/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRecord;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhousePart;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseCatalogUtil;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.DistributedEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClickhouseProxy
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ClickhouseProxy.class);
    private final ClickHouseRequest<?> clickhouseRequest;
    private final ClickHouseClient client;
    private final ClickHouseNode node;
    private final Map<Shard, ClickHouseClient> shardToDataSource = new ConcurrentHashMap<Shard, ClickHouseClient>(16);

    public ClickhouseProxy(ClickHouseNode node) {
        this.client = ClickHouseClient.newInstance(node.getProtocol());
        this.clickhouseRequest = this.client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
        this.node = node;
    }

    public ClickHouseRequest<?> getClickhouseConnection() {
        return this.clickhouseRequest;
    }

    public ClickHouseRequest<?> getClickhouseConnection(Shard shard) {
        ClickHouseClient c = this.shardToDataSource.computeIfAbsent(shard, s -> ClickHouseClient.newInstance(s.getNode().getProtocol()));
        return c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public DistributedEngine getClickhouseDistributedTable(ClickHouseRequest<?> connection, String database, String table) {
        String sql = String.format("select engine_full from system.tables where database = '%s' and name = '%s' and engine = 'Distributed'", database, table);
        try (ClickHouseResponse response = ((ClickHouseRequest)connection.query(sql)).executeAndWait();){
            List records = response.stream().collect(Collectors.toList());
            if (!records.isEmpty()) {
                String sortingKey;
                String localTableDDL;
                String localTableEngine;
                ClickHouseRecord record = (ClickHouseRecord)records.get(0);
                String engineFull = record.getValue(0).asString();
                List infos = Arrays.stream(engineFull.substring(12).split(",")).map(s -> s.replace("'", "").trim()).collect(Collectors.toList());
                String clusterName = (String)infos.get(0);
                String localDatabase = (String)infos.get(1);
                String localTable = ((String)infos.get(2)).replace(")", "").trim();
                String localTableSQL = String.format("select engine,create_table_query,sorting_key from system.tables where database = '%s' and name = '%s'", localDatabase, localTable);
                try (ClickHouseResponse localTableResponse = ((ClickHouseRequest)this.clickhouseRequest.query(localTableSQL)).executeAndWait();){
                    List localTableRecords = localTableResponse.stream().collect(Collectors.toList());
                    if (localTableRecords.isEmpty()) {
                        throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table from clickhouse, resultSet is empty");
                    }
                    localTableEngine = ((ClickHouseRecord)localTableRecords.get(0)).getValue(0).asString();
                    localTableDDL = ((ClickHouseRecord)localTableRecords.get(0)).getValue(1).asString();
                    localTableDDL = this.localizationEngine(localTableEngine, localTableDDL);
                    sortingKey = ((ClickHouseRecord)localTableRecords.get(0)).getValue(2).asString();
                }
                DistributedEngine distributedEngine = new DistributedEngine(clusterName, localDatabase, localTable, localTableEngine, localTableDDL, sortingKey);
                return distributedEngine;
            }
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get distributed table from clickhouse, resultSet is empty");
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get distributed table from clickhouse", e);
        }
    }

    public Map<String, String> getClickhouseTableSchema(String table) {
        ClickHouseRequest<?> request = this.getClickhouseConnection();
        return this.getClickhouseTableSchema(request, table);
    }

    public Map<String, String> getClickhouseTableSchema(ClickHouseRequest<?> request, String table) {
        String sql = "desc " + table;
        LinkedHashMap<String, String> schema = new LinkedHashMap<String, String>();
        try (ClickHouseResponse response = ((ClickHouseRequest)request.query(sql)).executeAndWait();){
            response.records().forEach(r -> {
                if (!"MATERIALIZED".equals(r.getValue(2).asString())) {
                    schema.put(r.getValue(0).asString(), r.getValue(1).asString());
                }
            });
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, "Cannot get table schema from clickhouse", e);
        }
        return schema;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<ClickHouseColumn> getClickHouseColumns(String table) {
        String sql = "desc " + table;
        try (ClickHouseResponse response = ((ClickHouseRequest)this.clickhouseRequest.query(sql)).executeAndWait();){
            List<ClickHouseColumn> list = response.getColumns();
            return list;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, "Cannot get table schema from clickhouse", e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<Shard> getClusterShardList(ClickHouseRequest<?> connection, String clusterName, String database, int port, String username, String password, Map<String, String> options) {
        String sql = "select shard_num,shard_weight,replica_num,host_name,host_address,port from system.clusters where cluster = '" + clusterName + "' and replica_num=1";
        ArrayList<Shard> shardList = new ArrayList<Shard>();
        try (ClickHouseResponse response = ((ClickHouseRequest)connection.query(sql)).executeAndWait();){
            response.records().forEach(r -> shardList.add(new Shard(r.getValue(0).asInteger(), r.getValue(1).asInteger(), r.getValue(2).asInteger(), r.getValue(3).asString(), r.getValue(4).asString(), port, database, username, password, options)));
            ArrayList<Shard> arrayList = shardList;
            return arrayList;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.CLUSTER_LIST_GET_FAILED, "Cannot get cluster shard list from clickhouse", e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public ClickhouseTable getClickhouseTable(ClickHouseRequest<?> clickhouseRequest, String database, String table) {
        String sql = String.format("select engine,create_table_query,engine_full,data_paths,sorting_key from system.tables where database = '%s' and name = '%s'", database, table);
        try (ClickHouseResponse response = ((ClickHouseRequest)clickhouseRequest.query(sql)).executeAndWait();){
            List records = response.stream().collect(Collectors.toList());
            if (records.isEmpty()) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table from clickhouse, resultSet is empty");
            }
            ClickHouseRecord record = (ClickHouseRecord)records.get(0);
            String engine = record.getValue(0).asString();
            String createTableDDL = record.getValue(1).asString();
            String engineFull = record.getValue(2).asString();
            List<String> dataPaths = record.getValue(3).asTuple().stream().map(Object::toString).collect(Collectors.toList());
            String sortingKey = record.getValue(4).asString();
            DistributedEngine distributedEngine = null;
            if ("Distributed".equals(engine)) {
                distributedEngine = this.getClickhouseDistributedTable(clickhouseRequest, database, table);
                createTableDDL = distributedEngine.getTableDDL();
                sortingKey = distributedEngine.getSortingKey();
            }
            ClickhouseTable clickhouseTable = new ClickhouseTable(database, table, distributedEngine, engine, createTableDDL, engineFull, dataPaths, sortingKey, this.getClickhouseTableSchema(clickhouseRequest, table));
            return clickhouseTable;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get clickhouse table", e);
        }
    }

    public String localizationEngine(String engine, String ddl) {
        if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) {
            return ddl.replaceAll("ReplicatedMergeTree(\\([^\\)]*\\))", "MergeTree()");
        }
        return ddl;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean tableExists(String database, String table) {
        String sql = String.format("select count(1) from system.tables where database = '%s' and name = '%s'", database, table);
        try (ClickHouseResponse response = ((ClickHouseRequest)this.clickhouseRequest.query(sql)).executeAndWait();){
            boolean bl = response.firstRecord().getValue(0).asInteger() > 0;
            return bl;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table from clickhouse", e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<String> listDatabases() {
        String sql = "select distinct database from system.tables";
        try (ClickHouseResponse response = ((ClickHouseRequest)this.clickhouseRequest.query(sql)).executeAndWait();){
            Iterable<ClickHouseRecord> records = response.records();
            List<String> list = StreamSupport.stream(records.spliterator(), false).map(r -> r.getValue(0).asString()).collect(Collectors.toList());
            return list;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.LIST_DATABASES_FAILED, "Cannot list databases from clickhouse", e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<String> listTable(String database) {
        String sql = "SELECT name FROM system.tables WHERE database = '" + database + "'";
        try (ClickHouseResponse response = ((ClickHouseRequest)this.clickhouseRequest.query(sql)).executeAndWait();){
            Iterable<ClickHouseRecord> records = response.records();
            List<String> list = StreamSupport.stream(records.spliterator(), false).map(r -> r.getValue(0).asString()).collect(Collectors.toList());
            return list;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.LIST_TABLES_FAILED, "Cannot list tables from clickhouse", e);
        }
    }

    public void executeSql(String sql) {
        try {
            ((ClickHouseRequest.Mutation)this.clickhouseRequest.write().format(ClickHouseFormat.RowBinaryWithNamesAndTypes).query(sql)).execute().get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void createTable(String database, String table, String template, String comment, TableSchema tableSchema) {
        String createTableSql = ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(template, database, table, tableSchema, comment, ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
        log.debug("Create Clickhouse table sql: {}", (Object)createTableSql);
        this.executeSql(createTableSql);
    }

    public Optional<PrimaryKey> getPrimaryKey(String schema, String table) throws SQLException {
        List pkFields;
        String sql = "SELECT\n    name as column_name\nFROM system.columns\nWHERE table = '" + table + "'\n  AND database = '" + schema + "'\n  AND is_in_primary_key = 1\nORDER BY position;";
        try (ClickHouseResponse response = ((ClickHouseRequest)this.clickhouseRequest.query(sql)).executeAndWait();){
            Iterable<ClickHouseRecord> records = response.records();
            pkFields = StreamSupport.stream(records.spliterator(), false).map(r -> r.getValue(0).asString()).collect(Collectors.toList());
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.GET_PRIMARY_KEY_FAILED, "Cannot get primary key from clickhouse", e);
        }
        if (!pkFields.isEmpty()) {
            String pkName = "pk_" + String.join((CharSequence)"_", pkFields);
            return Optional.of(PrimaryKey.of((String)pkName, pkFields));
        }
        return Optional.empty();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean isExistsData(String tableName) throws ExecutionException, InterruptedException {
        String queryDataSql = "SELECT count(*) FROM " + tableName;
        try (ClickHouseResponse response = ((ClickHouseRequest)this.clickhouseRequest.query(queryDataSql)).executeAndWait();){
            boolean bl = response.firstRecord().getValue(0).asInteger() > 0;
            return bl;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table from clickhouse", e);
        }
    }

    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) {
        this.executeSql(ClickhouseCatalogUtil.INSTANCE.getDropTableSql(tablePath, ignoreIfNotExists));
    }

    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
        this.executeSql(ClickhouseCatalogUtil.INSTANCE.getTruncateTableSql(tablePath));
    }

    public void createDatabase(String database, boolean ignoreIfExists) {
        this.executeSql(ClickhouseCatalogUtil.INSTANCE.getCreateDatabaseSql(database, ignoreIfExists));
    }

    public void dropDatabase(String database, boolean ignoreIfNotExists) {
        this.executeSql(ClickhouseCatalogUtil.INSTANCE.getDropDatabaseSql(database, ignoreIfNotExists));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<ClickhousePart> getPartList(String database, String table, Shard shard, List<String> partitionList) {
        String sql = String.format("select name from system.parts where database = '%s' and table = '%s'", database, table);
        if (partitionList != null && !partitionList.isEmpty()) {
            StringJoiner joiner = new StringJoiner("', '", "('", "')");
            partitionList.forEach(joiner::add);
            sql = sql + " and partition in " + joiner.toString();
        }
        sql = sql + " group by name";
        log.debug("get part sql: {}", (Object)sql);
        try (ClickHouseResponse response = ((ClickHouseRequest)this.clickhouseRequest.query(sql)).executeAndWait();){
            Iterable<ClickHouseRecord> records = response.records();
            List<ClickhousePart> list = StreamSupport.stream(records.spliterator(), false).map(r -> new ClickhousePart(r.getValue(0).asString(), database, table, shard)).collect(Collectors.toList());
            return list;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.GET_PART_ERROR, "Cannot get part name from system.parts", e);
        }
    }

    public List<SeaTunnelRow> batchFetchRecords(String sql, TablePath tablePath, SeaTunnelRowType seaTunnelRowType) {
        ArrayList<SeaTunnelRow> seaTunnelRowList = new ArrayList<SeaTunnelRow>();
        log.debug("run query data sql: {}", (Object)sql);
        try (ClickHouseResponse response = ((ClickHouseRequest)this.clickhouseRequest.query(sql)).executeAndWait();){
            response.stream().forEach(record -> {
                SeaTunnelRow seaTunnelRow = ClickhouseUtil.convertToSeaTunnelRow(record, seaTunnelRowType, tablePath.getFullName());
                seaTunnelRowList.add(seaTunnelRow);
            });
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.QUERY_DATA_ERROR, String.format("Query data with sql error. sql: %s, message: %s", sql, e.getMessage()), e);
        }
        return seaTunnelRowList;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean isComplexSql(String sql) {
        try {
            String explainSql = "EXPLAIN " + sql;
            try (ClickHouseResponse response = ((ClickHouseRequest)this.getClickhouseConnection().query(explainSql)).executeAndWait();){
                List explainOutput = response.stream().map(record -> record.getValue(0).asString()).collect(Collectors.toList());
                for (String explainLine : explainOutput) {
                    if (explainLine.startsWith("ReadFrom") || !explainLine.contains("JOIN") && !explainLine.contains("UNION") && !explainLine.contains("GROUP BY") && !explainLine.contains("LIMIT") && !explainLine.contains("Sorting") && !explainLine.contains("Aggregating") && !explainLine.contains("Merging") && !explainLine.contains("subquery")) continue;
                    log.info("Complex SQL detected, explain line: {}", (Object)explainLine);
                    boolean bl = true;
                    return bl;
                }
                boolean bl = false;
                return bl;
            }
        }
        catch (Exception e) {
            log.warn("Failed to analyze SQL complexity using EXPLAIN, fallback to default true. e: {}", (Object)e.getMessage());
            return true;
        }
    }

    @Override
    public void close() {
        if (this.client != null) {
            this.client.close();
        }
        this.shardToDataSource.values().forEach(ClickHouseClient::close);
    }
}

