/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceRecord;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.InputFetcher;
import org.apache.flink.streaming.runtime.io.SourceInputProcessor;
import org.apache.flink.streaming.runtime.tasks.InputSelector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;

class SourceFetcher
implements InputFetcher {
    private final StreamSourceV2 operator;
    private final SourceInputProcessor processor;
    private final SourceFunction.SourceContext context;
    private final InputSelector.InputSelection inputSelection;
    private boolean isIdle = false;
    private volatile boolean finishedInput = false;
    private InputFetcher.InputFetcherAvailableListener listener;

    public SourceFetcher(InputSelector.InputSelection inputSelection, StreamSourceV2 operator, SourceFunction.SourceContext context, SourceInputProcessor processor) {
        this.inputSelection = (InputSelector.InputSelection)Preconditions.checkNotNull((Object)inputSelection);
        this.operator = (StreamSourceV2)Preconditions.checkNotNull((Object)operator);
        this.processor = (SourceInputProcessor)Preconditions.checkNotNull((Object)processor);
        this.context = (SourceFunction.SourceContext)Preconditions.checkNotNull((Object)context);
    }

    @Override
    public void setup() throws Exception {
    }

    @Override
    public boolean fetchAndProcess() throws Exception {
        if (this.isFinished()) {
            this.finishInput();
            return false;
        }
        SourceRecord sourceRecord = this.operator.next();
        if (sourceRecord != null) {
            Object out = sourceRecord.getRecord();
            if (sourceRecord.getWatermark() != null) {
                this.context.emitWatermark(sourceRecord.getWatermark());
            }
            if (out != null) {
                if (sourceRecord.getTimestamp() > 0L) {
                    this.context.collectWithTimestamp(out, sourceRecord.getTimestamp());
                } else {
                    this.context.collect(out);
                }
            }
            this.isIdle = false;
        } else {
            this.context.markAsTemporarilyIdle();
            this.isIdle = true;
        }
        if (this.isFinished()) {
            this.finishInput();
            return false;
        }
        return !this.isIdle;
    }

    private void finishInput() throws Exception {
        if (!this.finishedInput) {
            this.context.emitWatermark(Watermark.MAX_WATERMARK);
            new CheckpointLockDelegate(this.context.getCheckpointLock()).lockAndRun(() -> {
                this.processor.endInput();
                this.processor.release();
            });
            this.finishedInput = true;
        }
    }

    @Override
    public boolean isFinished() {
        return this.operator.isFinished();
    }

    @Override
    public boolean moreAvailable() {
        return !this.isFinished();
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void cancel() {
        this.operator.cancel();
    }

    @Override
    public InputSelector.InputSelection getInputSelection() {
        return this.inputSelection;
    }

    @Override
    public void registerAvailableListener(InputFetcher.InputFetcherAvailableListener listener) {
        Preconditions.checkState((this.listener == null ? 1 : 0) != 0);
        this.listener = listener;
    }
}

