/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ScheduledFuture;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;

public class StreamSourceContexts {
    public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object checkpointLock, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> output, long watermarkInterval, long idleTimeout) {
        SourceFunction.SourceContext ctx;
        switch (timeCharacteristic) {
            case EventTime: {
                ctx = new ManualWatermarkContext(output, processingTimeService, checkpointLock, streamStatusMaintainer, idleTimeout);
                break;
            }
            case IngestionTime: {
                ctx = new AutomaticWatermarkContext(output, watermarkInterval, processingTimeService, checkpointLock, streamStatusMaintainer, idleTimeout);
                break;
            }
            case ProcessingTime: {
                ctx = new NonTimestampContext(checkpointLock, output);
                break;
            }
            default: {
                throw new IllegalArgumentException(String.valueOf((Object)timeCharacteristic));
            }
        }
        return ctx;
    }

    private static abstract class WatermarkContext<T>
    implements SourceFunction.SourceContext<T> {
        protected final ProcessingTimeService timeService;
        protected final Object checkpointLock;
        protected final StreamStatusMaintainer streamStatusMaintainer;
        protected final long idleTimeout;
        private ScheduledFuture<?> nextCheck;
        private final CheckpointLockDelegate checkpointLockDelegate;
        private volatile boolean failOnNextCheck;

        public WatermarkContext(ProcessingTimeService timeService, Object checkpointLock, StreamStatusMaintainer streamStatusMaintainer, long idleTimeout) {
            this.timeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)timeService, (String)"Time Service cannot be null.");
            this.checkpointLock = Preconditions.checkNotNull((Object)checkpointLock, (String)"Checkpoint Lock cannot be null.");
            this.streamStatusMaintainer = (StreamStatusMaintainer)Preconditions.checkNotNull((Object)streamStatusMaintainer, (String)"Stream Status Maintainer cannot be null.");
            this.checkpointLockDelegate = new CheckpointLockDelegate(checkpointLock);
            if (idleTimeout != -1L) {
                Preconditions.checkArgument((idleTimeout >= 1L ? 1 : 0) != 0, (Object)"The idle timeout cannot be smaller than 1 ms.");
            }
            this.idleTimeout = idleTimeout;
            this.scheduleNextIdleDetectionTask();
        }

        @Override
        public void collect(T element) {
            this.checkpointLockDelegate.lockAndRunQuietly(() -> {
                this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                if (this.nextCheck != null) {
                    this.failOnNextCheck = false;
                } else {
                    this.scheduleNextIdleDetectionTask();
                }
                this.processAndCollect(element);
            });
        }

        @Override
        public void collectWithTimestamp(T element, long timestamp) {
            this.checkpointLockDelegate.lockAndRunQuietly(() -> {
                this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                if (this.nextCheck != null) {
                    this.failOnNextCheck = false;
                } else {
                    this.scheduleNextIdleDetectionTask();
                }
                this.processAndCollectWithTimestamp(element, timestamp);
            });
        }

        @Override
        public void emitWatermark(Watermark mark) {
            if (this.allowWatermark(mark)) {
                this.checkpointLockDelegate.lockAndRunQuietly(() -> {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                    if (this.nextCheck != null) {
                        this.failOnNextCheck = false;
                    } else {
                        this.scheduleNextIdleDetectionTask();
                    }
                    this.processAndEmitWatermark(mark);
                });
            }
        }

        @Override
        public void markAsTemporarilyIdle() {
            this.checkpointLockDelegate.lockAndRunQuietly(() -> this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE));
        }

        @Override
        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        @Override
        public void close() {
            this.cancelNextIdleDetectionTask();
        }

        private void scheduleNextIdleDetectionTask() {
            if (this.idleTimeout != -1L) {
                this.failOnNextCheck = true;
                this.nextCheck = this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + this.idleTimeout, new IdlenessDetectionTask());
            }
        }

        protected void cancelNextIdleDetectionTask() {
            ScheduledFuture<?> nextCheck = this.nextCheck;
            if (nextCheck != null) {
                nextCheck.cancel(true);
            }
        }

        protected abstract void processAndCollect(T var1);

        protected abstract void processAndCollectWithTimestamp(T var1, long var2);

        protected abstract boolean allowWatermark(Watermark var1);

        protected abstract void processAndEmitWatermark(Watermark var1);

        private class IdlenessDetectionTask
        implements ProcessingTimeCallback {
            private IdlenessDetectionTask() {
            }

            @Override
            public void onProcessingTime(long timestamp) throws Exception {
                WatermarkContext.this.checkpointLockDelegate.lockAndRunQuietly(() -> {
                    WatermarkContext.this.nextCheck = null;
                    if (WatermarkContext.this.failOnNextCheck) {
                        WatermarkContext.this.markAsTemporarilyIdle();
                    } else {
                        WatermarkContext.this.scheduleNextIdleDetectionTask();
                    }
                });
            }
        }
    }

    private static class ManualWatermarkContext<T>
    extends WatermarkContext<T> {
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;

        private ManualWatermarkContext(Output<StreamRecord<T>> output, ProcessingTimeService timeService, Object checkpointLock, StreamStatusMaintainer streamStatusMaintainer, long idleTimeout) {
            super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
            this.output = (Output)Preconditions.checkNotNull(output, (String)"The output cannot be null.");
            this.reuse = new StreamRecord<Object>(null);
        }

        @Override
        protected void processAndCollect(T element) {
            this.output.collect(this.reuse.replace(element));
        }

        @Override
        protected void processAndCollectWithTimestamp(T element, long timestamp) {
            this.output.collect(this.reuse.replace(element, timestamp));
        }

        @Override
        protected void processAndEmitWatermark(Watermark mark) {
            this.output.emitWatermark(mark);
        }

        @Override
        protected boolean allowWatermark(Watermark mark) {
            return true;
        }
    }

    private static class AutomaticWatermarkContext<T>
    extends WatermarkContext<T> {
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;
        private final long watermarkInterval;
        private volatile ScheduledFuture<?> nextWatermarkTimer;
        private volatile long nextWatermarkTime;
        private long lastRecordTime;

        private AutomaticWatermarkContext(Output<StreamRecord<T>> output, long watermarkInterval, ProcessingTimeService timeService, Object checkpointLock, StreamStatusMaintainer streamStatusMaintainer, long idleTimeout) {
            super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
            this.output = (Output)Preconditions.checkNotNull(output, (String)"The output cannot be null.");
            Preconditions.checkArgument((watermarkInterval >= 1L ? 1 : 0) != 0, (Object)"The watermark interval cannot be smaller than 1 ms.");
            this.watermarkInterval = watermarkInterval;
            this.reuse = new StreamRecord<Object>(null);
            this.lastRecordTime = Long.MIN_VALUE;
            long now = this.timeService.getCurrentProcessingTime();
            this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(this.timeService, checkpointLock, output));
        }

        @Override
        protected void processAndCollect(T element) {
            this.lastRecordTime = this.timeService.getCurrentProcessingTime();
            this.output.collect(this.reuse.replace(element, this.lastRecordTime));
            if (this.lastRecordTime > this.nextWatermarkTime) {
                long watermarkTime = this.lastRecordTime - this.lastRecordTime % this.watermarkInterval;
                this.nextWatermarkTime = watermarkTime + this.watermarkInterval;
                this.output.emitWatermark(new Watermark(watermarkTime));
            }
        }

        @Override
        protected void processAndCollectWithTimestamp(T element, long timestamp) {
            this.processAndCollect(element);
        }

        @Override
        protected boolean allowWatermark(Watermark mark) {
            return mark.getTimestamp() == Long.MAX_VALUE && this.nextWatermarkTime != Long.MAX_VALUE;
        }

        @Override
        protected void processAndEmitWatermark(Watermark mark) {
            this.nextWatermarkTime = Long.MAX_VALUE;
            this.output.emitWatermark(mark);
            ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
            if (nextWatermarkTimer != null) {
                nextWatermarkTimer.cancel(true);
            }
        }

        @Override
        public void close() {
            super.close();
            ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
            if (nextWatermarkTimer != null) {
                nextWatermarkTimer.cancel(true);
            }
        }

        private class WatermarkEmittingTask
        implements ProcessingTimeCallback {
            private final ProcessingTimeService timeService;
            private final Object checkpointLock;
            private final Output<StreamRecord<T>> output;
            private final CheckpointLockDelegate checkpointLockDelegate;

            private WatermarkEmittingTask(ProcessingTimeService timeService, Object checkpointLock, Output<StreamRecord<T>> output) {
                this.timeService = timeService;
                this.checkpointLock = checkpointLock;
                this.output = output;
                this.checkpointLockDelegate = new CheckpointLockDelegate(checkpointLock);
            }

            @Override
            public void onProcessingTime(long timestamp) {
                long currentTime = this.timeService.getCurrentProcessingTime();
                this.checkpointLockDelegate.lockAndRunQuietly(() -> {
                    if (AutomaticWatermarkContext.this.streamStatusMaintainer.getStreamStatus().isActive()) {
                        if (AutomaticWatermarkContext.this.idleTimeout != -1L && currentTime - AutomaticWatermarkContext.this.lastRecordTime > AutomaticWatermarkContext.this.idleTimeout) {
                            AutomaticWatermarkContext.this.markAsTemporarilyIdle();
                            AutomaticWatermarkContext.this.cancelNextIdleDetectionTask();
                        } else if (currentTime > AutomaticWatermarkContext.this.nextWatermarkTime) {
                            long watermarkTime = currentTime - currentTime % AutomaticWatermarkContext.this.watermarkInterval;
                            this.output.emitWatermark(new Watermark(watermarkTime));
                            AutomaticWatermarkContext.this.nextWatermarkTime = watermarkTime + AutomaticWatermarkContext.this.watermarkInterval;
                        }
                    }
                });
                long nextWatermark = currentTime + AutomaticWatermarkContext.this.watermarkInterval;
                AutomaticWatermarkContext.this.nextWatermarkTimer = this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, this.checkpointLock, this.output));
            }
        }
    }

    private static class NonTimestampContext<T>
    implements SourceFunction.SourceContext<T> {
        private final Object checkpointLock;
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;
        private final CheckpointLockDelegate checkpointLockDelegate;

        private NonTimestampContext(Object checkpointLock, Output<StreamRecord<T>> output) {
            this.checkpointLock = Preconditions.checkNotNull((Object)checkpointLock, (String)"The checkpoint lock cannot be null.");
            this.output = (Output)Preconditions.checkNotNull(output, (String)"The output cannot be null.");
            this.reuse = new StreamRecord<Object>(null);
            this.checkpointLockDelegate = new CheckpointLockDelegate(checkpointLock);
        }

        @Override
        public void collect(T element) {
            this.checkpointLockDelegate.lockAndRunQuietly(() -> this.output.collect(this.reuse.replace(element)));
        }

        @Override
        public void collectWithTimestamp(T element, long timestamp) {
            this.collect(element);
        }

        @Override
        public void emitWatermark(Watermark mark) {
        }

        @Override
        public void markAsTemporarilyIdle() {
        }

        @Override
        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        @Override
        public void close() {
        }
    }
}

