/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;

import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.util.Preconditions;

@Internal
public class FanOutRecordPublisherFactory
implements RecordPublisherFactory {
    private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
    private final KinesisProxyV2Interface kinesisProxy;

    public FanOutRecordPublisherFactory(KinesisProxyV2Interface kinesisProxy) {
        this.kinesisProxy = kinesisProxy;
    }

    @Override
    public FanOutRecordPublisher create(StartingPosition startingPosition, Properties consumerConfig, MetricGroup metricGroup, StreamShardHandle streamShardHandle) {
        Preconditions.checkNotNull((Object)startingPosition);
        Preconditions.checkNotNull((Object)consumerConfig);
        Preconditions.checkNotNull((Object)metricGroup);
        Preconditions.checkNotNull((Object)streamShardHandle);
        String stream = streamShardHandle.getStreamName();
        FanOutRecordPublisherConfiguration configuration = new FanOutRecordPublisherConfiguration(consumerConfig, Collections.singletonList(stream));
        Optional<String> streamConsumerArn = configuration.getStreamConsumerArn(stream);
        Preconditions.checkState((boolean)streamConsumerArn.isPresent());
        return new FanOutRecordPublisher(startingPosition, streamConsumerArn.get(), streamShardHandle, this.kinesisProxy, configuration, BACKOFF);
    }

    @Override
    public void close() {
        this.kinesisProxy.close();
    }
}

