/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
import io.debezium.connector.sqlserver.SqlServerErrorHandler;
import io.debezium.connector.sqlserver.SqlServerOffsetContext;
import io.debezium.connector.sqlserver.SqlServerPartition;
import io.debezium.connector.sqlserver.SqlServerTaskContext;
import io.debezium.connector.sqlserver.SqlServerTopicSelector;
import io.debezium.connector.sqlserver.SqlServerValueConverters;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlServerSourceFetchTaskContext
extends JdbcSourceFetchTaskContext {
    private static final Logger log = LoggerFactory.getLogger(SqlServerSourceFetchTaskContext.class);
    private final SqlServerConnection dataConnection;
    private SqlServerConnection metadataConnection;
    private final SqlServerEventMetadataProvider metadataProvider;
    private SqlServerDatabaseSchema databaseSchema;
    private SqlServerOffsetContext offsetContext;
    private SqlServerPartition partition;
    private TopicSelector<TableId> topicSelector;
    private JdbcSourceEventDispatcher<SqlServerPartition> dispatcher;
    private ChangeEventQueue<DataChangeEvent> queue;
    private SqlServerErrorHandler errorHandler;
    private SqlServerTaskContext taskContext;
    private SnapshotChangeEventSourceMetrics<SqlServerPartition> snapshotChangeEventSourceMetrics;

    public SqlServerSourceFetchTaskContext(SqlServerSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
        super((JdbcSourceConfig)sourceConfig, dataSourceDialect);
        this.dataConnection = SqlServerConnectionUtils.createSqlServerConnection(sourceConfig.getDbzConfiguration());
        this.metadataProvider = new SqlServerEventMetadataProvider();
    }

    public void configure(SourceSplitBase sourceSplitBase) {
        super.registerDatabaseHistory(sourceSplitBase, (JdbcConnection)this.dataConnection);
        SqlServerConnectorConfig connectorConfig = this.getDbzConnectorConfig();
        SqlServerValueConverters valueConverters = new SqlServerValueConverters(connectorConfig.getDecimalMode(), connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode());
        this.topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
        this.databaseSchema = SqlServerUtils.createSqlServerDatabaseSchema(connectorConfig, this.dataConnection);
        String serverName = connectorConfig.getLogicalName();
        String dbName = connectorConfig.getJdbcConfig().getDatabase();
        this.partition = new SqlServerPartition(serverName, dbName, false);
        this.offsetContext = this.loadStartingOffsetState(new SqlServerOffsetContext.Loader(connectorConfig), sourceSplitBase);
        this.validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new SqlServerTaskContext(connectorConfig, this.databaseSchema);
        int queueSize = sourceSplitBase.isSnapshotSplit() && this.isExactlyOnce() ? Integer.MAX_VALUE : this.getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
        this.queue = new ChangeEventQueue.Builder().pollInterval(connectorConfig.getPollInterval()).maxBatchSize(connectorConfig.getMaxBatchSize()).maxQueueSize(queueSize).maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext("sqlServer-cdc-connector-task")).build();
        this.dispatcher = new JdbcSourceEventDispatcher((CommonConnectorConfig)connectorConfig, this.topicSelector, (DatabaseSchema)this.databaseSchema, this.queue, (DataCollectionFilters.DataCollectionFilter)connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, (EventMetadataProvider)this.metadataProvider, this.schemaNameAdjuster);
        DefaultChangeEventSourceMetricsFactory changeEventSourceMetricsFactory = new DefaultChangeEventSourceMetricsFactory();
        this.snapshotChangeEventSourceMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics((CdcSourceTaskContext)this.taskContext, this.queue, (EventMetadataProvider)this.metadataProvider);
        this.errorHandler = new SqlServerErrorHandler(connectorConfig, this.queue);
        if (sourceSplitBase.isIncrementalSplit() || this.isExactlyOnce()) {
            this.initMetadataConnection();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initMetadataConnection() {
        if (this.metadataConnection == null) {
            SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext = this;
            synchronized (sqlServerSourceFetchTaskContext) {
                if (this.metadataConnection == null) {
                    this.metadataConnection = SqlServerConnectionUtils.createSqlServerConnection(this.sourceConfig.getDbzConfiguration());
                }
            }
        }
    }

    public void close() {
        try {
            this.dataConnection.close();
            if (this.metadataConnection != null) {
                this.metadataConnection.close();
            }
        }
        catch (SQLException e) {
            log.warn("Failed to close connection", (Throwable)e);
        }
    }

    public SqlServerSourceConfig getSourceConfig() {
        return (SqlServerSourceConfig)this.sourceConfig;
    }

    public SqlServerConnection getDataConnection() {
        return this.dataConnection;
    }

    public SqlServerConnection getMetadataConnection() {
        return this.metadataConnection;
    }

    public SnapshotChangeEventSourceMetrics<SqlServerPartition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public SqlServerConnectorConfig getDbzConnectorConfig() {
        return (SqlServerConnectorConfig)super.getDbzConnectorConfig();
    }

    public SqlServerOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    public SqlServerPartition getPartition() {
        return this.partition;
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    public SqlServerDatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    public SeaTunnelRowType getSplitType(Table table) {
        return SqlServerUtils.getSplitType(table);
    }

    public JdbcSourceEventDispatcher<SqlServerPartition> getDispatcher() {
        return this.dispatcher;
    }

    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    public Tables.TableFilter getTableFilter() {
        return this.getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return SqlServerUtils.getLsn(sourceRecord);
    }

    private void validateAndLoadDatabaseHistory(SqlServerOffsetContext offset, SqlServerDatabaseSchema schema) {
        schema.initializeStorage();
        schema.recover(this.partition, offset);
    }

    private SqlServerOffsetContext loadStartingOffsetState(SqlServerOffsetContext.Loader loader, SourceSplitBase split) {
        LsnOffset offset = split.isSnapshotSplit() ? LsnOffset.INITIAL_OFFSET : split.asIncrementalSplit().getStartupOffset();
        OffsetContext sqlServerOffsetContext = loader.load(offset.getOffset());
        return sqlServerOffsetContext;
    }

    public static class SqlServerEventMetadataProvider
    implements EventMetadataProvider {
        public Instant getEventTimestamp(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            Long timestamp = sourceInfo.getInt64("ts_ms");
            return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
        }

        public Map<String, String> getEventSourcePosition(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            return Collect.hashMapOf((Object)"commit_lsn", (Object)sourceInfo.getString("commit_lsn"), (Object)"change_lsn", (Object)sourceInfo.getString("change_lsn"));
        }

        public String getTransactionId(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            return sourceInfo.getString("commit_lsn");
        }
    }
}

