/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.BitSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.SelectedReadingBarrierHandler;
import org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusSubMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskConfigSnapshot;
import org.apache.flink.util.Preconditions;

@Internal
public class ArbitraryInputStreamTask<OUT>
extends StreamTask<OUT, StreamOperator<OUT>> {
    private StreamArbitraryInputProcessor processor;

    public ArbitraryInputStreamTask(Environment env) {
        super(env);
    }

    public ArbitraryInputStreamTask(Environment environment, @Nullable ProcessingTimeService timeProvider) {
        super(environment, timeProvider);
    }

    @Override
    public void init() throws Exception {
        StreamTaskConfigSnapshot configuration = this.getStreamTaskConfig();
        ClassLoader userClassLoader = this.getUserCodeClassLoader();
        SelectedReadingBarrierHandler barrierHandler = null;
        if (this.getEnvironment().getAllInputGates().length > 0) {
            barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(configuration.isCheckpointingEnabled(), this, configuration.getCheckpointMode(), this.getEnvironment().getIOManager(), this.getEnvironment().getTaskManagerInfo().getConfiguration(), new InputGate[][]{this.getEnvironment().getAllInputGates()});
        }
        this.processor = new StreamArbitraryInputProcessor(this.getEnvironment().getIOManager(), this.getCheckpointLock(), this.operatorChain, this.getEnvironment().getMetricGroup(), barrierHandler);
        BitSet subStreamStatus = new BitSet();
        int sourceCount = 0;
        for (int headId : configuration.getChainedHeadNodeIds()) {
            StreamOperator streamOperator = this.operatorChain.getHeadOperator(headId);
            if (streamOperator instanceof StreamSource) {
                throw new UnsupportedOperationException("Source operator v1 is not supported in ArbitraryInputStreamTask");
            }
            if (!(streamOperator instanceof StreamSourceV2)) continue;
            StreamSourceV2 operator = (StreamSourceV2)streamOperator;
            StreamStatusSubMaintainer streamStatusSubMaintainer = new StreamStatusSubMaintainer(this.operatorChain, subStreamStatus, sourceCount);
            TimeCharacteristic timeCharacteristic = operator.getOperatorConfig().getTimeCharacteristic();
            Output collector = operator.getOutput();
            long watermarkInterval = operator.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
            SourceFunction.SourceContext context = StreamSourceContexts.getSourceContext(timeCharacteristic, this.getProcessingTimeService(), this.getCheckpointLock(), streamStatusSubMaintainer, collector, watermarkInterval, -1L);
            this.processor.bindSourceOperator(headId, operator, (OneInputStreamOperator)((Object)this.operatorChain.getOperatorProxy(headId)), context, streamStatusSubMaintainer);
        }
        Map<Integer, StreamConfig> operatorConfigs = configuration.getChainedNodeConfigs();
        int channelCount = 0;
        List<StreamEdge> inEdges = configuration.getInStreamEdgesOfChain();
        for (int i = 0; i < inEdges.size(); ++i) {
            StreamEdge streamEdge = inEdges.get(i);
            InputGate inputGate = this.getEnvironment().getInputGate(i);
            OperatorChain.AbstractStreamOperatorProxy headOperator = this.operatorChain.getOperatorProxy(streamEdge.getTargetId());
            Preconditions.checkNotNull((Object)headOperator);
            StreamConfig operatorConfig = operatorConfigs.get(streamEdge.getTargetId());
            Preconditions.checkNotNull((Object)operatorConfig);
            StreamStatusSubMaintainer streamStatusSubMaintainer = new StreamStatusSubMaintainer(this.operatorChain, subStreamStatus, sourceCount + i);
            if (headOperator instanceof OneInputStreamOperator) {
                this.processor.bindOneInputOperator(streamEdge, inputGate, channelCount, (OneInputStreamOperator)((Object)headOperator), operatorConfig.getTypeSerializerIn1(userClassLoader), streamStatusSubMaintainer, this.getExecutionConfig().isObjectReuseEnabled(), this.getEnvironment().getTaskManagerInfo().getConfiguration());
            } else if (headOperator instanceof TwoInputStreamOperator) {
                if (streamEdge.getTypeNumber() == 1) {
                    this.processor.bindFirstOfTwoInputOperator(streamEdge, inputGate, channelCount, (TwoInputStreamOperator)((Object)headOperator), operatorConfig.getTypeSerializerIn1(userClassLoader), streamStatusSubMaintainer, this.getExecutionConfig().isObjectReuseEnabled(), this.getEnvironment().getTaskManagerInfo().getConfiguration());
                } else {
                    this.processor.bindSecondOfTwoInputOperator(streamEdge, inputGate, channelCount, (TwoInputStreamOperator)((Object)headOperator), operatorConfig.getTypeSerializerIn2(userClassLoader), streamStatusSubMaintainer, this.getExecutionConfig().isObjectReuseEnabled(), this.getEnvironment().getTaskManagerInfo().getConfiguration());
                }
            } else {
                throw new RuntimeException("Unsupported of " + headOperator + " yet");
            }
            channelCount += inputGate.getNumberOfInputChannels();
        }
    }

    @Override
    protected void run() throws Exception {
        this.processor.process();
    }

    @Override
    protected void cleanup() throws Exception {
        if (this.processor != null) {
            this.processor.cleanup();
        }
    }

    @Override
    protected void cancelTask() {
        if (this.processor != null) {
            this.processor.stop();
        }
    }
}

