/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements StatefulTask,
AsyncExceptionHandler {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final Object lock = new Object();
    protected OP headOperator;
    private OperatorChain<OUT, OP> operatorChain;
    private StreamConfig configuration;
    private AbstractStateBackend stateBackend;
    private AbstractKeyedStateBackend<?> keyedStateBackend;
    private ProcessingTimeService timerService;
    private Map<String, Accumulator<?, ?>> accumulatorMap;
    private TaskStateHandles restoreStateHandles;
    private final CloseableRegistry cancelables = new CloseableRegistry();
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private long lastCheckpointSize = 0L;
    private ExecutorService asyncOperationsThreadPool;

    protected abstract void init() throws Exception;

    protected abstract void run() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected abstract void cancelTask() throws Exception;

    public void setProcessingTimeService(ProcessingTimeService timeProvider) {
        if (timeProvider == null) {
            throw new RuntimeException("The timeProvider cannot be set to null.");
        }
        this.timerService = timeProvider;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void invoke() throws Exception {
        boolean disposed = false;
        try {
            Object timerThreadFactory;
            LOG.debug("Initializing {}.", (Object)this.getName());
            this.asyncOperationsThreadPool = Executors.newCachedThreadPool();
            this.configuration = new StreamConfig(this.getTaskConfiguration());
            this.stateBackend = this.createStateBackend();
            this.accumulatorMap = this.getEnvironment().getAccumulatorRegistry().getUserMap();
            if (this.timerService == null) {
                timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + this.getName());
                this.timerService = new SystemProcessingTimeService(this, this.getCheckpointLock(), (ThreadFactory)timerThreadFactory);
            }
            this.operatorChain = new OperatorChain(this);
            this.headOperator = this.operatorChain.getHeadOperator();
            this.getEnvironment().getMetricGroup().gauge("lastCheckpointSize", (Gauge)new Gauge<Long>(){

                public Long getValue() {
                    return StreamTask.this.lastCheckpointSize;
                }
            });
            this.init();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Invoking {}", (Object)this.getName());
            this.initializeState();
            timerThreadFactory = this.lock;
            synchronized (timerThreadFactory) {
                this.openAllOperators();
            }
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.isRunning = true;
            this.run();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.timerService.quiesceAndAwaitPending();
            LOG.debug("Finished task {}", (Object)this.getName());
            timerThreadFactory = this.lock;
            synchronized (timerThreadFactory) {
                this.isRunning = false;
                this.closeAllOperators();
            }
            LOG.debug("Closed operators for task {}", (Object)this.getName());
            this.operatorChain.flushOutputs();
            this.tryDisposeAllOperators();
            disposed = true;
        }
        finally {
            this.isRunning = false;
            if (this.timerService != null) {
                try {
                    this.timerService.shutdownService();
                }
                catch (Throwable t) {
                    LOG.error("Could not shut down timer service", t);
                }
            }
            try {
                this.cancelables.close();
                this.shutdownAsyncThreads();
            }
            catch (Throwable t) {
                LOG.error("Could not shut down async checkpoint threads", t);
            }
            try {
                this.cleanup();
            }
            catch (Throwable t) {
                LOG.error("Error during cleanup of stream task", t);
            }
            if (!disposed) {
                this.disposeAllOperators();
            }
            if (this.operatorChain != null) {
                this.operatorChain.releaseOutputs();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        try {
            this.cancelTask();
        }
        finally {
            this.cancelables.close();
        }
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    private void openAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.open();
        }
    }

    private void closeAllOperators() throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int i = allOperators.length - 1; i >= 0; --i) {
            StreamOperator<?> operator = allOperators[i];
            if (operator == null) continue;
            operator.close();
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.dispose();
        }
    }

    private void shutdownAsyncThreads() throws Exception {
        if (!this.asyncOperationsThreadPool.isShutdown()) {
            this.asyncOperationsThreadPool.shutdownNow();
        }
    }

    private void disposeAllOperators() {
        if (this.operatorChain != null) {
            for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
                try {
                    if (operator == null) continue;
                    operator.dispose();
                }
                catch (Throwable t) {
                    LOG.error("Error during disposal of stream operator.", t);
                }
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.timerService != null && !this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        this.cancelables.close();
    }

    boolean isSerializingTimestamps() {
        TimeCharacteristic tc = this.configuration.getTimeCharacteristic();
        return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime;
    }

    public String getName() {
        return this.getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    public Object getCheckpointLock() {
        return this.lock;
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
        return this.accumulatorMap;
    }

    Output<StreamRecord<OUT>> getHeadOutput() {
        return this.operatorChain.getChainEntryPoint();
    }

    RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public void setInitialState(TaskStateHandles taskStateHandles) {
        this.restoreStateHandles = taskStateHandles;
    }

    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
        try {
            checkpointMetaData.setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L);
            return this.performCheckpoint(checkpointMetaData);
        }
        catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + '.', e);
            }
            LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{checkpointMetaData.getCheckpointId(), this.getName(), e});
            return false;
        }
    }

    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
        try {
            this.performCheckpoint(checkpointMetaData);
        }
        catch (CancelTaskException e) {
            throw new Exception("Operator " + this.getName() + " was cancelled while performing checkpoint " + checkpointMetaData.getCheckpointId() + '.');
        }
        catch (Exception e) {
            throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + '.', e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
        LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", (Object)checkpointId, (Object)this.getName());
        this.getEnvironment().declineCheckpoint(checkpointId, cause);
        Object object = this.lock;
        synchronized (object) {
            this.operatorChain.broadcastCheckpointCancelMarker(checkpointId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
        LOG.debug("Starting checkpoint {} on task {}", (Object)checkpointMetaData.getCheckpointId(), (Object)this.getName());
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                this.operatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
                this.checkpointState(checkpointMetaData);
                return true;
            }
            CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            Exception exception = null;
            for (ResultPartitionWriter output : this.getEnvironment().getAllWriters()) {
                try {
                    output.writeEventToAllChannels((AbstractEvent)message);
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exception);
                }
            }
            if (exception != null) {
                throw exception;
            }
            return false;
        }
    }

    public ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", (Object)this.getName());
                for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
                    if (operator == null) continue;
                    operator.notifyOfCompletedCheckpoint(checkpointId);
                }
            } else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", (Object)this.getName());
            }
        }
    }

    private void checkpointState(CheckpointMetaData checkpointMetaData) throws Exception {
        CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData);
        checkpointingOperation.executeCheckpointing();
    }

    private void initializeState() throws Exception {
        boolean restored;
        boolean bl = restored = null != this.restoreStateHandles;
        if (restored) {
            this.checkRestorePreconditions(this.operatorChain.getChainLength());
            this.initializeOperators(true);
            this.restoreStateHandles = null;
        } else {
            this.initializeOperators(false);
        }
    }

    private void initializeOperators(boolean restored) throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int chainIdx = 0; chainIdx < allOperators.length; ++chainIdx) {
            StreamOperator<?> operator = allOperators[chainIdx];
            if (null == operator) continue;
            if (restored && this.restoreStateHandles != null) {
                operator.initializeState(new OperatorStateHandles(this.restoreStateHandles, chainIdx));
                continue;
            }
            operator.initializeState(null);
        }
    }

    private void checkRestorePreconditions(int operatorChainLength) {
        ChainedStateHandle nonPartitionableOperatorStates = this.restoreStateHandles.getLegacyOperatorState();
        List operatorStates = this.restoreStateHandles.getManagedOperatorState();
        if (nonPartitionableOperatorStates != null) {
            Preconditions.checkState((nonPartitionableOperatorStates.getLength() == operatorChainLength ? 1 : 0) != 0, (Object)("Invalid Invalid number of operator states. Found :" + nonPartitionableOperatorStates.getLength() + ". Expected: " + operatorChainLength));
        }
        if (!CollectionUtil.isNullOrEmpty((Collection)operatorStates)) {
            Preconditions.checkArgument((operatorStates.size() == operatorChainLength ? 1 : 0) != 0, (Object)("Invalid number of operator states. Found :" + operatorStates.size() + ". Expected: " + operatorChainLength));
        }
    }

    private AbstractStateBackend createStateBackend() throws Exception {
        AbstractStateBackend stateBackend = this.configuration.getStateBackend(this.getUserCodeClassLoader());
        if (stateBackend != null) {
            LOG.info("Using user-defined state backend: {}.", (Object)stateBackend);
        } else {
            Configuration flinkConfig = this.getEnvironment().getTaskManagerInfo().getConfiguration();
            String backendName = flinkConfig.getString("state.backend", null);
            if (backendName == null) {
                LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
                backendName = "jobmanager";
            }
            switch (backendName.toLowerCase()) {
                case "jobmanager": {
                    LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
                    stateBackend = new MemoryStateBackend();
                    break;
                }
                case "filesystem": {
                    FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
                    LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")", (Object)backend.getBasePath());
                    stateBackend = backend;
                    break;
                }
                case "rocksdb": {
                    backendName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory";
                }
                default: {
                    try {
                        Class<StateBackendFactory> clazz = Class.forName(backendName, false, this.getUserCodeClassLoader()).asSubclass(StateBackendFactory.class);
                        stateBackend = clazz.newInstance().createFromConfig(flinkConfig);
                        break;
                    }
                    catch (ClassNotFoundException e) {
                        throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
                    }
                    catch (ClassCastException e) {
                        throw new IllegalConfigurationException("The class configured under 'state.backend' is not a valid state backend factory (" + backendName + ')');
                    }
                    catch (Throwable t) {
                        throw new IllegalConfigurationException("Cannot create configured state backend", t);
                    }
                }
            }
        }
        return stateBackend;
    }

    public OperatorStateBackend createOperatorStateBackend(StreamOperator<?> op, Collection<OperatorStateHandle> restoreStateHandles) throws Exception {
        Environment env = this.getEnvironment();
        String opId = this.createOperatorIdentifier(op, this.getConfiguration().getVertexID());
        OperatorStateBackend operatorStateBackend = this.stateBackend.createOperatorStateBackend(env, opId);
        this.cancelables.registerClosable((Closeable)operatorStateBackend);
        if (null != restoreStateHandles) {
            operatorStateBackend.restore(restoreStateHandles);
        }
        return operatorStateBackend;
    }

    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange) throws Exception {
        if (this.keyedStateBackend != null) {
            throw new RuntimeException("The keyed state backend can only be created once.");
        }
        String operatorIdentifier = this.createOperatorIdentifier((StreamOperator<?>)this.headOperator, this.configuration.getVertexID());
        this.keyedStateBackend = this.stateBackend.createKeyedStateBackend(this.getEnvironment(), this.getEnvironment().getJobID(), operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, this.getEnvironment().getTaskKvStateRegistry());
        this.cancelables.registerClosable(this.keyedStateBackend);
        if (null != this.restoreStateHandles && null != this.restoreStateHandles.getManagedKeyedState()) {
            this.keyedStateBackend.restore(this.restoreStateHandles.getManagedKeyedState());
        }
        AbstractKeyedStateBackend<?> typedBackend = this.keyedStateBackend;
        return typedBackend;
    }

    public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator<?> operator) throws IOException {
        return this.stateBackend.createStreamFactory(this.getEnvironment().getJobID(), this.createOperatorIdentifier(operator, this.configuration.getVertexID()));
    }

    private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
        return operator.getClass().getSimpleName() + "_" + vertexId + "_" + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
    }

    public ProcessingTimeService getProcessingTimeService() {
        if (this.timerService == null) {
            throw new IllegalStateException("The timer service has not been initialized.");
        }
        return this.timerService;
    }

    @Override
    public void handleAsyncException(String message, Throwable exception) {
        this.getEnvironment().failExternally(exception);
    }

    public String toString() {
        return this.getName();
    }

    public CloseableRegistry getCancelables() {
        return this.cancelables;
    }

    private static final class CheckpointingOperation {
        private final StreamTask<?, ?> owner;
        private final CheckpointMetaData checkpointMetaData;
        private final StreamOperator<?>[] allOperators;
        private long startSyncPartNano;
        private long startAsyncPartNano;
        private CheckpointStreamFactory streamFactory;
        private final List<StreamStateHandle> nonPartitionedStates;
        private final List<OperatorSnapshotResult> snapshotInProgressList;

        public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData) {
            this.owner = (StreamTask)Preconditions.checkNotNull(owner);
            this.checkpointMetaData = (CheckpointMetaData)Preconditions.checkNotNull((Object)checkpointMetaData);
            this.allOperators = ((StreamTask)owner).operatorChain.getAllOperators();
            this.nonPartitionedStates = new ArrayList<StreamStateHandle>(this.allOperators.length);
            this.snapshotInProgressList = new ArrayList<OperatorSnapshotResult>(this.allOperators.length);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void executeCheckpointing() throws Exception {
            block20: {
                this.startSyncPartNano = System.nanoTime();
                boolean failed = true;
                try {
                    for (StreamOperator<?> op : this.allOperators) {
                        this.checkpointStreamOperator(op);
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", (Object)this.checkpointMetaData.getCheckpointId(), (Object)this.owner.getName());
                    }
                    this.startAsyncPartNano = System.nanoTime();
                    this.checkpointMetaData.setSyncDurationMillis((this.startAsyncPartNano - this.startSyncPartNano) / 1000000L);
                    this.runAsyncCheckpointingAndAcknowledge();
                    failed = false;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} - finished synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getAlignmentDurationNanos() / 1000000L, this.checkpointMetaData.getSyncDurationMillis()});
                    }
                    if (!failed) break block20;
                }
                catch (Throwable throwable) {
                    if (failed) {
                        for (OperatorSnapshotResult operatorSnapshotResult : this.snapshotInProgressList) {
                            if (null == operatorSnapshotResult) continue;
                            try {
                                operatorSnapshotResult.cancel();
                            }
                            catch (Exception e) {
                                LOG.warn("Could not properly cancel an operator snapshot result.", (Throwable)e);
                            }
                        }
                        for (StreamStateHandle nonPartitionedState : this.nonPartitionedStates) {
                            if (nonPartitionedState == null) continue;
                            try {
                                nonPartitionedState.discardState();
                            }
                            catch (Exception e) {
                                LOG.warn("Could not properly discard a non partitioned state. This might leave some orphaned files behind.", (Throwable)e);
                            }
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{} - did NOT finish synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getAlignmentDurationNanos() / 1000000L, this.checkpointMetaData.getSyncDurationMillis()});
                        }
                    }
                    throw throwable;
                }
                for (OperatorSnapshotResult operatorSnapshotResult : this.snapshotInProgressList) {
                    if (null == operatorSnapshotResult) continue;
                    try {
                        operatorSnapshotResult.cancel();
                    }
                    catch (Exception e) {
                        LOG.warn("Could not properly cancel an operator snapshot result.", (Throwable)e);
                    }
                }
                for (StreamStateHandle nonPartitionedState : this.nonPartitionedStates) {
                    if (nonPartitionedState == null) continue;
                    try {
                        nonPartitionedState.discardState();
                    }
                    catch (Exception e) {
                        LOG.warn("Could not properly discard a non partitioned state. This might leave some orphaned files behind.", (Throwable)e);
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - did NOT finish synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getAlignmentDurationNanos() / 1000000L, this.checkpointMetaData.getSyncDurationMillis()});
                }
            }
        }

        private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {
                this.createStreamFactory(op);
                this.snapshotNonPartitionableState(op);
                OperatorSnapshotResult snapshotInProgress = op.snapshotState(this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getTimestamp(), this.streamFactory);
                this.snapshotInProgressList.add(snapshotInProgress);
            } else {
                this.nonPartitionedStates.add(null);
                OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
                this.snapshotInProgressList.add(emptySnapshotInProgress);
            }
        }

        private void createStreamFactory(StreamOperator<?> operator) throws IOException {
            String operatorId = ((StreamTask)this.owner).createOperatorIdentifier(operator, ((StreamTask)this.owner).configuration.getVertexID());
            this.streamFactory = ((StreamTask)this.owner).stateBackend.createStreamFactory(this.owner.getEnvironment().getJobID(), operatorId);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void snapshotNonPartitionableState(StreamOperator<?> operator) throws Exception {
            StreamStateHandle stateHandle = null;
            if (operator instanceof StreamCheckpointedOperator) {
                CheckpointStreamFactory.CheckpointStateOutputStream outStream = this.streamFactory.createCheckpointStateOutputStream(this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getTimestamp());
                ((StreamTask)this.owner).cancelables.registerClosable((Closeable)outStream);
                try {
                    ((StreamCheckpointedOperator)((Object)operator)).snapshotState((FSDataOutputStream)outStream, this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getTimestamp());
                    stateHandle = outStream.closeAndGetHandle();
                }
                finally {
                    ((StreamTask)this.owner).cancelables.unregisterClosable((Closeable)outStream);
                    outStream.close();
                }
            }
            this.nonPartitionedStates.add(stateHandle);
        }

        public void runAsyncCheckpointingAndAcknowledge() throws IOException {
            AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(this.owner, this.nonPartitionedStates, this.snapshotInProgressList, this.checkpointMetaData, this.startAsyncPartNano);
            ((StreamTask)this.owner).cancelables.registerClosable((Closeable)asyncCheckpointRunnable);
            ((StreamTask)this.owner).asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
        }

        private static enum AsynCheckpointState {
            RUNNING,
            DISCARDED,
            COMPLETED;

        }
    }

    private static final class AsyncCheckpointRunnable
    implements Runnable,
    Closeable {
        private final StreamTask<?, ?> owner;
        private final List<OperatorSnapshotResult> snapshotInProgressList;
        private RunnableFuture<KeyGroupsStateHandle> futureKeyedBackendStateHandles;
        private RunnableFuture<KeyGroupsStateHandle> futureKeyedStreamStateHandles;
        private List<StreamStateHandle> nonPartitionedStateHandles;
        private final CheckpointMetaData checkpointMetaData;
        private final long asyncStartNanos;
        private final AtomicReference<CheckpointingOperation.AsynCheckpointState> asyncCheckpointState = new AtomicReference<CheckpointingOperation.AsynCheckpointState>(CheckpointingOperation.AsynCheckpointState.RUNNING);

        AsyncCheckpointRunnable(StreamTask<?, ?> owner, List<StreamStateHandle> nonPartitionedStateHandles, List<OperatorSnapshotResult> snapshotInProgressList, CheckpointMetaData checkpointMetaData, long asyncStartNanos) {
            int headIndex;
            OperatorSnapshotResult snapshotInProgress;
            this.owner = (StreamTask)Preconditions.checkNotNull(owner);
            this.snapshotInProgressList = (List)Preconditions.checkNotNull(snapshotInProgressList);
            this.checkpointMetaData = (CheckpointMetaData)Preconditions.checkNotNull((Object)checkpointMetaData);
            this.nonPartitionedStateHandles = nonPartitionedStateHandles;
            this.asyncStartNanos = asyncStartNanos;
            if (!snapshotInProgressList.isEmpty() && null != (snapshotInProgress = snapshotInProgressList.get(headIndex = snapshotInProgressList.size() - 1))) {
                this.futureKeyedBackendStateHandles = snapshotInProgress.getKeyedStateManagedFuture();
                this.futureKeyedStreamStateHandles = snapshotInProgress.getKeyedStateRawFuture();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                KeyGroupsStateHandle keyedStateHandleBackend = (KeyGroupsStateHandle)FutureUtil.runIfNotDoneAndGet(this.futureKeyedBackendStateHandles);
                KeyGroupsStateHandle keyedStateHandleStream = (KeyGroupsStateHandle)FutureUtil.runIfNotDoneAndGet(this.futureKeyedStreamStateHandles);
                ArrayList<Object> operatorStatesBackend = new ArrayList<Object>(this.snapshotInProgressList.size());
                ArrayList<Object> operatorStatesStream = new ArrayList<Object>(this.snapshotInProgressList.size());
                for (OperatorSnapshotResult snapshotInProgress : this.snapshotInProgressList) {
                    if (null != snapshotInProgress) {
                        operatorStatesBackend.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
                        operatorStatesStream.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
                        continue;
                    }
                    operatorStatesBackend.add(null);
                    operatorStatesStream.add(null);
                }
                long asyncEndNanos = System.nanoTime();
                long asyncDurationMillis = (asyncEndNanos - this.asyncStartNanos) / 1000000L;
                this.checkpointMetaData.setAsyncDurationMillis(asyncDurationMillis);
                ChainedStateHandle chainedNonPartitionedOperatorsState = new ChainedStateHandle(this.nonPartitionedStateHandles);
                ChainedStateHandle chainedOperatorStateBackend = new ChainedStateHandle(operatorStatesBackend);
                ChainedStateHandle chainedOperatorStateStream = new ChainedStateHandle(operatorStatesStream);
                SubtaskState subtaskState = new SubtaskState(chainedNonPartitionedOperatorsState, chainedOperatorStateBackend, chainedOperatorStateStream, keyedStateHandleBackend, keyedStateHandleStream);
                if (this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
                    this.owner.getEnvironment().acknowledgeCheckpoint(this.checkpointMetaData, subtaskState);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), asyncDurationMillis});
                    }
                } else {
                    LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", (Object)this.owner.getName(), (Object)this.checkpointMetaData.getCheckpointId());
                }
            }
            catch (Exception e) {
                this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.COMPLETED, CheckpointingOperation.AsynCheckpointState.RUNNING);
                try {
                    this.cleanup();
                }
                catch (Exception cleanupException) {
                    e.addSuppressed(cleanupException);
                }
                AsynchronousException asyncException = new AsynchronousException(new Exception("Could not materialize checkpoint " + this.checkpointMetaData.getCheckpointId() + " for operator " + this.owner.getName() + '.', e));
                this.owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
            }
            finally {
                ((StreamTask)this.owner).cancelables.unregisterClosable((Closeable)this);
            }
        }

        @Override
        public void close() {
            try {
                this.cleanup();
            }
            catch (Exception cleanupException) {
                LOG.warn("Could not properly clean up the async checkpoint runnable.", (Throwable)cleanupException);
            }
        }

        private void cleanup() throws Exception {
            if (this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
                LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", (Object)this.checkpointMetaData.getCheckpointId(), (Object)this.owner.getName());
                Exception exception = null;
                for (OperatorSnapshotResult operatorSnapshotResult : this.snapshotInProgressList) {
                    if (operatorSnapshotResult == null) continue;
                    try {
                        operatorSnapshotResult.cancel();
                    }
                    catch (Exception cancelException) {
                        exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)cancelException, (Throwable)exception);
                    }
                }
                try {
                    StateUtil.bestEffortDiscardAllStateObjects(this.nonPartitionedStateHandles);
                }
                catch (Exception discardException) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)discardException, exception);
                }
                if (null != exception) {
                    throw exception;
                }
            } else {
                LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has already been completed. Thus, the state handles are not cleaned up.", (Object)this.owner.getName(), (Object)this.checkpointMetaData.getCheckpointId());
            }
        }
    }
}

