/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.frame.processor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import it.unimi.dsi.fastutil.longs.LongRBTreeSet;
import it.unimi.dsi.fastutil.longs.LongSortedSet;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFileFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameFileChannel;
import org.apache.druid.frame.file.FrameFile;
import org.apache.druid.frame.file.FrameFileWriter;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.frame.processor.FrameChannelBatcher;
import org.apache.druid.frame.processor.FrameChannelMerger;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.frame.processor.OutputChannels;
import org.apache.druid.frame.processor.SuperSorterProgressTracker;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CloseableUtils;

public class SuperSorter {
    private static final Logger log = new Logger(SuperSorter.class);
    public static final int UNKNOWN_LEVEL = -1;
    public static final long UNKNOWN_TOTAL = -1L;
    private final List<ReadableFrameChannel> inputChannels;
    private final FrameReader frameReader;
    private final ClusterBy clusterBy;
    private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture;
    private final FrameProcessorExecutor exec;
    private final File directory;
    private final OutputChannelFactory outputChannelFactory;
    private final Supplier<MemoryAllocator> innerFrameAllocatorMaker;
    private final int maxChannelsPerProcessor;
    private final int maxActiveProcessors;
    private final long rowLimit;
    private final String cancellationId;
    private final Object runWorkersLock = new Object();
    @GuardedBy(value="runWorkersLock")
    private boolean batcherIsRunning = false;
    @GuardedBy(value="runWorkersLock")
    private IntSet inputChannelsToRead = new IntOpenHashSet();
    @GuardedBy(value="runWorkersLock")
    private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new Int2ObjectArrayMap();
    @GuardedBy(value="runWorkersLock")
    private List<OutputChannel> outputChannels = null;
    @GuardedBy(value="runWorkersLock")
    private int activeProcessors = 0;
    @GuardedBy(value="runWorkersLock")
    private long totalInputFrames = -1L;
    @GuardedBy(value="runWorkersLock")
    private int totalMergingLevels = -1;
    @GuardedBy(value="runWorkersLock")
    private final Queue<Frame> inputBuffer = new ArrayDeque<Frame>();
    @GuardedBy(value="runWorkersLock")
    private long inputFramesReadSoFar = 0L;
    @GuardedBy(value="runWorkersLock")
    private long levelZeroMergersRunSoFar = 0L;
    @GuardedBy(value="runWorkersLock")
    private int ultimateMergersRunSoFar = 0;
    @GuardedBy(value="runWorkersLock")
    private final Map<File, FrameFile> penultimateFrameFileCache = new HashMap<File, FrameFile>();
    @GuardedBy(value="runWorkersLock")
    private SettableFuture<OutputChannels> allDone = null;
    @GuardedBy(value="runWorkersLock")
    SuperSorterProgressTracker superSorterProgressTracker;
    @GuardedBy(value="runWorkersLock")
    private Runnable noWorkRunnable = null;

