/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;

@Internal
public class InputFormatSourceFunction<OUT>
extends RichParallelSourceFunction<OUT> {
    private static final long serialVersionUID = 1L;
    private TypeInformation<OUT> typeInfo;
    private transient TypeSerializer<OUT> serializer;
    private InputFormat<OUT, InputSplit> format;
    private transient InputSplitProvider provider;
    private transient Iterator<InputSplit> splitIterator;
    private volatile boolean isRunning = true;

    public InputFormatSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
        this.format = format;
        this.typeInfo = typeInfo;
    }

    public void open(Configuration parameters) throws Exception {
        StreamingRuntimeContext context = (StreamingRuntimeContext)this.getRuntimeContext();
        if (this.format instanceof RichInputFormat) {
            ((RichInputFormat)this.format).setRuntimeContext((RuntimeContext)context);
        }
        this.format.configure(parameters);
        this.provider = context.getInputSplitProvider();
        this.serializer = this.typeInfo.createSerializer(this.getRuntimeContext().getExecutionConfig());
        this.splitIterator = this.getInputSplits();
        this.isRunning = this.splitIterator.hasNext();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(SourceFunction.SourceContext<OUT> ctx) throws Exception {
        try {
            Counter completedSplitsCounter = this.getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
            if (this.isRunning && this.format instanceof RichInputFormat) {
                ((RichInputFormat)this.format).openInputFormat();
            }
            Object nextElement = this.serializer.createInstance();
            while (this.isRunning) {
                this.format.open(this.splitIterator.next());
                while (this.isRunning && !this.format.reachedEnd() && (nextElement = this.format.nextRecord(nextElement)) != null) {
                    ctx.collect(nextElement);
                }
                this.format.close();
                completedSplitsCounter.inc();
                if (!this.isRunning) continue;
                this.isRunning = this.splitIterator.hasNext();
            }
        }
        finally {
            this.format.close();
            if (this.format instanceof RichInputFormat) {
                ((RichInputFormat)this.format).closeInputFormat();
            }
            this.isRunning = false;
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    public void close() throws Exception {
        this.format.close();
        if (this.format instanceof RichInputFormat) {
            ((RichInputFormat)this.format).closeInputFormat();
        }
    }

    public InputFormat<OUT, InputSplit> getFormat() {
        return this.format;
    }

    private Iterator<InputSplit> getInputSplits() {
        return new Iterator<InputSplit>(){
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                InputSplit split = InputFormatSourceFunction.this.provider.getNextInputSplit(InputFormatSourceFunction.this.getRuntimeContext().getUserCodeClassLoader());
                if (split != null) {
                    this.nextSplit = split;
                    return true;
                }
                this.exhausted = true;
                return false;
            }

            @Override
            public InputSplit next() {
                if (this.nextSplit == null && !this.hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit tmp = this.nextSplit;
                this.nextSplit = null;
                return tmp;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}

