/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseNode;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseValueReader;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split.ClickhouseSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClickhouseSourceReader
implements SourceReader<SeaTunnelRow, ClickhouseSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(ClickhouseSourceReader.class);
    private final Map<TablePath, List<ClickHouseNode>> servers;
    private ClickHouseClient client;
    private final SourceReader.Context context;
    private volatile boolean noMoreSplits;
    private final Queue<ClickhouseSourceSplit> splitQueue;
    private final Map<TablePath, ClickhouseSourceTable> tables;

    ClickhouseSourceReader(Map<TablePath, List<ClickHouseNode>> servers, SourceReader.Context readerContext, Map<TablePath, ClickhouseSourceTable> tables) {
        this.servers = servers;
        this.context = readerContext;
        this.splitQueue = new ArrayDeque<ClickhouseSourceSplit>();
        this.tables = tables;
    }

    public void open() {
    }

    public void close() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        Object object = output.getCheckpointLock();
        synchronized (object) {
            ClickhouseSourceSplit split = this.splitQueue.poll();
            if (split != null) {
                try (ClickhouseValueReader clickhouseValueReader = null;){
                    ClickhouseSourceTable clickhouseSourceTable = this.tables.get(split.getConfigTablePath());
                    if (clickhouseSourceTable == null) {
                        throw new ClickhouseConnectorException((SeaTunnelErrorCode)ClickhouseConnectorErrorCode.TABLE_NOT_FOUND_ERROR, String.format("Table %s.%s not found in table list of job configuration.", split.getConfigTablePath().getDatabaseName(), split.getConfigTablePath().getTableName()));
                    }
                    CatalogTable catalogTable = clickhouseSourceTable.getCatalogTable();
                    clickhouseValueReader = new ClickhouseValueReader(split, catalogTable.getSeaTunnelRowType(), clickhouseSourceTable);
                    while (clickhouseValueReader.hasNext()) {
                        List<SeaTunnelRow> next = clickhouseValueReader.next();
                        next.forEach(arg_0 -> output.collect(arg_0));
                    }
                }
            } else if (this.noMoreSplits && this.splitQueue.isEmpty()) {
                this.signalNoMoreElement();
            }
        }
    }

    public List<ClickhouseSourceSplit> snapshotState(long checkpointId) throws Exception {
        return new ArrayList<ClickhouseSourceSplit>(this.splitQueue);
    }

    public void addSplits(List<ClickhouseSourceSplit> splits) {
        this.splitQueue.addAll(splits);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplits = true;
    }

    private void signalNoMoreElement() {
        log.info("Closed the bounded ClickHouse source");
        this.context.signalNoMoreElement();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

