/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.base.source.reader;

import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.base.source.meta.events.StreamSplitMetaEvent;
import com.ververica.cdc.connectors.base.source.meta.events.StreamSplitMetaRequestEvent;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplitState;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplitState;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class IncrementalSourceReader<T, C extends SourceConfig>
extends SingleThreadMultiplexSourceReaderBase<SourceRecords, T, SourceSplitBase, SourceSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceReader.class);
    private final Map<String, SnapshotSplit> finishedUnackedSplits;
    private final Map<String, StreamSplit> uncompletedStreamSplits;
    private final int subtaskId;
    private final SourceSplitSerializer sourceSplitSerializer;
    private final C sourceConfig;
    private final DataSourceDialect<C> dialect;

    public IncrementalSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue, Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier, RecordEmitter<SourceRecords, T, SourceSplitState> recordEmitter, Configuration config, SourceReaderContext context, C sourceConfig, SourceSplitSerializer sourceSplitSerializer, DataSourceDialect<C> dialect) {
        super(elementQueue, new SingleThreadFetcherManager(elementQueue, splitReaderSupplier::get), recordEmitter, config, context);
        this.sourceConfig = sourceConfig;
        this.finishedUnackedSplits = new HashMap<String, SnapshotSplit>();
        this.uncompletedStreamSplits = new HashMap<String, StreamSplit>();
        this.subtaskId = context.getIndexOfSubtask();
        this.sourceSplitSerializer = (SourceSplitSerializer)Preconditions.checkNotNull((Object)sourceSplitSerializer);
        this.dialect = dialect;
    }

    public void start() {
        if (this.getNumberOfCurrentlyAssignedSplits() == 0) {
            this.context.sendSplitRequest();
        }
    }

    protected SourceSplitState initializedState(SourceSplitBase split) {
        if (split.isSnapshotSplit()) {
            return new SnapshotSplitState(split.asSnapshotSplit());
        }
        return new StreamSplitState(split.asStreamSplit());
    }

    public List<SourceSplitBase> snapshotState(long checkpointId) {
        List stateSplits = super.snapshotState(checkpointId);
        stateSplits.addAll(this.finishedUnackedSplits.values());
        stateSplits.addAll(this.uncompletedStreamSplits.values());
        return stateSplits;
    }

    protected void onSplitFinished(Map<String, SourceSplitState> finishedSplitIds) {
        for (SourceSplitState splitState : finishedSplitIds.values()) {
            SourceSplitBase sourceSplit = splitState.toSourceSplit();
            Preconditions.checkState((boolean)sourceSplit.isSnapshotSplit(), (Object)String.format("Only snapshot split could finish, but the actual split is stream split %s", sourceSplit));
            this.finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit());
        }
        this.reportFinishedSnapshotSplitsIfNeed();
        this.context.sendSplitRequest();
    }

    public void addSplits(List<SourceSplitBase> splits) {
        ArrayList<SourceSplitBase> unfinishedSplits = new ArrayList<SourceSplitBase>();
        for (SourceSplitBase split : splits) {
            if (split.isSnapshotSplit()) {
                SnapshotSplit snapshotSplit = split.asSnapshotSplit();
                if (snapshotSplit.isSnapshotReadFinished()) {
                    this.finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
                    continue;
                }
                unfinishedSplits.add(split);
                continue;
            }
            if (!split.asStreamSplit().isCompletedSplit()) {
                this.uncompletedStreamSplits.put(split.splitId(), split.asStreamSplit());
                this.requestStreamSplitMetaIfNeeded(split.asStreamSplit());
                continue;
            }
            this.uncompletedStreamSplits.remove(split.splitId());
            StreamSplit streamSplit = this.discoverTableSchemasForStreamSplit(split.asStreamSplit());
            unfinishedSplits.add(streamSplit);
        }
        this.reportFinishedSnapshotSplitsIfNeed();
        super.addSplits(unfinishedSplits);
    }

    private StreamSplit discoverTableSchemasForStreamSplit(StreamSplit split) {
        String splitId = split.splitId();
        if (split.getTableSchemas().isEmpty()) {
            try {
                Map<TableId, TableChanges.TableChange> tableSchemas = this.dialect.discoverDataCollectionSchemas(this.sourceConfig);
                LOG.info("The table schema discovery for stream split {} success", (Object)splitId);
                return StreamSplit.fillTableSchemas(split, tableSchemas);
            }
            catch (Exception e2) {
                LOG.error("Failed to obtains table schemas due to {}", (Object)e2.getMessage());
                throw new FlinkRuntimeException((Throwable)e2);
            }
        }
        LOG.warn("The stream split {} has table schemas yet, skip the table schema discovery", (Object)split);
        return split;
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
            FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent)sourceEvent;
            LOG.debug("The subtask {} receives ack event for {} from enumerator.", (Object)this.subtaskId, ackEvent.getFinishedSplits());
            for (String splitId : ackEvent.getFinishedSplits()) {
                this.finishedUnackedSplits.remove(splitId);
            }
        } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
            LOG.debug("The subtask {} receives request to report finished snapshot splits.", (Object)this.subtaskId);
            this.reportFinishedSnapshotSplitsIfNeed();
        } else if (sourceEvent instanceof StreamSplitMetaEvent) {
            LOG.debug("The subtask {} receives stream meta with group id {}.", (Object)this.subtaskId, (Object)((StreamSplitMetaEvent)sourceEvent).getMetaGroupId());
            this.fillMetaDataForStreamSplit((StreamSplitMetaEvent)sourceEvent);
        } else {
            super.handleSourceEvents(sourceEvent);
        }
    }

    private void fillMetaDataForStreamSplit(StreamSplitMetaEvent metadataEvent) {
        StreamSplit streamSplit = this.uncompletedStreamSplits.get(metadataEvent.getSplitId());
        if (streamSplit != null) {
            int expectedMetaGroupId;
            int receivedMetaGroupId = metadataEvent.getMetaGroupId();
            if (receivedMetaGroupId == (expectedMetaGroupId = IncrementalSourceReader.getNextMetaGroupId(streamSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize()))) {
                List<FinishedSnapshotSplitInfo> metaDataGroup = metadataEvent.getMetaGroup().stream().map(this.sourceSplitSerializer::deserialize).collect(Collectors.toList());
                this.uncompletedStreamSplits.put(streamSplit.splitId(), StreamSplit.appendFinishedSplitInfos(streamSplit, metaDataGroup));
                LOG.info("Fill meta data of group {} to stream split", (Object)metaDataGroup.size());
            } else {
                LOG.warn("Received out of oder metadata event for split {}, the received meta group id is {}, but expected is {}, ignore it", new Object[]{metadataEvent.getSplitId(), receivedMetaGroupId, expectedMetaGroupId});
            }
            this.requestStreamSplitMetaIfNeeded(streamSplit);
        } else {
            LOG.warn("Received metadata event for split {}, but the uncompleted split map does not contain it", (Object)metadataEvent.getSplitId());
        }
    }

    private void requestStreamSplitMetaIfNeeded(StreamSplit streamSplit) {
        String splitId = streamSplit.splitId();
        if (!streamSplit.isCompletedSplit()) {
            int nextMetaGroupId = IncrementalSourceReader.getNextMetaGroupId(streamSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize());
            StreamSplitMetaRequestEvent splitMetaRequestEvent = new StreamSplitMetaRequestEvent(splitId, nextMetaGroupId);
            this.context.sendSourceEventToCoordinator((SourceEvent)splitMetaRequestEvent);
        } else {
            LOG.info("The meta of stream split {} has been collected success", (Object)splitId);
            this.addSplits(Collections.singletonList(streamSplit));
        }
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (!this.finishedUnackedSplits.isEmpty()) {
            HashMap<String, Offset> finishedOffsets = new HashMap<String, Offset>();
            for (SnapshotSplit split : this.finishedUnackedSplits.values()) {
                finishedOffsets.put(split.splitId(), split.getHighWatermark());
            }
            FinishedSnapshotSplitsReportEvent reportEvent = new FinishedSnapshotSplitsReportEvent(finishedOffsets);
            this.context.sendSourceEventToCoordinator((SourceEvent)reportEvent);
            LOG.debug("The subtask {} reports offsets of finished snapshot splits {}.", (Object)this.subtaskId, finishedOffsets);
        }
    }

    public static int getNextMetaGroupId(int receivedMetaNum, int metaGroupSize) {
        Preconditions.checkState((metaGroupSize > 0 ? 1 : 0) != 0);
        return receivedMetaNum % metaGroupSize == 0 ? receivedMetaNum / metaGroupSize : receivedMetaNum / metaGroupSize + 1;
    }

    protected SourceSplitBase toSplitType(String splitId, SourceSplitState splitState) {
        return splitState.toSourceSplit();
    }
}

