/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.BitSet;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.PartitionConsumptionFailedEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManagerUtility;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.SumAndCount;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.SelectedReadingBarrierHandler;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.ReusingRecordValueDeserializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamInputProcessor<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private final SelectedReadingBarrierHandler barrierHandler;
    private final CheckpointLockDelegate lockDelegate;
    private StatusWatermarkValve statusWatermarkValve;
    private final int numInputChannels;
    private final BitSet channelsWithEndOfPartitionEvents;
    private int currentChannel = -1;
    private final StreamStatusMaintainer streamStatusMaintainer;
    private final OneInputStreamOperator<IN, ?> streamOperator;
    private final WatermarkGauge watermarkGauge;
    private Counter numRecordsIn;
    private Counter numRecordsReceived;
    private boolean enableTracingMetrics;
    private int tracingMetricsInterval;
    private long tracingInputCount;
    private SumAndCount taskLatency;
    private SumAndCount waitInput;
    private long lastProcessedTime = -1L;
    private boolean isFinished;

    public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean isCheckpointingEnabled, StreamTask<?, ?> checkpointedTask, CheckpointingMode checkpointMode, Object lock, IOManager ioManager, Configuration taskManagerConfig, StreamStatusMaintainer streamStatusMaintainer, OneInputStreamOperator<IN, ?> streamOperator, TaskIOMetricGroup metrics, WatermarkGauge watermarkGauge, boolean objectReuse, boolean enableTracingMetrics, int tracingMetricsInterval) throws IOException {
        this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(isCheckpointingEnabled, checkpointedTask, checkpointMode, ioManager, taskManagerConfig, new InputGate[][]{inputGates});
        this.lockDelegate = new CheckpointLockDelegate(Preconditions.checkNotNull((Object)lock));
        StreamElementSerializer<IN> ser = new StreamElementSerializer<IN>(inputSerializer);
        this.deserializationDelegate = objectReuse ? new ReusingRecordValueDeserializationDelegate<IN>(ser) : new NonReusingDeserializationDelegate(ser);
        this.numInputChannels = this.barrierHandler.getNumberOfInputChannels();
        SerializerManagerUtility serializerManagerUtility = new SerializerManagerUtility(taskManagerConfig);
        this.recordDeserializers = serializerManagerUtility.createRecordDeserializers(this.barrierHandler.getAllInputChannels(), ioManager.getSpillingDirectoriesPaths());
        this.channelsWithEndOfPartitionEvents = new BitSet(this.numInputChannels);
        this.streamStatusMaintainer = (StreamStatusMaintainer)Preconditions.checkNotNull((Object)streamStatusMaintainer);
        this.streamOperator = (OneInputStreamOperator)Preconditions.checkNotNull(streamOperator);
        this.statusWatermarkValve = new StatusWatermarkValve(this.numInputChannels, new ForwardingValveOutputHandler(streamOperator, this.lockDelegate.getLock()));
        this.watermarkGauge = watermarkGauge;
        metrics.gauge("checkpointAlignmentTime", this.barrierHandler::getAlignmentDurationNanos);
        this.enableTracingMetrics = enableTracingMetrics;
        this.tracingMetricsInterval = tracingMetricsInterval;
        this.tracingInputCount = 0L;
    }

    public boolean processInput() throws Exception {
        block19: {
            AbstractEvent event;
            if (this.isFinished) {
                return false;
            }
            if (this.numRecordsIn == null) {
                try {
                    this.numRecordsIn = ((OperatorMetricGroup)this.streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
                }
                catch (Exception e) {
                    LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                    this.numRecordsIn = new SimpleCounter();
                }
            }
            if (this.numRecordsReceived == null) {
                try {
                    this.numRecordsReceived = ((OperatorMetricGroup)this.streamOperator.getMetricGroup()).parent().getIOMetricGroup().getNumRecordsReceived();
                }
                catch (Exception e) {
                    LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                    this.numRecordsReceived = new SimpleCounter();
                }
            }
            if (this.enableTracingMetrics) {
                if (this.taskLatency == null) {
                    this.taskLatency = new SumAndCount("taskLatency", (MetricGroup)((OperatorMetricGroup)this.streamOperator.getMetricGroup()).parent());
                }
                if (this.waitInput == null) {
                    this.waitInput = new SumAndCount("waitInput", (MetricGroup)((OperatorMetricGroup)this.streamOperator.getMetricGroup()).parent());
                }
            }
            while (true) {
                BufferOrEvent bufferOrEvent;
                if (this.currentRecordDeserializer != null) {
                    RecordDeserializer.DeserializationResult result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                    if (result.isBufferConsumed()) {
                        this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                        this.currentRecordDeserializer = null;
                    }
                    if (result.isFullRecord()) {
                        boolean recordProcessed;
                        StreamElement recordOrMark = (StreamElement)this.deserializationDelegate.getInstance();
                        this.numRecordsReceived.inc();
                        ++this.tracingInputCount;
                        if (this.enableTracingMetrics && this.tracingInputCount % (long)this.tracingMetricsInterval == 0L) {
                            long start = System.nanoTime();
                            this.waitInput.update(start - this.lastProcessedTime);
                            recordProcessed = this.processRecordOrMark(recordOrMark);
                            this.lastProcessedTime = System.nanoTime();
                            this.taskLatency.update(this.lastProcessedTime - start);
                        } else {
                            recordProcessed = this.processRecordOrMark(recordOrMark);
                        }
                        if (!recordProcessed) continue;
                        return true;
                    }
                }
                if ((bufferOrEvent = this.barrierHandler.getNextNonBlocked()) == null) break block19;
                if (bufferOrEvent.isBuffer()) {
                    this.currentChannel = bufferOrEvent.getChannelIndex();
                    this.currentRecordDeserializer = this.recordDeserializers[this.currentChannel];
                    this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                    continue;
                }
                event = bufferOrEvent.getEvent();
                if (event.getClass() == EndOfPartitionEvent.class) {
                    this.channelsWithEndOfPartitionEvents.set(bufferOrEvent.getChannelIndex());
                    if (this.channelsWithEndOfPartitionEvents.cardinality() != this.numInputChannels) continue;
                    this.lockDelegate.lockAndRun(() -> this.streamOperator.endInput());
                    continue;
                }
                if (event.getClass() != PartitionConsumptionFailedEvent.class) break;
                this.currentRecordDeserializer = this.recordDeserializers[bufferOrEvent.getChannelIndex()];
                this.clearDeserializer(this.currentRecordDeserializer);
                this.currentRecordDeserializer = null;
            }
            throw new IOException("Unexpected event: " + event);
        }
        this.isFinished = true;
        if (!this.barrierHandler.isEmpty()) {
            throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
        }
        return false;
    }

    private boolean processRecordOrMark(StreamElement recordOrMark) throws Exception {
        if (recordOrMark.isRecord()) {
            StreamRecord record = recordOrMark.asRecord();
            this.lockDelegate.lockAndRun(() -> {
                this.numRecordsIn.inc();
                this.streamOperator.setKeyContextElement1(record);
                this.streamOperator.processElement(record);
            });
            return true;
        }
        if (recordOrMark.isWatermark()) {
            this.statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), this.currentChannel);
            return false;
        }
        if (recordOrMark.isStreamStatus()) {
            this.statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), this.currentChannel);
            return false;
        }
        if (recordOrMark.isLatencyMarker()) {
            this.lockDelegate.lockAndRun(() -> this.streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()));
            return false;
        }
        throw new RuntimeException("Unexpected stream element type " + recordOrMark);
    }

    public void cleanup() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer : this.recordDeserializers) {
            this.clearDeserializer(deserializer);
        }
        this.barrierHandler.cleanup();
    }

    private void clearDeserializer(RecordDeserializer<?> deserializer) {
        Buffer buffer = deserializer.getCurrentBuffer();
        if (buffer != null && !buffer.isRecycled()) {
            buffer.recycleBuffer();
        }
        deserializer.clear();
    }

    private class ForwardingValveOutputHandler
    implements StatusWatermarkValve.ValveOutputHandler {
        private final OneInputStreamOperator<IN, ?> operator;
        private final CheckpointLockDelegate lockDelegate;

        private ForwardingValveOutputHandler(OneInputStreamOperator<IN, ?> operator, Object lock) {
            this.operator = (OneInputStreamOperator)Preconditions.checkNotNull(operator);
            this.lockDelegate = new CheckpointLockDelegate(Preconditions.checkNotNull((Object)lock));
        }

        @Override
        public void handleWatermark(Watermark watermark) {
            try {
                this.lockDelegate.lockAndRun(() -> {
                    StreamInputProcessor.this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark(watermark);
                });
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        @Override
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                this.lockDelegate.lockAndRun(() -> StreamInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(streamStatus));
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }
}

