/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.collections4.MapUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.format.avro.AvroSerializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.json.maxwell.MaxWellJsonSerializationSchema;
import org.apache.seatunnel.format.json.ogg.OggJsonSerializationSchema;
import org.apache.seatunnel.format.protobuf.ProtobufSerializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;

public class DefaultSeaTunnelRowSerializer
implements SeaTunnelRowSerializer {
    private final Function<SeaTunnelRow, String> topicExtractor;
    private final Function<SeaTunnelRow, Integer> partitionExtractor;
    private final Function<SeaTunnelRow, Long> timestampExtractor;
    private final Function<SeaTunnelRow, byte[]> keyExtractor;
    private final Function<SeaTunnelRow, byte[]> valueExtractor;
    private final Function<SeaTunnelRow, Iterable<Header>> headersExtractor;

    public ProducerRecord serializeRow(SeaTunnelRow row) {
        return new ProducerRecord<byte[], byte[]>(this.topicExtractor.apply(row), this.partitionExtractor.apply(row), this.timestampExtractor.apply(row), this.keyExtractor.apply(row), this.valueExtractor.apply(row), this.headersExtractor.apply(row));
    }

    public static DefaultSeaTunnelRowSerializer create(String topic, MessageFormat format, SeaTunnelRowType rowType) {
        return new DefaultSeaTunnelRowSerializer(DefaultSeaTunnelRowSerializer.topicExtractor(topic, rowType, format), DefaultSeaTunnelRowSerializer.partitionNativeExtractor(rowType), DefaultSeaTunnelRowSerializer.timestampExtractor(rowType), DefaultSeaTunnelRowSerializer.keyExtractor(rowType), DefaultSeaTunnelRowSerializer.valueExtractor(rowType), DefaultSeaTunnelRowSerializer.headersExtractor(rowType));
    }

    public static DefaultSeaTunnelRowSerializer createWithPartitionAndTimestampFields(String topic, MessageFormat format, SeaTunnelRowType rowType, String delimiter, ReadonlyConfig pluginConfig) {
        return new DefaultSeaTunnelRowSerializer(DefaultSeaTunnelRowSerializer.topicExtractor(topic, rowType, format), DefaultSeaTunnelRowSerializer.partitionNativeExtractor(rowType), DefaultSeaTunnelRowSerializer.timestampExtractor(rowType), DefaultSeaTunnelRowSerializer.keyExtractor(null, rowType, format, null, null), DefaultSeaTunnelRowSerializer.valueExtractor(rowType, format, delimiter, pluginConfig), DefaultSeaTunnelRowSerializer.headersExtractor());
    }

    public static DefaultSeaTunnelRowSerializer create(String topic, SeaTunnelRowType rowType, MessageFormat format, String delimiter, ReadonlyConfig pluginConfig) {
        return new DefaultSeaTunnelRowSerializer(DefaultSeaTunnelRowSerializer.topicExtractor(topic, rowType, format), DefaultSeaTunnelRowSerializer.partitionExtractor(null), DefaultSeaTunnelRowSerializer.timestampExtractor(), DefaultSeaTunnelRowSerializer.keyExtractor(null, rowType, format, delimiter, pluginConfig), DefaultSeaTunnelRowSerializer.valueExtractor(rowType, format, delimiter, pluginConfig), DefaultSeaTunnelRowSerializer.headersExtractor());
    }

    public static DefaultSeaTunnelRowSerializer create(String topic, Integer partition, SeaTunnelRowType rowType, MessageFormat format, String delimiter, ReadonlyConfig pluginConfig) {
        return new DefaultSeaTunnelRowSerializer(DefaultSeaTunnelRowSerializer.topicExtractor(topic, rowType, format), DefaultSeaTunnelRowSerializer.partitionExtractor(partition), DefaultSeaTunnelRowSerializer.timestampExtractor(), DefaultSeaTunnelRowSerializer.keyExtractor(null, rowType, format, delimiter, pluginConfig), DefaultSeaTunnelRowSerializer.valueExtractor(rowType, format, delimiter, pluginConfig), DefaultSeaTunnelRowSerializer.headersExtractor());
    }

    public static DefaultSeaTunnelRowSerializer create(String topic, List<String> keyFields, SeaTunnelRowType rowType, MessageFormat format, String delimiter, ReadonlyConfig pluginConfig) {
        return new DefaultSeaTunnelRowSerializer(DefaultSeaTunnelRowSerializer.topicExtractor(topic, rowType, format), DefaultSeaTunnelRowSerializer.partitionExtractor(null), DefaultSeaTunnelRowSerializer.timestampExtractor(), DefaultSeaTunnelRowSerializer.keyExtractor(keyFields, rowType, format, delimiter, pluginConfig), DefaultSeaTunnelRowSerializer.valueExtractor(rowType, format, delimiter, pluginConfig), DefaultSeaTunnelRowSerializer.headersExtractor());
    }

    private static Function<SeaTunnelRow, Integer> partitionNativeExtractor(SeaTunnelRowType rowType) {
        return row -> (Integer)row.getField(rowType.indexOf("partition"));
    }

    private static Function<SeaTunnelRow, Integer> partitionExtractor(Integer partition) {
        return row -> partition;
    }

    private static Function<SeaTunnelRow, Long> timestampExtractor() {
        return row -> null;
    }

    private static Function<SeaTunnelRow, Long> timestampExtractor(SeaTunnelRowType rowType) {
        return row -> (Long)row.getField(rowType.indexOf("timestamp"));
    }

    private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor() {
        return row -> null;
    }

    private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor(SeaTunnelRowType rowType) {
        return row -> DefaultSeaTunnelRowSerializer.convertToKafkaHeaders((Map)row.getField(rowType.indexOf("headers")));
    }

    private static Function<SeaTunnelRow, String> topicExtractor(String topic, SeaTunnelRowType rowType, MessageFormat format) {
        if ((MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals((Object)format) || MessageFormat.NATIVE.equals((Object)format)) && topic == null) {
            int topicFieldIndex = rowType.indexOf("topic");
            return row -> row.getField(topicFieldIndex).toString();
        }
        String regex = "\\$\\{(.*?)\\}";
        Pattern pattern = Pattern.compile(regex, 32);
        Matcher matcher = pattern.matcher(topic);
        boolean isExtractTopic = matcher.find();
        if (!isExtractTopic) {
            return row -> topic;
        }
        String topicField = matcher.group(1);
        List<String> fieldNames = Arrays.asList(rowType.getFieldNames());
        if (!fieldNames.contains(topicField)) {
            throw new KafkaConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, String.format("Field name { %s } is not found!", topic));
        }
        int topicFieldIndex = rowType.indexOf(topicField);
        return row -> {
            Object topicFieldValue = row.getField(topicFieldIndex);
            if (topicFieldValue == null) {
                throw new KafkaConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "The column value is empty!");
            }
            return topicFieldValue.toString();
        };
    }

    private static Function<SeaTunnelRow, byte[]> keyExtractor(List<String> keyFields, SeaTunnelRowType rowType, MessageFormat format, String delimiter, ReadonlyConfig pluginConfig) {
        if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals((Object)format)) {
            CompatibleDebeziumJsonSerializationSchema serializationSchema = new CompatibleDebeziumJsonSerializationSchema(rowType, true);
            return row -> serializationSchema.serialize((SeaTunnelRow)row);
        }
        if (keyFields == null || keyFields.isEmpty()) {
            return row -> null;
        }
        SeaTunnelRowType keyType = DefaultSeaTunnelRowSerializer.createKeyType(keyFields, rowType);
        Function<SeaTunnelRow, SeaTunnelRow> keyRowExtractor = DefaultSeaTunnelRowSerializer.createKeyRowExtractor(keyType, rowType);
        SerializationSchema serializationSchema = DefaultSeaTunnelRowSerializer.createSerializationSchema(keyType, format, delimiter, true, pluginConfig);
        return row -> serializationSchema.serialize((SeaTunnelRow)keyRowExtractor.apply((SeaTunnelRow)row));
    }

    private static Function<SeaTunnelRow, byte[]> keyExtractor(SeaTunnelRowType rowType) {
        return row -> (byte[])row.getField(rowType.indexOf("key"));
    }

    private static Function<SeaTunnelRow, byte[]> valueExtractor(SeaTunnelRowType rowType, MessageFormat format, String delimiter, ReadonlyConfig pluginConfig) {
        SerializationSchema serializationSchema = DefaultSeaTunnelRowSerializer.createSerializationSchema(rowType, format, delimiter, false, pluginConfig);
        return row -> serializationSchema.serialize(row);
    }

    private static Function<SeaTunnelRow, byte[]> valueExtractor(SeaTunnelRowType rowType) {
        return row -> (byte[])row.getField(rowType.indexOf("value"));
    }

    private static SeaTunnelRowType createKeyType(List<String> keyFieldNames, SeaTunnelRowType rowType) {
        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
        SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
        for (int i = 0; i < keyFieldNames.size(); ++i) {
            int rowFieldIndex;
            String keyFieldName = keyFieldNames.get(i);
            keyFieldIndexArr[i] = rowFieldIndex = rowType.indexOf(keyFieldName);
            keyFieldDataTypeArr[i] = rowType.getFieldType(rowFieldIndex);
        }
        return new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
    }

    private static Function<SeaTunnelRow, SeaTunnelRow> createKeyRowExtractor(SeaTunnelRowType keyType, SeaTunnelRowType rowType) {
        int[] keyIndex = new int[keyType.getTotalFields()];
        for (int i = 0; i < keyType.getTotalFields(); ++i) {
            keyIndex[i] = rowType.indexOf(keyType.getFieldName(i));
        }
        return row -> {
            Object[] fields = new Object[keyType.getTotalFields()];
            for (int i = 0; i < keyIndex.length; ++i) {
                fields[i] = row.getField(keyIndex[i]);
            }
            return new SeaTunnelRow(fields);
        };
    }

    private static SerializationSchema createSerializationSchema(SeaTunnelRowType rowType, MessageFormat format, String delimiter, boolean isKey, ReadonlyConfig pluginConfig) {
        switch (format) {
            case JSON: 
            case NATIVE: {
                return new JsonSerializationSchema(rowType);
            }
            case TEXT: {
                return TextSerializationSchema.builder().seaTunnelRowType(rowType).delimiter(delimiter).build();
            }
            case CANAL_JSON: {
                return new CanalJsonSerializationSchema(rowType);
            }
            case OGG_JSON: {
                return new OggJsonSerializationSchema(rowType);
            }
            case DEBEZIUM_JSON: {
                return new DebeziumJsonSerializationSchema(rowType);
            }
            case MAXWELL_JSON: {
                return new MaxWellJsonSerializationSchema(rowType);
            }
            case COMPATIBLE_DEBEZIUM_JSON: {
                return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
            }
            case AVRO: {
                return new AvroSerializationSchema(rowType);
            }
            case PROTOBUF: {
                String protobufMessageName = (String)pluginConfig.get(KafkaBaseOptions.PROTOBUF_MESSAGE_NAME);
                String protobufSchema = (String)pluginConfig.get(KafkaBaseOptions.PROTOBUF_SCHEMA);
                return new ProtobufSerializationSchema(rowType, protobufMessageName, protobufSchema);
            }
        }
        throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + (Object)((Object)format));
    }

    private static Iterable<Header> convertToKafkaHeaders(Map<String, String> headersMap) {
        if (MapUtils.isEmpty(headersMap)) {
            return null;
        }
        RecordHeaders kafkaHeaders = new RecordHeaders();
        for (Map.Entry<String, String> entry : headersMap.entrySet()) {
            kafkaHeaders.add(new RecordHeader(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8)));
        }
        return kafkaHeaders;
    }

    public DefaultSeaTunnelRowSerializer(Function<SeaTunnelRow, String> topicExtractor, Function<SeaTunnelRow, Integer> partitionExtractor, Function<SeaTunnelRow, Long> timestampExtractor, Function<SeaTunnelRow, byte[]> keyExtractor, Function<SeaTunnelRow, byte[]> valueExtractor, Function<SeaTunnelRow, Iterable<Header>> headersExtractor) {
        this.topicExtractor = topicExtractor;
        this.partitionExtractor = partitionExtractor;
        this.timestampExtractor = timestampExtractor;
        this.keyExtractor = keyExtractor;
        this.valueExtractor = valueExtractor;
        this.headersExtractor = headersExtractor;
    }
}

