/*
 * Decompiled with CFR 0.152.
 */
package org.aksw.commons.io.cache;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.LongUnaryOperator;
import org.aksw.commons.io.buffer.array.ArrayOps;
import org.aksw.commons.io.cache.AdvancedRangeCacheImpl;
import org.aksw.commons.io.cache.IdleMode;
import org.aksw.commons.io.input.SequentialReader;
import org.aksw.commons.io.slice.Slice;
import org.aksw.commons.io.slice.SliceAccessor;
import org.aksw.commons.util.closeable.AutoCloseableWithLeakDetectionBase;
import org.aksw.commons.util.lock.LockUtils;
import org.aksw.commons.util.slot.ObservableSlottedValue;
import org.aksw.commons.util.slot.ObservableSlottedValueImpl;
import org.aksw.commons.util.slot.Slot;
import org.aksw.commons.util.slot.SlottedBuilder;
import org.aksw.commons.util.slot.SlottedBuilderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RangeRequestWorkerImpl<A>
extends AutoCloseableWithLeakDetectionBase
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(RangeRequestWorkerImpl.class);
    protected AdvancedRangeCacheImpl<A> cacheSystem;
    protected ArrayOps<A> arrayOps;
    protected ObservableSlottedValue<Long, Long> endpointDemands = ObservableSlottedValueImpl.wrap((SlottedBuilder)SlottedBuilderImpl.create(values -> values.stream().reduce(-1L, Math::max)));
    protected SequentialReader<A> sequentialReader;
    protected Slice<A> slice;
    protected SliceAccessor<A> pageRange;
    protected long currentPageId = -1L;
    protected final long requestOffset;
    protected final long requestLimit;
    protected LongUnaryOperator offsetToMaxAllowedRefetchCount;
    protected int bulkSize;
    protected volatile long offset;
    protected long nextCheckpointOffset;
    protected Duration terminationDelay;
    protected IdleMode idleMode = IdleMode.READ_AHEAD;
    protected transient Stopwatch terminationTimer = Stopwatch.createUnstarted();
    protected Duration firstItemTime = null;
    protected long numItemsProcessed = 0L;
    protected long processingTimeInNanos = 0L;

    public RangeRequestWorkerImpl(AdvancedRangeCacheImpl<A> cacheSystem, long requestOffset, long requestLimit, int bulkSize, Duration terminationDelay) {
        this.cacheSystem = cacheSystem;
        this.arrayOps = cacheSystem.getSlice().getArrayOps();
        this.requestOffset = requestOffset;
        this.offset = requestOffset;
        this.requestLimit = requestLimit;
        this.bulkSize = bulkSize;
        this.terminationDelay = terminationDelay;
        this.slice = cacheSystem.getSlice();
        this.pageRange = this.slice.newSliceAccessor();
        this.offsetToMaxAllowedRefetchCount = offset -> 5000L;
        this.nextCheckpointOffset = this.offset;
        this.endpointDemands.addValueChangeListener(event -> {
            if (logger.isTraceEnabled()) {
                logger.trace("End-offset of data range demand updated to " + this + ": " + event);
            }
            ObservableSlottedValue<Long, Long> observableSlottedValue = this.endpointDemands;
            synchronized (observableSlottedValue) {
                this.endpointDemands.notifyAll();
            }
        });
    }

    public long getMaxAllowedRefetchCount(long offset) {
        return this.offsetToMaxAllowedRefetchCount.applyAsLong(offset);
    }

    public Duration getFirstItemTime() {
        return this.firstItemTime;
    }

    public float getThroughput() {
        return (float)this.numItemsProcessed / (float)((double)this.processingTimeInNanos / 1.0E9);
    }

    public float etaAtIndex(long index) {
        long distance = index - this.offset;
        float throughput = this.getThroughput();
        float result = (float)distance * throughput;
        return result;
    }

    protected void closeActual() {
        if (this.sequentialReader != null) {
            try {
                this.sequentialReader.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this.pageRange.close();
    }

    protected synchronized void initBackendRequest() {
        if (this.isClosed) {
            return;
        }
        this.sequentialReader = this.cacheSystem.getDataSource().newInputStream((Range<Long>)Range.atLeast((Comparable)Long.valueOf(this.requestOffset)));
    }

    @Override
    public void run() {
        try {
            this.checkpoint();
            if (this.offset != this.nextCheckpointOffset) {
                this.runCore();
            }
            logger.debug("RangeRequestWorker normal termination at offset " + this.offset);
        }
        catch (Exception e) {
            logger.error("RangeRequestWorker exceptional termination at offset " + this.offset, (Throwable)e);
            throw new RuntimeException(e);
        }
        finally {
            this.close();
        }
    }

    protected void checkpoint() {
        long remainingAllowedItems = this.requestLimit - this.numItemsProcessed;
        long maxAllowedRefetchCount = this.getMaxAllowedRefetchCount(this.offset);
        long effectiveLimit = Math.min(maxAllowedRefetchCount, remainingAllowedItems);
        long claimAheadRangeEnd = this.offset + effectiveLimit;
        Range claimAheadRange = Range.closedOpen((Comparable)Long.valueOf(this.offset), (Comparable)Long.valueOf(claimAheadRangeEnd));
        LockUtils.runWithLock((Lock)this.slice.getReadWriteLock().readLock(), () -> {
            RangeSet<Long> gaps = this.slice.getGaps((Range<Long>)claimAheadRange);
            if (gaps.isEmpty()) {
                this.nextCheckpointOffset = claimAheadRangeEnd;
            } else {
                long last;
                Range lastGap = (Range)gaps.asDescendingSetOfRanges().iterator().next();
                this.nextCheckpointOffset = last = LongMath.saturatedAdd((long)((Long)ContiguousSet.create((Range)lastGap, (DiscreteDomain)DiscreteDomain.longs()).last()), (long)1L);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runCore() throws Exception {
        A buffer = this.arrayOps.create(this.bulkSize);
        this.initBackendRequest();
        boolean isFirstRun = true;
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.process(buffer, 0, 1);
        this.firstItemTime = stopwatch.elapsed();
        while (!this.terminationTimer.isRunning() || this.terminationTimer.elapsed(TimeUnit.MILLISECONDS) <= this.terminationDelay.toMillis()) {
            boolean hasNext;
            if (this.offset == this.nextCheckpointOffset) {
                this.checkpoint();
                if (this.offset == this.nextCheckpointOffset) break;
            }
            try {
                if (isFirstRun) {
                    isFirstRun = false;
                    hasNext = this.process(buffer, 1, this.bulkSize - 1) >= 0;
                } else {
                    hasNext = this.process(buffer, 0, this.bulkSize) >= 0;
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            if (!hasNext) break;
            ObservableSlottedValue<Long, Long> e = this.endpointDemands;
            synchronized (e) {
                long maxDemandedEndpoint = (Long)this.endpointDemands.build();
                if (this.offset >= maxDemandedEndpoint) {
                    switch (this.idleMode) {
                        case PAUSE: {
                            try {
                                this.endpointDemands.wait(this.terminationDelay.toMillis());
                            }
                            catch (InterruptedException interruptedException) {}
                            break;
                        }
                        case READ_AHEAD: {
                            if (maxDemandedEndpoint >= 0L || this.terminationTimer.isRunning()) break;
                            logger.debug("No more demand for data - starting termination timer");
                            this.terminationTimer.start();
                        }
                    }
                } else if (this.terminationTimer.isRunning()) {
                    logger.debug("New demand for data - resetting termination timer");
                    this.terminationTimer.reset();
                }
            }
            Lock readLock = this.slice.getReadWriteLock().readLock();
            readLock.lock();
            try {
                long knownSize = this.slice.getKnownSize();
                if (knownSize >= 0L && this.offset >= knownSize) {
                    break;
                }
            }
            finally {
                readLock.unlock();
            }
            if (!IdleMode.PAUSE.equals((Object)this.idleMode)) continue;
            ObservableSlottedValue<Long, Long> observableSlottedValue = this.endpointDemands;
            synchronized (observableSlottedValue) {
                long maxEndpoint = (Long)this.endpointDemands.build();
                if (maxEndpoint < 0L) {
                    break;
                }
            }
        }
    }

    public long getCurrentOffset() {
        return this.offset;
    }

    public long getEndOffset() {
        return LongMath.saturatedAdd((long)this.requestOffset, (long)this.requestLimit);
    }

    public Range<Long> getWorkingRange() {
        return Range.closedOpen((Comparable)Long.valueOf(this.offset), (Comparable)Long.valueOf(this.getEndOffset()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int process(A buffer, int bufferOffset, int n) throws Exception {
        if (n <= 0) {
            throw new IllegalArgumentException("Request to process 0 or fewer items is invalid");
        }
        this.pageRange.claimByOffsetRange(this.offset, this.offset + (long)n);
        long numItemsUntilNextCheckpoint = this.nextCheckpointOffset - this.offset;
        long remainingReads = Math.min((long)n, Math.min((long)this.bulkSize, numItemsUntilNextCheckpoint));
        if (IdleMode.PAUSE.equals((Object)this.idleMode)) {
            long maxEndpointDemand = (Long)this.endpointDemands.build();
            long numItemsUntilEndpoint = maxEndpointDemand - this.offset;
            remainingReads = Math.min(remainingReads, numItemsUntilEndpoint);
        }
        Lock writeLock = this.slice.getReadWriteLock().writeLock();
        writeLock.lock();
        int remainingReadsInt = Ints.checkedCast((long)remainingReads);
        int numItemsOfLastRead = 0;
        int result = 0;
        try {
            while (numItemsOfLastRead >= 0 && remainingReadsInt != 0 && !this.isClosed && !Thread.interrupted() && (numItemsOfLastRead = this.sequentialReader.read(buffer, bufferOffset, remainingReadsInt)) >= 0) {
                this.pageRange.write(this.offset, buffer, bufferOffset, numItemsOfLastRead);
                remainingReadsInt -= numItemsOfLastRead;
                result += numItemsOfLastRead;
                bufferOffset += numItemsOfLastRead;
                this.offset += (long)numItemsOfLastRead;
            }
            this.numItemsProcessed += (long)result;
            result = result == 0 && n != 0 ? -1 : result;
            this.slice.updateMinimumKnownSize(this.offset);
            if (numItemsOfLastRead < 0 && this.numItemsProcessed < this.requestLimit && this.slice.getKnownSize() < 0L) {
                this.slice.setMaximumKnownSize(this.offset);
            }
            if (logger.isTraceEnabled()) {
                logger.trace(String.format("Signalling data condition to clients - offset: %1$d, processed: %2$d, limit:  %3$d, loaded ranges: %4$s", this.offset, this.numItemsProcessed, this.requestLimit, this.slice.getLoadedRanges()));
            }
            this.slice.getHasDataCondition().signalAll();
        }
        finally {
            writeLock.unlock();
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Slot<Long> newDemandSlot() {
        ObservableSlottedValue<Long, Long> observableSlottedValue = this.endpointDemands;
        synchronized (observableSlottedValue) {
            return this.endpointDemands.newSlot();
        }
    }
}

