/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;

import com.google.common.base.Strings;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import lombok.NonNull;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;

public class JdbcOutputFormatBuilder {
    @NonNull
    private final JdbcDialect dialect;
    @NonNull
    private final JdbcConnectionProvider connectionProvider;
    @NonNull
    private final JdbcSinkOptions jdbcSinkOptions;
    @NonNull
    private final SeaTunnelRowType seaTunnelRowType;

    public JdbcOutputFormat build() {
        String table = this.jdbcSinkOptions.getTable();
        List<String> primaryKeys = this.jdbcSinkOptions.getPrimaryKeys();
        JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory = Strings.isNullOrEmpty((String)table) ? () -> JdbcOutputFormatBuilder.createSimpleBufferedExecutor(this.jdbcSinkOptions.getSimpleSQL(), this.seaTunnelRowType, this.dialect.getRowConverter()) : (primaryKeys == null || primaryKeys.isEmpty() ? () -> JdbcOutputFormatBuilder.createSimpleBufferedExecutor(this.dialect, table, this.seaTunnelRowType) : () -> JdbcOutputFormatBuilder.createUpsertBufferedExecutor(this.dialect, table, this.seaTunnelRowType, primaryKeys.toArray(new String[0]), this.jdbcSinkOptions.isSupportUpsertByQueryPrimaryKeyExist()));
        return new JdbcOutputFormat(this.connectionProvider, this.jdbcSinkOptions.getJdbcConnectionOptions(), statementExecutorFactory);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(JdbcDialect dialect, String table, SeaTunnelRowType rowType) {
        String insertSQL = dialect.getInsertIntoStatement(table, rowType.getFieldNames());
        return JdbcOutputFormatBuilder.createSimpleBufferedExecutor(insertSQL, rowType, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(String sql, SeaTunnelRowType rowType, JdbcRowConverter rowConverter) {
        JdbcBatchStatementExecutor<SeaTunnelRow> simpleRowExecutor = JdbcOutputFormatBuilder.createSimpleExecutor(sql, rowType, rowConverter);
        return new BufferedBatchStatementExecutor(simpleRowExecutor, Function.identity());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertBufferedExecutor(JdbcDialect dialect, String table, SeaTunnelRowType rowType, String[] pkNames, boolean supportUpsertByQueryPrimaryKeyExist) {
        int[] pkFields = Arrays.stream(pkNames).mapToInt(Arrays.asList(rowType.getFieldNames())::indexOf).toArray();
        SeaTunnelDataType[] pkTypes = (SeaTunnelDataType[])Arrays.stream(pkFields).mapToObj(index -> rowType.getFieldType(index)).toArray(SeaTunnelDataType[]::new);
        Function<SeaTunnelRow, SeaTunnelRow> keyExtractor = JdbcOutputFormatBuilder.createKeyExtractor(pkFields);
        JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor = JdbcOutputFormatBuilder.createDeleteExecutor(dialect, table, pkNames, pkTypes);
        JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor = JdbcOutputFormatBuilder.createUpsertExecutor(dialect, table, rowType, pkNames, pkTypes, keyExtractor, supportUpsertByQueryPrimaryKeyExist);
        return new BufferReducedBatchStatementExecutor(upsertExecutor, deleteExecutor, keyExtractor, Function.identity());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertExecutor(JdbcDialect dialect, String table, SeaTunnelRowType rowType, String[] pkNames, SeaTunnelDataType[] pkTypes, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, boolean supportUpsertByQueryPrimaryKeyExist) {
        return dialect.getUpsertStatement(table, rowType.getFieldNames(), pkNames).map(upsertSQL -> JdbcOutputFormatBuilder.createSimpleExecutor(upsertSQL, rowType, dialect.getRowConverter())).orElseGet(() -> {
            if (supportUpsertByQueryPrimaryKeyExist) {
                return JdbcOutputFormatBuilder.createInsertOrUpdateByQueryExecutor(dialect, table, rowType, pkNames, pkTypes, keyExtractor);
            }
            return JdbcOutputFormatBuilder.createInsertOrUpdateExecutor(dialect, table, rowType, pkNames);
        });
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateExecutor(JdbcDialect dialect, String table, SeaTunnelRowType rowType, String[] pkNames) {
        return new InsertOrUpdateBatchStatementExecutor(connection -> connection.prepareStatement(dialect.getInsertIntoStatement(table, rowType.getFieldNames())), connection -> connection.prepareStatement(dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames)), rowType, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOrUpdateByQueryExecutor(JdbcDialect dialect, String table, SeaTunnelRowType rowType, String[] pkNames, SeaTunnelDataType[] pkTypes, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor) {
        SeaTunnelRowType keyRowType = new SeaTunnelRowType(pkNames, pkTypes);
        return new InsertOrUpdateBatchStatementExecutor(connection -> connection.prepareStatement(dialect.getRowExistsStatement(table, pkNames)), connection -> connection.prepareStatement(dialect.getInsertIntoStatement(table, rowType.getFieldNames())), connection -> connection.prepareStatement(dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames)), keyRowType, keyExtractor, rowType, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createDeleteExecutor(JdbcDialect dialect, String table, String[] pkNames, SeaTunnelDataType[] pkTypes) {
        String deleteSQL = dialect.getDeleteStatement(table, pkNames);
        return JdbcOutputFormatBuilder.createSimpleExecutor(deleteSQL, pkNames, pkTypes, dialect.getRowConverter());
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(String sql, String[] fieldNames, SeaTunnelDataType[] fieldTypes, JdbcRowConverter rowConverter) {
        SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, fieldTypes);
        return JdbcOutputFormatBuilder.createSimpleExecutor(sql, rowType, rowConverter);
    }

    private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(String sql, SeaTunnelRowType rowType, JdbcRowConverter rowConverter) {
        return new SimpleBatchStatementExecutor(connection -> connection.prepareStatement(sql), rowType, rowConverter);
    }

    private static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
        return row -> {
            Object[] fields = new Object[pkFields.length];
            for (int i = 0; i < pkFields.length; ++i) {
                fields[i] = row.getField(pkFields[i]);
            }
            SeaTunnelRow newRow = new SeaTunnelRow(fields);
            newRow.setTableId(row.getTableId());
            newRow.setRowKind(row.getRowKind());
            return row;
        };
    }

    public JdbcOutputFormatBuilder(@NonNull JdbcDialect dialect, @NonNull JdbcConnectionProvider connectionProvider, @NonNull JdbcSinkOptions jdbcSinkOptions, @NonNull SeaTunnelRowType seaTunnelRowType) {
        if (dialect == null) {
            throw new NullPointerException("dialect is marked @NonNull but is null");
        }
        if (connectionProvider == null) {
            throw new NullPointerException("connectionProvider is marked @NonNull but is null");
        }
        if (jdbcSinkOptions == null) {
            throw new NullPointerException("jdbcSinkOptions is marked @NonNull but is null");
        }
        if (seaTunnelRowType == null) {
            throw new NullPointerException("seaTunnelRowType is marked @NonNull but is null");
        }
        this.dialect = dialect;
        this.connectionProvider = connectionProvider;
        this.jdbcSinkOptions = jdbcSinkOptions;
        this.seaTunnelRowType = seaTunnelRowType;
    }
}

