/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.Checkpoint;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsCache;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskType;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.Log;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.LogFactory;

class InitializeTask
implements ITask {
    private static final Log LOG = LogFactory.getLog(InitializeTask.class);
    private static final String RECORD_PROCESSOR_INITIALIZE_METRIC = "RecordProcessor.initialize";
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final KinesisDataFetcher dataFetcher;
    private final TaskType taskType = TaskType.INITIALIZE;
    private final ICheckpoint checkpoint;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final long backoffTimeMillis;
    private final StreamConfig streamConfig;
    private final GetRecordsCache getRecordsCache;

    InitializeTask(ShardInfo shardInfo, IRecordProcessor recordProcessor, ICheckpoint checkpoint, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, StreamConfig streamConfig, GetRecordsCache getRecordsCache) {
        this.shardInfo = shardInfo;
        this.recordProcessor = recordProcessor;
        this.checkpoint = checkpoint;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.dataFetcher = dataFetcher;
        this.backoffTimeMillis = backoffTimeMillis;
        this.streamConfig = streamConfig;
        this.getRecordsCache = getRecordsCache;
    }

    @Override
    public TaskResult call() {
        boolean applicationException = false;
        Exception exception = null;
        try {
            Checkpoint initialCheckpointObject;
            LOG.debug("Initializing ShardId " + this.shardInfo.getShardId());
            try {
                initialCheckpointObject = this.checkpoint.getCheckpointObject(this.shardInfo.getShardId());
            }
            catch (KinesisClientLibNonRetryableException e) {
                LOG.error("Caught exception while fetching checkpoint for " + this.shardInfo.getShardId(), e);
                TaskResult result = new TaskResult(e);
                result.leaseNotFound();
                return result;
            }
            ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint();
            this.dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), this.streamConfig.getInitialPositionInStream());
            this.getRecordsCache.start();
            this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
            this.recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);
            LOG.debug("Calling the record processor initialize().");
            InitializationInput initializationInput = new InitializationInput().withShardId(this.shardInfo.getShardId()).withExtendedSequenceNumber(initialCheckpoint).withPendingCheckpointSequenceNumber(initialCheckpointObject.getPendingCheckpoint());
            long recordProcessorStartTimeMillis = System.currentTimeMillis();
            try {
                this.recordProcessor.initialize(initializationInput);
                LOG.debug("Record processor initialize() completed.");
            }
            catch (Exception e) {
                applicationException = true;
                throw e;
            }
            finally {
                MetricsHelper.addLatency(RECORD_PROCESSOR_INITIALIZE_METRIC, recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
            }
            return new TaskResult(null);
        }
        catch (Exception e) {
            if (applicationException) {
                LOG.error("Application initialize() threw exception: ", e);
            } else {
                LOG.error("Caught exception: ", e);
            }
            exception = e;
            try {
                Thread.sleep(this.backoffTimeMillis);
            }
            catch (InterruptedException ie) {
                LOG.debug("Interrupted sleep", ie);
            }
            return new TaskResult(exception);
        }
    }

    @Override
    public TaskType getTaskType() {
        return this.taskType;
    }
}

