/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ScheduledFuture;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

public class StreamSourceContexts {
    public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) {
        SourceFunction.SourceContext ctx;
        switch (timeCharacteristic) {
            case EventTime: {
                ctx = new ManualWatermarkContext(checkpointLock, output);
                break;
            }
            case IngestionTime: {
                ctx = new AutomaticWatermarkContext(processingTimeService, checkpointLock, output, watermarkInterval);
                break;
            }
            case ProcessingTime: {
                ctx = new NonTimestampContext(checkpointLock, output);
                break;
            }
            default: {
                throw new IllegalArgumentException(String.valueOf((Object)timeCharacteristic));
            }
        }
        return ctx;
    }

    private static class ManualWatermarkContext<T>
    implements SourceFunction.SourceContext<T> {
        private final Object lock;
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;

        private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) {
            this.lock = Preconditions.checkNotNull((Object)checkpointLock, (String)"The checkpoint lock cannot be null.");
            this.output = (Output)Preconditions.checkNotNull(output, (String)"The output cannot be null.");
            this.reuse = new StreamRecord<Object>(null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void collect(T element) {
            Object object = this.lock;
            synchronized (object) {
                this.output.collect(this.reuse.replace(element));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void collectWithTimestamp(T element, long timestamp) {
            Object object = this.lock;
            synchronized (object) {
                this.output.collect(this.reuse.replace(element, timestamp));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void emitWatermark(Watermark mark) {
            Object object = this.lock;
            synchronized (object) {
                this.output.emitWatermark(mark);
            }
        }

        @Override
        public Object getCheckpointLock() {
            return this.lock;
        }

        @Override
        public void close() {
        }
    }

    private static class AutomaticWatermarkContext<T>
    implements SourceFunction.SourceContext<T> {
        private final ProcessingTimeService timeService;
        private final Object lock;
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;
        private final long watermarkInterval;
        private volatile ScheduledFuture<?> nextWatermarkTimer;
        private volatile long nextWatermarkTime;

        private AutomaticWatermarkContext(ProcessingTimeService timeService, Object checkpointLock, Output<StreamRecord<T>> output, long watermarkInterval) {
            this.timeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)timeService, (String)"Time Service cannot be null.");
            this.lock = Preconditions.checkNotNull((Object)checkpointLock, (String)"The checkpoint lock cannot be null.");
            this.output = (Output)Preconditions.checkNotNull(output, (String)"The output cannot be null.");
            Preconditions.checkArgument((watermarkInterval >= 1L ? 1 : 0) != 0, (Object)"The watermark interval cannot be smaller than 1 ms.");
            this.watermarkInterval = watermarkInterval;
            this.reuse = new StreamRecord<Object>(null);
            long now = this.timeService.getCurrentProcessingTime();
            this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(this.timeService, this.lock, output));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void collect(T element) {
            Object object = this.lock;
            synchronized (object) {
                long currentTime = this.timeService.getCurrentProcessingTime();
                this.output.collect(this.reuse.replace(element, currentTime));
                if (currentTime > this.nextWatermarkTime) {
                    long watermarkTime = currentTime - currentTime % this.watermarkInterval;
                    this.nextWatermarkTime = watermarkTime + this.watermarkInterval;
                    this.output.emitWatermark(new Watermark(watermarkTime));
                }
            }
        }

        @Override
        public void collectWithTimestamp(T element, long timestamp) {
            this.collect(element);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void emitWatermark(Watermark mark) {
            if (mark.getTimestamp() == Long.MAX_VALUE) {
                Object object = this.lock;
                synchronized (object) {
                    this.nextWatermarkTime = Long.MAX_VALUE;
                    this.output.emitWatermark(mark);
                }
                ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
                if (nextWatermarkTimer != null) {
                    nextWatermarkTimer.cancel(true);
                }
            }
        }

        @Override
        public Object getCheckpointLock() {
            return this.lock;
        }

        @Override
        public void close() {
            ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
            if (nextWatermarkTimer != null) {
                nextWatermarkTimer.cancel(true);
            }
        }

        private class WatermarkEmittingTask
        implements ProcessingTimeCallback {
            private final ProcessingTimeService timeService;
            private final Object lock;
            private final Output<StreamRecord<T>> output;

            private WatermarkEmittingTask(ProcessingTimeService timeService, Object checkpointLock, Output<StreamRecord<T>> output) {
                this.timeService = timeService;
                this.lock = checkpointLock;
                this.output = output;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onProcessingTime(long timestamp) {
                long currentTime = this.timeService.getCurrentProcessingTime();
                if (currentTime > AutomaticWatermarkContext.this.nextWatermarkTime) {
                    long watermarkTime = currentTime - currentTime % AutomaticWatermarkContext.this.watermarkInterval;
                    Object object = this.lock;
                    synchronized (object) {
                        if (currentTime > AutomaticWatermarkContext.this.nextWatermarkTime) {
                            this.output.emitWatermark(new Watermark(watermarkTime));
                            AutomaticWatermarkContext.this.nextWatermarkTime = watermarkTime + AutomaticWatermarkContext.this.watermarkInterval;
                        }
                    }
                }
                long nextWatermark = currentTime + AutomaticWatermarkContext.this.watermarkInterval;
                AutomaticWatermarkContext.this.nextWatermarkTimer = this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, this.lock, this.output));
            }
        }
    }

    private static class NonTimestampContext<T>
    implements SourceFunction.SourceContext<T> {
        private final Object lock;
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;

        private NonTimestampContext(Object checkpointLock, Output<StreamRecord<T>> output) {
            this.lock = Preconditions.checkNotNull((Object)checkpointLock, (String)"The checkpoint lock cannot be null.");
            this.output = (Output)Preconditions.checkNotNull(output, (String)"The output cannot be null.");
            this.reuse = new StreamRecord<Object>(null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void collect(T element) {
            Object object = this.lock;
            synchronized (object) {
                this.output.collect(this.reuse.replace(element));
            }
        }

        @Override
        public void collectWithTimestamp(T element, long timestamp) {
            this.collect(element);
        }

        @Override
        public void emitWatermark(Watermark mark) {
        }

        @Override
        public Object getCheckpointLock() {
            return this.lock;
        }

        @Override
        public void close() {
        }
    }
}

