package org.neo4j.gds.beta.pregel;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.immutables.value.Value;
import org.neo4j.gds.api.Graph;
import org.neo4j.gds.beta.pregel.PregelConfig;
import org.neo4j.gds.beta.pregel.context.MasterComputeContext;
import org.neo4j.gds.core.concurrency.Pools;
import org.neo4j.gds.core.utils.mem.AllocationTracker;
import org.neo4j.gds.core.utils.mem.MemoryEstimation;
import org.neo4j.gds.core.utils.mem.MemoryEstimations;
import org.neo4j.gds.core.utils.paged.HugeAtomicBitSet;
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
import org.neo4j.gds.core.utils.progress.tasks.Task;
import org.neo4j.gds.core.utils.progress.tasks.Tasks;

@Value.Style(builderVisibility = Value.Style.BuilderVisibility.PUBLIC, depluralize = true, deepImmutablesDetection = true)
/* loaded from: input_file:org/neo4j/gds/beta/pregel/Pregel.class */
public final class Pregel<CONFIG extends PregelConfig> {
    private final CONFIG config;
    private final PregelComputation<CONFIG> computation;
    private final Graph graph;
    private final NodeValue nodeValues;
    private final Messenger<?> messenger;
    private final PregelComputer<CONFIG> computer;
    private final ProgressTracker progressTracker;
    private final ExecutorService executor;

    public static <CONFIG extends PregelConfig> Pregel<CONFIG> create(Graph graph, CONFIG config, PregelComputation<CONFIG> pregelComputation, ExecutorService executorService, AllocationTracker allocationTracker, ProgressTracker progressTracker) {
        ImmutablePregelConfig.copyOf(config);
        return new Pregel<>(graph, config, pregelComputation, NodeValue.of(pregelComputation.schema(config), graph.nodeCount(), config.concurrency(), allocationTracker), executorService, allocationTracker, progressTracker);
    }

    public static MemoryEstimation memoryEstimation(PregelSchema pregelSchema, boolean z, boolean z2) {
        MemoryEstimations.Builder add = MemoryEstimations.builder(Pregel.class).perNode("vote bits", HugeAtomicBitSet::memoryEstimation).perThread("compute steps", MemoryEstimations.builder(PartitionedComputeStep.class).build()).add("node value", NodeValue.memoryEstimation(pregelSchema));
        if (!z) {
            add.add("message arrays", ReducingMessenger.memoryEstimation());
        } else if (z2) {
            add.add("message queues", AsyncQueueMessenger.memoryEstimation());
        } else {
            add.add("message queues", SyncQueueMessenger.memoryEstimation());
        }
        return add.build();
    }

    public static <CONFIG extends PregelConfig> Task progressTask(Graph graph, CONFIG config, String str) {
        return Tasks.iterativeDynamic(str, () -> {
            return List.of(Tasks.leaf("Compute iteration", graph.nodeCount()), Tasks.leaf("Master compute iteration", graph.nodeCount()));
        }, config.maxIterations());
    }

    public static <CONFIG extends PregelConfig> Task progressTask(Graph graph, CONFIG config) {
        return progressTask(graph, config, config.getClass().getSimpleName().replaceAll("(Mutate|Stream|Write|Stats)*Config", ""));
    }

    private Pregel(Graph graph, CONFIG config, PregelComputation<CONFIG> pregelComputation, NodeValue nodeValue, ExecutorService executorService, AllocationTracker allocationTracker, ProgressTracker progressTracker) {
        this.graph = graph;
        this.config = config;
        this.computation = pregelComputation;
        this.nodeValues = nodeValue;
        this.executor = executorService;
        this.progressTracker = progressTracker;
        Optional<Reducer> reducer = pregelComputation.reducer();
        this.messenger = reducer.isPresent() ? new ReducingMessenger(graph, config, reducer.get(), allocationTracker) : config.isAsynchronous() ? new AsyncQueueMessenger(graph.nodeCount(), allocationTracker) : new SyncQueueMessenger(graph.nodeCount(), allocationTracker);
        this.computer = PregelComputer.builder().graph(graph).computation(pregelComputation).config(config).nodeValues(this.nodeValues).messenger(this.messenger).voteBits(HugeAtomicBitSet.fixed(graph.nodeCount(), allocationTracker)).executorService(config.useForkJoin() ? Pools.createForkJoinPool(config.concurrency()) : executorService).progressTracker(progressTracker).build();
    }

    public PregelResult run() {
        boolean z = false;
        this.computer.initComputation();
        try {
            this.progressTracker.beginSubTask();
            int i = 0;
            while (i < this.config.maxIterations()) {
                try {
                    this.progressTracker.beginSubTask();
                    this.computer.initIteration(i);
                    this.messenger.initIteration(i);
                    this.computer.runIteration();
                    this.progressTracker.endSubTask();
                    try {
                        this.progressTracker.beginSubTask();
                        z = runMasterComputeStep(i) || this.computer.hasConverged();
                        this.progressTracker.endSubTask();
                        if (z) {
                            break;
                        }
                        i++;
                    } finally {
                    }
                } finally {
                }
            }
            PregelResult build = ImmutablePregelResult.builder().nodeValues(this.nodeValues).didConverge(z).ranIterations(i).build();
            this.progressTracker.endSubTask();
            this.computer.release();
            return build;
        } catch (Throwable th) {
            this.progressTracker.endSubTask();
            this.computer.release();
            throw th;
        }
    }

    public void release() {
        this.progressTracker.release();
        this.messenger.release();
    }

    private boolean runMasterComputeStep(int i) {
        return this.computation.masterCompute(new MasterComputeContext<>(this.config, this.graph, i, this.nodeValues, this.executor));
    }
}
