/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;

import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhousePart;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split.ClickhouseSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClickhouseValueReader
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(ClickhouseValueReader.class);
    private static final long serialVersionUID = 4588012013447713463L;
    private final ClickhouseSourceSplit clickhouseSourceSplit;
    private final SeaTunnelRowType rowTypeInfo;
    private final ClickhouseSourceTable clickhouseSourceTable;
    private StreamValueReader streamValueReader;
    private ClickhouseProxy proxy;
    protected int currentPartIndex = 0;
    private List<SeaTunnelRow> rowBatch;

    public ClickhouseValueReader(ClickhouseSourceSplit clickhouseSourceSplit, SeaTunnelRowType seaTunnelRowType, ClickhouseSourceTable clickhouseSourceTable) {
        this.clickhouseSourceSplit = clickhouseSourceSplit;
        this.rowTypeInfo = seaTunnelRowType;
        this.clickhouseSourceTable = clickhouseSourceTable;
        this.proxy = new ClickhouseProxy(clickhouseSourceSplit.getShard().getNode());
    }

    public boolean hasNext() {
        if (this.shouldUseStreamReader()) {
            if (this.streamValueReader == null) {
                this.streamValueReader = new StreamValueReader();
            }
            return this.streamValueReader.hasNext();
        }
        if (this.clickhouseSourceTable.isSqlStrategyRead()) {
            return this.sqlBatchStrategyRead();
        }
        return this.partBatchStrategyRead();
    }

    public List<SeaTunnelRow> next() {
        if (this.rowBatch == null) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)ClickhouseConnectorErrorCode.SHOULD_NEVER_HAPPEN, "never happen error !");
        }
        return this.rowBatch;
    }

    private boolean partBatchStrategyRead() {
        List<ClickhousePart> parts = this.clickhouseSourceSplit.getParts();
        int partSize = parts.size();
        if (this.currentPartIndex >= partSize) {
            return false;
        }
        ClickhousePart currentPart = parts.get(this.currentPartIndex);
        if (currentPart.isEndOfPart()) {
            ++this.currentPartIndex;
            return this.currentPartIndex < partSize && this.partBatchStrategyRead();
        }
        try {
            String query = this.buildPartQuery(currentPart);
            this.rowBatch = this.proxy.batchFetchRecords(query, this.clickhouseSourceTable.getTablePath(), this.rowTypeInfo);
            log.debug("SplitId: {}, partName: {} read rowBatch size: {}", new Object[]{this.clickhouseSourceSplit.getSplitId(), currentPart.getName(), this.rowBatch.size()});
            if (this.rowBatch.isEmpty()) {
                currentPart.setEndOfPart(true);
                ++this.currentPartIndex;
                return this.currentPartIndex < partSize && this.partBatchStrategyRead();
            }
            currentPart.setOffset(currentPart.getOffset() + this.rowBatch.size());
            return true;
        }
        catch (Exception e) {
            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.QUERY_DATA_ERROR, String.format("Failed to read data from part %s, shard: %s, splitId: %s, message: %s", currentPart.getName(), currentPart.getShard().getNode(), this.clickhouseSourceSplit.getSplitId(), e.getMessage()), e);
        }
    }

    private boolean sqlBatchStrategyRead() {
        String query = this.buildSqlQuery();
        try {
            this.rowBatch = this.proxy.batchFetchRecords(query, this.clickhouseSourceTable.getTablePath(), this.rowTypeInfo);
            this.clickhouseSourceSplit.setSqlOffset(this.clickhouseSourceSplit.getSqlOffset() + this.rowBatch.size());
            return !this.rowBatch.isEmpty();
        }
        catch (Exception e) {
            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.QUERY_DATA_ERROR, String.format("Failed to read data from sql %s, shard: %s, splitId %s, message: %s", query, this.clickhouseSourceSplit.getShard().getNode(), this.clickhouseSourceSplit.getSplitId(), e.getMessage()), e);
        }
    }

    public void close() {
        if (this.proxy != null) {
            this.proxy.close();
        }
        if (this.streamValueReader != null) {
            this.streamValueReader.close();
        }
    }

    private boolean shouldUseStreamReader() {
        return this.clickhouseSourceTable.isComplexSql() || StringUtils.isEmpty(this.clickhouseSourceTable.getClickhouseTable().getSortingKey());
    }

    private String buildPartQuery(ClickhousePart part) {
        TablePath tablePath = TablePath.of((String)part.getDatabase(), (String)part.getTable());
        String whereClause = String.format("_part = '%s'", part.getName());
        if (StringUtils.isNotEmpty(this.clickhouseSourceTable.getFilterQuery())) {
            whereClause = whereClause + " AND (" + this.clickhouseSourceTable.getFilterQuery() + ")";
        }
        String orderByClause = "";
        if (StringUtils.isNotEmpty(this.clickhouseSourceTable.getClickhouseTable().getSortingKey())) {
            orderByClause = " ORDER BY " + this.clickhouseSourceTable.getClickhouseTable().getSortingKey();
        }
        String sql = StringUtils.isNotEmpty(orderByClause) ? String.format("SELECT * FROM %s.%s WHERE %s %s LIMIT %d, %d WITH TIES", tablePath.getDatabaseName(), tablePath.getTableName(), whereClause, orderByClause, part.getOffset(), this.clickhouseSourceTable.getBatchSize()) : String.format("SELECT * FROM %s.%s WHERE %s", tablePath.getDatabaseName(), tablePath.getTableName(), whereClause);
        return sql;
    }

    private String buildSqlQuery() {
        String orderByClause = "";
        if (StringUtils.isNotEmpty(this.clickhouseSourceTable.getClickhouseTable().getSortingKey())) {
            orderByClause = " ORDER BY " + this.clickhouseSourceTable.getClickhouseTable().getSortingKey();
        }
        String executeSql = StringUtils.isNotEmpty(orderByClause) ? String.format("SELECT * FROM (%s) AS t %s LIMIT %d, %d WITH TIES", this.clickhouseSourceSplit.getSplitQuery(), orderByClause, this.clickhouseSourceSplit.getSqlOffset(), this.clickhouseSourceTable.getBatchSize()) : String.format("SELECT * FROM (%s) AS t", this.clickhouseSourceSplit.getSplitQuery());
        return executeSql;
    }

    private class StreamValueReader
    implements Serializable {
        private static final long serialVersionUID = -7037116446966849773L;
        private final BlockingQueue<SeaTunnelRow> rowQueue;
        private AtomicBoolean eos = new AtomicBoolean(false);
        private final List<String> sqlList;
        private final Thread asyncReadThread = new Thread(new Runnable(){

            @Override
            public void run() {
                String executeSql = "";
                try {
                    Iterator iterator = StreamValueReader.this.sqlList.iterator();
                    while (iterator.hasNext()) {
                        String sql;
                        executeSql = sql = (String)iterator.next();
                        ClickHouseResponse response = ((ClickHouseRequest)ClickhouseValueReader.this.proxy.getClickhouseConnection().query(sql)).executeAndWait();
                        Throwable throwable = null;
                        try {
                            response.records().forEach(record -> {
                                SeaTunnelRow seaTunnelRow = ClickhouseUtil.convertToSeaTunnelRow(record, ClickhouseValueReader.this.rowTypeInfo, ClickhouseValueReader.this.clickhouseSourceTable.getTablePath().getFullName());
                                try {
                                    StreamValueReader.this.rowQueue.put(seaTunnelRow);
                                }
                                catch (InterruptedException e) {
                                    throw new ClickhouseConnectorException((SeaTunnelErrorCode)ClickhouseConnectorErrorCode.ROW_BATCH_GET_FAILED, e);
                                }
                            });
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        finally {
                            if (response == null) continue;
                            if (throwable != null) {
                                try {
                                    response.close();
                                }
                                catch (Throwable throwable3) {
                                    throwable.addSuppressed(throwable3);
                                }
                                continue;
                            }
                            response.close();
                        }
                    }
                }
                catch (ClickHouseException e) {
                    throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.QUERY_DATA_ERROR, String.format("Failed to execute query: %s", executeSql), e);
                }
                finally {
                    StreamValueReader.this.eos.set(true);
                    log.info("StreamValueReader finished reading data");
                }
            }
        }, "clickhouse-stream-reader-" + ClickhouseValueReader.access$700(ClickhouseValueReader.this).getSplitId());

        public StreamValueReader() {
            this.rowQueue = new LinkedBlockingDeque<SeaTunnelRow>(ClickhouseValueReader.this.clickhouseSourceTable.getBatchSize());
            this.sqlList = this.buildSqlList();
            this.asyncReadThread.start();
            log.info("StreamValueReader start.");
        }

        public boolean hasNext() {
            ArrayList<SeaTunnelRow> rows = new ArrayList<SeaTunnelRow>();
            while (!this.eos.get() || !this.rowQueue.isEmpty()) {
                if (!this.rowQueue.isEmpty()) {
                    try {
                        SeaTunnelRow seaTunnelRow = this.rowQueue.take();
                        rows.add(seaTunnelRow);
                        if (rows.size() < ClickhouseValueReader.this.clickhouseSourceTable.getBatchSize()) continue;
                        ClickhouseValueReader.this.rowBatch = rows;
                        return true;
                    }
                    catch (InterruptedException e) {
                        throw new ClickhouseConnectorException((SeaTunnelErrorCode)ClickhouseConnectorErrorCode.ROW_BATCH_GET_FAILED, e);
                    }
                }
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException interruptedException) {}
            }
            if (!rows.isEmpty()) {
                ClickhouseValueReader.this.rowBatch = rows;
                return true;
            }
            return false;
        }

        private List<String> buildSqlList() {
            if (ClickhouseValueReader.this.clickhouseSourceTable.isSqlStrategyRead()) {
                return Collections.singletonList(ClickhouseValueReader.this.clickhouseSourceSplit.getSplitQuery());
            }
            return ClickhouseValueReader.this.clickhouseSourceSplit.getParts().stream().map(x$0 -> ClickhouseValueReader.this.buildPartQuery(x$0)).collect(Collectors.toList());
        }

        public void close() {
            if (this.rowQueue != null) {
                this.rowQueue.clear();
            }
            this.eos.set(true);
        }
    }
}

