/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.FileChunk;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ExceptionReporter;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetchedInputAllocatorOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeThread;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MergeManager
implements FetchedInputAllocatorOrderedGrouped {
    private static final Logger LOG = LoggerFactory.getLogger(MergeManager.class);
    private final Configuration conf;
    private final FileSystem localFS;
    private final FileSystem rfs;
    private final LocalDirAllocator localDirAllocator;
    private final TezTaskOutputFiles mapOutputFile;
    private final Progressable progressable = new Progressable(){

        public void progress() {
            MergeManager.this.inputContext.notifyProgress();
        }
    };
    private final Combiner combiner;
    @VisibleForTesting
    final Set<MapOutput> inMemoryMergedMapOutputs = new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
    private final IntermediateMemoryToMemoryMerger memToMemMerger;
    @VisibleForTesting
    final Set<MapOutput> inMemoryMapOutputs = new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
    private final InMemoryMerger inMemoryMerger;
    @VisibleForTesting
    final Set<FileChunk> onDiskMapOutputs = new TreeSet<FileChunk>();
    @VisibleForTesting
    final OnDiskMerger onDiskMerger;
    private final long memoryLimit;
    @VisibleForTesting
    final long postMergeMemLimit;
    private long usedMemory;
    private long commitMemory;
    private final int ioSortFactor;
    private final long maxSingleShuffleLimit;
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final int memToMemMergeOutputsThreshold;
    private final long mergeThreshold;
    private final long initialMemoryAvailable;
    private final ExceptionReporter exceptionReporter;
    private final InputContext inputContext;
    private final TezCounter spilledRecordsCounter;
    private final TezCounter reduceCombineInputCounter;
    private final TezCounter mergedMapOutputsCounter;
    private final TezCounter numMemToDiskMerges;
    private final TezCounter numDiskToDiskMerges;
    private final TezCounter additionalBytesWritten;
    private final TezCounter additionalBytesRead;
    private final CompressionCodec codec;
    private volatile boolean finalMergeComplete = false;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private final int ifileBufferSize;
    private long lastInMemSegmentLogTime = -1L;
    private final SegmentStatsTracker statsInMemTotal = new SegmentStatsTracker();
    private final SegmentStatsTracker statsInMemLastLog = new SegmentStatsTracker();
    private AtomicInteger mergeFileSequenceId = new AtomicInteger(0);
    private final boolean cleanup;
    private final MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);
    private long lastOnDiskSegmentLogTime = -1L;

    public MergeManager(Configuration conf, FileSystem localFS, LocalDirAllocator localDirAllocator, InputContext inputContext, Combiner combiner, TezCounter spilledRecordsCounter, TezCounter reduceCombineInputCounter, TezCounter mergedMapOutputsCounter, ExceptionReporter exceptionReporter, long initialMemoryAvailable, CompressionCodec codec, boolean ifileReadAheadEnabled, int ifileReadAheadLength) {
        this.inputContext = inputContext;
        this.conf = conf;
        this.localDirAllocator = localDirAllocator;
        this.exceptionReporter = exceptionReporter;
        this.initialMemoryAvailable = initialMemoryAvailable;
        this.combiner = combiner;
        this.reduceCombineInputCounter = reduceCombineInputCounter;
        this.spilledRecordsCounter = spilledRecordsCounter;
        this.mergedMapOutputsCounter = mergedMapOutputsCounter;
        this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier(), inputContext.getDagIdentifier());
        this.localFS = localFS;
        this.rfs = ((LocalFileSystem)localFS).getRaw();
        this.numDiskToDiskMerges = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_DISK_TO_DISK_MERGES);
        this.numMemToDiskMerges = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_MEM_TO_DISK_MERGES);
        this.additionalBytesWritten = inputContext.getCounters().findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        this.additionalBytesRead = inputContext.getCounters().findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        this.cleanup = conf.getBoolean("tez.runtime.cleanup.files.on.interrupt", false);
        this.codec = codec;
        this.ifileReadAhead = ifileReadAheadEnabled;
        this.ifileReadAheadLength = this.ifileReadAhead ? ifileReadAheadLength : 0;
        this.ifileBufferSize = conf.getInt("io.file.buffer.size", -1);
        float maxInMemCopyUse = conf.getFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.9f);
        if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.fetch.buffer.percent: " + maxInMemCopyUse);
        }
        long memLimit = conf.getLong("tez.runtime.task.memory", (long)((float)inputContext.getTotalMemoryAvailableToTask() * maxInMemCopyUse));
        float maxRedPer = conf.getFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.0f);
        if ((double)maxRedPer > 1.0 || (double)maxRedPer < 0.0) {
            throw new TezUncheckedException("tez.runtime.task.input.post-merge.buffer.percent" + maxRedPer);
        }
        long maxRedBuffer = (long)((float)inputContext.getTotalMemoryAvailableToTask() * maxRedPer);
        this.memoryLimit = this.initialMemoryAvailable < memLimit ? this.initialMemoryAvailable : memLimit;
        this.postMergeMemLimit = this.initialMemoryAvailable < maxRedBuffer ? this.initialMemoryAvailable : maxRedBuffer;
        if (LOG.isDebugEnabled()) {
            LOG.debug(inputContext.getSourceVertexName() + ": " + "InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + ". Updated to: ShuffleMem=" + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
        }
        this.ioSortFactor = conf.getInt("tez.runtime.io.sort.factor", 100);
        float singleShuffleMemoryLimitPercent = conf.getFloat("tez.runtime.shuffle.memory.limit.percent", 0.25f);
        if (singleShuffleMemoryLimitPercent <= 0.0f || singleShuffleMemoryLimitPercent > 1.0f) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.memory.limit.percent: " + singleShuffleMemoryLimitPercent);
        }
        this.maxSingleShuffleLimit = (long)Math.min((float)this.memoryLimit * singleShuffleMemoryLimitPercent, 2.1474836E9f);
        this.memToMemMergeOutputsThreshold = conf.getInt("tez.runtime.shuffle.memory-to-memory.segments", this.ioSortFactor);
        this.mergeThreshold = (long)((float)this.memoryLimit * conf.getFloat("tez.runtime.shuffle.merge.percent", 0.9f));
        LOG.info(inputContext.getSourceVertexName() + ": MergerManager: memoryLimit=" + this.memoryLimit + ", " + "maxSingleShuffleLimit=" + this.maxSingleShuffleLimit + ", " + "mergeThreshold=" + this.mergeThreshold + ", " + "ioSortFactor=" + this.ioSortFactor + ", " + "postMergeMem=" + this.postMergeMemLimit + ", " + "memToMemMergeOutputsThreshold=" + this.memToMemMergeOutputsThreshold);
        if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
            throw new RuntimeException("Invlaid configuration: maxSingleShuffleLimit should be less than mergeThresholdmaxSingleShuffleLimit: " + this.maxSingleShuffleLimit + ", mergeThreshold: " + this.mergeThreshold);
        }
        boolean allowMemToMemMerge = conf.getBoolean("tez.runtime.shuffle.memory-to-memory.enable", false);
        this.memToMemMerger = allowMemToMemMerge ? new IntermediateMemoryToMemoryMerger(this, this.memToMemMergeOutputsThreshold) : null;
        this.inMemoryMerger = new InMemoryMerger(this);
        this.onDiskMerger = new OnDiskMerger(this);
    }

    @InterfaceAudience.Private
    void configureAndStart() {
        if (this.memToMemMerger != null) {
            this.memToMemMerger.start();
        }
        this.inMemoryMerger.start();
        this.onDiskMerger.start();
    }

    @InterfaceAudience.Private
    static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
        float maxInMemCopyUse = conf.getFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.9f);
        if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.fetch.buffer.percent: " + maxInMemCopyUse);
        }
        long memLimit = conf.getLong("tez.runtime.task.memory", (long)((float)maxAvailableTaskMemory * maxInMemCopyUse));
        float maxRedPer = conf.getFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.0f);
        if ((double)maxRedPer > 1.0 || (double)maxRedPer < 0.0) {
            throw new TezUncheckedException("tez.runtime.task.input.post-merge.buffer.percent" + maxRedPer);
        }
        long maxRedBuffer = (long)((float)maxAvailableTaskMemory * maxRedPer);
        LOG.info("Initial Memory required for SHUFFLE_BUFFER=" + memLimit + " based on INPUT_BUFFER_FACTOR=" + maxInMemCopyUse + ",  for final merged output=" + maxRedBuffer + ", using factor: " + maxRedPer);
        long reqMem = Math.max(maxRedBuffer, memLimit);
        return reqMem;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForInMemoryMerge() throws InterruptedException {
        this.inMemoryMerger.waitForMerge();
        boolean triggerAdditionalMerge = false;
        MergeManager mergeManager = this;
        synchronized (mergeManager) {
            if (this.commitMemory >= this.mergeThreshold) {
                this.startMemToDiskMerge();
                triggerAdditionalMerge = true;
            }
        }
        if (triggerAdditionalMerge) {
            this.inMemoryMerger.waitForMerge();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Additional in-memory merge triggered");
            }
        }
    }

    private boolean canShuffleToMemory(long requestedSize) {
        return requestedSize < this.maxSingleShuffleLimit;
    }

    public synchronized void waitForShuffleToMergeMemory() throws InterruptedException {
        long startTime = System.currentTimeMillis();
        while (this.usedMemory > this.memoryLimit) {
            this.wait();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Waited for " + (System.currentTimeMillis() - startTime) + " for memory to become" + " available");
        }
    }

    @Override
    public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, long compressedLength, int fetcher) throws IOException {
        if (!this.canShuffleToMemory(requestedSize)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + this.maxSingleShuffleLimit + ")");
            }
            return MapOutput.createDiskMapOutput(srcAttemptIdentifier, this, compressedLength, this.conf, fetcher, true, this.mapOutputFile);
        }
        if (this.usedMemory > this.memoryLimit) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + this.usedMemory + ") is greater than memoryLimit (" + this.memoryLimit + ")." + " CommitMemory is (" + this.commitMemory + ")");
            }
            return this.stallShuffle;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory (" + this.usedMemory + ") is lesser than memoryLimit (" + this.memoryLimit + ")." + "CommitMemory is (" + this.commitMemory + ")");
        }
        return this.unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
    }

    private synchronized MapOutput unconditionalReserve(InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) throws IOException {
        this.usedMemory += requestedSize;
        return MapOutput.createMemoryMapOutput(srcAttemptIdentifier, this, (int)requestedSize, primaryMapOutput);
    }

    @Override
    public synchronized void unreserve(long size) {
        this.usedMemory -= size;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Notifying unreserve : size=" + size + ", commitMemory=" + this.commitMemory + ", usedMemory=" + this.usedMemory + ", mergeThreshold=" + this.mergeThreshold);
        }
        this.notifyAll();
    }

    @Override
    public synchronized void releaseCommittedMemory(long size) {
        this.commitMemory -= size;
        this.unreserve(size);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void closeInMemoryFile(MapOutput mapOutput) {
        this.inMemoryMapOutputs.add(mapOutput);
        this.trackAndLogCloseInMemoryFile(mapOutput);
        this.commitMemory += mapOutput.getSize();
        if (this.commitMemory >= this.mergeThreshold) {
            this.startMemToDiskMerge();
        }
        if (this.memToMemMerger != null) {
            IntermediateMemoryToMemoryMerger intermediateMemoryToMemoryMerger = this.memToMemMerger;
            synchronized (intermediateMemoryToMemoryMerger) {
                if (!this.memToMemMerger.isInProgress() && this.inMemoryMapOutputs.size() >= this.memToMemMergeOutputsThreshold) {
                    this.memToMemMerger.startMerge(this.inMemoryMapOutputs);
                }
            }
        }
    }

    private void trackAndLogCloseInMemoryFile(MapOutput mapOutput) {
        this.statsInMemTotal.updateStats(mapOutput.getSize());
        if (LOG.isDebugEnabled()) {
            LOG.debug("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() + ", inMemoryMapOutputs.size() -> " + this.inMemoryMapOutputs.size() + ", commitMemory -> " + this.commitMemory + ", usedMemory ->" + this.usedMemory + ", mapOutput=" + mapOutput);
        } else {
            this.statsInMemLastLog.updateStats(mapOutput.getSize());
            long now = Time.monotonicNow();
            if (now > this.lastInMemSegmentLogTime + 30000L) {
                LOG.info("CloseInMemoryFile. Current state: inMemoryMapOutputs.size={}, commitMemory={}, usedMemory={}. Since last log: count={}, min={}, max={}, total={}, avg={}", new Object[]{this.inMemoryMapOutputs.size(), this.commitMemory, this.usedMemory, this.statsInMemLastLog.count, this.statsInMemLastLog.minSize, this.statsInMemLastLog.maxSize, this.statsInMemLastLog.size, this.statsInMemLastLog.count == 0 ? "nan" : Double.valueOf((double)this.statsInMemLastLog.size / (double)this.statsInMemLastLog.count)});
                this.statsInMemLastLog.reset();
                this.lastInMemSegmentLogTime = now;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startMemToDiskMerge() {
        InMemoryMerger inMemoryMerger = this.inMemoryMerger;
        synchronized (inMemoryMerger) {
            if (!this.inMemoryMerger.isInProgress()) {
                LOG.info(this.inputContext.getSourceVertexName() + ": " + "Starting inMemoryMerger's merge since commitMemory=" + this.commitMemory + " > mergeThreshold=" + this.mergeThreshold + ". Current usedMemory=" + this.usedMemory);
                this.inMemoryMapOutputs.addAll(this.inMemoryMergedMapOutputs);
                this.inMemoryMergedMapOutputs.clear();
                this.inMemoryMerger.startMerge(this.inMemoryMapOutputs);
            }
        }
    }

    public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
        this.inMemoryMergedMapOutputs.add(mapOutput);
        if (LOG.isDebugEnabled()) {
            LOG.debug("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + ", inMemoryMergedMapOutputs.size() -> " + this.inMemoryMergedMapOutputs.size());
        }
        this.commitMemory += mapOutput.getSize();
        if (this.commitMemory >= this.mergeThreshold) {
            this.startMemToDiskMerge();
        }
    }

    @Override
    public FileSystem getLocalFileSystem() {
        return this.localFS;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void closeOnDiskFile(FileChunk file) {
        for (FileChunk fileChunk : this.onDiskMapOutputs) {
            if (!fileChunk.getPath().equals((Object)file.getPath())) continue;
            Preconditions.checkArgument((fileChunk.getOffset() != file.getOffset() ? 1 : 0) != 0, (Object)("Can't have a file with same path and offset.OldFilePath=" + fileChunk.getPath() + ", OldFileOffset=" + fileChunk.getOffset() + ", newFilePath=" + file.getPath() + ", newFileOffset=" + file.getOffset()));
        }
        this.onDiskMapOutputs.add(file);
        this.logCloseOnDiskFile(file);
        OnDiskMerger onDiskMerger = this.onDiskMerger;
        synchronized (onDiskMerger) {
            if (!this.onDiskMerger.isInProgress() && this.onDiskMapOutputs.size() >= 2 * this.ioSortFactor - 1) {
                this.onDiskMerger.startMerge(this.onDiskMapOutputs);
            }
        }
    }

    private void logCloseOnDiskFile(FileChunk file) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("close onDiskFile=" + file.getPath() + ", len=" + file.getLength() + ", onDisMapOutputs=" + this.onDiskMapOutputs.size());
        } else {
            long now = Time.monotonicNow();
            if (now > this.lastOnDiskSegmentLogTime + 30000L) {
                LOG.info("close onDiskFile. State: NumOnDiskFiles={}. Current: path={}, len={}", new Object[]{this.onDiskMapOutputs.size(), file.getPath(), file.getLength()});
                this.lastOnDiskSegmentLogTime = now;
            }
        }
    }

    @InterfaceAudience.Private
    public boolean isMergeComplete() {
        return this.finalMergeComplete;
    }

    public TezRawKeyValueIterator close(boolean tryFinalMerge) throws Throwable {
        if (!this.isShutdown.getAndSet(true)) {
            if (this.memToMemMerger != null) {
                this.memToMemMerger.close();
            }
            this.inMemoryMerger.close();
            this.onDiskMerger.close();
            ArrayList<MapOutput> memory = new ArrayList<MapOutput>(this.inMemoryMergedMapOutputs);
            this.inMemoryMergedMapOutputs.clear();
            memory.addAll(this.inMemoryMapOutputs);
            this.inMemoryMapOutputs.clear();
            ArrayList<FileChunk> disk = new ArrayList<FileChunk>(this.onDiskMapOutputs);
            this.onDiskMapOutputs.clear();
            if (this.statsInMemTotal.count > 0) {
                LOG.info("TotalInMemFetchStats: count={}, totalSize={}, min={}, max={}, avg={}", new Object[]{this.statsInMemTotal.count, this.statsInMemTotal.size, this.statsInMemTotal.minSize, this.statsInMemTotal.maxSize, Float.valueOf((float)this.statsInMemTotal.size / (float)this.statsInMemTotal.size)});
            }
            if (tryFinalMerge) {
                try {
                    TezRawKeyValueIterator kvIter = this.finalMerge(this.conf, this.rfs, memory, disk);
                    this.finalMergeComplete = true;
                    return kvIter;
                }
                catch (InterruptedException e) {
                    if (this.cleanup) {
                        MergeManager.cleanup(this.localFS, disk);
                        MergeManager.cleanup(this.localFS, this.onDiskMapOutputs);
                    }
                    Thread.currentThread().interrupt();
                    throw e;
                }
            }
        }
        return null;
    }

    @VisibleForTesting
    public boolean isShutdown() {
        return this.isShutdown.get();
    }

    static void cleanup(FileSystem fs, Collection<FileChunk> fileChunkList) {
        for (FileChunk fileChunk : fileChunkList) {
            MergeManager.cleanup(fs, fileChunk.getPath());
        }
    }

    static void cleanup(FileSystem fs, Path path) {
        if (path == null) {
            return;
        }
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Deleting " + path);
            }
            fs.delete(path, true);
        }
        catch (IOException e) {
            LOG.info("Error in deleting " + path);
        }
    }

    void runCombineProcessor(TezRawKeyValueIterator kvIter, IFile.Writer writer) throws IOException, InterruptedException {
        this.combiner.combine(kvIter, writer);
    }

    private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs, List<TezMerger.Segment> inMemorySegments, long leaveBytes) throws IOException {
        long totalSize = 0L;
        long fullSize = 0L;
        for (MapOutput mo : inMemoryMapOutputs) {
            fullSize += mo.getSize();
        }
        int inMemoryMapOutputsOffset = 0;
        while (fullSize > leaveBytes && !Thread.currentThread().isInterrupted()) {
            MapOutput mo;
            mo = inMemoryMapOutputs.get(inMemoryMapOutputsOffset++);
            byte[] data = mo.getMemory();
            long size = data.length;
            totalSize += size;
            fullSize -= size;
            InMemoryReader reader = new InMemoryReader(this, mo.getAttemptIdentifier(), data, 0, (int)size);
            inMemorySegments.add(new TezMerger.Segment(reader, mo.isPrimaryMapOutput() ? this.mergedMapOutputsCounter : null));
        }
        inMemoryMapOutputs.subList(0, inMemoryMapOutputsOffset).clear();
        return totalSize;
    }

    private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, List<MapOutput> inMemoryMapOutputs, List<FileChunk> onDiskMapOutputs) throws IOException, InterruptedException {
        this.logFinalMergeStart(inMemoryMapOutputs, onDiskMapOutputs);
        StringBuilder finalMergeLog = new StringBuilder();
        this.inputContext.notifyProgress();
        Class keyClass = ConfigUtils.getIntermediateInputKeyClass(job);
        Class valueClass = ConfigUtils.getIntermediateInputValueClass(job);
        Path tmpDir = new Path(this.inputContext.getUniqueIdentifier());
        RawComparator comparator = ConfigUtils.getIntermediateInputKeyComparator(job);
        ArrayList<TezMerger.Segment> memDiskSegments = new ArrayList<TezMerger.Segment>();
        long inMemToDiskBytes = 0L;
        boolean mergePhaseFinished = false;
        if (inMemoryMapOutputs.size() > 0) {
            int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier();
            inMemToDiskBytes = this.createInMemorySegments(inMemoryMapOutputs, memDiskSegments, this.postMergeMemLimit);
            int numMemDiskSegments = memDiskSegments.size();
            if (numMemDiskSegments > 0 && this.ioSortFactor > onDiskMapOutputs.size()) {
                mergePhaseFinished = true;
                Path outputPath = this.mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE, inMemToDiskBytes).suffix(".merged");
                TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, this.progressable, this.spilledRecordsCounter, null, this.additionalBytesRead, null);
                IFile.Writer writer = new IFile.Writer(job, fs, outputPath, keyClass, valueClass, this.codec, null, null);
                try {
                    TezMerger.writeFile(rIter, writer, this.progressable, 10000L);
                }
                catch (IOException e) {
                    if (null != outputPath) {
                        try {
                            fs.delete(outputPath, true);
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    }
                    throw e;
                }
                finally {
                    if (null != writer) {
                        writer.close();
                        this.additionalBytesWritten.increment(writer.getCompressedLength());
                    }
                }
                FileStatus fStatus = this.localFS.getFileStatus(outputPath);
                onDiskMapOutputs.add(new FileChunk(outputPath, 0L, fStatus.getLen()));
                if (LOG.isInfoEnabled()) {
                    finalMergeLog.append("MemMerged: " + numMemDiskSegments + ", " + inMemToDiskBytes);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Merged " + numMemDiskSegments + "segments, size=" + inMemToDiskBytes + " to " + outputPath);
                    }
                }
                inMemToDiskBytes = 0L;
                memDiskSegments.clear();
            } else if (inMemToDiskBytes != 0L && LOG.isInfoEnabled()) {
                finalMergeLog.append("DelayedMemMerge: " + numMemDiskSegments + ", " + inMemToDiskBytes);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Keeping " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes in memory for " + "intermediate, on-disk merge");
                }
            }
        }
        ArrayList<TezMerger.Segment> diskSegments = new ArrayList<TezMerger.Segment>();
        long onDiskBytes = inMemToDiskBytes;
        FileChunk[] onDisk = onDiskMapOutputs.toArray(new FileChunk[onDiskMapOutputs.size()]);
        for (FileChunk fileChunk : onDisk) {
            Path file;
            long fileLength = fileChunk.getLength();
            onDiskBytes += fileLength;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Disk file=" + fileChunk.getPath() + ", len=" + fileLength + ", isLocal=" + fileChunk.isLocalFile());
            }
            TezCounter counter = (file = fileChunk.getPath()).toString().endsWith(".merged") ? null : this.mergedMapOutputsCounter;
            long fileOffset = fileChunk.getOffset();
            boolean preserve = fileChunk.isLocalFile();
            diskSegments.add(new TezMerger.DiskSegment(fs, file, fileOffset, fileLength, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, this.ifileBufferSize, preserve, counter));
        }
        if (LOG.isInfoEnabled()) {
            finalMergeLog.append(". DiskSeg: " + onDisk.length + ", " + onDiskBytes);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Merging " + onDisk.length + " files, " + onDiskBytes + " bytes from disk");
            }
        }
        Collections.sort(diskSegments, new Comparator<TezMerger.Segment>(){

            @Override
            public int compare(TezMerger.Segment o1, TezMerger.Segment o2) {
                if (o1.getLength() == o2.getLength()) {
                    return 0;
                }
                return o1.getLength() < o2.getLength() ? -1 : 1;
            }
        });
        ArrayList<TezMerger.Segment> finalSegments = new ArrayList<TezMerger.Segment>();
        long inMemBytes = this.createInMemorySegments(inMemoryMapOutputs, finalSegments, 0L);
        if (LOG.isInfoEnabled()) {
            finalMergeLog.append(". MemSeg: " + finalSegments.size() + ", " + inMemBytes);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Merging " + finalSegments.size() + " segments, " + inMemBytes + " bytes from memory into reduce");
            }
        }
        if (0L != onDiskBytes) {
            int numInMemSegments = memDiskSegments.size();
            diskSegments.addAll(0, memDiskSegments);
            memDiskSegments.clear();
            TezRawKeyValueIterator diskMerge = TezMerger.merge(job, fs, keyClass, valueClass, this.codec, diskSegments, this.ioSortFactor, numInMemSegments, tmpDir, comparator, this.progressable, false, this.spilledRecordsCounter, null, this.additionalBytesRead, null);
            diskSegments.clear();
            if (0 == finalSegments.size()) {
                return diskMerge;
            }
            finalSegments.add(new TezMerger.Segment(new RawKVIteratorReader(diskMerge, onDiskBytes), null));
        }
        if (LOG.isInfoEnabled()) {
            LOG.info(finalMergeLog.toString());
        }
        return TezMerger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, this.progressable, this.spilledRecordsCounter, null, this.additionalBytesRead, null);
    }

    private void logFinalMergeStart(List<MapOutput> inMemoryMapOutputs, List<FileChunk> onDiskMapOutputs) {
        long inMemSegmentSize = 0L;
        for (MapOutput inMemoryMapOutput : inMemoryMapOutputs) {
            inMemSegmentSize += inMemoryMapOutput.getSize();
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("finalMerge: inMemoryOutput=" + inMemoryMapOutput + ", size=" + inMemoryMapOutput.getSize());
        }
        long onDiskSegmentSize = 0L;
        for (FileChunk onDiskMapOutput : onDiskMapOutputs) {
            onDiskSegmentSize += onDiskMapOutput.getLength();
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("finalMerge: onDiskMapOutput=" + onDiskMapOutput.getPath() + ", size=" + onDiskMapOutput.getLength());
        }
        LOG.info("finalMerge with #inMemoryOutputs={}, size={} and #onDiskOutputs={}, size={}", new Object[]{inMemoryMapOutputs.size(), inMemSegmentSize, onDiskMapOutputs.size(), onDiskSegmentSize});
    }

    @VisibleForTesting
    long getCommitMemory() {
        return this.commitMemory;
    }

    @VisibleForTesting
    synchronized long getUsedMemory() {
        return this.usedMemory;
    }

    @VisibleForTesting
    void waitForMemToMemMerge() throws InterruptedException {
        this.memToMemMerger.waitForMerge();
    }

    private static class SegmentStatsTracker {
        private long size;
        private int count;
        private long minSize;
        private long maxSize;

        SegmentStatsTracker() {
            this.reset();
        }

        void updateStats(long segSize) {
            this.size += segSize;
            ++this.count;
            this.minSize = segSize < this.minSize ? segSize : this.minSize;
            this.maxSize = segSize > this.maxSize ? segSize : this.maxSize;
        }

        void reset() {
            this.size = 0L;
            this.count = 0;
            this.minSize = Long.MAX_VALUE;
            this.maxSize = Long.MIN_VALUE;
        }
    }

    class RawKVIteratorReader
    extends IFile.Reader {
        private final TezRawKeyValueIterator kvIter;
        private final long size;

        public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size) throws IOException {
            super(null, size, null, MergeManager.this.spilledRecordsCounter, null, MergeManager.this.ifileReadAhead, MergeManager.this.ifileReadAheadLength, MergeManager.this.ifileBufferSize);
            this.kvIter = kvIter;
            this.size = size;
        }

        @Override
        public IFile.Reader.KeyState readRawKey(DataInputBuffer key) throws IOException {
            if (this.kvIter.next()) {
                DataInputBuffer kb = this.kvIter.getKey();
                int kp = kb.getPosition();
                int klen = kb.getLength() - kp;
                key.reset(kb.getData(), kp, klen);
                this.bytesRead += (long)klen;
                return IFile.Reader.KeyState.NEW_KEY;
            }
            return IFile.Reader.KeyState.NO_KEY;
        }

        @Override
        public void nextRawValue(DataInputBuffer value) throws IOException {
            DataInputBuffer vb = this.kvIter.getValue();
            int vp = vb.getPosition();
            int vlen = vb.getLength() - vp;
            value.reset(vb.getData(), vp, vlen);
            this.bytesRead += (long)vlen;
        }

        @Override
        public long getPosition() throws IOException {
            return this.bytesRead;
        }

        @Override
        public void close() throws IOException {
            this.kvIter.close();
        }

        @Override
        public long getLength() {
            return this.size;
        }
    }

    @VisibleForTesting
    class OnDiskMerger
    extends MergeThread<FileChunk> {
        @VisibleForTesting
        volatile Path outputPath;
        @VisibleForTesting
        volatile Path tmpDir;

        public OnDiskMerger(MergeManager manager) {
            super(manager, MergeManager.this.ioSortFactor, MergeManager.this.exceptionReporter);
            this.setName("DiskToDiskMerger [" + TezUtilsInternal.cleanVertexName((String)MergeManager.this.inputContext.getSourceVertexName()) + "]");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<FileChunk> inputs) throws IOException, InterruptedException {
            if (inputs == null || inputs.isEmpty()) {
                LOG.info("No ondisk files to merge...");
                return;
            }
            MergeManager.this.numDiskToDiskMerges.increment(1L);
            MergeManager.this.inputContext.notifyProgress();
            long approxOutputSize = 0L;
            int bytesPerSum = MergeManager.this.conf.getInt("io.bytes.per.checksum", 512);
            LOG.info("OnDiskMerger: We have  " + inputs.size() + " map outputs on disk. Triggering merge...");
            ArrayList<TezMerger.Segment> inputSegments = new ArrayList<TezMerger.Segment>(inputs.size());
            for (FileChunk fileChunk : inputs) {
                long offset = fileChunk.getOffset();
                long size = fileChunk.getLength();
                boolean preserve = fileChunk.isLocalFile();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("InputAttemptIdentifier=" + fileChunk.getInputAttemptIdentifier() + ", len=" + fileChunk.getLength() + ", offset=" + fileChunk.getOffset() + ", path=" + fileChunk.getPath());
                }
                Path file = fileChunk.getPath();
                approxOutputSize += size;
                TezMerger.DiskSegment segment = new TezMerger.DiskSegment(MergeManager.this.rfs, file, offset, size, MergeManager.this.codec, MergeManager.this.ifileReadAhead, MergeManager.this.ifileReadAheadLength, MergeManager.this.ifileBufferSize, preserve);
                inputSegments.add(segment);
            }
            approxOutputSize += ChecksumFileSystem.getChecksumLength((long)approxOutputSize, (int)bytesPerSum);
            FileChunk file0 = inputs.get(0);
            String namePart = file0.isLocalFile() ? MergeManager.this.mapOutputFile.getSpillFileName(file0.getInputAttemptIdentifier().getInputIdentifier(), file0.getInputAttemptIdentifier().getSpillEventId()) : file0.getPath().getName().toString();
            namePart = FilenameUtils.removeExtension((String)namePart);
            this.outputPath = MergeManager.this.localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, MergeManager.this.conf);
            this.outputPath = this.outputPath.suffix(".merged" + MergeManager.this.mergeFileSequenceId.getAndIncrement());
            IFile.Writer writer = new IFile.Writer(MergeManager.this.conf, MergeManager.this.rfs, this.outputPath, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), MergeManager.this.codec, null, null);
            this.tmpDir = new Path(MergeManager.this.inputContext.getUniqueIdentifier());
            try {
                TezRawKeyValueIterator iter = TezMerger.merge(MergeManager.this.conf, MergeManager.this.rfs, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), inputSegments, MergeManager.this.ioSortFactor, this.tmpDir, ConfigUtils.getIntermediateInputKeyComparator(MergeManager.this.conf), MergeManager.this.progressable, true, MergeManager.this.spilledRecordsCounter, null, MergeManager.this.mergedMapOutputsCounter, null);
                TezMerger.writeFile(iter, writer, MergeManager.this.progressable, 10000L);
                writer.close();
                MergeManager.this.additionalBytesWritten.increment(writer.getCompressedLength());
            }
            catch (IOException e) {
                MergeManager.this.localFS.delete(this.outputPath, true);
                throw e;
            }
            long outputLen = MergeManager.this.localFS.getFileStatus(this.outputPath).getLen();
            MergeManager.this.closeOnDiskFile(new FileChunk(this.outputPath, 0L, outputLen));
            LOG.info(MergeManager.this.inputContext.getSourceVertexName() + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + this.outputPath + " of size " + outputLen);
        }

        @Override
        public void cleanup(List<FileChunk> inputs, boolean deleteData) throws IOException, InterruptedException {
            if (deleteData && MergeManager.this.cleanup) {
                LOG.info("Try deleting stale data");
                MergeManager.cleanup(MergeManager.this.localFS, inputs);
                MergeManager.cleanup(MergeManager.this.localFS, this.outputPath);
                MergeManager.cleanup(MergeManager.this.localFS, this.tmpDir);
            }
        }
    }

    private class InMemoryMerger
    extends MergeThread<MapOutput> {
        @VisibleForTesting
        volatile InputAttemptIdentifier srcTaskIdentifier;
        @VisibleForTesting
        volatile Path outputPath;
        @VisibleForTesting
        volatile Path tmpDir;

        public InMemoryMerger(MergeManager manager) {
            super(manager, Integer.MAX_VALUE, MergeManager.this.exceptionReporter);
            this.setName("MemtoDiskMerger [" + TezUtilsInternal.cleanVertexName((String)MergeManager.this.inputContext.getSourceVertexName()) + "]");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
            if (inputs == null || inputs.size() == 0) {
                return;
            }
            MergeManager.this.numMemToDiskMerges.increment(1L);
            MergeManager.this.inputContext.notifyProgress();
            this.srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
            ArrayList<TezMerger.Segment> inMemorySegments = new ArrayList<TezMerger.Segment>();
            long mergeOutputSize = MergeManager.this.createInMemorySegments(inputs, inMemorySegments, 0L);
            int noInMemorySegments = inMemorySegments.size();
            this.outputPath = MergeManager.this.mapOutputFile.getInputFileForWrite(this.srcTaskIdentifier.getInputIdentifier(), this.srcTaskIdentifier.getSpillEventId(), mergeOutputSize).suffix(".merged");
            long outFileLen = 0L;
            try (IFile.Writer writer = null;){
                writer = new IFile.Writer(MergeManager.this.conf, MergeManager.this.rfs, this.outputPath, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), MergeManager.this.codec, null, null);
                TezRawKeyValueIterator rIter = null;
                LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments...");
                this.tmpDir = new Path(MergeManager.this.inputContext.getUniqueIdentifier());
                rIter = TezMerger.merge(MergeManager.this.conf, MergeManager.this.rfs, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), inMemorySegments, inMemorySegments.size(), this.tmpDir, ConfigUtils.getIntermediateInputKeyComparator(MergeManager.this.conf), MergeManager.this.progressable, MergeManager.this.spilledRecordsCounter, null, MergeManager.this.additionalBytesRead, null);
                if (null == MergeManager.this.combiner) {
                    TezMerger.writeFile(rIter, writer, MergeManager.this.progressable, 10000L);
                } else {
                    MergeManager.this.runCombineProcessor(rIter, writer);
                }
                writer.close();
                MergeManager.this.additionalBytesWritten.increment(writer.getCompressedLength());
                writer = null;
                outFileLen = MergeManager.this.localFS.getFileStatus(this.outputPath).getLen();
                LOG.info(MergeManager.this.inputContext.getUniqueIdentifier() + " Merge of the " + noInMemorySegments + " files in-memory complete." + " Local file is " + this.outputPath + " of size " + outFileLen);
            }
            MergeManager.this.closeOnDiskFile(new FileChunk(this.outputPath, 0L, outFileLen));
        }

        @Override
        public void cleanup(List<MapOutput> inputs, boolean deleteData) throws IOException, InterruptedException {
            if (deleteData && MergeManager.this.cleanup) {
                LOG.info("Try deleting stale data");
                MergeManager.cleanup(MergeManager.this.localFS, this.outputPath);
                MergeManager.cleanup(MergeManager.this.localFS, this.tmpDir);
            }
        }
    }

    private class IntermediateMemoryToMemoryMerger
    extends MergeThread<MapOutput> {
        public IntermediateMemoryToMemoryMerger(MergeManager manager, int mergeFactor) {
            super(manager, mergeFactor, MergeManager.this.exceptionReporter);
            this.setName("MemToMemMerger [" + TezUtilsInternal.cleanVertexName((String)MergeManager.this.inputContext.getSourceVertexName()) + "]");
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
            if (inputs == null || inputs.size() == 0) {
                return;
            }
            MergeManager.this.inputContext.notifyProgress();
            InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
            ArrayList<TezMerger.Segment> inMemorySegments = new ArrayList<TezMerger.Segment>();
            MapOutput mergedMapOutputs = null;
            long mergeOutputSize = 0L;
            MergeManager mergeManager = this.manager;
            synchronized (mergeManager) {
                Iterator<MapOutput> it = inputs.iterator();
                MapOutput lastAddedMapOutput = null;
                while (it.hasNext() && !Thread.currentThread().isInterrupted()) {
                    MapOutput mo = it.next();
                    if (mergeOutputSize + mo.getSize() + this.manager.getUsedMemory() > MergeManager.this.memoryLimit) {
                        if (!LOG.isDebugEnabled()) continue;
                        LOG.debug("Size is greater than usedMemory. mergeOutputSize=" + mergeOutputSize + ", moSize=" + mo.getSize() + ", usedMemory=" + this.manager.getUsedMemory() + ", memoryLimit=" + MergeManager.this.memoryLimit);
                        continue;
                    }
                    mergeOutputSize += mo.getSize();
                    InMemoryReader reader = new InMemoryReader(MergeManager.this, mo.getAttemptIdentifier(), mo.getMemory(), 0, mo.getMemory().length);
                    inMemorySegments.add(new TezMerger.Segment(reader, mo.isPrimaryMapOutput() ? MergeManager.this.mergedMapOutputsCounter : null));
                    lastAddedMapOutput = mo;
                    it.remove();
                    LOG.debug("Added segment for merging. mergeOutputSize=" + mergeOutputSize);
                }
                MergeManager.this.inMemoryMapOutputs.addAll(inputs);
                if (inMemorySegments.size() <= 1) {
                    if (lastAddedMapOutput != null) {
                        MergeManager.this.inMemoryMapOutputs.add(lastAddedMapOutput);
                    }
                    return;
                }
                mergedMapOutputs = MergeManager.this.unconditionalReserve(dummyMapId, mergeOutputSize, false);
            }
            int noInMemorySegments = inMemorySegments.size();
            InMemoryWriter writer = new InMemoryWriter(mergedMapOutputs.getMemory());
            LOG.info(MergeManager.this.inputContext.getSourceVertexName() + ": " + "Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize);
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            TezRawKeyValueIterator rIter = TezMerger.merge(MergeManager.this.conf, MergeManager.this.rfs, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), inMemorySegments, inMemorySegments.size(), new Path(MergeManager.this.inputContext.getUniqueIdentifier()), ConfigUtils.getIntermediateInputKeyComparator(MergeManager.this.conf), MergeManager.this.progressable, null, null, null, null);
            TezMerger.writeFile(rIter, writer, MergeManager.this.progressable, 10000L);
            ((IFile.Writer)writer).close();
            LOG.info(MergeManager.this.inputContext.getSourceVertexName() + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete with mergeOutputSize=" + mergeOutputSize);
            MergeManager.this.closeInMemoryMergedFile(mergedMapOutputs);
        }

        @Override
        public void cleanup(List<MapOutput> inputs, boolean deleteData) throws IOException, InterruptedException {
        }
    }
}

