/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.StateBootstrapTransformation;
import org.apache.flink.state.api.output.FileCopyFunction;
import org.apache.flink.state.api.output.MergeOperatorStates;
import org.apache.flink.state.api.output.SavepointOutputFormat;
import org.apache.flink.state.api.output.StatePathExtractor;
import org.apache.flink.state.api.output.operators.GroupReduceOperator;
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.state.api.runtime.StateBootstrapTransformationWithID;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadataV2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public class SavepointWriter {
    protected final SavepointMetadataV2 metadata;
    @Nullable
    protected final StateBackend stateBackend;
    private final Configuration configuration;

    public static SavepointWriter fromExistingSavepoint(String path) throws IOException {
        CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata(path);
        int maxParallelism = metadata.getOperatorStates().stream().map(OperatorState::getMaxParallelism).max(Comparator.naturalOrder()).orElseThrow(() -> new RuntimeException("Savepoint must contain at least one operator state."));
        SavepointMetadataV2 savepointMetadata = new SavepointMetadataV2(maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
        return new SavepointWriter(savepointMetadata, null);
    }

    public static SavepointWriter fromExistingSavepoint(String path, StateBackend stateBackend) throws IOException {
        CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata(path);
        int maxParallelism = metadata.getOperatorStates().stream().map(OperatorState::getMaxParallelism).max(Comparator.naturalOrder()).orElseThrow(() -> new RuntimeException("Savepoint must contain at least one operator state."));
        SavepointMetadataV2 savepointMetadata = new SavepointMetadataV2(maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
        return new SavepointWriter(savepointMetadata, stateBackend);
    }

    public static SavepointWriter newSavepoint(int maxParallelism) {
        Preconditions.checkArgument((maxParallelism > 0 && maxParallelism <= 32768 ? 1 : 0) != 0, (Object)("Maximum parallelism must be between 1 and 32768. Found: " + maxParallelism));
        SavepointMetadataV2 metadata = new SavepointMetadataV2(maxParallelism, Collections.emptyList(), Collections.emptyList());
        return new SavepointWriter(metadata, null);
    }

    public static SavepointWriter newSavepoint(StateBackend stateBackend, int maxParallelism) {
        Preconditions.checkArgument((maxParallelism > 0 && maxParallelism <= 32768 ? 1 : 0) != 0, (Object)("Maximum parallelism must be between 1 and 32768. Found: " + maxParallelism));
        SavepointMetadataV2 metadata = new SavepointMetadataV2(maxParallelism, Collections.emptyList(), Collections.emptyList());
        return new SavepointWriter(metadata, stateBackend);
    }

    private SavepointWriter(SavepointMetadataV2 metadata, @Nullable StateBackend stateBackend) {
        Preconditions.checkNotNull((Object)metadata, (String)"The savepoint metadata must not be null");
        this.metadata = metadata;
        this.stateBackend = stateBackend;
        this.configuration = new Configuration();
    }

    public SavepointWriter removeOperator(String uid) {
        this.metadata.removeOperator(uid);
        return this;
    }

    public <T> SavepointWriter withOperator(String uid, StateBootstrapTransformation<T> transformation) {
        this.metadata.addOperator(uid, transformation);
        return this;
    }

    public <T> SavepointWriter withConfiguration(ConfigOption<T> option, T value) {
        this.configuration.set(option, value);
        return this;
    }

    public final void write(String path) {
        DataStream finalOperatorStates;
        Path savepointPath = new Path(path);
        List<StateBootstrapTransformationWithID<?>> newOperatorTransformations = this.metadata.getNewOperators();
        DataStream newOperatorStates = this.writeOperatorStates(newOperatorTransformations, this.configuration, savepointPath);
        List<OperatorState> existingOperators = this.metadata.getExistingOperators();
        if (existingOperators.isEmpty()) {
            finalOperatorStates = newOperatorStates;
        } else {
            SingleOutputStreamOperator existingOperatorStates = newOperatorStates.getExecutionEnvironment().fromCollection(existingOperators).name("existingOperatorStates");
            existingOperatorStates.flatMap((FlatMapFunction)new StatePathExtractor()).setParallelism(1).addSink((SinkFunction)new OutputFormatSinkFunction((OutputFormat)new FileCopyFunction(path)));
            finalOperatorStates = newOperatorStates.union(new DataStream[]{existingOperatorStates});
        }
        finalOperatorStates.transform("reduce(OperatorState)", TypeInformation.of(CheckpointMetadata.class), new GroupReduceOperator<OperatorState, CheckpointMetadata>(new MergeOperatorStates(this.metadata.getMasterStates()))).forceNonParallel().addSink((SinkFunction)new OutputFormatSinkFunction((OutputFormat)new SavepointOutputFormat(savepointPath))).setParallelism(1).name(path);
    }

    private DataStream<OperatorState> writeOperatorStates(List<StateBootstrapTransformationWithID<?>> newOperatorStates, Configuration config, Path savepointWritePath) {
        return newOperatorStates.stream().map(newOperatorState -> newOperatorState.getBootstrapTransformation().writeOperatorState(newOperatorState.getOperatorID(), this.stateBackend, config, this.metadata.getMaxParallelism(), savepointWritePath)).reduce((rec$, xva$0) -> ((DataStream)rec$).union(new DataStream[]{xva$0})).orElseThrow(() -> new IllegalStateException("Savepoint must contain at least one operator"));
    }
}

