/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarSourceFetcherManager
extends SplitFetcherManager<Message<byte[]>, PulsarPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceFetcherManager.class);
    private final Map<String, Integer> splitFetcherMapping = new HashMap<String, Integer>();
    private final Map<Integer, Boolean> fetcherStatus = new HashMap<Integer, Boolean>();

    public PulsarSourceFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue, Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier, Configuration configuration) {
        super(elementsQueue, splitReaderSupplier, configuration);
    }

    @Override
    public void addSplits(List<PulsarPartitionSplit> splitsToAdd) {
        for (PulsarPartitionSplit split : splitsToAdd) {
            SplitFetcher<Message<byte[]>, PulsarPartitionSplit> fetcher = this.getOrCreateFetcher(split.splitId());
            fetcher.addSplits(Collections.singletonList(split));
            this.startFetcher(fetcher);
        }
    }

    @Override
    protected void startFetcher(SplitFetcher<Message<byte[]>, PulsarPartitionSplit> fetcher) {
        if (this.fetcherStatus.get(fetcher.fetcherId()) != Boolean.TRUE) {
            this.fetcherStatus.put(fetcher.fetcherId(), true);
            super.startFetcher(fetcher);
        }
    }

    public void closeFetcher(String splitId) {
        Integer fetchId = this.splitFetcherMapping.remove(splitId);
        if (fetchId != null) {
            this.fetcherStatus.remove(fetchId);
            SplitFetcher fetcher = (SplitFetcher)this.fetchers.remove(fetchId);
            if (fetcher != null) {
                fetcher.shutdown();
            }
        }
    }

    public void acknowledgeMessages(Map<TopicPartition, MessageId> cursorsToCommit) throws PulsarClientException {
        LOG.debug("Acknowledge messages {}", cursorsToCommit);
        for (Map.Entry<TopicPartition, MessageId> entry : cursorsToCommit.entrySet()) {
            TopicPartition partition = entry.getKey();
            MessageId messageId = entry.getValue();
            SplitFetcher<Message<byte[]>, PulsarPartitionSplit> fetcher = this.getOrCreateFetcher(partition.toString());
            this.triggerAcknowledge(fetcher, partition, messageId);
        }
    }

    private void triggerAcknowledge(SplitFetcher<Message<byte[]>, PulsarPartitionSplit> splitFetcher, TopicPartition partition, MessageId messageId) throws PulsarClientException {
        PulsarPartitionSplitReader splitReader = (PulsarPartitionSplitReader)splitFetcher.getSplitReader();
        splitReader.notifyCheckpointComplete(partition, messageId);
        this.startFetcher(splitFetcher);
    }

    private SplitFetcher<Message<byte[]>, PulsarPartitionSplit> getOrCreateFetcher(String splitId) {
        SplitFetcher fetcher;
        Integer fetcherId = this.splitFetcherMapping.get(splitId);
        if (fetcherId == null) {
            fetcher = this.createSplitFetcher();
        } else {
            fetcher = (SplitFetcher)this.fetchers.get(fetcherId);
            if (fetcher == null) {
                this.fetcherStatus.remove(fetcherId);
                fetcher = this.createSplitFetcher();
            }
        }
        this.splitFetcherMapping.put(splitId, fetcher.fetcherId());
        return fetcher;
    }
}

