/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.graphalgo.beta.pregel;

import com.carrotsearch.hppc.BitSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.BaseStream;
import java.util.stream.LongStream;
import org.jctools.queues.MpscLinkedQueue;
import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.collection.primitive.PrimitiveLongIterable;
import org.neo4j.graphalgo.api.Degrees;
import org.neo4j.graphalgo.api.Graph;
import org.neo4j.graphalgo.api.NodeProperties;
import org.neo4j.graphalgo.api.RelationshipIterator;
import org.neo4j.graphalgo.beta.pregel.PregelComputation;
import org.neo4j.graphalgo.beta.pregel.PregelConfig;
import org.neo4j.graphalgo.beta.pregel.PregelContext;
import org.neo4j.graphalgo.core.concurrency.ParallelUtil;
import org.neo4j.graphalgo.core.utils.LazyBatchCollection;
import org.neo4j.graphalgo.core.utils.LazyMappingCollection;
import org.neo4j.graphalgo.core.utils.paged.AllocationTracker;
import org.neo4j.graphalgo.core.utils.paged.HugeDoubleArray;
import org.neo4j.graphalgo.core.utils.paged.HugeObjectArray;

public final class Pregel {
    private static final Double TERMINATION_SYMBOL = Double.NaN;
    private final PregelConfig config;
    private final PregelComputation computation;
    private final Graph graph;
    private final HugeDoubleArray nodeValues;
    private final HugeObjectArray<MpscLinkedQueue<Double>> messageQueues;
    private final int batchSize;
    private final int concurrency;
    private final ExecutorService executor;
    private int iterations;

    public static Pregel withDefaultNodeValues(Graph graph, PregelConfig config, PregelComputation computation, int batchSize, int concurrency, ExecutorService executor, AllocationTracker tracker) {
        double defaultNodeValue = config.getInitialNodeValue();
        HugeDoubleArray hugeDoubleArray = HugeDoubleArray.newArray((long)graph.nodeCount(), (AllocationTracker)tracker);
        ParallelUtil.parallelStreamConsume((BaseStream)LongStream.range(0L, graph.nodeCount()), (int)concurrency, nodeIds -> nodeIds.forEach(nodeId -> hugeDoubleArray.set(nodeId, defaultNodeValue)));
        return new Pregel(graph, config, computation, hugeDoubleArray, batchSize, concurrency, executor, tracker);
    }

    public static Pregel withInitialNodeValues(Graph graph, PregelConfig config, PregelComputation computation, NodeProperties initialNodeValues, int batchSize, int concurrency, ExecutorService executor, AllocationTracker tracker) {
        HugeDoubleArray hugeDoubleArray = HugeDoubleArray.newArray((long)graph.nodeCount(), (AllocationTracker)tracker);
        ParallelUtil.parallelStreamConsume((BaseStream)LongStream.range(0L, graph.nodeCount()), (int)concurrency, nodeIds -> nodeIds.forEach(nodeId -> hugeDoubleArray.set(nodeId, initialNodeValues.nodeProperty(nodeId))));
        return new Pregel(graph, config, computation, hugeDoubleArray, batchSize, concurrency, executor, tracker);
    }

    private Pregel(Graph graph, PregelConfig config, PregelComputation computation, HugeDoubleArray initialNodeValues, int batchSize, int concurrency, ExecutorService executor, AllocationTracker tracker) {
        this.graph = graph;
        this.config = config;
        this.computation = computation;
        this.nodeValues = initialNodeValues;
        this.batchSize = batchSize;
        this.concurrency = concurrency;
        this.executor = executor;
        this.messageQueues = this.initLinkedQueues(graph, tracker);
    }

    public HugeDoubleArray run(int maxIterations) {
        this.iterations = 0;
        boolean canHalt = false;
        BitSet receiverBits = new BitSet(this.graph.nodeCount());
        BitSet voteBits = new BitSet(this.graph.nodeCount());
        Collection nodeBatches = LazyBatchCollection.of((long)this.graph.nodeCount(), (long)this.batchSize, (start, length) -> () -> PrimitiveLongCollections.range((long)start, (long)(start + length - 1L)));
        while (this.iterations < maxIterations && !canHalt) {
            int iteration;
            ++this.iterations;
            List<ComputeStep> computeSteps = this.runComputeSteps(nodeBatches, iteration, receiverBits, voteBits);
            receiverBits = this.unionBitSets(computeSteps, ComputeStep::getSenders);
            voteBits = this.unionBitSets(computeSteps, ComputeStep::getVotes);
            if (receiverBits.nextSetBit(0) != -1) continue;
            canHalt = true;
        }
        return this.nodeValues;
    }

    public int getIterations() {
        return this.iterations;
    }

    private BitSet unionBitSets(Collection<ComputeStep> computeSteps, Function<ComputeStep, BitSet> fn) {
        return (BitSet)ParallelUtil.parallelStream(computeSteps.stream(), (int)this.concurrency, stream -> stream.map(fn).reduce((bitSet1, bitSet2) -> {
            bitSet1.union(bitSet2);
            return bitSet1;
        }).orElseGet(BitSet::new));
    }

