/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async;

import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.async.Emitter;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

@Internal
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>,
OperatorActions {
    private static final long serialVersionUID = 1L;
    private static final String STATE_NAME = "_async_wait_operator_state_";
    private final int capacity;
    private final AsyncDataStream.OutputMode outputMode;
    private final long timeout;
    protected transient Object checkpointingLock;
    private transient StreamElementSerializer<IN> inStreamElementSerializer;
    private transient ListState<StreamElement> recoveredStreamElements;
    private transient StreamElementQueue queue;
    private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
    private transient ExecutorService executor;
    private transient Emitter<OUT> emitter;
    private transient Thread emitterThread;

    public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode outputMode) {
        super(asyncFunction);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        Preconditions.checkArgument((capacity > 0 ? 1 : 0) != 0, (Object)"The number of concurrent async operation should be greater than 0.");
        this.capacity = capacity;
        this.outputMode = (AsyncDataStream.OutputMode)((Object)Preconditions.checkNotNull((Object)((Object)outputMode), (String)"outputMode"));
        this.timeout = timeout;
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        this.checkpointingLock = this.getContainingTask().getCheckpointLock();
        this.inStreamElementSerializer = new StreamElementSerializer(this.getOperatorConfig().getTypeSerializerIn1(this.getUserCodeClassloader()));
        this.executor = Executors.newSingleThreadExecutor();
        switch (this.outputMode) {
            case ORDERED: {
                this.queue = new OrderedStreamElementQueue(this.capacity, this.executor, this);
                break;
            }
            case UNORDERED: {
                this.queue = new UnorderedStreamElementQueue(this.capacity, this.executor, this);
                break;
            }
            default: {
                throw new IllegalStateException("Unknown async mode: " + (Object)((Object)this.outputMode) + '.');
            }
        }
    }

    @Override
    public void open() throws Exception {
        super.open();
        if (this.recoveredStreamElements != null) {
            for (StreamElement element : (Iterable)this.recoveredStreamElements.get()) {
                if (element.isRecord()) {
                    this.processElement(element.asRecord());
                    continue;
                }
                if (element.isWatermark()) {
                    this.processWatermark(element.asWatermark());
                    continue;
                }
                if (element.isLatencyMarker()) {
                    this.processLatencyMarker(element.asLatencyMarker());
                    continue;
                }
                throw new IllegalStateException("Unknown record type " + element.getClass() + " encountered while opening the operator.");
            }
            this.recoveredStreamElements = null;
        }
        this.emitter = new Emitter(this.checkpointingLock, this.output, this.queue, this);
        this.emitterThread = new Thread(this.emitter, "AsyncIO-Emitter-Thread (" + this.getOperatorName() + ')');
        this.emitterThread.setDaemon(true);
        this.emitterThread.start();
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final StreamRecordQueueEntry streamRecordBufferEntry = new StreamRecordQueueEntry(element);
        if (this.timeout > 0L) {
            long timeoutTimestamp = this.timeout + this.getProcessingTimeService().getCurrentProcessingTime();
            this.getProcessingTimeService().registerTimer(timeoutTimestamp, new ProcessingTimeCallback(){

                @Override
                public void onProcessingTime(long timestamp) throws Exception {
                    streamRecordBufferEntry.collect(new TimeoutException("Async function call has timed out."));
                }
            });
        }
        this.addAsyncBufferEntry(streamRecordBufferEntry);
        ((AsyncFunction)this.userFunction).asyncInvoke(element.getValue(), streamRecordBufferEntry);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);
        this.addAsyncBufferEntry(watermarkBufferEntry);
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        ListState partitionableState = this.getOperatorStateBackend().getOperatorState(new ListStateDescriptor(STATE_NAME, this.inStreamElementSerializer));
        partitionableState.clear();
        Collection<StreamElementQueueEntry<?>> values = this.queue.values();
        try {
            for (StreamElementQueueEntry<?> value : values) {
                partitionableState.add((Object)value.getStreamElement());
            }
            if (this.pendingStreamElementQueueEntry != null) {
                partitionableState.add((Object)this.pendingStreamElementQueueEntry.getStreamElement());
            }
        }
        catch (Exception e) {
            partitionableState.clear();
            throw new Exception("Could not add stream element queue entries to operator state backend of operator " + this.getOperatorName() + '.', e);
        }
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        this.recoveredStreamElements = context.getOperatorStateStore().getOperatorState(new ListStateDescriptor(STATE_NAME, this.inStreamElementSerializer));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws Exception {
        try {
            assert (Thread.holdsLock(this.checkpointingLock));
            while (!this.queue.isEmpty()) {
                this.checkpointingLock.wait();
            }
        }
        finally {
            Exception exception = null;
            try {
                super.close();
            }
            catch (InterruptedException interrupted) {
                exception = interrupted;
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                exception = e;
            }
            try {
                this.stopResources(true);
            }
            catch (InterruptedException interrupted) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)interrupted, (Throwable)exception);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
            if (exception != null) {
                LOG.warn("Errors occurred while closing the AsyncWaitOperator.", (Throwable)exception);
            }
        }
    }

    @Override
    public void dispose() throws Exception {
        Exception exception = null;
        try {
            super.dispose();
        }
        catch (InterruptedException interrupted) {
            exception = interrupted;
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            this.stopResources(false);
        }
        catch (InterruptedException interrupted) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)interrupted, (Throwable)exception);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        if (exception != null) {
            throw exception;
        }
    }

    private void stopResources(boolean waitForShutdown) throws InterruptedException {
        this.emitter.stop();
        this.emitterThread.interrupt();
        this.executor.shutdown();
        if (waitForShutdown) {
            try {
                if (!this.executor.awaitTermination(365L, TimeUnit.DAYS)) {
                    this.executor.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                this.executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
            if (Thread.holdsLock(this.checkpointingLock)) {
                while (this.emitterThread.isAlive()) {
                    this.checkpointingLock.wait(100L);
                }
            }
            this.emitterThread.join();
        } else {
            this.executor.shutdownNow();
        }
    }

    private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        assert (Thread.holdsLock(this.checkpointingLock));
        this.pendingStreamElementQueueEntry = streamElementQueueEntry;
        while (!this.queue.tryPut(streamElementQueueEntry)) {
            this.checkpointingLock.wait();
        }
        this.pendingStreamElementQueueEntry = null;
    }

    @Override
    public void failOperator(Throwable throwable) {
        this.getContainingTask().getEnvironment().failExternally(throwable);
    }
}

