/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class FlinkKinesisConsumer<T>
extends RichParallelSourceFunction<T>
implements ResultTypeQueryable<T>,
CheckpointedFunction {
    private static final long serialVersionUID = 4724006128720664870L;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class);
    private final List<String> streams;
    private final Properties configProps;
    private final KinesisDeserializationSchema<T> deserializer;
    private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
    private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
    private WatermarkTracker watermarkTracker;
    private transient KinesisDataFetcher<T> fetcher;
    private transient HashMap<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> sequenceNumsToRestore;
    private volatile boolean running = true;
    private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
    private transient ListState<Tuple2<StreamShardMetadata, SequenceNumber>> sequenceNumsStateForCheckpoint;

    public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps) {
        this(stream, new KinesisDeserializationSchemaWrapper<T>(deserializer), configProps);
    }

    public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
        this(Collections.singletonList(stream), deserializer, configProps);
    }

    public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
        Preconditions.checkNotNull(streams, (String)"streams can not be null");
        Preconditions.checkArgument((streams.size() != 0 ? 1 : 0) != 0, (Object)"must be consuming at least 1 stream");
        Preconditions.checkArgument((!streams.contains("") ? 1 : 0) != 0, (Object)"stream names cannot be empty Strings");
        this.streams = streams;
        this.configProps = (Properties)Preconditions.checkNotNull((Object)configProps, (String)"configProps can not be null");
        KinesisConfigUtil.validateConsumerConfiguration(this.configProps, streams);
        Preconditions.checkNotNull(deserializer, (String)"deserializer can not be null");
        Preconditions.checkArgument((boolean)InstantiationUtil.isSerializable(deserializer), (Object)("The provided deserialization schema is not serializable: " + deserializer.getClass().getName() + ". Please check that it does not contain references to non-serializable instances."));
        this.deserializer = deserializer;
        StreamConsumerRegistrarUtil.eagerlyRegisterStreamConsumers(configProps, streams);
        if (LOG.isInfoEnabled()) {
            StringBuilder sb = new StringBuilder();
            for (String stream : streams) {
                sb.append(stream).append(", ");
            }
            LOG.info("Flink Kinesis Consumer is going to read the following streams: {}", (Object)sb.toString());
        }
    }

    public KinesisShardAssigner getShardAssigner() {
        return this.shardAssigner;
    }

    public void setShardAssigner(KinesisShardAssigner shardAssigner) {
        this.shardAssigner = (KinesisShardAssigner)Preconditions.checkNotNull((Object)shardAssigner, (String)"function can not be null");
        ClosureCleaner.clean((Object)shardAssigner, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
    }

    public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() {
        return this.periodicWatermarkAssigner;
    }

    public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
        this.periodicWatermarkAssigner = periodicWatermarkAssigner;
        ClosureCleaner.clean(this.periodicWatermarkAssigner, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
    }

    public WatermarkTracker getWatermarkTracker() {
        return this.watermarkTracker;
    }

    public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
        this.watermarkTracker = watermarkTracker;
        ClosureCleaner.clean((Object)this.watermarkTracker, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        KinesisDataFetcher<T> fetcher = this.createFetcher(this.streams, sourceContext, this.getRuntimeContext(), this.configProps, this.deserializer);
        List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
        for (StreamShardHandle shard : allShards) {
            StreamShardMetadata.EquivalenceWrapper kinesisStreamShard = new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));
            if (this.sequenceNumsToRestore != null) {
                if (this.sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
                    fetcher.registerNewSubscribedShardState(new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, this.sequenceNumsToRestore.get(kinesisStreamShard)));
                    if (!LOG.isInfoEnabled()) continue;
                    LOG.info("Subtask {} is seeding the fetcher with restored shard {}, starting state set to the restored sequence number {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), this.sequenceNumsToRestore.get(kinesisStreamShard)});
                    continue;
                }
                fetcher.registerNewSubscribedShardState(new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
                if (!LOG.isInfoEnabled()) continue;
                LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}, starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM", (Object)this.getRuntimeContext().getIndexOfThisSubtask(), (Object)shard.toString());
                continue;
            }
            SentinelSequenceNumber startingSeqNum = ConsumerConfigConstants.InitialPosition.valueOf(this.configProps.getProperty("flink.stream.initpos", ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
            fetcher.registerNewSubscribedShardState(new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));
            if (!LOG.isInfoEnabled()) continue;
            LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get()});
        }
        if (!this.running) {
            return;
        }
        this.fetcher = fetcher;
        fetcher.runFetcher();
        fetcher.awaitTermination();
        sourceContext.close();
    }

    public void cancel() {
        this.running = false;
        KinesisDataFetcher<T> fetcher = this.fetcher;
        if (fetcher != null) {
            try {
                fetcher.shutdownFetcher();
            }
            catch (Exception e) {
                LOG.warn("Error while closing Kinesis data fetcher", (Throwable)e);
            }
        }
    }

    public void close() throws Exception {
        this.cancel();
        KinesisDataFetcher<T> fetcher = this.fetcher;
        if (fetcher != null) {
            fetcher.awaitTermination();
        }
        this.fetcher = null;
        super.close();
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        TupleTypeInfo shardsStateTypeInfo = new TupleTypeInfo(new TypeInformation[]{TypeInformation.of(StreamShardMetadata.class), TypeInformation.of(SequenceNumber.class)});
        this.sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor(sequenceNumsStateStoreName, (TypeInformation)shardsStateTypeInfo));
        if (context.isRestored()) {
            if (this.sequenceNumsToRestore == null) {
                this.sequenceNumsToRestore = new HashMap();
                for (Tuple2 kinesisSequenceNumber : (Iterable)this.sequenceNumsStateForCheckpoint.get()) {
                    this.sequenceNumsToRestore.put(new StreamShardMetadata.EquivalenceWrapper((StreamShardMetadata)kinesisSequenceNumber.f0), (SequenceNumber)kinesisSequenceNumber.f1);
                }
                LOG.info("Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {}", this.sequenceNumsToRestore);
            }
        } else {
            LOG.info("No restore state for FlinkKinesisConsumer.");
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        block5: {
            block6: {
                block4: {
                    if (this.running) break block4;
                    LOG.debug("snapshotState() called on closed source; returning null.");
                    break block5;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Snapshotting state ...");
                }
                this.sequenceNumsStateForCheckpoint.clear();
                if (this.fetcher != null) break block6;
                if (this.sequenceNumsToRestore == null) break block5;
                for (Map.Entry<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> entry : this.sequenceNumsToRestore.entrySet()) {
                    int hashCode = this.shardAssigner.assign(KinesisDataFetcher.convertToStreamShardHandle(entry.getKey().getShardMetadata()), this.getRuntimeContext().getNumberOfParallelSubtasks());
                    if (!KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(hashCode, this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getIndexOfThisSubtask())) continue;
                    this.sequenceNumsStateForCheckpoint.add((Object)Tuple2.of((Object)entry.getKey().getShardMetadata(), (Object)entry.getValue()));
                }
                break block5;
            }
            HashMap<StreamShardMetadata, SequenceNumber> lastStateSnapshot = this.fetcher.snapshotState();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", new Object[]{lastStateSnapshot, context.getCheckpointId(), context.getCheckpointTimestamp()});
            }
            for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
                this.sequenceNumsStateForCheckpoint.add((Object)Tuple2.of((Object)entry.getKey(), (Object)entry.getValue()));
            }
        }
    }

    protected KinesisDataFetcher<T> createFetcher(List<String> streams, SourceFunction.SourceContext<T> sourceContext, RuntimeContext runtimeContext, Properties configProps, KinesisDeserializationSchema<T> deserializationSchema) {
        return new KinesisDataFetcher<T>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, this.shardAssigner, this.periodicWatermarkAssigner, this.watermarkTracker);
    }

    @VisibleForTesting
    HashMap<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> getRestoredState() {
        return this.sequenceNumsToRestore;
    }
}