    private List<ComputeStep> runComputeSteps(Collection<PrimitiveLongIterable> nodeBatches, int iteration, BitSet messageBits, BitSet voteToHaltBits) {
        ArrayList<ComputeStep> tasks = new ArrayList<ComputeStep>(nodeBatches.size());
        if (!this.config.isAsynchronous() && iteration > 0) {
            ParallelUtil.parallelStreamConsume((BaseStream)LongStream.range(0L, this.graph.nodeCount()), (int)this.concurrency, nodeIds -> nodeIds.forEach(nodeId -> {
                if (messageBits.get(nodeId)) {
                    ((MpscLinkedQueue)this.messageQueues.get(nodeId)).add((Object)TERMINATION_SYMBOL);
                }
            }));
        }
        Collection computeSteps = LazyMappingCollection.of(nodeBatches, nodeBatch -> {
            ComputeStep task = new ComputeStep(this.computation, this.config, this.graph.nodeCount(), iteration, (PrimitiveLongIterable)nodeBatch, (Degrees)this.graph, this.nodeValues, messageBits, voteToHaltBits, (HugeObjectArray)this.messageQueues, (RelationshipIterator)this.graph);
            tasks.add(task);
            return task;
        });
        ParallelUtil.runWithConcurrency((int)this.concurrency, (Collection)computeSteps, (ExecutorService)this.executor);
        return tasks;
    }

    private HugeObjectArray<MpscLinkedQueue<Double>> initLinkedQueues(Graph graph, AllocationTracker tracker) {
        Class<?> queueClass = MpscLinkedQueue.newMpscLinkedQueue().getClass();
        HugeObjectArray messageQueues = HugeObjectArray.newArray(queueClass, (long)graph.nodeCount(), (AllocationTracker)tracker);
        ParallelUtil.parallelStreamConsume((BaseStream)LongStream.range(0L, graph.nodeCount()), (int)this.concurrency, nodeIds -> nodeIds.forEach(nodeId -> messageQueues.set(nodeId, (Object)MpscLinkedQueue.newMpscLinkedQueue())));
        return messageQueues;
    }

    public static final class ComputeStep
    implements Runnable {
        private final int iteration;
        private final PregelComputation computation;
        private final PregelContext pregelContext;
        private final BitSet senderBits;
        private final BitSet receiverBits;
        private final BitSet voteBits;
        private final PrimitiveLongIterable nodeBatch;
        private final Degrees degrees;
        private final HugeDoubleArray nodeValues;
        private final HugeObjectArray<? extends Queue<Double>> messageQueues;
        private final RelationshipIterator relationshipIterator;

        private ComputeStep(PregelComputation computation, PregelConfig config, long globalNodeCount, int iteration, PrimitiveLongIterable nodeBatch, Degrees degrees, HugeDoubleArray nodeValues, BitSet receiverBits, BitSet voteBits, HugeObjectArray<? extends Queue<Double>> messageQueues, RelationshipIterator relationshipIterator) {
            this.iteration = iteration;
            this.computation = computation;
            this.senderBits = new BitSet(globalNodeCount);
            this.receiverBits = receiverBits;
            this.voteBits = voteBits;
            this.nodeBatch = nodeBatch;
            this.degrees = degrees;
            this.nodeValues = nodeValues;
            this.messageQueues = messageQueues;
            this.relationshipIterator = relationshipIterator.concurrentCopy();
            this.pregelContext = new PregelContext(this, config);
        }

        @Override
        public void run() {
            for (long nodeId : this.nodeBatch) {
                if (!this.receiverBits.get(nodeId) && this.voteBits.get(nodeId)) continue;
                this.voteBits.clear(nodeId);
                this.computation.compute(this.pregelContext, nodeId, this.receiveMessages(nodeId));
            }
        }

        BitSet getSenders() {
            return this.senderBits;
        }

        BitSet getVotes() {
            return this.voteBits;
        }

        public int getIteration() {
            return this.iteration;
        }

        int getDegree(long nodeId) {
            return this.degrees.degree(nodeId);
        }

        double getNodeValue(long nodeId) {
            return this.nodeValues.get(nodeId);
        }

        void setNodeValue(long nodeId, double value) {
            this.nodeValues.set(nodeId, value);
        }

        void voteToHalt(long nodeId) {
            this.voteBits.set(nodeId);
        }

        void sendMessages(long nodeId, double message) {
            this.relationshipIterator.forEachRelationship(nodeId, (sourceNodeId, targetNodeId) -> {
                ((Queue)this.messageQueues.get(targetNodeId)).add(message);
                this.senderBits.set(targetNodeId);
                return true;
            });
        }

        private Queue<Double> receiveMessages(long nodeId) {
            return this.receiverBits.get(nodeId) ? (Queue)this.messageQueues.get(nodeId) : null;
        }
    }
}

