/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.internals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.HashKeyRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.SequenceNumberRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Factory;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class KinesisDataFetcher<T> {
    public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = (shard, subtasks) -> shard.hashCode();
    private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class);
    private final Properties configProps;
    private final List<String> streams;
    private final KinesisDeserializationSchema<T> deserializationSchema;
    private final KinesisShardAssigner shardAssigner;
    private final MetricGroup consumerMetricGroup;
    private final RuntimeContext runtimeContext;
    private final int totalNumberOfConsumerSubtasks;
    private final int indexOfThisConsumerSubtask;
    private final ExecutorService shardConsumersExecutor;
    private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds;
    private final List<KinesisStreamShardState> subscribedShardsState;
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final AtomicReference<Throwable> error;
    private final FlinkKinesisProxyFactory kinesisProxyFactory;
    private final FlinkKinesisProxyV2Factory kinesisProxyV2Factory;
    private final KinesisProxyInterface kinesis;
    private final RecordPublisherFactory recordPublisherFactory;
    private final CompletableFuture<Void> cancelFuture = new CompletableFuture();
    private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
    private volatile boolean running = true;
    private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
    private final WatermarkTracker watermarkTracker;
    private final RecordEmitter recordEmitter;
    private boolean isIdle;
    private ConcurrentHashMap<Integer, ShardWatermarkState> shardWatermarks = new ConcurrentHashMap();
    private long lastWatermark = Long.MIN_VALUE;
    private long nextWatermark = Long.MIN_VALUE;
    private long shardIdleIntervalMillis = -1L;

    public KinesisDataFetcher(List<String> streams, SourceFunction.SourceContext<T> sourceContext, RuntimeContext runtimeContext, Properties configProps, KinesisDeserializationSchema<T> deserializationSchema, KinesisShardAssigner shardAssigner, AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner, WatermarkTracker watermarkTracker) {
        this(streams, sourceContext, sourceContext.getCheckpointLock(), runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker, new AtomicReference<Throwable>(), new ArrayList<KinesisStreamShardState>(), KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), KinesisProxy::create, KinesisProxyV2Factory::createKinesisProxyV2);
    }

    @VisibleForTesting
    protected KinesisDataFetcher(List<String> streams, SourceFunction.SourceContext<T> sourceContext, Object checkpointLock, RuntimeContext runtimeContext, Properties configProps, KinesisDeserializationSchema<T> deserializationSchema, KinesisShardAssigner shardAssigner, AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner, WatermarkTracker watermarkTracker, AtomicReference<Throwable> error, List<KinesisStreamShardState> subscribedShardsState, HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds, FlinkKinesisProxyFactory kinesisProxyFactory, @Nullable FlinkKinesisProxyV2Factory kinesisProxyV2Factory) {
        this.streams = (List)Preconditions.checkNotNull(streams);
        this.configProps = (Properties)Preconditions.checkNotNull((Object)configProps);
        this.sourceContext = (SourceFunction.SourceContext)Preconditions.checkNotNull(sourceContext);
        this.checkpointLock = Preconditions.checkNotNull((Object)checkpointLock);
        this.runtimeContext = (RuntimeContext)Preconditions.checkNotNull((Object)runtimeContext);
        this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
        this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
        this.deserializationSchema = (KinesisDeserializationSchema)Preconditions.checkNotNull(deserializationSchema);
        this.shardAssigner = (KinesisShardAssigner)Preconditions.checkNotNull((Object)shardAssigner);
        this.periodicWatermarkAssigner = periodicWatermarkAssigner;
        this.watermarkTracker = watermarkTracker;
        this.kinesisProxyFactory = (FlinkKinesisProxyFactory)Preconditions.checkNotNull((Object)kinesisProxyFactory);
        this.kinesisProxyV2Factory = kinesisProxyV2Factory;
        this.kinesis = kinesisProxyFactory.create(configProps);
        this.recordPublisherFactory = this.createRecordPublisherFactory();
        this.consumerMetricGroup = runtimeContext.getMetricGroup().addGroup("KinesisConsumer");
        this.error = (AtomicReference)Preconditions.checkNotNull(error);
        this.subscribedShardsState = (List)Preconditions.checkNotNull(subscribedShardsState);
        this.subscribedStreamsToLastDiscoveredShardIds = (Map)Preconditions.checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
        this.shardConsumersExecutor = this.createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
        this.recordEmitter = this.createRecordEmitter(configProps);
        StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(configProps, streams);
    }

    private RecordEmitter createRecordEmitter(Properties configProps) {
        if (this.periodicWatermarkAssigner != null && this.watermarkTracker != null) {
            int queueCapacity = Integer.parseInt(configProps.getProperty("flink.watermark.sync.queue.capacity", Integer.toString(100)));
            return new AsyncKinesisRecordEmitter(queueCapacity);
        }
        return new SyncKinesisRecordEmitter();
    }

    protected ShardConsumer<T> createShardConsumer(Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, MetricGroup metricGroup, KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
        return new ShardConsumer<T>(this, this.createRecordPublisher(lastSequenceNum, this.configProps, metricGroup, subscribedShard), subscribedShardStateIndex, subscribedShard, lastSequenceNum, new ShardConsumerMetricsReporter(metricGroup), shardDeserializer);
    }

    protected RecordPublisherFactory createRecordPublisherFactory() {
        ConsumerConfigConstants.RecordPublisherType recordPublisherType = ConsumerConfigConstants.RecordPublisherType.valueOf(this.configProps.getProperty("flink.stream.recordpublisher", ConsumerConfigConstants.RecordPublisherType.POLLING.name()));
        switch (recordPublisherType) {
            case EFO: {
                return new FanOutRecordPublisherFactory(this.kinesisProxyV2Factory.create(this.configProps));
            }
        }
        return new PollingRecordPublisherFactory(this.kinesisProxyFactory);
    }

    protected RecordPublisher createRecordPublisher(SequenceNumber sequenceNumber, Properties configProps, MetricGroup metricGroup, StreamShardHandle subscribedShard) throws InterruptedException {
        StartingPosition startingPosition = AWSUtil.getStartingPosition(sequenceNumber, configProps);
        return this.recordPublisherFactory.create(startingPosition, configProps, metricGroup, subscribedShard);
    }

    public void runFetcher() throws Exception {
        if (!this.running) {
            return;
        }
        boolean hasShards = false;
        StringBuilder streamsWithNoShardsFound = new StringBuilder();
        for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : this.subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
            if (streamToLastDiscoveredShardEntry.getValue() != null) {
                hasShards = true;
                continue;
            }
            streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", ");
        }
        if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
            LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}", (Object)this.indexOfThisConsumerSubtask, (Object)streamsWithNoShardsFound.toString());
        }
        if (!hasShards) {
            throw new RuntimeException("No shards can be found for all subscribed streams: " + this.streams);
        }
        for (int seededStateIndex = 0; seededStateIndex < this.subscribedShardsState.size(); ++seededStateIndex) {
            KinesisStreamShardState seededShardState = this.subscribedShardsState.get(seededStateIndex);
            if (seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) continue;
            if (LOG.isInfoEnabled()) {
                LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}", new Object[]{this.indexOfThisConsumerSubtask, seededShardState.getStreamShardHandle().toString(), seededShardState.getLastProcessedSequenceNum(), seededStateIndex});
            }
            StreamShardHandle streamShardHandle = this.subscribedShardsState.get(seededStateIndex).getStreamShardHandle();
            KinesisDeserializationSchema<T> shardDeserializationSchema = this.getClonedDeserializationSchema();
            shardDeserializationSchema.open(RuntimeContextInitializationContextAdapters.deserializationAdapter((RuntimeContext)this.runtimeContext, metricGroup -> this.consumerMetricGroup.addGroup("subtaskId", String.valueOf(this.indexOfThisConsumerSubtask)).addGroup("shardId", streamShardHandle.getShard().getShardId()).addGroup("user")));
            this.shardConsumersExecutor.submit(this.createShardConsumer(seededStateIndex, streamShardHandle, this.subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum(), this.registerShardMetricGroup(this.consumerMetricGroup, this.subscribedShardsState.get(seededStateIndex)), shardDeserializationSchema));
        }
        if (this.periodicWatermarkAssigner != null) {
            long periodicWatermarkIntervalMillis = this.runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
            if (periodicWatermarkIntervalMillis > 0L) {
                ProcessingTimeService timerService = ((StreamingRuntimeContext)this.runtimeContext).getProcessingTimeService();
                LOG.info("Starting periodic watermark emitter with interval {}", (Object)periodicWatermarkIntervalMillis);
                new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis).start();
                if (this.watermarkTracker != null) {
                    long watermarkSyncMillis = Long.parseLong(this.getConsumerConfiguration().getProperty("flink.watermark.sync.interval", Long.toString(30000L)));
                    this.watermarkTracker.setUpdateTimeoutMillis(watermarkSyncMillis * 3L);
                    this.watermarkTracker.open(this.runtimeContext);
                    new WatermarkSyncCallback(timerService, watermarkSyncMillis).start();
                    long lookaheadMillis = Long.parseLong(this.getConsumerConfiguration().getProperty("flink.watermark.lookahead.millis", Long.toString(0L)));
                    this.recordEmitter.setMaxLookaheadMillis(Math.max(lookaheadMillis, watermarkSyncMillis * 3L));
                    Runnable recordEmitterRunnable = new Runnable(){

                        @Override
                        public void run() {
                            try {
                                KinesisDataFetcher.this.recordEmitter.run();
                            }
                            catch (Throwable error) {
                                KinesisDataFetcher.this.stopWithError(error);
                            }
                        }
                    };
                    Thread thread = new Thread(recordEmitterRunnable);
                    thread.setName("recordEmitter-" + this.runtimeContext.getTaskNameWithSubtasks());
                    thread.setDaemon(true);
                    thread.start();
                }
            }
            this.shardIdleIntervalMillis = Long.parseLong(this.getConsumerConfiguration().getProperty("flink.shard.idle.interval", Long.toString(-1L)));
        }
        long discoveryIntervalMillis = Long.parseLong(this.configProps.getProperty("flink.shard.discovery.intervalmillis", Long.toString(10000L)));
        if (this.numberOfActiveShards.get() == 0) {
            LOG.info("Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...", (Object)this.indexOfThisConsumerSubtask);
            this.sourceContext.markAsTemporarilyIdle();
        }
        while (this.running) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...", (Object)this.indexOfThisConsumerSubtask);
            }
            List<StreamShardHandle> newShardsDueToResharding = this.discoverNewShardsToSubscribe();
            for (StreamShardHandle shard : newShardsDueToResharding) {
                KinesisStreamShardState newShardState = new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
                int newStateIndex = this.registerNewSubscribedShardState(newShardState);
                if (LOG.isInfoEnabled()) {
                    LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming the shard from sequence number {} with ShardConsumer {}", new Object[]{this.indexOfThisConsumerSubtask, newShardState.getStreamShardHandle().toString(), newShardState.getLastProcessedSequenceNum(), newStateIndex});
                }
                StreamShardHandle streamShardHandle = newShardState.getStreamShardHandle();
                KinesisDeserializationSchema<T> shardDeserializationSchema = this.getClonedDeserializationSchema();
                shardDeserializationSchema.open(RuntimeContextInitializationContextAdapters.deserializationAdapter((RuntimeContext)this.runtimeContext, metricGroup -> this.consumerMetricGroup.addGroup("subtaskId", String.valueOf(this.indexOfThisConsumerSubtask)).addGroup("shardId", streamShardHandle.getShard().getShardId()).addGroup("user")));
                this.shardConsumersExecutor.submit(this.createShardConsumer(newStateIndex, newShardState.getStreamShardHandle(), newShardState.getLastProcessedSequenceNum(), this.registerShardMetricGroup(this.consumerMetricGroup, newShardState), shardDeserializationSchema));
            }
            if (!this.running || discoveryIntervalMillis == 0L) continue;
            try {
                this.cancelFuture.get(discoveryIntervalMillis, TimeUnit.MILLISECONDS);
                LOG.debug("Cancelled discovery");
            }
            catch (TimeoutException timeoutException) {}
        }
        try {
            this.awaitTermination();
        }
        catch (InterruptedException ie) {
            this.error.compareAndSet(null, ie);
        }
        Throwable throwable = this.error.get();
        if (throwable != null) {
            if (throwable instanceof Exception) {
                throw (Exception)throwable;
            }
            if (throwable instanceof Error) {
                throw (Error)throwable;
            }
            throw new Exception(throwable);
        }
    }

    public HashMap<StreamShardMetadata, SequenceNumber> snapshotState() {
        assert (Thread.holdsLock(this.checkpointLock));
        HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<StreamShardMetadata, SequenceNumber>();
        for (KinesisStreamShardState shardWithState : this.subscribedShardsState) {
            stateSnapshot.put(shardWithState.getStreamShardMetadata(), shardWithState.getLastProcessedSequenceNum());
        }
        return stateSnapshot;
    }

    public void shutdownFetcher() {
        LOG.info("Starting shutdown of shard consumer threads and AWS SDK resources of subtask {} ...", (Object)this.indexOfThisConsumerSubtask, (Object)this.error.get());
        this.running = false;
        try {
            try {
                this.deregisterStreamConsumer();
            }
            catch (Exception e) {
                LOG.warn("Encountered exception deregistering stream consumers", (Throwable)e);
            }
            try {
                this.closeRecordPublisherFactory();
            }
            catch (Exception e) {
                LOG.warn("Encountered exception closing record publisher factory", (Throwable)e);
            }
        }
        finally {
            this.gracefulShutdownShardConsumers();
            this.cancelFuture.complete(null);
            if (this.watermarkTracker != null) {
                this.watermarkTracker.close();
            }
            this.recordEmitter.stop();
        }
        LOG.info("Shutting down the shard consumer threads of subtask {} ...", (Object)this.indexOfThisConsumerSubtask);
    }

    @VisibleForTesting
    protected void closeRecordPublisherFactory() {
        this.recordPublisherFactory.close();
    }

    @VisibleForTesting
    protected void deregisterStreamConsumer() {
        StreamConsumerRegistrarUtil.deregisterStreamConsumers(this.configProps, this.streams);
    }

    private void gracefulShutdownShardConsumers() {
        this.shardConsumersExecutor.shutdown();
    }

    boolean isRunning() {
        return this.running;
    }

    public void awaitTermination() throws InterruptedException {
        while (!this.shardConsumersExecutor.awaitTermination(1L, TimeUnit.MINUTES)) {
        }
    }

    protected void stopWithError(Throwable throwable) {
        if (this.error.compareAndSet(null, throwable)) {
            this.shutdownFetcher();
        }
    }

    public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
        String lastSeenShardIdOfStream = this.subscribedStreamsToLastDiscoveredShardIds.get(stream);
        if (lastSeenShardIdOfStream == null) {
            this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
        } else if (this.shouldAdvanceLastDiscoveredShardId(shardId, lastSeenShardIdOfStream)) {
            this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
        }
    }

    protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream) {
        return StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0;
    }

    public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException {
        LinkedList<StreamShardHandle> newShardsToSubscribe = new LinkedList<StreamShardHandle>();
        GetShardListResult shardListResult = this.kinesis.getShardList(this.subscribedStreamsToLastDiscoveredShardIds);
        if (shardListResult.hasRetrievedShards()) {
            Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();
            for (String stream : streamsWithNewShards) {
                List<StreamShardHandle> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
                for (StreamShardHandle newShard : newShardsOfStream) {
                    int hashCode = this.shardAssigner.assign(newShard, this.totalNumberOfConsumerSubtasks);
                    if (!KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(hashCode, this.totalNumberOfConsumerSubtasks, this.indexOfThisConsumerSubtask)) continue;
                    newShardsToSubscribe.add(newShard);
                }
                this.advanceLastDiscoveredShardOfStream(stream, shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId());
            }
        }
        return newShardsToSubscribe;
    }

    protected Properties getConsumerConfiguration() {
        return this.configProps;
    }

    private KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
        try {
            return (KinesisDeserializationSchema)InstantiationUtil.clone(this.deserializationSchema, (ClassLoader)this.runtimeContext.getUserCodeClassLoader());
        }
        catch (IOException | ClassNotFoundException ex) {
            throw new RuntimeException(ex);
        }
    }

    protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
        ShardWatermarkState sws = this.shardWatermarks.get(shardStateIndex);
        Preconditions.checkNotNull((Object)sws, (String)"shard watermark state initialized in registerNewSubscribedShardState");
        Watermark watermark = null;
        if (sws.periodicWatermarkAssigner != null) {
            recordTimestamp = sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp);
            watermark = sws.periodicWatermarkAssigner.getCurrentWatermark();
        }
        sws.lastRecordTimestamp = recordTimestamp;
        sws.lastUpdated = this.getCurrentTimeMillis();
        RecordWrapper recordWrapper = new RecordWrapper(record, recordTimestamp);
        recordWrapper.shardStateIndex = shardStateIndex;
        recordWrapper.lastSequenceNumber = lastSequenceNumber;
        recordWrapper.watermark = watermark;
        try {
            sws.emitQueue.put(recordWrapper);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitRecordAndUpdateState(RecordWrapper<T> rw) {
        Object object = this.checkpointLock;
        synchronized (object) {
            if (rw.getValue() != null) {
                this.sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp);
                ShardWatermarkState sws = this.shardWatermarks.get(rw.shardStateIndex);
                sws.lastEmittedRecordWatermark = rw.watermark;
            } else {
                LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.", (Object)rw.lastSequenceNumber, (Object)this.subscribedShardsState.get(rw.shardStateIndex).getStreamShardHandle());
            }
            this.updateState(rw.shardStateIndex, rw.lastSequenceNumber);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
        Object object = this.checkpointLock;
        synchronized (object) {
            this.subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
            if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
                LOG.info("Subtask {} has reached the end of subscribed shard: {}", (Object)this.indexOfThisConsumerSubtask, (Object)this.subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
                if (this.numberOfActiveShards.decrementAndGet() == 0) {
                    LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...", (Object)this.indexOfThisConsumerSubtask);
                    this.sourceContext.markAsTemporarilyIdle();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
        Object object = this.checkpointLock;
        synchronized (object) {
            int shardStateIndex;
            ShardWatermarkState sws;
            this.subscribedShardsState.add(newSubscribedShardState);
            if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
                this.numberOfActiveShards.incrementAndGet();
            }
            if ((sws = this.shardWatermarks.get(shardStateIndex = this.subscribedShardsState.size() - 1)) == null) {
                sws = new ShardWatermarkState();
                try {
                    sws.periodicWatermarkAssigner = (AssignerWithPeriodicWatermarks)InstantiationUtil.clone(this.periodicWatermarkAssigner);
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
                }
                sws.emitQueue = this.recordEmitter.getQueue(shardStateIndex);
                sws.lastUpdated = this.getCurrentTimeMillis();
                sws.lastRecordTimestamp = Long.MIN_VALUE;
                this.shardWatermarks.put(shardStateIndex, sws);
            }
            return shardStateIndex;
        }
    }

    @VisibleForTesting
    protected long getCurrentTimeMillis() {
        return System.currentTimeMillis();
    }

    @VisibleForTesting
    protected void emitWatermark() {
        LOG.debug("Evaluating watermark for subtask {} time {}", (Object)this.indexOfThisConsumerSubtask, (Object)this.getCurrentTimeMillis());
        long potentialWatermark = Long.MAX_VALUE;
        long potentialNextWatermark = Long.MAX_VALUE;
        long idleTime = this.shardIdleIntervalMillis > 0L ? this.getCurrentTimeMillis() - this.shardIdleIntervalMillis : Long.MAX_VALUE;
        for (Map.Entry<Integer, ShardWatermarkState> e : this.shardWatermarks.entrySet()) {
            Watermark w = e.getValue().lastEmittedRecordWatermark;
            if (w == null || e.getValue().lastUpdated < idleTime && e.getValue().emitQueue.getSize() <= 0 && w.getTimestamp() <= this.lastWatermark) continue;
            potentialWatermark = Math.min(potentialWatermark, w.getTimestamp());
            RecordEmitter.RecordQueue q = e.getValue().emitQueue;
            RecordWrapper nextRecord = (RecordWrapper)((Object)q.peek());
            Watermark nextWatermark = nextRecord != null ? nextRecord.watermark : w;
            potentialNextWatermark = Math.min(potentialNextWatermark, nextWatermark.getTimestamp());
        }
        if (potentialWatermark == Long.MAX_VALUE) {
            if (this.shardWatermarks.isEmpty() || this.shardIdleIntervalMillis > 0L) {
                LOG.info("No active shard for subtask {}, marking the source idle.", (Object)this.indexOfThisConsumerSubtask);
                this.sourceContext.markAsTemporarilyIdle();
                this.isIdle = true;
            }
        } else {
            if (potentialWatermark > this.lastWatermark) {
                LOG.debug("Emitting watermark {} from subtask {}", (Object)potentialWatermark, (Object)this.indexOfThisConsumerSubtask);
                this.sourceContext.emitWatermark(new Watermark(potentialWatermark));
                this.lastWatermark = potentialWatermark;
                this.isIdle = false;
            }
            this.nextWatermark = potentialNextWatermark;
        }
    }

    private MetricGroup registerShardMetricGroup(MetricGroup metricGroup, KinesisStreamShardState shardState) {
        return metricGroup.addGroup("stream", shardState.getStreamShardHandle().getStreamName()).addGroup("shardId", shardState.getStreamShardHandle().getShard().getShardId());
    }

    public static boolean isThisSubtaskShouldSubscribeTo(int shardHash, int totalNumberOfConsumerSubtasks, int indexOfThisConsumerSubtask) {
        return Math.abs(shardHash % totalNumberOfConsumerSubtasks) == indexOfThisConsumerSubtask;
    }

    @VisibleForTesting
    protected ExecutorService createShardConsumersThreadPool(final String subtaskName) {
        return Executors.newCachedThreadPool(new ThreadFactory(){
            private final AtomicLong threadCount = new AtomicLong(0L);

            @Override
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName("shardConsumers-" + subtaskName + "-thread-" + this.threadCount.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
        });
    }

    @VisibleForTesting
    public List<KinesisStreamShardState> getSubscribedShardsState() {
        return this.subscribedShardsState;
    }

    protected static HashMap<String, String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) {
        HashMap<String, String> initial = new HashMap<String, String>();
        for (String stream : streams) {
            initial.put(stream, null);
        }
        return initial;
    }

    public static StreamShardMetadata convertToStreamShardMetadata(StreamShardHandle streamShardHandle) {
        StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
        streamShardMetadata.setStreamName(streamShardHandle.getStreamName());
        streamShardMetadata.setShardId(streamShardHandle.getShard().getShardId());
        streamShardMetadata.setParentShardId(streamShardHandle.getShard().getParentShardId());
        streamShardMetadata.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
        if (streamShardHandle.getShard().getHashKeyRange() != null) {
            streamShardMetadata.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
            streamShardMetadata.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
        }
        if (streamShardHandle.getShard().getSequenceNumberRange() != null) {
            streamShardMetadata.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
            streamShardMetadata.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
        }
        return streamShardMetadata;
    }

    public static StreamShardHandle convertToStreamShardHandle(StreamShardMetadata streamShardMetadata) {
        Shard shard = new Shard();
        shard.withShardId(streamShardMetadata.getShardId());
        shard.withParentShardId(streamShardMetadata.getParentShardId());
        shard.withAdjacentParentShardId(streamShardMetadata.getAdjacentParentShardId());
        HashKeyRange hashKeyRange = new HashKeyRange();
        hashKeyRange.withStartingHashKey(streamShardMetadata.getStartingHashKey());
        hashKeyRange.withEndingHashKey(streamShardMetadata.getEndingHashKey());
        shard.withHashKeyRange(hashKeyRange);
        SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
        sequenceNumberRange.withStartingSequenceNumber(streamShardMetadata.getStartingSequenceNumber());
        sequenceNumberRange.withEndingSequenceNumber(streamShardMetadata.getEndingSequenceNumber());
        shard.withSequenceNumberRange(sequenceNumberRange);
        return new StreamShardHandle(streamShardMetadata.getStreamName(), shard);
    }

    private class WatermarkSyncCallback
    implements ProcessingTimeService.ProcessingTimeCallback {
        private static final long LOG_INTERVAL_MILLIS = 60000L;
        private final ProcessingTimeService timerService;
        private final long interval;
        private long lastGlobalWatermark = Long.MIN_VALUE;
        private long propagatedLocalWatermark = Long.MIN_VALUE;
        private int stalledWatermarkIntervalCount = 0;
        private long lastLogged;

        WatermarkSyncCallback(ProcessingTimeService timerService, long interval) {
            this.timerService = (ProcessingTimeService)Preconditions.checkNotNull((Object)timerService);
            this.interval = interval;
            MetricGroup shardMetricsGroup = KinesisDataFetcher.this.consumerMetricGroup.addGroup("subtaskId", String.valueOf(KinesisDataFetcher.this.indexOfThisConsumerSubtask));
            shardMetricsGroup.gauge("localWatermark", () -> KinesisDataFetcher.this.nextWatermark);
            shardMetricsGroup.gauge("globalWatermark", () -> this.lastGlobalWatermark);
        }

        public void start() {
            LOG.info("Registering watermark tracker with interval {}", (Object)this.interval);
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, (ProcessingTimeService.ProcessingTimeCallback)this);
        }

        public void onProcessingTime(long timestamp) {
            if (KinesisDataFetcher.this.nextWatermark != Long.MIN_VALUE) {
                long globalWatermark = this.lastGlobalWatermark;
                if (!KinesisDataFetcher.this.isIdle || KinesisDataFetcher.this.nextWatermark != this.propagatedLocalWatermark) {
                    globalWatermark = KinesisDataFetcher.this.watermarkTracker.updateWatermark(KinesisDataFetcher.this.nextWatermark);
                    this.propagatedLocalWatermark = KinesisDataFetcher.this.nextWatermark;
                } else {
                    LOG.info("WatermarkSyncCallback subtask: {} is idle", (Object)KinesisDataFetcher.this.indexOfThisConsumerSubtask);
                }
                if (timestamp - this.lastLogged > 60000L) {
                    this.lastLogged = System.currentTimeMillis();
                    LOG.info("WatermarkSyncCallback subtask: {} local watermark: {}, global watermark: {}, delta: {} timeouts: {}, emitter: {}", new Object[]{KinesisDataFetcher.this.indexOfThisConsumerSubtask, KinesisDataFetcher.this.nextWatermark, globalWatermark, KinesisDataFetcher.this.nextWatermark - globalWatermark, KinesisDataFetcher.this.watermarkTracker.getUpdateTimeoutCount(), KinesisDataFetcher.this.recordEmitter.printInfo()});
                    if (globalWatermark == KinesisDataFetcher.this.nextWatermark && globalWatermark == this.lastGlobalWatermark && this.stalledWatermarkIntervalCount++ > 5) {
                        this.stalledWatermarkIntervalCount = 0;
                        for (Map.Entry e : KinesisDataFetcher.this.shardWatermarks.entrySet()) {
                            RecordEmitter.RecordQueue q = ((ShardWatermarkState)e.getValue()).emitQueue;
                            RecordWrapper nextRecord = (RecordWrapper)((Object)q.peek());
                            if (nextRecord == null) continue;
                            LOG.info("stalled watermark {} key {} next watermark {} next timestamp {}", new Object[]{KinesisDataFetcher.this.nextWatermark, e.getKey(), nextRecord.watermark, nextRecord.timestamp});
                        }
                    }
                }
                this.lastGlobalWatermark = globalWatermark;
                KinesisDataFetcher.this.recordEmitter.setCurrentWatermark(globalWatermark);
            }
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, (ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }

    private class PeriodicWatermarkEmitter
    implements ProcessingTimeService.ProcessingTimeCallback {
        private final ProcessingTimeService timerService;
        private final long interval;

        PeriodicWatermarkEmitter(ProcessingTimeService timerService, long autoWatermarkInterval) {
            this.timerService = (ProcessingTimeService)Preconditions.checkNotNull((Object)timerService);
            this.interval = autoWatermarkInterval;
        }

        public void start() {
            LOG.debug("registering periodic watermark timer with interval {}", (Object)this.interval);
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, (ProcessingTimeService.ProcessingTimeCallback)this);
        }

        public void onProcessingTime(long timestamp) {
            KinesisDataFetcher.this.emitWatermark();
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, (ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }

    private static class ShardWatermarkState<T> {
        private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
        private RecordEmitter.RecordQueue<RecordWrapper<T>> emitQueue;
        private volatile long lastRecordTimestamp;
        private volatile long lastUpdated;
        private volatile Watermark lastEmittedRecordWatermark;

        private ShardWatermarkState() {
        }
    }

    private class SyncKinesisRecordEmitter
    extends AsyncKinesisRecordEmitter {
        private final ConcurrentHashMap<Integer, RecordEmitter.RecordQueue<RecordWrapper<T>>> queues;

        private SyncKinesisRecordEmitter() {
            this.queues = new ConcurrentHashMap();
        }

        @Override
        public RecordEmitter.RecordQueue<RecordWrapper<T>> getQueue(int producerIndex) {
            return this.queues.computeIfAbsent(producerIndex, key -> new RecordEmitter.RecordQueue<RecordWrapper<T>>(){

                @Override
                public void put(RecordWrapper<T> record) {
                    SyncKinesisRecordEmitter.this.emit(record, this);
                }

                @Override
                public int getSize() {
                    return 0;
                }

                @Override
                public RecordWrapper<T> peek() {
                    return null;
                }
            });
        }
    }

    private class AsyncKinesisRecordEmitter
    extends RecordEmitter<RecordWrapper<T>> {
        private AsyncKinesisRecordEmitter() {
            this(100);
        }

        private AsyncKinesisRecordEmitter(int queueCapacity) {
            super(queueCapacity);
        }

        @Override
        public void emit(RecordWrapper<T> record, RecordEmitter.RecordQueue<RecordWrapper<T>> queue) {
            KinesisDataFetcher.this.emitRecordAndUpdateState(record);
        }
    }

    private static class RecordWrapper<T>
    extends TimestampedValue<T> {
        int shardStateIndex;
        SequenceNumber lastSequenceNumber;
        long timestamp;
        Watermark watermark;

        private RecordWrapper(T record, long timestamp) {
            super(record, timestamp);
            this.timestamp = timestamp;
        }

        public long getTimestamp() {
            return this.timestamp;
        }
    }

    public static interface FlinkKinesisProxyV2Factory {
        public KinesisProxyV2Interface create(Properties var1);
    }

    public static interface FlinkKinesisProxyFactory {
        public KinesisProxyInterface create(Properties var1);
    }
}

