/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source;

import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.ChangeTable;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.enumerator.SqlServerChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.scan.SqlServerSnapshotFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.TableDiscoveryUtils;

public class SqlServerDialect
implements JdbcDataSourceDialect {
    private static final long serialVersionUID = 1L;
    private final SqlServerSourceConfig sourceConfig;
    private transient SqlServerSchema sqlServerSchema;
    private final Map<TableId, CatalogTable> tableMap;

    public SqlServerDialect(SqlServerSourceConfigFactory configFactory, List<CatalogTable> catalogTables) {
        this.sourceConfig = configFactory.create(0);
        this.tableMap = CatalogTableUtils.convertTables(catalogTables);
    }

    public String getName() {
        return "SqlServer";
    }

    public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
        return true;
    }

    public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
        return SqlServerConnectionUtils.createSqlServerConnection(sourceConfig.getDbzConfiguration());
    }

    public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
        return new SqlServerChunkSplitter(sourceConfig, this);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
        SqlServerSourceConfig sqlServerSourceConfig = (SqlServerSourceConfig)sourceConfig;
        try (JdbcConnection jdbcConnection = this.openJdbcConnection(sourceConfig);){
            List<TableId> tables = TableDiscoveryUtils.listTables(jdbcConnection, sqlServerSourceConfig.getTableFilters());
            this.checkAllTablesEnabledCapture(jdbcConnection, tables);
            List<TableId> list = tables;
            return list;
        }
        catch (SQLException e) {
            throw new SeaTunnelException("Error to discover tables: " + e.getMessage(), (Throwable)e);
        }
    }

    public void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection, List<TableId> tableIds) throws SQLException {
        Map databases = tableIds.stream().collect(Collectors.groupingBy(TableId::catalog, Collectors.toList()));
        for (String database : databases.keySet()) {
            Set tables = ((SqlServerConnection)jdbcConnection).getChangeTables(database).stream().map(ChangeTable::getSourceTableId).collect(Collectors.toSet());
            for (TableId tableId : databases.get(database)) {
                if (tables.contains(tableId)) continue;
                throw new SeaTunnelException("Table " + tableId + " is not enabled for capture");
            }
        }
    }

    public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
        if (this.sqlServerSchema == null) {
            this.sqlServerSchema = new SqlServerSchema(this.sourceConfig.getDbzConnectorConfig(), this.tableMap);
        }
        return this.sqlServerSchema.getTableSchema(jdbc, tableId);
    }

    public SqlServerSourceFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
        return new SqlServerSourceFetchTaskContext((SqlServerSourceConfig)taskSourceConfig, this);
    }

    public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) {
        if (sourceSplitBase.isSnapshotSplit()) {
            return new SqlServerSnapshotFetchTask(sourceSplitBase.asSnapshotSplit());
        }
        try (JdbcConnection jdbcConnection = this.openJdbcConnection(this.sourceConfig);){
            List tables = sourceSplitBase.asIncrementalSplit().getTableIds();
            this.checkAllTablesEnabledCapture(jdbcConnection, tables);
        }
        catch (SQLException e) {
            throw new SeaTunnelException("Error to check tables: " + e.getMessage(), (Throwable)e);
        }
        return new SqlServerTransactionLogFetchTask(sourceSplitBase.asIncrementalSplit());
    }

    public Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId) {
        return Optional.ofNullable(this.tableMap.get(tableId).getTableSchema().getPrimaryKey());
    }

    public List<ConstraintKey> getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId) {
        return this.tableMap.get(tableId).getTableSchema().getConstraintKeys();
    }
}

