/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionPoolProviderProxy;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.AbstractJdbcSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.ConnectionPoolManager;
import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
import org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSinkWriter
extends AbstractJdbcSinkWriter<ConnectionPoolManager> {
    private static final Logger log = LoggerFactory.getLogger(JdbcSinkWriter.class);
    private final Integer primaryKeyIndex;

    public JdbcSinkWriter(TablePath sinkTablePath, JdbcDialect dialect, JdbcSinkConfig jdbcSinkConfig, TableSchema tableSchema, Integer primaryKeyIndex) {
        this.sinkTablePath = sinkTablePath;
        this.dialect = dialect;
        this.tableSchema = tableSchema;
        this.jdbcSinkConfig = jdbcSinkConfig;
        this.primaryKeyIndex = primaryKeyIndex;
        this.connectionProvider = dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
        this.outputFormat = new JdbcOutputFormatBuilder(dialect, this.connectionProvider, jdbcSinkConfig, tableSchema).build();
    }

    public MultiTableResourceManager<ConnectionPoolManager> initMultiTableResourceManager(int tableSize, int queueSize) {
        HikariDataSource ds = new HikariDataSource();
        try {
            Class.forName(this.jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
        ds.setIdleTimeout(30000L);
        ds.setMaximumPoolSize(queueSize);
        ds.setJdbcUrl(this.jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
        if (this.jdbcSinkConfig.getJdbcConnectionConfig().getUsername().isPresent()) {
            ds.setUsername(this.jdbcSinkConfig.getJdbcConnectionConfig().getUsername().get());
        }
        if (this.jdbcSinkConfig.getJdbcConnectionConfig().getPassword().isPresent()) {
            ds.setPassword(this.jdbcSinkConfig.getJdbcConnectionConfig().getPassword().get());
        }
        ds.setAutoCommit(this.jdbcSinkConfig.getJdbcConnectionConfig().isAutoCommit());
        return new JdbcMultiTableResourceManager(new ConnectionPoolManager(ds));
    }

    public void setMultiTableResourceManager(MultiTableResourceManager<ConnectionPoolManager> multiTableResourceManager, int queueIndex) {
        this.connectionProvider.closeConnection();
        this.connectionProvider = new SimpleJdbcConnectionPoolProviderProxy((ConnectionPoolManager)multiTableResourceManager.getSharedResource().get(), this.jdbcSinkConfig.getJdbcConnectionConfig(), queueIndex);
        this.outputFormat = new JdbcOutputFormatBuilder(this.dialect, this.connectionProvider, this.jdbcSinkConfig, this.tableSchema).build();
    }

    public Optional<Integer> primaryKey() {
        return this.primaryKeyIndex != null ? Optional.of(this.primaryKeyIndex) : Optional.empty();
    }

    private void tryOpen() throws IOException {
        if (!this.isOpen) {
            this.isOpen = true;
            this.outputFormat.open();
        }
    }

    public List<JdbcSinkState> snapshotState(long checkpointId) {
        return Collections.emptyList();
    }

    public void write(SeaTunnelRow element) throws IOException {
        this.tryOpen();
        this.outputFormat.writeRecord(element);
    }

    public Optional<XidInfo> prepareCommit() throws IOException {
        this.tryOpen();
        this.outputFormat.checkFlushException();
        this.outputFormat.flush();
        try {
            if (!this.connectionProvider.getConnection().getAutoCommit()) {
                this.connectionProvider.getConnection().commit();
            }
        }
        catch (SQLException e) {
            throw new JdbcConnectorException(JdbcConnectorErrorCode.TRANSACTION_OPERATION_FAILED, "commit failed," + e.getMessage(), e);
        }
        return Optional.empty();
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        this.tryOpen();
        this.outputFormat.flush();
        try {
            if (!this.connectionProvider.getConnection().getAutoCommit()) {
                this.connectionProvider.getConnection().commit();
            }
        }
        catch (SQLException e) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "unable to close JDBC sink write", e);
        }
        finally {
            this.outputFormat.close();
        }
    }
}

