package com.ververica.cdc.connectors.mysql.source.reader;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader;
import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader;
import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlRecords;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import io.debezium.connector.mysql.MySqlConnection;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.class */
public class MySqlSplitReader implements SplitReader<SourceRecords, MySqlSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSplitReader.class);
    private final ArrayDeque<MySqlSnapshotSplit> snapshotSplits = new ArrayDeque<>();
    private final ArrayDeque<MySqlBinlogSplit> binlogSplits = new ArrayDeque<>(1);
    private final MySqlSourceConfig sourceConfig;
    private final int subtaskId;
    private final MySqlSourceReaderContext context;
    private final SnapshotPhaseHooks snapshotHooks;

    @Nullable
    private String currentSplitId;

    @Nullable
    private DebeziumReader<SourceRecords, MySqlSplit> currentReader;

    @Nullable
    private SnapshotSplitReader reusedSnapshotReader;

    @Nullable
    private BinlogSplitReader reusedBinlogReader;

    public MySqlSplitReader(MySqlSourceConfig mySqlSourceConfig, int i, MySqlSourceReaderContext mySqlSourceReaderContext, SnapshotPhaseHooks snapshotPhaseHooks) {
        this.sourceConfig = mySqlSourceConfig;
        this.subtaskId = i;
        this.context = mySqlSourceReaderContext;
        this.snapshotHooks = snapshotPhaseHooks;
    }

    public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
        try {
            suspendBinlogReaderIfNeed();
            return pollSplitRecords();
        } catch (InterruptedException e) {
            LOG.warn("fetch data failed.", e);
            throw new IOException(e);
        }
    }

    private void suspendBinlogReaderIfNeed() {
        if (this.currentReader == null || !(this.currentReader instanceof BinlogSplitReader) || !this.context.isBinlogSplitReaderSuspended() || this.currentReader.isFinished()) {
            return;
        }
        ((BinlogSplitReader) this.currentReader).stopBinlogReadTask();
        LOG.info("Suspend binlog reader to wait the binlog split update.");
    }

    private MySqlRecords pollSplitRecords() throws InterruptedException {
        MySqlRecords forRecords;
        if (this.currentReader == null) {
            if (this.binlogSplits.size() > 0) {
                MySqlBinlogSplit poll = this.binlogSplits.poll();
                this.currentSplitId = poll.splitId();
                this.currentReader = getBinlogSplitReader();
                this.currentReader.submitSplit(poll);
            } else if (this.snapshotSplits.size() > 0) {
                MySqlSnapshotSplit poll2 = this.snapshotSplits.poll();
                this.currentSplitId = poll2.splitId();
                this.currentReader = getSnapshotSplitReader();
                this.currentReader.submitSplit(poll2);
            } else {
                LOG.info("No available split to read.");
            }
            Iterator<SourceRecords> pollSplitRecords = this.currentReader.pollSplitRecords();
            return pollSplitRecords == null ? finishedSplit() : forRecords(pollSplitRecords);
        }
        if (this.currentReader instanceof SnapshotSplitReader) {
            Iterator<SourceRecords> pollSplitRecords2 = this.currentReader.pollSplitRecords();
            if (pollSplitRecords2 == null) {
                return finishedSplit();
            }
            if (this.context.isHasAssignedBinlogSplit()) {
                forRecords = forNewAddedTableFinishedSplit(this.currentSplitId, pollSplitRecords2);
                closeSnapshotReader();
                closeBinlogReader();
            } else {
                forRecords = forRecords(pollSplitRecords2);
                MySqlSnapshotSplit poll3 = this.snapshotSplits.poll();
                if (poll3 != null) {
                    this.currentSplitId = poll3.splitId();
                    this.currentReader.submitSplit(poll3);
                } else {
                    closeSnapshotReader();
                }
            }
            return forRecords;
        }
        if (!(this.currentReader instanceof BinlogSplitReader)) {
            throw new IllegalStateException("Unsupported reader type.");
        }
        Iterator<SourceRecords> pollSplitRecords3 = this.currentReader.pollSplitRecords();
        if (pollSplitRecords3 == null) {
            closeBinlogReader();
            return finishedSplit();
        }
        MySqlSnapshotSplit poll4 = this.snapshotSplits.poll();
        if (poll4 != null) {
            closeBinlogReader();
            LOG.info("It's turn to switch next fetch reader to snapshot split reader");
            this.currentSplitId = poll4.splitId();
            this.currentReader = getSnapshotSplitReader();
            this.currentReader.submitSplit(poll4);
        }
        return MySqlRecords.forBinlogRecords(MySqlBinlogSplitAssigner.BINLOG_SPLIT_ID, pollSplitRecords3);
    }

    private MySqlRecords finishedSplit() {
        MySqlRecords forFinishedSplit = MySqlRecords.forFinishedSplit(this.currentSplitId);
        this.currentSplitId = null;
        return forFinishedSplit;
    }

    private MySqlRecords forRecords(Iterator<SourceRecords> it) {
        if (!(this.currentReader instanceof SnapshotSplitReader)) {
            return MySqlRecords.forBinlogRecords(this.currentSplitId, it);
        }
        MySqlRecords forSnapshotRecords = MySqlRecords.forSnapshotRecords(this.currentSplitId, it);
        closeSnapshotReader();
        return forSnapshotRecords;
    }

    private MySqlRecords forNewAddedTableFinishedSplit(String str, Iterator<SourceRecords> it) {
        HashSet hashSet = new HashSet();
        hashSet.add(str);
        hashSet.add(MySqlBinlogSplitAssigner.BINLOG_SPLIT_ID);
        this.currentSplitId = null;
        return new MySqlRecords(str, it, hashSet);
    }

    public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        LOG.info("Handling split change {}", splitsChange);
        for (MySqlSplit mySqlSplit : splitsChange.splits()) {
            if (mySqlSplit.isSnapshotSplit()) {
                this.snapshotSplits.add(mySqlSplit.asSnapshotSplit());
            } else {
                this.binlogSplits.add(mySqlSplit.asBinlogSplit());
            }
        }
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        closeSnapshotReader();
        closeBinlogReader();
    }

    private SnapshotSplitReader getSnapshotSplitReader() {
        if (this.reusedSnapshotReader == null) {
            MySqlConnection createMySqlConnection = DebeziumUtils.createMySqlConnection(this.sourceConfig);
            this.reusedSnapshotReader = new SnapshotSplitReader(new StatefulTaskContext(this.sourceConfig, DebeziumUtils.createBinaryClient(this.sourceConfig.getDbzConfiguration()), createMySqlConnection), this.subtaskId, this.snapshotHooks);
        }
        return this.reusedSnapshotReader;
    }

    private BinlogSplitReader getBinlogSplitReader() {
        if (this.reusedBinlogReader == null) {
            MySqlConnection createMySqlConnection = DebeziumUtils.createMySqlConnection(this.sourceConfig);
            this.reusedBinlogReader = new BinlogSplitReader(new StatefulTaskContext(this.sourceConfig, DebeziumUtils.createBinaryClient(this.sourceConfig.getDbzConfiguration()), createMySqlConnection), this.subtaskId);
        }
        return this.reusedBinlogReader;
    }

    private void closeSnapshotReader() {
        if (this.reusedSnapshotReader != null) {
            LOG.debug("Close snapshot reader {}", this.reusedSnapshotReader.getClass().getCanonicalName());
            this.reusedSnapshotReader.close();
            if (this.reusedSnapshotReader == this.currentReader) {
                this.currentReader = null;
            }
            this.reusedSnapshotReader = null;
        }
    }

    private void closeBinlogReader() {
        if (this.reusedBinlogReader != null) {
            LOG.debug("Close binlog reader {}", this.reusedBinlogReader.getClass().getCanonicalName());
            this.reusedBinlogReader.close();
            if (this.reusedBinlogReader == this.currentReader) {
                this.currentReader = null;
            }
            this.reusedBinlogReader = null;
        }
    }
}
