/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.internals;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.model.DynamoDBStreamsShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.DynamoDBStreamsProxy;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;

public class DynamoDBStreamsDataFetcher<T>
extends KinesisDataFetcher<T> {
    private final RecordPublisherFactory recordPublisherFactory;

    public DynamoDBStreamsDataFetcher(List<String> streams, SourceFunction.SourceContext<T> sourceContext, RuntimeContext runtimeContext, Properties configProps, KinesisDeserializationSchema<T> deserializationSchema, KinesisShardAssigner shardAssigner) {
        this(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, DynamoDBStreamsProxy::create);
    }

    @VisibleForTesting
    DynamoDBStreamsDataFetcher(List<String> streams, SourceFunction.SourceContext<T> sourceContext, RuntimeContext runtimeContext, Properties configProps, KinesisDeserializationSchema<T> deserializationSchema, KinesisShardAssigner shardAssigner, KinesisDataFetcher.FlinkKinesisProxyFactory flinkKinesisProxyFactory) {
        super(streams, sourceContext, sourceContext.getCheckpointLock(), runtimeContext, configProps, deserializationSchema, shardAssigner, null, null, new AtomicReference<Throwable>(), new ArrayList<KinesisStreamShardState>(), DynamoDBStreamsDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), flinkKinesisProxyFactory, null);
        this.recordPublisherFactory = new PollingRecordPublisherFactory(flinkKinesisProxyFactory);
    }

    @Override
    protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream) {
        return DynamoDBStreamsShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0;
    }

    @Override
    protected RecordPublisher createRecordPublisher(SequenceNumber sequenceNumber, Properties configProps, MetricGroup metricGroup, StreamShardHandle subscribedShard) throws InterruptedException {
        StartingPosition startingPosition = StartingPosition.continueFromSequenceNumber(sequenceNumber);
        return this.recordPublisherFactory.create(startingPosition, this.getConsumerConfiguration(), metricGroup, subscribedShard);
    }
}

