/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ContinuousFileReaderOperator<OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>,
OutputTypeConfigurable<OUT>,
CheckpointedRestoringOperator {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
    private FileInputFormat<OUT> format;
    private TypeSerializer<OUT> serializer;
    private transient Object checkpointLock;
    private transient SplitReader<OUT> reader;
    private transient SourceFunction.SourceContext<OUT> readerContext;
    private transient ListState<TimestampedFileInputSplit> checkpointedState;
    private transient List<TimestampedFileInputSplit> restoredReaderState;

    public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
        this.format = (FileInputFormat)Preconditions.checkNotNull(format);
    }

    @Override
    public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
        this.serializer = outTypeInfo.createSerializer(executionConfig);
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        Preconditions.checkState((this.checkpointedState == null ? 1 : 0) != 0, (Object)"The reader state has already been initialized.");
        this.checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
        int subtaskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
        if (context.isRestored()) {
            LOG.info("Restoring state for the {} (taskIdx={}).", (Object)this.getClass().getSimpleName(), (Object)subtaskIdx);
            if (this.restoredReaderState == null) {
                this.restoredReaderState = new ArrayList<TimestampedFileInputSplit>();
                for (TimestampedFileInputSplit split : (Iterable)this.checkpointedState.get()) {
                    this.restoredReaderState.add(split);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} (taskIdx={}) restored {}.", new Object[]{this.getClass().getSimpleName(), subtaskIdx, this.restoredReaderState});
                }
            }
        } else {
            LOG.info("No state to restore for the {} (taskIdx={}).", (Object)this.getClass().getSimpleName(), (Object)subtaskIdx);
        }
    }

    @Override
    public void open() throws Exception {
        super.open();
        Preconditions.checkState((this.reader == null ? 1 : 0) != 0, (Object)"The reader is already initialized.");
        Preconditions.checkState((this.serializer != null ? 1 : 0) != 0, (Object)"The serializer has not been set. Probably the setOutputType() was not called. Please report it.");
        this.format.setRuntimeContext((RuntimeContext)this.getRuntimeContext());
        this.format.configure(new Configuration());
        this.checkpointLock = this.getContainingTask().getCheckpointLock();
        TimeCharacteristic timeCharacteristic = this.getOperatorConfig().getTimeCharacteristic();
        long watermarkInterval = this.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
        this.readerContext = StreamSourceContexts.getSourceContext(timeCharacteristic, this.getProcessingTimeService(), this.checkpointLock, this.getContainingTask().getStreamStatusMaintainer(), this.output, watermarkInterval, -1L);
        this.reader = new SplitReader(this.format, this.serializer, this.readerContext, this.checkpointLock, this.restoredReaderState);
        this.restoredReaderState = null;
        this.reader.start();
    }

    @Override
    public void processElement(StreamRecord<TimestampedFileInputSplit> element) throws Exception {
        ((SplitReader)this.reader).addSplit(element.getValue());
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
    }

    @Override
    public void dispose() throws Exception {
        super.dispose();
        this.reader.cancel();
        try {
            this.reader.join(200L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        while (this.reader.isAlive()) {
            StackTraceElement[] stack;
            StringBuilder bld = new StringBuilder();
            for (StackTraceElement e : stack = this.reader.getStackTrace()) {
                bld.append(e).append('\n');
            }
            LOG.warn("The reader is stuck in method:\n {}", (Object)bld.toString());
            this.reader.interrupt();
            try {
                this.reader.join(50L);
            }
            catch (InterruptedException interruptedException) {}
        }
        this.reader = null;
        this.readerContext = null;
        this.restoredReaderState = null;
        this.format = null;
        this.serializer = null;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.reader != null && this.reader.isAlive() && this.reader.isRunning()) {
            this.reader.close();
            this.checkpointLock.wait();
        }
        if (this.readerContext != null) {
            this.readerContext.emitWatermark(Watermark.MAX_WATERMARK);
            this.readerContext.close();
        }
        this.output.close();
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        Preconditions.checkState((this.checkpointedState != null ? 1 : 0) != 0, (Object)"The operator state has not been properly initialized.");
        int subtaskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
        this.checkpointedState.clear();
        List readerState = ((SplitReader)this.reader).getReaderState();
        try {
            for (TimestampedFileInputSplit split : readerState) {
                this.checkpointedState.add((Object)split);
            }
        }
        catch (Exception e) {
            this.checkpointedState.clear();
            throw new Exception("Could not add timestamped file input splits to to operator state backend of operator " + this.getOperatorName() + '.', e);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.", new Object[]{this.getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState});
        }
    }

    @Override
    public void restoreState(FSDataInputStream in) throws Exception {
        LOG.info("{} (taskIdx={}) restoring state from an older Flink version.", (Object)this.getClass().getSimpleName(), (Object)this.getRuntimeContext().getIndexOfThisSubtask());
        int hasUdfState = in.read();
        Preconditions.checkArgument((hasUdfState == 0 ? 1 : 0) != 0);
        ObjectInputStream ois = new ObjectInputStream((InputStream)in);
        DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
        FileInputSplit currSplit = (FileInputSplit)ois.readObject();
        LinkedList<FileInputSplit> pendingSplits = new LinkedList<FileInputSplit>();
        int noOfSplits = div.readInt();
        for (int i = 0; i < noOfSplits; ++i) {
            FileInputSplit split = (FileInputSplit)ois.readObject();
            pendingSplits.add(split);
        }
        Serializable formatState = (Serializable)ois.readObject();
        div.close();
        if (this.restoredReaderState == null) {
            this.restoredReaderState = new ArrayList<TimestampedFileInputSplit>();
        }
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        long runningModTime = Math.max(now, (long)(noOfSplits + 1));
        TimestampedFileInputSplit currentSplit = this.createTimestampedFileSplit(currSplit, --runningModTime, formatState);
        this.restoredReaderState.add(currentSplit);
        for (FileInputSplit split : pendingSplits) {
            TimestampedFileInputSplit timestampedSplit = this.createTimestampedFileSplit(split, --runningModTime);
            this.restoredReaderState.add(timestampedSplit);
        }
        if (LOG.isDebugEnabled() && LOG.isDebugEnabled()) {
            LOG.debug("{} (taskIdx={}) restored {} splits from legacy: {}.", new Object[]{this.getClass().getSimpleName(), this.getRuntimeContext().getIndexOfThisSubtask(), this.restoredReaderState.size(), this.restoredReaderState});
        }
    }

    private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime) {
        return this.createTimestampedFileSplit(split, modificationTime, null);
    }

    private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime, Serializable state) {
        TimestampedFileInputSplit timestampedSplit = new TimestampedFileInputSplit(modificationTime, split.getSplitNumber(), split.getPath(), split.getStart(), split.getLength(), split.getHostnames());
        if (state != null) {
            timestampedSplit.setSplitState(state);
        }
        return timestampedSplit;
    }

    private class SplitReader<OT>
    extends Thread {
        private volatile boolean shouldClose;
        private volatile boolean isRunning;
        private final FileInputFormat<OT> format;
        private final TypeSerializer<OT> serializer;
        private final Object checkpointLock;
        private final SourceFunction.SourceContext<OT> readerContext;
        private final Queue<TimestampedFileInputSplit> pendingSplits;
        private TimestampedFileInputSplit currentSplit;
        private volatile boolean isSplitOpen;

        private SplitReader(FileInputFormat<OT> format, TypeSerializer<OT> serializer, SourceFunction.SourceContext<OT> readerContext, Object checkpointLock, List<TimestampedFileInputSplit> restoredState) {
            this.format = (FileInputFormat)Preconditions.checkNotNull(format, (String)"Unspecified FileInputFormat.");
            this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer, (String)"Unspecified Serializer.");
            this.readerContext = (SourceFunction.SourceContext)Preconditions.checkNotNull(readerContext, (String)"Unspecified Reader Context.");
            this.checkpointLock = Preconditions.checkNotNull((Object)checkpointLock, (String)"Unspecified checkpoint lock.");
            this.shouldClose = false;
            this.isRunning = true;
            this.pendingSplits = new PriorityQueue<TimestampedFileInputSplit>();
            if (restoredState != null) {
                this.pendingSplits.addAll(restoredState);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void addSplit(TimestampedFileInputSplit split) {
            Preconditions.checkNotNull((Object)split, (String)"Cannot insert a null value in the pending splits queue.");
            Object object = this.checkpointLock;
            synchronized (object) {
                this.pendingSplits.add(split);
            }
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object completedSplitsCounter;
            try {
                completedSplitsCounter = ContinuousFileReaderOperator.this.getMetricGroup().counter("numSplitsProcessed");
                this.format.openInputFormat();
                while (this.isRunning) {
                    Object nextElement;
                    Object object = this.checkpointLock;
                    synchronized (object) {
                        if (this.currentSplit == null) {
                            this.currentSplit = this.pendingSplits.poll();
                            if (this.currentSplit == null) {
                                if (this.shouldClose) {
                                    this.isRunning = false;
                                } else {
                                    this.checkpointLock.wait(50L);
                                }
                                continue;
                            }
                        }
                        if (this.format instanceof CheckpointableInputFormat && this.currentSplit.getSplitState() != null) {
                            ((CheckpointableInputFormat)this.format).reopen((InputSplit)this.currentSplit, this.currentSplit.getSplitState());
                        } else {
                            this.format.open((FileInputSplit)this.currentSplit);
                        }
                        this.currentSplit.resetSplitState();
                        this.isSplitOpen = true;
                    }
                    LOG.debug("Reading split: " + this.currentSplit);
                    try {
                        nextElement = this.serializer.createInstance();
                        while (!this.format.reachedEnd()) {
                            Object object2 = this.checkpointLock;
                            synchronized (object2) {
                                nextElement = this.format.nextRecord(nextElement);
                                if (nextElement == null) {
                                    break;
                                }
                                this.readerContext.collect(nextElement);
                            }
                        }
                        completedSplitsCounter.inc();
                    }
                    finally {
                        nextElement = this.checkpointLock;
                        synchronized (nextElement) {
                            this.format.close();
                            this.isSplitOpen = false;
                            this.currentSplit = null;
                        }
                    }
                }
            }
            catch (Throwable e) {
                ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception when processing split: " + this.currentSplit, e);
            }
            finally {
                completedSplitsCounter = this.checkpointLock;
                synchronized (completedSplitsCounter) {
                    LOG.debug("Reader terminated, and exiting...");
                    try {
                        this.format.closeInputFormat();
                    }
                    catch (IOException e) {
                        ContinuousFileReaderOperator.this.getContainingTask().handleAsyncException("Caught exception from " + this.format.getClass().getName() + ".closeInputFormat() : " + e.getMessage(), e);
                    }
                    this.isSplitOpen = false;
                    this.currentSplit = null;
                    this.isRunning = false;
                    this.checkpointLock.notifyAll();
                }
            }
        }

        private List<TimestampedFileInputSplit> getReaderState() throws IOException {
            ArrayList<TimestampedFileInputSplit> snapshot = new ArrayList<TimestampedFileInputSplit>(this.pendingSplits.size());
            if (this.currentSplit != null) {
                if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
                    Serializable formatState = ((CheckpointableInputFormat)this.format).getCurrentState();
                    this.currentSplit.setSplitState(formatState);
                }
                snapshot.add(this.currentSplit);
            }
            snapshot.addAll(this.pendingSplits);
            return snapshot;
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void close() {
            this.shouldClose = true;
        }
    }
}