    public SuperSorter(List<ReadableFrameChannel> inputChannels, FrameReader frameReader, ClusterBy clusterBy, ListenableFuture<ClusterByPartitions> outputPartitionsFuture, FrameProcessorExecutor exec, File temporaryDirectory, OutputChannelFactory outputChannelFactory, Supplier<MemoryAllocator> innerFrameAllocatorMaker, int maxActiveProcessors, int maxChannelsPerProcessor, long rowLimit, @Nullable String cancellationId, SuperSorterProgressTracker superSorterProgressTracker) {
        this.inputChannels = inputChannels;
        this.frameReader = frameReader;
        this.clusterBy = clusterBy;
        this.outputPartitionsFuture = outputPartitionsFuture;
        this.exec = exec;
        this.directory = temporaryDirectory;
        this.outputChannelFactory = outputChannelFactory;
        this.innerFrameAllocatorMaker = innerFrameAllocatorMaker;
        this.maxChannelsPerProcessor = maxChannelsPerProcessor;
        this.maxActiveProcessors = maxActiveProcessors;
        this.rowLimit = rowLimit;
        this.cancellationId = cancellationId;
        this.superSorterProgressTracker = superSorterProgressTracker;
        for (int i = 0; i < inputChannels.size(); ++i) {
            this.inputChannelsToRead.add(i);
        }
        if (maxActiveProcessors < 1) {
            throw new IAE("maxActiveProcessors[%d] < 1", new Object[]{maxActiveProcessors});
        }
        if (maxChannelsPerProcessor < 2) {
            throw new IAE("maxChannelsPerProcessor[%d] < 2", new Object[]{maxChannelsPerProcessor});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<OutputChannels> run() {
        Object object = this.runWorkersLock;
        synchronized (object) {
            if (this.allDone != null) {
                throw new ISE("Cannot run() more than once.", new Object[0]);
            }
            this.allDone = SettableFuture.create();
            this.runWorkersIfPossible();
            this.outputPartitionsFuture.addListener(() -> {
                Object object = this.runWorkersLock;
                synchronized (object) {
                    if (this.outputPartitionsFuture.isDone()) {
                        this.superSorterProgressTracker.setTotalMergersForUltimateLevel(this.getOutputPartitions().size());
                    }
                    this.runWorkersIfPossible();
                    this.setAllDoneIfPossible();
                }
            }, (Executor)this.exec.getExecutorService());
            return FutureUtils.futureWithBaggage(this.allDone, () -> {
                Object object = this.runWorkersLock;
                synchronized (object) {
                    if (this.activeProcessors == 0) {
                        this.cleanUp();
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void setNoWorkRunnable(Runnable runnable) {
        Object object = this.runWorkersLock;
        synchronized (object) {
            this.noWorkRunnable = runnable;
        }
    }

    @GuardedBy(value="runWorkersLock")
    private void workerFinished() {
        --this.activeProcessors;
        if (log.isDebugEnabled()) {
            log.debug(this.stateString(), new Object[0]);
        }
        this.runWorkersIfPossible();
        this.setAllDoneIfPossible();
        if (this.isAllDone() && this.activeProcessors == 0) {
            this.cleanUp();
        }
    }

    @GuardedBy(value="runWorkersLock")
    private void runWorkersIfPossible() {
        if (this.isAllDone()) {
            return;
        }
        try {
            while (this.activeProcessors < this.maxActiveProcessors && (this.runNextUltimateMerger() || this.runNextMiddleMerger() || this.runNextLevelZeroMerger() || this.runNextBatcher())) {
                ++this.activeProcessors;
                if (!log.isDebugEnabled()) continue;
                log.debug(this.stateString(), new Object[0]);
            }
            if (this.activeProcessors == 0 && this.noWorkRunnable != null) {
                log.debug("No active workers and no work left to start.", new Object[0]);
                this.noWorkRunnable.run();
            }
        }
        catch (Throwable e) {
            this.allDone.setException(e);
        }
    }

    @GuardedBy(value="runWorkersLock")
    private void setAllDoneIfPossible() {
        if (this.totalInputFrames == 0L && this.outputPartitionsFuture.isDone()) {
            ClusterByPartitions partitions = this.getOutputPartitions();
            ArrayList<OutputChannel> channels = new ArrayList<OutputChannel>(partitions.size());
            for (int partitionNum = 0; partitionNum < partitions.size(); ++partitionNum) {
                channels.add(this.outputChannelFactory.openNilChannel(partitionNum));
            }
            this.allDone.set((Object)OutputChannels.wrap(channels));
        } else if (this.totalMergingLevels != -1 && this.outputsReadyByLevel.containsKey(this.totalMergingLevels - 1) && ((LongSortedSet)this.outputsReadyByLevel.get(this.totalMergingLevels - 1)).size() == this.getOutputPartitions().size()) {
            try {
                this.allDone.set((Object)OutputChannels.wrap(this.outputChannels));
            }
            catch (Throwable e) {
                this.allDone.setException(e);
            }
        }
    }

    @GuardedBy(value="runWorkersLock")
    private boolean runNextBatcher() {
        if (this.batcherIsRunning || this.inputChannelsToRead.isEmpty()) {
            return false;
        }
        this.batcherIsRunning = true;
        this.runWorker(new FrameChannelBatcher(this.inputChannels, this.maxChannelsPerProcessor), result -> {
            List batch = (List)result.lhs;
            IntSet keepReading = (IntSet)result.rhs;
            Object object = this.runWorkersLock;
            synchronized (object) {
                this.inputBuffer.addAll(batch);
                this.inputFramesReadSoFar += (long)batch.size();
                this.inputChannelsToRead = keepReading;
                if (this.inputChannelsToRead.isEmpty()) {
                    this.inputChannels.forEach(ReadableFrameChannel::close);
                    this.setTotalInputFrames(this.inputFramesReadSoFar);
                    this.runWorkersIfPossible();
                } else if (this.inputBuffer.size() >= this.maxChannelsPerProcessor) {
                    this.runWorkersIfPossible();
                }
                this.batcherIsRunning = false;
            }
        });
        return true;
    }

    @GuardedBy(value="runWorkersLock")
    private boolean runNextLevelZeroMerger() {
        Frame frame;
        if (this.inputBuffer.isEmpty() || this.inputBuffer.size() < this.maxChannelsPerProcessor && !this.allInputRead()) {
            return false;
        }
        ArrayList<ReadableFrameChannel> in = new ArrayList<ReadableFrameChannel>();
        while (in.size() < this.maxChannelsPerProcessor && (frame = this.inputBuffer.poll()) != null) {
            in.add(SuperSorter.singleReadableFrameChannel(new FrameWithPartition(frame, -1)));
        }
        this.runMerger(0, this.levelZeroMergersRunSoFar++, in, null);
        return true;
    }

    @GuardedBy(value="runWorkersLock")
    private boolean runNextMiddleMerger() {
        for (int inLevel = this.outputsReadyByLevel.size() - 1; inLevel >= 0; --inLevel) {
            ClusterByPartitions outPartitions;
            int outLevel = inLevel + 1;
            long totalInputs = this.getTotalMergersInLevel(inLevel);
            LongSortedSet inputsReady = (LongSortedSet)this.outputsReadyByLevel.get(inLevel);
            if (this.totalMergingLevels != -1 && outLevel >= this.totalMergingLevels - 1 || this.totalMergingLevels == -1 && LongMath.divide((long)inputsReady.size(), (long)this.maxChannelsPerProcessor, (RoundingMode)RoundingMode.CEILING) <= (long)this.maxChannelsPerProcessor) continue;
            if (this.totalMergingLevels != -1 && outLevel == this.totalMergingLevels - 2) {
                if (!this.outputPartitionsFuture.isDone()) continue;
                outPartitions = this.getOutputPartitions();
            } else {
                outPartitions = null;
            }
            LongBidirectionalIterator iter = inputsReady.iterator();
            long currentSetStart = -1L;
            long currentSetIndex = -1L;
            while (iter.hasNext()) {
                long w = iter.nextLong();
                if (w % (long)this.maxChannelsPerProcessor == 0L) {
                    currentSetStart = w;
                    currentSetIndex = -1L;
                }
                if (currentSetStart < 0L) continue;
                long pos = w - currentSetStart;
                if (pos == currentSetIndex + 1L && (pos == (long)(this.maxChannelsPerProcessor - 1) || totalInputs != -1L && w == totalInputs - 1L)) {
                    ArrayList<ReadableFrameChannel> in = new ArrayList<ReadableFrameChannel>();
                    for (long i = currentSetStart; i < currentSetStart + (long)this.maxChannelsPerProcessor; ++i) {
                        if (!inputsReady.remove(i)) continue;
                        try {
                            FrameFile handle = FrameFile.open(this.mergerOutputFile(inLevel, i), FrameFile.Flag.DELETE_ON_CLOSE);
                            in.add(new ReadableFileFrameChannel(handle));
                            continue;
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    this.runMerger(outLevel, currentSetStart / (long)this.maxChannelsPerProcessor, in, outPartitions);
                    return true;
                }
                if (w == currentSetStart + currentSetIndex + 1L) {
                    ++currentSetIndex;
                    continue;
                }
                currentSetStart = -1L;
                currentSetIndex = -1L;
            }
        }
        return false;
    }

    @GuardedBy(value="runWorkersLock")
    private boolean runNextUltimateMerger() {
        if (this.totalMergingLevels == -1 || !this.outputPartitionsFuture.isDone() || this.ultimateMergersRunSoFar >= this.getOutputPartitions().size()) {
            return false;
        }
        int inLevel = this.totalMergingLevels - 2;
        int outLevel = inLevel + 1;
        LongSortedSet inputsReady = (LongSortedSet)this.outputsReadyByLevel.get(inLevel);
        if (inputsReady == null) {
            return false;
        }
        int numInputs = inputsReady.size();
        if ((long)numInputs != this.getTotalMergersInLevel(inLevel)) {
            return false;
        }
        ArrayList<ReadableFrameChannel> in = new ArrayList<ReadableFrameChannel>(numInputs);
        for (long i = 0L; i < (long)numInputs; ++i) {
            FrameFile fileHandle = this.penultimateFrameFileCache.computeIfAbsent(this.mergerOutputFile(inLevel, i), file -> {
                try {
                    return FrameFile.open(file, FrameFile.Flag.DELETE_ON_CLOSE);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).newReference();
            in.add(new ReadableFileFrameChannel(fileHandle, fileHandle.getPartitionStartFrame(this.ultimateMergersRunSoFar), fileHandle.getPartitionStartFrame(this.ultimateMergersRunSoFar + 1)));
        }
        if (this.outputChannels == null) {
            this.outputChannels = Arrays.asList(new OutputChannel[this.getOutputPartitions().size()]);
        }
        this.runMerger(outLevel, this.ultimateMergersRunSoFar, in, null);
        ++this.ultimateMergersRunSoFar;
        return true;
    }

    @GuardedBy(value="runWorkersLock")
    private void runMerger(int level, long rank, List<ReadableFrameChannel> in, @Nullable ClusterByPartitions partitions) {
        try {
            MemoryAllocator frameAllocator;
            WritableFrameChannel writableChannel;
            if (this.totalMergingLevels != -1 && level == this.totalMergingLevels - 1) {
                int intRank = Ints.checkedCast((long)rank);
                OutputChannel outputChannel = this.outputChannelFactory.openChannel(intRank);
                this.outputChannels.set(intRank, outputChannel.readOnly());
                writableChannel = outputChannel.getWritableChannel();
                frameAllocator = outputChannel.getFrameMemoryAllocator();
            } else {
                frameAllocator = this.innerFrameAllocatorMaker.get();
                writableChannel = new WritableFrameFileChannel(FrameFileWriter.open(Files.newByteChannel(this.mergerOutputFile(level, rank).toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE), ByteBuffer.allocate(Frame.compressionBufferSize(frameAllocator.capacity()))));
            }
            FrameChannelMerger worker = new FrameChannelMerger(in, this.frameReader, writableChannel, FrameWriters.makeFrameWriterFactory(FrameType.ROW_BASED, frameAllocator, this.frameReader.signature(), Collections.emptyList()), this.clusterBy, partitions, this.rowLimit);
            this.runWorker(worker, ignored1 -> {
                Object object = this.runWorkersLock;
                synchronized (object) {
                    ((LongSortedSet)this.outputsReadyByLevel.computeIfAbsent(level, ignored2 -> new LongRBTreeSet())).add(rank);
                    this.superSorterProgressTracker.addMergedBatchesForLevel(level, 1L);
                }
            });
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private <T> void runWorker(FrameProcessor<T> worker, final Consumer<T> outConsumer) {
        Futures.addCallback(this.exec.runFully(worker, this.cancellationId), (FutureCallback)new FutureCallback<T>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onSuccess(T result) {
                try {
                    outConsumer.accept(result);
                    Object object = SuperSorter.this.runWorkersLock;
                    synchronized (object) {
                        SuperSorter.this.workerFinished();
                    }
                }
                catch (Throwable e) {
                    Object object = SuperSorter.this.runWorkersLock;
                    synchronized (object) {
                        SuperSorter.this.allDone.setException(e);
                    }
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onFailure(Throwable t) {
                Object object = SuperSorter.this.runWorkersLock;
                synchronized (object) {
                    SuperSorter.this.allDone.setException(t);
                }
            }
        }, (Executor)this.exec.getExecutorService());
    }

    @GuardedBy(value="runWorkersLock")
    private void setTotalInputFrames(long totalInputFrames) {
        this.totalInputFrames = totalInputFrames;
        if (totalInputFrames == 0L) {
            this.superSorterProgressTracker.markTriviallyComplete();
        }
        long totalMergersInLevel = totalInputFrames;
        int level = 0;
        while (totalMergersInLevel > (long)this.maxChannelsPerProcessor) {
            totalMergersInLevel = LongMath.divide((long)totalMergersInLevel, (long)this.maxChannelsPerProcessor, (RoundingMode)RoundingMode.CEILING);
            this.superSorterProgressTracker.setTotalMergersForLevel(level, totalMergersInLevel);
            ++level;
        }
        this.totalMergingLevels = Math.max(level + 1, 3);
        IntStream.range(level, this.totalMergingLevels).forEach(curLevel -> {
            Object object = this.runWorkersLock;
            synchronized (object) {
                this.superSorterProgressTracker.setTotalMergersForLevel(curLevel, 1L);
            }
        });
        this.superSorterProgressTracker.setTotalMergingLevels(this.totalMergingLevels);
    }

    private ClusterByPartitions getOutputPartitions() {
        if (!this.outputPartitionsFuture.isDone()) {
            throw new ISE("Output partitions are not ready yet", new Object[0]);
        }
        try {
            return (ClusterByPartitions)this.outputPartitionsFuture.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @GuardedBy(value="runWorkersLock")
    private long getTotalMergersInLevel(int level) {
        if (this.totalInputFrames == -1L || this.totalMergingLevels == -1) {
            return -1L;
        }
        if (level >= this.totalMergingLevels) {
            throw new ISE("Invalid level %d", new Object[]{level});
        }
        if (level == this.totalMergingLevels - 1) {
            return this.outputPartitionsFuture.isDone() ? (long)this.getOutputPartitions().size() : -1L;
        }
        long totalMergersInLevel = this.totalInputFrames;
        for (int i = 0; i <= level; ++i) {
            totalMergersInLevel = LongMath.divide((long)totalMergersInLevel, (long)this.maxChannelsPerProcessor, (RoundingMode)RoundingMode.CEILING);
        }
        return totalMergersInLevel;
    }

    @GuardedBy(value="runWorkersLock")
    private boolean allInputRead() {
        return this.totalInputFrames != -1L;
    }

    @GuardedBy(value="runWorkersLock")
    private boolean isAllDone() {
        return this.allDone.isDone() || this.allDone.isCancelled();
    }

    @GuardedBy(value="runWorkersLock")
    private void cleanUp() {
        if (!this.isAllDone() || this.activeProcessors != 0) {
            throw new ISE("Improper cleanup", new Object[0]);
        }
        if (log.isDebugEnabled()) {
            log.debug(this.stateString(), new Object[0]);
        }
        this.outputsReadyByLevel.clear();
        this.inputBuffer.clear();
        for (FrameFile frameFile : this.penultimateFrameFileCache.values()) {
            CloseableUtils.closeAndSuppressExceptions((Closeable)frameFile, e -> log.warn(e, "Could not close intermediate file [%s]", new Object[]{frameFile.file()}));
        }
        this.penultimateFrameFileCache.clear();
        if (!this.inputChannelsToRead.isEmpty()) {
            for (ReadableFrameChannel inputChannel : this.inputChannels) {
                CloseableUtils.closeAndSuppressExceptions(inputChannel::close, e -> log.warn(e, "Could not close input channel", new Object[0]));
            }
            this.inputChannels.forEach(ReadableFrameChannel::close);
        }
        this.inputChannelsToRead.clear();
    }

    private File mergerOutputFile(int level, long rank) {
        return new File(this.directory, StringUtils.format((String)"merged.%d.%d", (Object[])new Object[]{level, rank}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String stateString() {
        Object object = this.runWorkersLock;
        synchronized (object) {
            return "frames-in=" + this.inputFramesReadSoFar + "/" + this.totalInputFrames + " frames-buffered=" + this.inputBuffer.size() + " lvls=" + this.totalMergingLevels + " parts=" + (this.outputPartitionsFuture.isDone() ? ((ClusterByPartitions)FutureUtils.getUncheckedImmediately(this.outputPartitionsFuture)).size() : -1) + " p=" + this.activeProcessors + "/" + this.maxActiveProcessors + " ch-pending=" + this.inputChannelsToRead + " to-merge=" + this.outputsReadyByLevel + " done=" + (this.isAllDone() ? "y" : "n");
        }
    }

    private static ReadableFrameChannel singleReadableFrameChannel(FrameWithPartition frame) {
        try {
            BlockingQueueFrameChannel channel = BlockingQueueFrameChannel.minimal();
            channel.writable().write(frame);
            channel.writable().close();
            return channel.readable();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

