/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.redshift.commit;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.redshift.RedshiftJdbcClient;
import org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3RedshiftSinkAggregatedCommitter
extends FileSinkAggregatedCommitter {
    private static final Logger log = LoggerFactory.getLogger(S3RedshiftSinkAggregatedCommitter.class);
    private final String executeSql;
    private Config pluginConfig;

    public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf, Config pluginConfig) {
        super(hadoopConf);
        this.pluginConfig = pluginConfig;
        this.executeSql = pluginConfig.getString(S3RedshiftConfigOptions.EXECUTE_SQL.key());
    }

    @Override
    public List<FileAggregatedCommitInfo> commit(List<FileAggregatedCommitInfo> aggregatedCommitInfos) {
        ArrayList<FileAggregatedCommitInfo> errorAggregatedCommitInfoList = new ArrayList<FileAggregatedCommitInfo>();
        aggregatedCommitInfos.forEach(aggregatedCommitInfo -> {
            try {
                for (Map.Entry<String, LinkedHashMap<String, String>> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) {
                    for (Map.Entry<String, String> mvFileEntry : entry.getValue().entrySet()) {
                        this.hadoopFileSystemProxy.renameFile(mvFileEntry.getKey(), mvFileEntry.getValue(), true);
                        String sql = this.convertSql(mvFileEntry.getValue());
                        log.debug("execute redshift sql is:" + sql);
                        RedshiftJdbcClient.getInstance(this.pluginConfig).execute(sql);
                        this.hadoopFileSystemProxy.deleteFile(mvFileEntry.getValue());
                    }
                    this.hadoopFileSystemProxy.deleteFile(entry.getKey());
                }
            }
            catch (Exception e) {
                log.error("commit aggregatedCommitInfo error ", (Throwable)e);
                errorAggregatedCommitInfoList.add((FileAggregatedCommitInfo)aggregatedCommitInfo);
                throw new S3RedshiftJdbcConnectorException((SeaTunnelErrorCode)S3RedshiftConnectorErrorCode.AGGREGATE_COMMIT_ERROR, e);
            }
        });
        return errorAggregatedCommitInfoList;
    }

    @Override
    public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) {
        if (aggregatedCommitInfos == null || aggregatedCommitInfos.isEmpty()) {
            return;
        }
        aggregatedCommitInfos.forEach(aggregatedCommitInfo -> {
            try {
                for (Map.Entry<String, LinkedHashMap<String, String>> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) {
                    this.hadoopFileSystemProxy.deleteFile(entry.getKey());
                }
            }
            catch (Exception e) {
                log.error("abort aggregatedCommitInfo error ", (Throwable)e);
            }
        });
    }

    @Override
    public void close() throws IOException {
        super.close();
        try {
            RedshiftJdbcClient.getInstance(this.pluginConfig).close();
        }
        catch (SQLException e) {
            throw new S3RedshiftJdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.SQL_OPERATION_FAILED, "close redshift jdbc client failed", e);
        }
    }

    private String convertSql(String path) {
        return StringUtils.replace(this.executeSql, "${path}", path);
    }
}

