/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.PartitionInfo;

class DefaultKafkaSinkContext
implements KafkaRecordSerializationSchema.KafkaSinkContext {
    private final int subtaskId;
    private final int numberOfParallelInstances;
    private final Properties kafkaProducerConfig;
    private final Map<String, int[]> cachedPartitions = new HashMap<String, int[]>();

    public DefaultKafkaSinkContext(int subtaskId, int numberOfParallelInstances, Properties kafkaProducerConfig) {
        this.subtaskId = subtaskId;
        this.numberOfParallelInstances = numberOfParallelInstances;
        this.kafkaProducerConfig = kafkaProducerConfig;
    }

    @Override
    public int getParallelInstanceId() {
        return this.subtaskId;
    }

    @Override
    public int getNumberOfParallelInstances() {
        return this.numberOfParallelInstances;
    }

    @Override
    public int[] getPartitionsForTopic(String topic) {
        return this.cachedPartitions.computeIfAbsent(topic, this::fetchPartitionsForTopic);
    }

    private int[] fetchPartitionsForTopic(String topic) {
        try (KafkaProducer producer = new KafkaProducer(this.kafkaProducerConfig);){
            ArrayList partitionsList = new ArrayList(producer.partitionsFor(topic));
            int[] nArray = partitionsList.stream().sorted(Comparator.comparing(PartitionInfo::partition)).map(PartitionInfo::partition).mapToInt(Integer::intValue).toArray();
            return nArray;
        }
    }
}

