/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.dynamic.source.reader;

import java.io.IOException;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.kafka.clients.consumer.ConsumerRecord;

@Internal
public class KafkaPartitionSplitReaderWrapper
extends KafkaPartitionSplitReader
implements AutoCloseable {
    private final String kafkaClusterId;

    public KafkaPartitionSplitReaderWrapper(Properties props, SourceReaderContext context, KafkaSourceReaderMetrics kafkaSourceReaderMetrics, String kafkaClusterId) {
        super(props, context, kafkaSourceReaderMetrics);
        this.kafkaClusterId = kafkaClusterId;
    }

    @Override
    public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {
        return new WrappedRecordsWithSplitIds(super.fetch(), this.kafkaClusterId);
    }

    private static final class WrappedRecordsWithSplitIds
    implements RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> {
        private final RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> delegate;
        private final String kafkaClusterId;

        public WrappedRecordsWithSplitIds(RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> delegate, String kafkaClusterId) {
            this.delegate = delegate;
            this.kafkaClusterId = kafkaClusterId;
        }

        @Nullable
        public String nextSplit() {
            String nextSplit = this.delegate.nextSplit();
            if (nextSplit == null) {
                return nextSplit;
            }
            return this.kafkaClusterId + "-" + nextSplit;
        }

        @Nullable
        public ConsumerRecord<byte[], byte[]> nextRecordFromSplit() {
            return (ConsumerRecord)this.delegate.nextRecordFromSplit();
        }

        public Set<String> finishedSplits() {
            return this.delegate.finishedSplits().stream().map(finishedSplit -> this.kafkaClusterId + "-" + finishedSplit).collect(Collectors.toSet());
        }

        public void recycle() {
            this.delegate.recycle();
        }
    }
}

