/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.transforms.outbox;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigDef;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.connector.ConnectRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.header.Headers;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.ExtractField;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.RegexRouter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.Transformation;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.util.Requirements;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.transforms.SmtManager;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import io.debezium.transforms.tracing.ActivateTracingSpan;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Incubating
public class EventRouter<R extends ConnectRecord<R>>
implements Transformation<R> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventRouter.class);
    private static final String ENVELOPE_PAYLOAD = "payload";
    private final ExtractField<R> afterExtractor = new ExtractField.Value();
    private final RegexRouter<R> regexRouter = new RegexRouter();
    private EventRouterConfigDefinition.InvalidOperationBehavior invalidOperationBehavior;
    private final ActivateTracingSpan<R> tracingSmt = new ActivateTracingSpan();
    private String fieldEventId;
    private String fieldEventKey;
    private String fieldEventTimestamp;
    private String fieldPayload;
    private String fieldPayloadId;
    private String fieldSchemaVersion;
    private String routeByField;
    private boolean routeTombstoneOnEmptyPayload;
    private List<EventRouterConfigDefinition.AdditionalField> additionalFields;
    private Schema defaultValueSchema;
    private final Map<Integer, Schema> versionedValueSchema = new HashMap<Integer, Schema>();
    private boolean onlyHeadersInOutputMessage = false;
    private SmtManager<R> smtManager;

    @Override
    public R apply(R r2) {
        Schema updatedSchema;
        Object updatedValue;
        boolean isDeleteEvent;
        this.tracingSmt.apply(r2);
        if (((ConnectRecord)r2).value() == null) {
            LOGGER.debug("Tombstone message ignored. Message key: \"{}\"", ((ConnectRecord)r2).key());
            return null;
        }
        if (!this.smtManager.isValidEnvelope(r2)) {
            return r2;
        }
        Struct debeziumEventValue = Requirements.requireStruct(((ConnectRecord)r2).value(), "Detect Debezium Operation");
        String op = debeziumEventValue.getString("op");
        if (op.equals(Envelope.Operation.DELETE.code())) {
            LOGGER.info("Delete message {} ignored", ((ConnectRecord)r2).key());
            return null;
        }
        if (op.equals(Envelope.Operation.UPDATE.code())) {
            this.handleUnexpectedOperation(r2);
            return null;
        }
        R afterRecord = this.afterExtractor.apply(r2);
        Struct eventStruct = Requirements.requireStruct(((ConnectRecord)afterRecord).value(), "Read Outbox Event");
        Schema eventValueSchema = ((ConnectRecord)afterRecord).valueSchema();
        com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field payloadField = eventValueSchema.field(this.fieldPayload);
        if (payloadField == null) {
            throw new ConnectException(String.format("Unable to find payload field %s in event", this.fieldPayload));
        }
        Schema payloadSchema = payloadField.schema();
        Long timestamp = this.getEventTimestampMs(debeziumEventValue, eventStruct);
        Object eventId = eventStruct.get(this.fieldEventId);
        Object payload = eventStruct.get(this.fieldPayload);
        com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field fallbackPayloadIdField = eventValueSchema.field(this.fieldPayloadId);
        Object payloadId = fallbackPayloadIdField != null ? eventStruct.get(this.fieldPayloadId) : null;
        com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field eventIdField = eventValueSchema.field(this.fieldEventId);
        if (eventIdField == null) {
            throw new ConnectException(String.format("Unable to find event-id field %s in event", this.fieldEventId));
        }
        Headers headers = ((ConnectRecord)r2).headers();
        headers.add("id", eventId, eventIdField.schema());
        Schema structValueSchema = this.onlyHeadersInOutputMessage ? null : (this.fieldSchemaVersion == null ? this.getValueSchema(eventValueSchema, eventStruct.getString(this.routeByField)) : this.getValueSchema(eventValueSchema, eventStruct.getInt32(this.fieldSchemaVersion), eventStruct.getString(this.routeByField)));
        Struct structValue = this.onlyHeadersInOutputMessage ? null : new Struct(structValueSchema).put(ENVELOPE_PAYLOAD, payload);
        this.additionalFields.forEach(additionalField -> {
            switch (additionalField.getPlacement()) {
                case ENVELOPE: {
                    structValue.put(additionalField.getAlias(), eventStruct.get(additionalField.getField()));
                    break;
                }
                case HEADER: {
                    headers.add(additionalField.getAlias(), eventStruct.get(additionalField.getField()), eventValueSchema.field(additionalField.getField()).schema());
                }
            }
        });
        boolean bl = isDeleteEvent = payload == null || payload.toString().trim().isEmpty();
        if (isDeleteEvent && this.routeTombstoneOnEmptyPayload) {
            updatedValue = null;
            updatedSchema = null;
        } else if (this.onlyHeadersInOutputMessage) {
            updatedValue = payload;
            updatedSchema = payloadSchema;
        } else {
            updatedValue = structValue;
            updatedSchema = structValueSchema;
        }
        Object newRecord = ((ConnectRecord)r2).newRecord(eventStruct.getString(this.routeByField), null, this.defineRecordKeySchema(eventValueSchema, fallbackPayloadIdField), this.defineRecordKey(eventStruct, payloadId), updatedSchema, updatedValue, timestamp, headers);
        return this.regexRouter.apply(newRecord);
    }

    private Long getEventTimestampMs(Struct debeziumEventValue, Struct eventStruct) {
        if (this.fieldEventTimestamp == null) {
            return debeziumEventValue.getInt64("ts_ms");
        }
        com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field timestampField = eventStruct.schema().field(this.fieldEventTimestamp);
        if (timestampField == null) {
            throw new ConnectException(String.format("Unable to find timestamp field %s in event", this.fieldEventTimestamp));
        }
        Long timestamp = eventStruct.getInt64(this.fieldEventTimestamp);
        if (timestamp == null) {
            return debeziumEventValue.getInt64("ts_ms");
        }
        String schemaName = timestampField.schema().name();
        if (schemaName == null) {
            throw new ConnectException(String.format("Unsupported field type %s (without logical schema name) for event timestamp", new Object[]{timestampField.schema().type()}));
        }
        switch (schemaName) {
            case "io.debezium.time.Timestamp": {
                return timestamp;
            }
            case "io.debezium.time.MicroTimestamp": {
                return timestamp / 1000L;
            }
            case "io.debezium.time.NanoTimestamp": {
                return timestamp / 1000000L;
            }
        }
        throw new ConnectException(String.format("Unsupported field type %s for event timestamp", schemaName));
    }

    private Schema defineRecordKeySchema(Schema eventStruct, com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field fallbackKeyField) {
        com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field eventKeySchema = null;
        if (this.fieldEventKey != null) {
            eventKeySchema = eventStruct.field(this.fieldEventKey);
        }
        if (eventKeySchema != null) {
            return eventKeySchema.schema();
        }
        return fallbackKeyField != null ? fallbackKeyField.schema() : Schema.STRING_SCHEMA;
    }

    private Object defineRecordKey(Struct eventStruct, Object fallbackKey) {
        Object eventKey = null;
        if (this.fieldEventKey != null) {
            eventKey = eventStruct.get(this.fieldEventKey);
        }
        return eventKey != null ? eventKey : fallbackKey;
    }

    private void handleUnexpectedOperation(R r2) {
        switch (this.invalidOperationBehavior) {
            case SKIP_AND_WARN: {
                LOGGER.warn("Unexpected update message received {} and ignored", ((ConnectRecord)r2).key());
                break;
            }
            case SKIP_AND_ERROR: {
                LOGGER.error("Unexpected update message received {} and ignored", ((ConnectRecord)r2).key());
                break;
            }
            case FATAL: {
                throw new IllegalStateException(String.format("Unexpected update message received %s, fail.", ((ConnectRecord)r2).key()));
            }
        }
    }

    @Override
    public ConfigDef config() {
        return EventRouterConfigDefinition.configDef();
    }

    @Override
    public void close() {
        this.tracingSmt.close();
    }

    @Override
    public void configure(Map<String, ?> configMap) {
        this.tracingSmt.configure(configMap);
        if (!configMap.containsKey(ActivateTracingSpan.TRACING_CONTEXT_FIELD_REQUIRED.name())) {
            this.tracingSmt.setRequireContextField(true);
        }
        Configuration config = Configuration.from(configMap);
        this.smtManager = new SmtManager(config);
        Field.Set allFields = Field.setOf(EventRouterConfigDefinition.CONFIG_FIELDS);
        if (!config.validateAndRecord(allFields, arg_0 -> ((Logger)LOGGER).error(arg_0))) {
            throw new ConnectException("Unable to validate config.");
        }
        this.invalidOperationBehavior = EventRouterConfigDefinition.InvalidOperationBehavior.parse(config.getString(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR));
        this.fieldEventId = config.getString(EventRouterConfigDefinition.FIELD_EVENT_ID);
        this.fieldEventKey = config.getString(EventRouterConfigDefinition.FIELD_EVENT_KEY);
        this.fieldEventTimestamp = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP);
        this.fieldPayload = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD);
        this.fieldPayloadId = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID);
        this.fieldSchemaVersion = config.getString(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION);
        this.routeByField = config.getString(EventRouterConfigDefinition.ROUTE_BY_FIELD);
        this.routeTombstoneOnEmptyPayload = config.getBoolean(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD);
        HashMap<String, String> regexRouterConfig = new HashMap<String, String>();
        regexRouterConfig.put("regex", config.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX));
        regexRouterConfig.put("replacement", config.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REPLACEMENT));
        this.regexRouter.configure(regexRouterConfig);
        HashMap<String, String> afterExtractorConfig = new HashMap<String, String>();
        afterExtractorConfig.put("field", "after");
        this.afterExtractor.configure(afterExtractorConfig);
        this.additionalFields = EventRouterConfigDefinition.parseAdditionalFieldsConfig(config);
        this.onlyHeadersInOutputMessage = !this.additionalFields.stream().anyMatch(field -> field.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE);
    }

    private Schema getValueSchema(Schema debeziumEventSchema, String routedTopic) {
        if (this.defaultValueSchema == null) {
            this.defaultValueSchema = this.getSchemaBuilder(debeziumEventSchema, routedTopic).build();
        }
        return this.defaultValueSchema;
    }

    private Schema getValueSchema(Schema debeziumEventSchema, Integer version, String routedTopic) {
        if (!this.versionedValueSchema.containsKey(version)) {
            Schema schema = this.getSchemaBuilder(debeziumEventSchema, routedTopic).version(version).build();
            this.versionedValueSchema.put(version, schema);
        }
        return this.versionedValueSchema.get(version);
    }

    private SchemaBuilder getSchemaBuilder(Schema debeziumEventSchema, String routedTopic) {
        SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(this.getSchemaName(debeziumEventSchema, routedTopic));
        schemaBuilder.field(ENVELOPE_PAYLOAD, debeziumEventSchema.field(this.fieldPayload).schema());
        this.additionalFields.forEach(additionalField -> {
            if (additionalField.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE) {
                schemaBuilder.field(additionalField.getAlias(), debeziumEventSchema.field(additionalField.getField()).schema());
            }
        });
        return schemaBuilder;
    }

    private String getSchemaName(Schema debeziumEventSchema, String routedTopic) {
        int lastDot;
        String originalSchemaName = debeziumEventSchema.name();
        String schemaName = originalSchemaName != null ? ((lastDot = originalSchemaName.lastIndexOf(46)) != -1 ? originalSchemaName.substring(0, lastDot + 1) + routedTopic + "." + originalSchemaName.substring(lastDot + 1) : routedTopic + "." + originalSchemaName) : routedTopic;
        return schemaName;
    }
}

