/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowSnapshotRecordDeserializer;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowStreamingRecordDeserializer;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.RowKeyWithTs;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils.TableKeyRangeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.cdc.CDCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.key.RowKey;
import org.tikv.common.meta.TiTableInfo;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.shade.com.google.protobuf.ByteString;
import org.tikv.txn.KVClient;

public class TiDBSourceReader
implements SourceReader<SeaTunnelRow, TiDBSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(TiDBSourceReader.class);
    private final SourceReader.Context context;
    private final TiDBSourceConfig config;
    private final List<TiDBSourceSplit> sourceSplits;
    private final Map<TiDBSourceSplit, CDCClient> cacheCDCClient;
    private SeaTunnelRowSnapshotRecordDeserializer snapshotRecordDeserializer;
    private SeaTunnelRowStreamingRecordDeserializer streamingRecordDeserializer;
    private transient TiSession session;
    private transient TreeMap<RowKeyWithTs, Cdcpb.Event.Row> preWrites;
    private transient TreeMap<RowKeyWithTs, Cdcpb.Event.Row> commits;
    private transient BlockingQueue<Cdcpb.Event.Row> committedEvents;
    private CatalogTable catalogTable;

    public TiDBSourceReader(SourceReader.Context context, TiDBSourceConfig config, CatalogTable catalogTable) {
        this.context = context;
        this.config = config;
        this.sourceSplits = new ArrayList<TiDBSourceSplit>();
        this.cacheCDCClient = new HashMap<TiDBSourceSplit, CDCClient>();
        this.preWrites = new TreeMap();
        this.commits = new TreeMap();
        this.committedEvents = new LinkedBlockingQueue<Cdcpb.Event.Row>();
        this.catalogTable = catalogTable;
    }

    public void open() throws Exception {
        this.session = TiSession.create((TiConfiguration)this.config.getTiConfiguration());
        TiTableInfo tableInfo = this.session.getCatalog().getTable(this.config.getDatabaseName(), this.config.getTableName());
        this.snapshotRecordDeserializer = new SeaTunnelRowSnapshotRecordDeserializer(tableInfo, this.catalogTable);
        this.streamingRecordDeserializer = new SeaTunnelRowStreamingRecordDeserializer(tableInfo, this.catalogTable);
    }

    public void close() throws IOException {
        if (this.session != null) {
            try {
                this.session.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        if (this.config.getStartupMode() == StartupMode.INITIAL) {
            for (TiDBSourceSplit sourceSplit : this.sourceSplits) {
                if (sourceSplit.isSnapshotCompleted()) continue;
                this.snapshotEvents(sourceSplit, output);
                sourceSplit.setSnapshotCompleted(true);
            }
        }
        for (TiDBSourceSplit sourceSplit : this.sourceSplits) {
            this.captureStreamingEvents(sourceSplit, output);
        }
    }

    protected void snapshotEvents(TiDBSourceSplit split, Collector<SeaTunnelRow> output) throws Exception {
        log.info(String.format("[%s] Snapshot events start.", split.splitId()));
        Coprocessor.KeyRange keyRange = split.getKeyRange();
        try (KVClient scanClient = this.session.createKVClient();){
            long startTs = this.session.getTimestamp().getVersion();
            ByteString start = split.getSnapshotStart();
            while (true) {
                List segment;
                if ((segment = scanClient.scan(start, keyRange.getEnd(), startTs)).isEmpty()) {
                    split.setResolvedTs(startTs);
                    break;
                }
                for (Kvrpcpb.KvPair record : segment) {
                    if (!TableKeyRangeUtils.isRecordKey(record.getKey().toByteArray())) continue;
                    this.snapshotRecordDeserializer.deserialize(record, output);
                }
                start = RowKey.toRawKey((ByteString)((Kvrpcpb.KvPair)segment.get(segment.size() - 1)).getKey()).next().toByteString();
                split.setSnapshotStart(start);
            }
        }
    }

    protected void captureStreamingEvents(TiDBSourceSplit split, Collector<SeaTunnelRow> output) throws Exception {
        Cdcpb.Event.Row row;
        long resolvedTs = split.getResolvedTs();
        log.info("Capture streaming event from resolvedTs:{}", (Object)resolvedTs);
        CDCClient cdcClient = this.getCdcClient(split, resolvedTs);
        for (int i = 0; i < this.config.getBatchSize() && (row = cdcClient.get()) != null; ++i) {
            this.handleRow(row);
        }
        resolvedTs = cdcClient.getMaxResolvedTs();
        if (this.commits.size() > 0) {
            this.flushRows(resolvedTs);
        }
        while (!this.committedEvents.isEmpty()) {
            Cdcpb.Event.Row row2 = this.committedEvents.take();
            this.streamingRecordDeserializer.deserialize(row2, output);
        }
        log.info("Capture streaming event next resolvedTs:{}", (Object)resolvedTs);
        split.setResolvedTs(resolvedTs);
    }

    private CDCClient getCdcClient(TiDBSourceSplit split, long finalResolvedTs) {
        CDCClient cdcClient = this.cacheCDCClient.computeIfAbsent(split, k -> {
            CDCClient client = new CDCClient(this.session, k.getKeyRange());
            client.start(finalResolvedTs);
            return client;
        });
        return cdcClient;
    }

    public List<TiDBSourceSplit> snapshotState(long checkpointId) throws Exception {
        return new ArrayList<TiDBSourceSplit>(this.sourceSplits);
    }

    public void addSplits(List<TiDBSourceSplit> splits) {
        this.sourceSplits.addAll(splits);
    }

    public void handleNoMoreSplits() {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }

    private void handleRow(Cdcpb.Event.Row row) {
        if (!TableKeyRangeUtils.isRecordKey(row.getKey().toByteArray())) {
            return;
        }
        log.debug("binlog record, type: {}, data: {}", (Object)row.getType(), (Object)row);
        switch (row.getType()) {
            case COMMITTED: {
                this.preWrites.put(RowKeyWithTs.ofStart(row), row);
                this.commits.put(RowKeyWithTs.ofCommit(row), row);
                break;
            }
            case COMMIT: {
                this.commits.put(RowKeyWithTs.ofCommit(row), row);
                break;
            }
            case PREWRITE: {
                this.preWrites.put(RowKeyWithTs.ofStart(row), row);
                break;
            }
            case ROLLBACK: {
                this.preWrites.remove(RowKeyWithTs.ofStart(row));
                break;
            }
            default: {
                log.warn("Unsupported row type:" + row.getType());
            }
        }
    }

    protected void flushRows(long resolvedTs) throws Exception {
        while (!this.commits.isEmpty() && this.commits.firstKey().getTimestamp() <= resolvedTs) {
            Cdcpb.Event.Row commitRow = this.commits.pollFirstEntry().getValue();
            Cdcpb.Event.Row prewriteRow = this.preWrites.remove(RowKeyWithTs.ofStart(commitRow));
            this.committedEvents.offer(prewriteRow);
        }
    }
}

