/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.util.ArrayDeque;
import java.util.Deque;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public class StatefulSequenceSource
extends RichParallelSourceFunction<Long>
implements CheckpointedFunction {
    private static final long serialVersionUID = 1L;
    private final long start;
    private final long end;
    private volatile boolean isRunning = true;
    private transient Deque<Long> valuesToEmit;
    private transient ListState<Long> checkpointedState;

    public StatefulSequenceSource(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        Preconditions.checkState((this.checkpointedState == null ? 1 : 0) != 0, (Object)("The " + this.getClass().getSimpleName() + " has already been initialized."));
        this.checkpointedState = context.getOperatorStateStore().getOperatorState(new ListStateDescriptor("stateful-sequence-source-state", (TypeSerializer)LongSerializer.INSTANCE));
        this.valuesToEmit = new ArrayDeque<Long>();
        if (context.isRestored()) {
            for (Long v : (Iterable)this.checkpointedState.get()) {
                this.valuesToEmit.add(v);
            }
        } else {
            int stepSize = this.getRuntimeContext().getNumberOfParallelSubtasks();
            int taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
            long congruence = this.start + (long)taskIdx;
            long totalNoOfElements = Math.abs(this.end - this.start + 1L);
            int baseSize = StatefulSequenceSource.safeDivide(totalNoOfElements, stepSize);
            int toCollect = totalNoOfElements % (long)stepSize > (long)taskIdx ? baseSize + 1 : baseSize;
            for (long collected = 0L; collected < (long)toCollect; ++collected) {
                this.valuesToEmit.add(collected * (long)stepSize + congruence);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
        while (this.isRunning && !this.valuesToEmit.isEmpty()) {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                ctx.collect(this.valuesToEmit.poll());
            }
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        Preconditions.checkState((this.checkpointedState != null ? 1 : 0) != 0, (Object)("The " + this.getClass().getSimpleName() + " state has not been properly initialized."));
        this.checkpointedState.clear();
        for (Long v : this.valuesToEmit) {
            this.checkpointedState.add((Object)v);
        }
    }

    private static int safeDivide(long left, long right) {
        Preconditions.checkArgument((right > 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((left >= 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((left <= Integer.MAX_VALUE * right ? 1 : 0) != 0);
        return (int)(left / right);
    }
}

