/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitState;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceReader
extends SingleThreadMultiplexSourceReaderBase<ConsumerRecord<byte[], byte[]>, SeaTunnelRow, KafkaSourceSplit, KafkaSourceSplitState> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSourceReader.class);
    private final SourceReader.Context context;
    private final KafkaSourceConfig kafkaSourceConfig;
    private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> checkpointOffsetMap;
    private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits;

    KafkaSourceReader(BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue, SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit> splitFetcherManager, RecordEmitter<ConsumerRecord<byte[], byte[]>, SeaTunnelRow, KafkaSourceSplitState> recordEmitter, SourceReaderOptions options, KafkaSourceConfig kafkaSourceConfig, SourceReader.Context context) {
        super(elementsQueue, splitFetcherManager, recordEmitter, options, context);
        this.kafkaSourceConfig = kafkaSourceConfig;
        this.context = context;
        this.checkpointOffsetMap = Collections.synchronizedSortedMap(new TreeMap());
        this.offsetsOfFinishedSplits = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
    }

    @Override
    protected void onSplitFinished(Map<String, KafkaSourceSplitState> finishedSplitIds) {
        finishedSplitIds.forEach((ignored, splitState) -> {
            if (splitState.getCurrentOffset() > 0L) {
                this.offsetsOfFinishedSplits.put(splitState.getTopicPartition(), new OffsetAndMetadata(splitState.getCurrentOffset()));
            } else if (splitState.getEndOffset() > 0L) {
                this.offsetsOfFinishedSplits.put(splitState.getTopicPartition(), new OffsetAndMetadata(splitState.getEndOffset()));
            }
        });
    }

    @Override
    protected KafkaSourceSplitState initializedState(KafkaSourceSplit split) {
        return new KafkaSourceSplitState(split);
    }

    @Override
    protected KafkaSourceSplit toSplitType(String splitId, KafkaSourceSplitState splitState) {
        return splitState.toKafkaSourceSplit();
    }

    @Override
    public List<KafkaSourceSplit> snapshotState(long checkpointId) {
        List<KafkaSourceSplit> sourceSplits = super.snapshotState(checkpointId);
        if (!this.kafkaSourceConfig.isCommitOnCheckpoint()) {
            return sourceSplits;
        }
        if (sourceSplits.isEmpty() && this.offsetsOfFinishedSplits.isEmpty()) {
            logger.debug("checkpoint {} does not have an offset to submit for splits", (Object)checkpointId);
            this.checkpointOffsetMap.put(checkpointId, Collections.emptyMap());
        } else {
            Map offsetAndMetadataMap = this.checkpointOffsetMap.computeIfAbsent(checkpointId, id -> new HashMap());
            for (KafkaSourceSplit kafkaSourceSplit : sourceSplits) {
                if (kafkaSourceSplit.getStartOffset() < 0L) continue;
                offsetAndMetadataMap.put(kafkaSourceSplit.getTopicPartition(), new OffsetAndMetadata(kafkaSourceSplit.getStartOffset()));
            }
            offsetAndMetadataMap.putAll(this.offsetsOfFinishedSplits);
        }
        return sourceSplits;
    }

    public void notifyCheckpointComplete(long checkpointId) {
        logger.debug("Committing offsets for checkpoint {}", (Object)checkpointId);
        if (!this.kafkaSourceConfig.isCommitOnCheckpoint()) {
            logger.debug("Submitting offsets after snapshot completion is prohibited");
            return;
        }
        Map committedPartitions = (Map)this.checkpointOffsetMap.get(checkpointId);
        if (committedPartitions == null) {
            logger.debug("Offsets for checkpoint {} have already been committed.", (Object)checkpointId);
            return;
        }
        if (committedPartitions.isEmpty()) {
            logger.debug("There are no offsets to commit for checkpoint {}.", (Object)checkpointId);
            this.removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
            return;
        }
        ((KafkaSourceFetcherManager)this.splitFetcherManager).commitOffsets(committedPartitions, (ignored, e) -> {
            if (e != null) {
                logger.warn("Failed to commit consumer offsets for checkpoint {}", (Object)checkpointId, (Object)e);
                return;
            }
            this.offsetsOfFinishedSplits.keySet().removeIf(committedPartitions::containsKey);
            this.removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
        });
    }

    private void removeAllOffsetsToCommitUpToCheckpoint(long checkpointId) {
        while (!this.checkpointOffsetMap.isEmpty() && this.checkpointOffsetMap.firstKey() <= checkpointId) {
            this.checkpointOffsetMap.remove(this.checkpointOffsetMap.firstKey());
        }
    }
}

