/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.StreamChunkParser;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CircularBuffer;
import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;

public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
implements ChatHandler {
    private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskRunner.class);
    static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
    static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
    private final Map<PartitionIdType, SequenceOffsetType> endOffsets;
    private final Map<PartitionIdType, SequenceOffsetType> lastReadOffsets = new HashMap<PartitionIdType, SequenceOffsetType>();
    private final ConcurrentMap<PartitionIdType, SequenceOffsetType> currOffsets = new ConcurrentHashMap<PartitionIdType, SequenceOffsetType>();
    private final ConcurrentMap<PartitionIdType, SequenceOffsetType> lastPersistedOffsets = new ConcurrentHashMap<PartitionIdType, SequenceOffsetType>();
    private final Lock pauseLock = new ReentrantLock();
    private final Condition hasPaused = this.pauseLock.newCondition();
    private final Condition shouldResume = this.pauseLock.newCondition();
    protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
    private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
    private final Object statusLock = new Object();
    protected final Lock pollRetryLock = new ReentrantLock();
    protected final Condition isAwaitingRetry = this.pollRetryLock.newCondition();
    private final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task;
    private final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
    private final SeekableStreamIndexTaskTuningConfig tuningConfig;
    private final InputRowSchema inputRowSchema;
    @Nullable
    private final InputFormat inputFormat;
    @Nullable
    private final InputRowParser<ByteBuffer> parser;
    private final String stream;
    private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
    private final List<ListenableFuture<SegmentsAndCommitMetadata>> publishWaitList = new ArrayList<ListenableFuture<SegmentsAndCommitMetadata>>();
    private final List<ListenableFuture<SegmentsAndCommitMetadata>> handOffWaitList = new ArrayList<ListenableFuture<SegmentsAndCommitMetadata>>();
    private final LockGranularity lockGranularityToUse;
    private @MonotonicNonNull RowIngestionMeters rowIngestionMeters;
    private @MonotonicNonNull ParseExceptionHandler parseExceptionHandler;
    private @MonotonicNonNull FireDepartmentMetrics fireDepartmentMetrics;
    private @MonotonicNonNull AuthorizerMapper authorizerMapper;
    private volatile DateTime startTime;
    private volatile Status status = Status.NOT_STARTED;
    private volatile TaskToolbox toolbox;
    private volatile Thread runThread;
    private volatile Appenderator appenderator;
    private volatile StreamAppenderatorDriver driver;
    private volatile IngestionState ingestionState;
    protected volatile boolean pauseRequested = false;
    private volatile long nextCheckpointTime;
    private volatile CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequences;
    private volatile Throwable backgroundThreadException;

    public SeekableStreamIndexTaskRunner(SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task, @Nullable InputRowParser<ByteBuffer> parser, AuthorizerMapper authorizerMapper, LockGranularity lockGranularityToUse) {
        Preconditions.checkNotNull(task);
        this.task = task;
        this.ioConfig = task.getIOConfig();
        this.tuningConfig = task.getTuningConfig();
        this.inputRowSchema = InputRowSchemas.fromDataSchema(task.getDataSchema());
        this.inputFormat = this.ioConfig.getInputFormat();
        this.parser = parser;
        this.authorizerMapper = authorizerMapper;
        this.stream = this.ioConfig.getStartSequenceNumbers().getStream();
        this.endOffsets = new ConcurrentHashMap<PartitionIdType, SequenceOffsetType>(this.ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
        this.sequences = new CopyOnWriteArrayList();
        this.ingestionState = IngestionState.NOT_STARTED;
        this.lockGranularityToUse = lockGranularityToUse;
        this.resetNextCheckpointTime();
    }

    public TaskStatus run(TaskToolbox toolbox) {
        try {
            return this.runInternal(toolbox);
        }
        catch (Exception e) {
            log.error((Throwable)e, "Encountered exception while running task.", new Object[0]);
            String errorMsg = Throwables.getStackTraceAsString((Throwable)e);
            toolbox.getTaskReportFileWriter().write(this.task.getId(), this.getTaskCompletionReports(errorMsg, 0L));
            return TaskStatus.failure((String)this.task.getId(), (String)errorMsg);
        }
    }

    private Set<PartitionIdType> computeExclusiveStartPartitionsForSequence(Map<PartitionIdType, SequenceOffsetType> sequenceStartOffsets) {
        if (sequenceStartOffsets.equals(this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap())) {
            return this.ioConfig.getStartSequenceNumbers().getExclusivePartitions();
        }
        return this.isEndOffsetExclusive() ? Collections.emptySet() : sequenceStartOffsets.keySet();
    }

    @VisibleForTesting
    public void setToolbox(TaskToolbox toolbox) {
        this.toolbox = toolbox;
    }

    @VisibleForTesting
    public void initializeSequences() throws IOException {
        if (!this.restoreSequences()) {
            TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> checkpoints = this.getCheckPointsFromContext(this.toolbox, (String)this.task.getContextValue("checkpoints"));
            if (checkpoints != null) {
                Iterator<Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>>> sequenceOffsets = checkpoints.entrySet().iterator();
                Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous = sequenceOffsets.next();
                while (sequenceOffsets.hasNext()) {
                    Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current = sequenceOffsets.next();
                    Set<PartitionIdType> exclusiveStartPartitions = this.computeExclusiveStartPartitionsForSequence(previous.getValue());
                    this.addSequence(new SequenceMetadata<PartitionIdType, SequenceOffsetType>(previous.getKey(), StringUtils.format((String)"%s_%s", (Object[])new Object[]{this.ioConfig.getBaseSequenceName(), previous.getKey()}), previous.getValue(), current.getValue(), true, exclusiveStartPartitions));
                    previous = current;
                }
                Set<PartitionIdType> exclusiveStartPartitions = this.computeExclusiveStartPartitionsForSequence(previous.getValue());
                this.addSequence(new SequenceMetadata<PartitionIdType, SequenceOffsetType>(previous.getKey(), StringUtils.format((String)"%s_%s", (Object[])new Object[]{this.ioConfig.getBaseSequenceName(), previous.getKey()}), previous.getValue(), this.endOffsets, false, exclusiveStartPartitions));
            } else {
                this.addSequence(new SequenceMetadata<PartitionIdType, SequenceOffsetType>(0, StringUtils.format((String)"%s_%s", (Object[])new Object[]{this.ioConfig.getBaseSequenceName(), 0}), this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(), this.endOffsets, false, this.ioConfig.getStartSequenceNumbers().getExclusivePartitions()));
            }
        }
        log.info("Starting with sequences: %s", new Object[]{this.sequences});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    private TaskStatus runInternal(TaskToolbox toolbox) throws Exception {
        this.startTime = DateTimes.nowUtc();
        this.status = Status.STARTING;
        this.setToolbox(toolbox);
        this.authorizerMapper = toolbox.getAuthorizerMapper();
        this.rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
        this.parseExceptionHandler = new ParseExceptionHandler(this.rowIngestionMeters, this.tuningConfig.isLogParseExceptions(), this.tuningConfig.getMaxParseExceptions(), this.tuningConfig.getMaxSavedParseExceptions());
        StreamChunkParser<ByteEntity> parser = new StreamChunkParser<ByteEntity>(this.parser, this.inputFormat, this.inputRowSchema, this.task.getDataSchema().getTransformSpec(), toolbox.getIndexingTmpDir(), row -> row != null && this.task.withinMinMaxRecordTime((InputRow)row), this.rowIngestionMeters, this.parseExceptionHandler);
        this.initializeSequences();
        log.debug("Found chat handler of class[%s]", new Object[]{toolbox.getChatHandlerProvider().getClass().getName()});
        toolbox.getChatHandlerProvider().register(this.task.getId(), (ChatHandler)this, false);
        this.runThread = Thread.currentThread();
        FireDepartment fireDepartmentForMetrics = new FireDepartment(this.task.getDataSchema(), new RealtimeIOConfig(null, null), null);
        this.fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
        TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(this.task, fireDepartmentForMetrics, this.rowIngestionMeters);
        toolbox.addMonitor((Monitor)metricsMonitor);
        String lookupTier = (String)this.task.getContextValue("lookupTier");
        LookupNodeService lookupNodeService = lookupTier == null ? toolbox.getLookupNodeService() : new LookupNodeService(lookupTier);
        DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(toolbox.getDruidNode(), NodeRole.PEON, (Map)ImmutableMap.of((Object)toolbox.getDataNodeService().getName(), (Object)toolbox.getDataNodeService(), (Object)lookupNodeService.getName(), (Object)lookupNodeService));
        Exception caughtExceptionOuter = null;
        long handoffWaitMs = 0L;
        try (RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier = this.task.newTaskRecordSupplier();){
            if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                toolbox.getDataSegmentServerAnnouncer().announce();
                toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
            }
            this.appenderator = this.task.newAppenderator(toolbox, this.fireDepartmentMetrics, this.rowIngestionMeters, this.parseExceptionHandler);
            this.driver = this.task.newDriver(this.appenderator, toolbox, this.fireDepartmentMetrics);
            Object restoredMetadata = this.driver.startJob(segmentId -> {
                try {
                    if (this.lockGranularityToUse == LockGranularity.SEGMENT) {
                        return toolbox.getTaskActionClient().submit(new SegmentLockAcquireAction(TaskLockType.EXCLUSIVE, segmentId.getInterval(), segmentId.getVersion(), segmentId.getShardSpec().getPartitionNum(), 1000L)).isOk();
                    }
                    TaskLock lock = toolbox.getTaskActionClient().submit(new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, segmentId.getInterval(), 1000L));
                    if (lock == null) {
                        return false;
                    }
                    if (lock.isRevoked()) {
                        throw new ISE(StringUtils.format((String)"Lock for interval [%s] was revoked.", (Object[])new Object[]{segmentId.getInterval()}), new Object[0]);
                    }
                    return true;
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            if (restoredMetadata == null) {
                Preconditions.checkState((boolean)this.sequences.get((int)0).startOffsets.entrySet().stream().allMatch(partitionOffsetEntry -> this.createSequenceNumber(partitionOffsetEntry.getValue()).compareTo(this.createSequenceNumber(this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(partitionOffsetEntry.getKey()))) >= 0), (Object)"Sequence sequences are not compatible with start sequences of task");
                this.currOffsets.putAll(this.sequences.get((int)0).startOffsets);
            } else {
                Map restoredMetadataMap = (Map)restoredMetadata;
                SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> restoredNextPartitions = this.deserializePartitionsFromMetadata(toolbox.getJsonMapper(), restoredMetadataMap.get(METADATA_NEXT_PARTITIONS));
                this.currOffsets.putAll(restoredNextPartitions.getPartitionSequenceNumberMap());
                if (!restoredNextPartitions.getStream().equals(this.ioConfig.getStartSequenceNumbers().getStream())) {
                    throw new ISE("Restored stream[%s] but expected stream[%s]", new Object[]{restoredNextPartitions.getStream(), this.ioConfig.getStartSequenceNumbers().getStream()});
                }
                if (!this.currOffsets.keySet().equals(this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet())) {
                    throw new ISE("Restored partitions[%s] but expected partitions[%s]", new Object[]{this.currOffsets.keySet(), this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet()});
                }
                if (this.sequences.size() == 0 || this.getLastSequenceMetadata().isCheckpointed()) {
                    this.endOffsets.putAll(this.sequences.size() == 0 ? this.currOffsets : this.getLastSequenceMetadata().getEndOffsets());
                }
            }
            log.info("Initialized sequences: %s", new Object[]{this.sequences.stream().map(SequenceMetadata::toString).collect(Collectors.joining(", "))});
            int numPreFilterPartitions = this.currOffsets.size();
            if (this.currOffsets.entrySet().removeIf(x -> this.isEndOfShard(x.getValue()))) {
                log.info("Removed [%d] partitions from assignment which have already been closed.", new Object[]{numPreFilterPartitions - this.currOffsets.size()});
            }
            if (!this.isEndOffsetExclusive()) {
                for (Map.Entry entry : this.currOffsets.entrySet()) {
                    boolean isAtStart = entry.getValue().equals(this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(entry.getKey()));
                    if (isAtStart && !this.ioConfig.getStartSequenceNumbers().getExclusivePartitions().contains(entry.getKey())) continue;
                    this.lastReadOffsets.put(entry.getKey(), entry.getValue());
                }
            }
            Supplier committerSupplier = () -> {
                ImmutableMap snapshot = ImmutableMap.copyOf(this.currOffsets);
                this.lastPersistedOffsets.clear();
                this.lastPersistedOffsets.putAll((Map<PartitionIdType, SequenceOffsetType>)snapshot);
                return new Committer((Map)snapshot){
                    final /* synthetic */ Map val$snapshot;
                    {
                        this.val$snapshot = map;
                    }

                    public Object getMetadata() {
                        return ImmutableMap.of((Object)SeekableStreamIndexTaskRunner.METADATA_NEXT_PARTITIONS, new SeekableStreamEndSequenceNumbers(SeekableStreamIndexTaskRunner.this.stream, this.val$snapshot));
                    }

                    public void run() {
                    }
                };
            };
            this.maybePersistAndPublishSequences((Supplier<Committer>)committerSupplier);
            Set<StreamPartition<PartitionIdType>> set = this.assignPartitions(recordSupplier);
            this.possiblyResetDataSourceMetadata(toolbox, recordSupplier, set);
            this.seekToStartingSequence(recordSupplier, set);
            this.ingestionState = IngestionState.BUILD_SEGMENTS;
            boolean stillReading = !set.isEmpty();
            this.status = Status.READING;
            Exception caughtExceptionInner = null;
            try {
                while (stillReading) {
                    void var16_26;
                    if (this.possiblyPause()) {
                        Set<StreamPartition<PartitionIdType>> set2 = this.assignPartitions(recordSupplier);
                        this.possiblyResetDataSourceMetadata(toolbox, recordSupplier, set2);
                        if (set2.isEmpty()) {
                            log.debug("All partitions have been fully read.", new Object[0]);
                            this.publishOnStop.set(true);
                            this.stopRequested.set(true);
                        }
                    }
                    if (this.stopRequested.get() || this.sequences.size() == 0 || this.getLastSequenceMetadata().isCheckpointed()) {
                        this.status = Status.PUBLISHING;
                    }
                    if (this.stopRequested.get()) break;
                    if (this.backgroundThreadException != null) {
                        throw new RuntimeException(this.backgroundThreadException);
                    }
                    this.checkPublishAndHandoffFailure();
                    this.maybePersistAndPublishSequences((Supplier<Committer>)committerSupplier);
                    List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType>> records = this.getRecords(recordSupplier, toolbox);
                    stillReading = !var16_26.isEmpty();
                    SequenceMetadata sequenceToCheckpoint = null;
                    for (OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType> record : records) {
                        boolean moreToReadAfterThisRecord;
                        boolean shouldProcess = this.verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber());
                        log.trace("Got stream[%s] partition[%s] sequenceNumber[%s], shouldProcess[%s].", new Object[]{record.getStream(), record.getPartitionId(), record.getSequenceNumber(), shouldProcess});
                        if (shouldProcess) {
                            List<InputRow> rows = parser.parse(record.getData(), this.isEndOfShard(record.getSequenceNumber()));
                            boolean isPersistRequired = false;
                            SequenceMetadata sequenceToUse = this.sequences.stream().filter(sequenceMetadata -> sequenceMetadata.canHandle(this, record)).findFirst().orElse(null);
                            if (sequenceToUse == null) {
                                throw new ISE("Cannot find any valid sequence for record with partition [%s] and sequenceNumber [%s]. Current sequences: %s", new Object[]{record.getPartitionId(), record.getSequenceNumber(), this.sequences});
                            }
                            for (InputRow row2 : rows) {
                                AppenderatorDriverAddResult addResult = this.driver.add(row2, sequenceToUse.getSequenceName(), committerSupplier, true, false);
                                if (addResult.isOk()) {
                                    boolean isPushRequired = addResult.isPushRequired(this.tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(), Long.valueOf(this.tuningConfig.getPartitionsSpec().getMaxTotalRowsOr(20000000L)));
                                    if (isPushRequired && !sequenceToUse.isCheckpointed()) {
                                        sequenceToCheckpoint = sequenceToUse;
                                    }
                                    isPersistRequired |= addResult.isPersistRequired();
                                    continue;
                                }
                                throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{row2.getTimestamp()});
                            }
                            if (isPersistRequired) {
                                Futures.addCallback((ListenableFuture)this.driver.persistAsync((Committer)committerSupplier.get()), (FutureCallback)new FutureCallback<Object>(){

                                    public void onSuccess(@Nullable Object result) {
                                        log.debug("Persist completed with metadata: %s", new Object[]{result});
                                    }

                                    public void onFailure(Throwable t) {
                                        log.error("Persist failed, dying", new Object[0]);
                                        SeekableStreamIndexTaskRunner.this.backgroundThreadException = t;
                                    }
                                });
                            }
                            this.lastReadOffsets.put(record.getPartitionId(), record.getSequenceNumber());
                            this.currOffsets.put(record.getPartitionId(), this.getNextStartOffset(record.getSequenceNumber()));
                        }
                        if ((moreToReadAfterThisRecord = this.isMoreToReadAfterReadingRecord(record.getSequenceNumber(), this.endOffsets.get(record.getPartitionId()))) || !var16_26.remove(record.getStreamPartition())) continue;
                        log.info("Finished reading stream[%s], partition[%s].", new Object[]{record.getStream(), record.getPartitionId()});
                        recordSupplier.assign((Set<StreamPartition<PartitionIdType>>)var16_26);
                        stillReading = !var16_26.isEmpty();
                    }
                    if (!stillReading) {
                        this.fireDepartmentMetrics.markProcessingDone();
                    }
                    if (System.currentTimeMillis() > this.nextCheckpointTime) {
                        sequenceToCheckpoint = this.getLastSequenceMetadata();
                    }
                    if (sequenceToCheckpoint == null || !stillReading) continue;
                    Preconditions.checkArgument((boolean)this.getLastSequenceMetadata().getSequenceName().equals(sequenceToCheckpoint.getSequenceName()), (String)"Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s", (Object[])new Object[]{sequenceToCheckpoint, this.sequences});
                    this.requestPause();
                    CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(this.task.getDataSource(), this.ioConfig.getTaskGroupId(), null, this.createDataSourceMetadata(new SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>(this.stream, sequenceToCheckpoint.getStartOffsets(), sequenceToCheckpoint.getExclusiveStartPartitions())));
                    if (toolbox.getTaskActionClient().submit(checkpointAction).booleanValue()) continue;
                    throw new ISE("Checkpoint request with sequences [%s] failed, dying", new Object[]{this.currOffsets});
                }
                this.ingestionState = IngestionState.COMPLETED;
            }
            catch (Exception e) {
                caughtExceptionInner = e;
                log.error((Throwable)e, "Encountered exception in run() before persisting.", new Object[0]);
                throw e;
            }
            finally {
                try {
                    this.driver.persist((Committer)committerSupplier.get());
                }
                catch (Exception e) {
                    if (caughtExceptionInner != null) {
                        caughtExceptionInner.addSuppressed(e);
                    }
                    throw e;
                }
            }
            Object e = this.statusLock;
            synchronized (e) {
                if (this.stopRequested.get() && !this.publishOnStop.get()) {
                    throw new InterruptedException("Stopping without publishing");
                }
                this.status = Status.PUBLISHING;
            }
            ArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequencesSnapshot = new ArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>>(this.sequences);
            for (int i = 0; i < sequencesSnapshot.size(); ++i) {
                boolean isLast;
                SequenceMetadata sequenceMetadata2 = (SequenceMetadata)sequencesSnapshot.get(i);
                if (this.publishingSequences.contains(sequenceMetadata2.getSequenceName())) continue;
                boolean bl = isLast = i == sequencesSnapshot.size() - 1;
                if (isLast) {
                    sequenceMetadata2.setEndOffsets(this.currOffsets);
                }
                sequenceMetadata2.updateAssignments(this.currOffsets, this::isMoreToReadAfterReadingRecord);
                this.publishingSequences.add(sequenceMetadata2.getSequenceName());
                this.publishAndRegisterHandoff(sequenceMetadata2);
            }
            if (this.backgroundThreadException != null) {
                throw new RuntimeException(this.backgroundThreadException);
            }
            Futures.allAsList(this.publishWaitList).get();
            List handedOffList = Collections.emptyList();
            if (this.tuningConfig.getHandoffConditionTimeout() == 0L) {
                handedOffList = (List)Futures.allAsList(this.handOffWaitList).get();
            } else {
                long start = System.nanoTime();
                try {
                    handedOffList = (List)Futures.allAsList(this.handOffWaitList).get(this.tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e2) {
                    log.makeAlert("Timeout waiting for handoff", new Object[0]).addData("taskId", (Object)this.task.getId()).addData("handoffConditionTimeout", (Object)this.tuningConfig.getHandoffConditionTimeout()).emit();
                }
                finally {
                    handoffWaitMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
                }
            }
            for (SegmentsAndCommitMetadata handedOff : handedOffList) {
                log.info("Handoff complete for segments: %s", new Object[]{String.join((CharSequence)", ", Lists.transform((List)handedOff.getSegments(), DataSegment::toString))});
            }
            this.appenderator.close();
        }
        catch (InterruptedException | RejectedExecutionException e) {
            caughtExceptionOuter = e;
            try {
                Futures.allAsList(this.publishWaitList).cancel(true);
                Futures.allAsList(this.handOffWaitList).cancel(true);
                if (this.appenderator != null) {
                    this.appenderator.closeNow();
                }
            }
            catch (Exception e2) {
                e.addSuppressed(e2);
            }
            if (e instanceof RejectedExecutionException && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
                throw e;
            }
            if (!this.stopRequested.get()) {
                Thread.currentThread().interrupt();
                throw e;
            }
        }
        catch (Exception e) {
            caughtExceptionOuter = e;
            try {
                Futures.allAsList(this.publishWaitList).cancel(true);
                Futures.allAsList(this.handOffWaitList).cancel(true);
                if (this.appenderator != null) {
                    this.appenderator.closeNow();
                }
            }
            catch (Exception e2) {
                e.addSuppressed(e2);
            }
            throw e;
        }
        finally {
            try {
                if (this.driver != null) {
                    this.driver.close();
                }
                toolbox.getChatHandlerProvider().unregister(this.task.getId());
                toolbox.removeMonitor((Monitor)metricsMonitor);
                if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                    toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                    toolbox.getDataSegmentServerAnnouncer().unannounce();
                }
            }
            catch (Throwable e) {
                if (caughtExceptionOuter != null) {
                    caughtExceptionOuter.addSuppressed(e);
                }
                throw e;
            }
        }
        toolbox.getTaskReportFileWriter().write(this.task.getId(), this.getTaskCompletionReports(null, handoffWaitMs));
        return TaskStatus.success((String)this.task.getId());
    }

    private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException {
        List publishFinished = this.publishWaitList.stream().filter(Future::isDone).collect(Collectors.toList());
        for (ListenableFuture publishFuture : publishFinished) {
            publishFuture.get();
        }
        this.publishWaitList.removeAll(publishFinished);
        List handoffFinished = this.handOffWaitList.stream().filter(Future::isDone).collect(Collectors.toList());
        for (ListenableFuture handoffFuture : handoffFinished) {
            handoffFuture.get();
        }
        this.handOffWaitList.removeAll(handoffFinished);
    }

    private void publishAndRegisterHandoff(final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata) {
        log.debug("Publishing segments for sequence [%s].", new Object[]{sequenceMetadata});
        ListenableFuture publishFuture = Futures.transform((ListenableFuture)this.driver.publish(sequenceMetadata.createPublisher(this, this.toolbox, this.ioConfig.isUseTransaction()), (Committer)sequenceMetadata.getCommitterSupplier(this, this.stream, this.lastPersistedOffsets).get(), Collections.singletonList(sequenceMetadata.getSequenceName())), publishedSegmentsAndMetadata -> {
            if (publishedSegmentsAndMetadata == null) {
                throw new ISE("Transaction failure publishing segments for sequence [%s]", new Object[]{sequenceMetadata});
            }
            return publishedSegmentsAndMetadata;
        });
        this.publishWaitList.add((ListenableFuture<SegmentsAndCommitMetadata>)publishFuture);
        final SettableFuture handoffFuture = SettableFuture.create();
        this.handOffWaitList.add((ListenableFuture<SegmentsAndCommitMetadata>)handoffFuture);
        Futures.addCallback((ListenableFuture)publishFuture, (FutureCallback)new FutureCallback<SegmentsAndCommitMetadata>(){

            public void onSuccess(final SegmentsAndCommitMetadata publishedSegmentsAndCommitMetadata) {
                log.info("Published %s segments for sequence [%s] with metadata [%s].", new Object[]{publishedSegmentsAndCommitMetadata.getSegments().size(), sequenceMetadata.getSequenceName(), Preconditions.checkNotNull((Object)publishedSegmentsAndCommitMetadata.getCommitMetadata(), (Object)"commitMetadata")});
                log.infoSegments((Collection)publishedSegmentsAndCommitMetadata.getSegments(), "Published segments");
                SeekableStreamIndexTaskRunner.this.sequences.remove(sequenceMetadata);
                SeekableStreamIndexTaskRunner.this.publishingSequences.remove(sequenceMetadata.getSequenceName());
                try {
                    SeekableStreamIndexTaskRunner.this.persistSequences();
                }
                catch (IOException e) {
                    log.error((Throwable)e, "Unable to persist state, dying", new Object[0]);
                    handoffFuture.setException((Throwable)e);
                    throw new RuntimeException(e);
                }
                Futures.transform((ListenableFuture)SeekableStreamIndexTaskRunner.this.driver.registerHandoff(publishedSegmentsAndCommitMetadata), (Function)new Function<SegmentsAndCommitMetadata, Void>(){

                    @Nullable
                    public Void apply(@Nullable SegmentsAndCommitMetadata handoffSegmentsAndCommitMetadata) {
                        if (handoffSegmentsAndCommitMetadata == null) {
                            log.warn("Failed to hand off %s segments", new Object[]{publishedSegmentsAndCommitMetadata.getSegments().size()});
                            log.warnSegments((Collection)publishedSegmentsAndCommitMetadata.getSegments(), "Failed to hand off segments");
                        }
                        handoffFuture.set((Object)handoffSegmentsAndCommitMetadata);
                        return null;
                    }
                });
                int segmentCount = 0;
                if (publishedSegmentsAndCommitMetadata != null && publishedSegmentsAndCommitMetadata.getSegments() != null) {
                    segmentCount = publishedSegmentsAndCommitMetadata.getSegments().size();
                }
                SeekableStreamIndexTaskRunner.this.task.emitMetric(SeekableStreamIndexTaskRunner.this.toolbox.getEmitter(), "ingest/segment/count", segmentCount);
            }

            public void onFailure(Throwable t) {
                log.error(t, "Error while publishing segments for sequenceNumber[%s]", new Object[]{sequenceMetadata});
                handoffFuture.setException(t);
            }
        });
    }

    private static File getSequencesPersistFile(TaskToolbox toolbox) {
        return new File(toolbox.getPersistDir(), "sequences.json");
    }

    private boolean restoreSequences() throws IOException {
        File sequencesPersistFile = SeekableStreamIndexTaskRunner.getSequencesPersistFile(this.toolbox);
        if (sequencesPersistFile.exists()) {
            this.sequences = new CopyOnWriteArrayList((Collection)this.toolbox.getJsonMapper().readValue(sequencesPersistFile, this.getSequenceMetadataTypeReference()));
            return true;
        }
        return false;
    }

    private synchronized void persistSequences() throws IOException {
        this.toolbox.getJsonMapper().writerFor(this.getSequenceMetadataTypeReference()).writeValue(SeekableStreamIndexTaskRunner.getSequencesPersistFile(this.toolbox), this.sequences);
        log.info("Saved sequence metadata to disk: %s", new Object[]{this.sequences});
    }

    private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorMsg, long handoffWaitMs) {
        return TaskReport.buildTaskReports(new IngestionStatsAndErrorsTaskReport(this.task.getId(), new IngestionStatsAndErrorsTaskReportData(this.ingestionState, this.getTaskCompletionUnparseableEvents(), this.getTaskCompletionRowStats(), errorMsg, errorMsg == null, handoffWaitMs)));
    }

    private Map<String, Object> getTaskCompletionUnparseableEvents() {
        HashMap<String, Object> unparseableEventsMap = new HashMap<String, Object>();
        List<ParseExceptionReport> buildSegmentsParseExceptionMessages = IndexTaskUtils.getReportListFromSavedParseExceptions((CircularBuffer<ParseExceptionReport>)this.parseExceptionHandler.getSavedParseExceptionReports());
        if (buildSegmentsParseExceptionMessages != null) {
            unparseableEventsMap.put("buildSegments", buildSegmentsParseExceptionMessages);
        }
        return unparseableEventsMap;
    }

    private Map<String, Object> getTaskCompletionRowStats() {
        HashMap<String, Object> metrics = new HashMap<String, Object>();
        metrics.put("buildSegments", this.rowIngestionMeters.getTotals());
        return metrics;
    }

    private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier) throws InterruptedException {
        for (SequenceMetadata<PartitionIdType, Object> sequenceMetadata : this.sequences) {
            sequenceMetadata.updateAssignments(this.currOffsets, this::isMoreToReadBeforeReadingRecord);
            if (sequenceMetadata.isOpen() || this.publishingSequences.contains(sequenceMetadata.getSequenceName())) continue;
            this.publishingSequences.add(sequenceMetadata.getSequenceName());
            try {
                Object result = this.driver.persist((Committer)committerSupplier.get());
                log.debug("Persist completed with metadata [%s], adding sequence [%s] to publish queue.", new Object[]{result, sequenceMetadata.getSequenceName()});
                this.publishAndRegisterHandoff(sequenceMetadata);
            }
            catch (InterruptedException e) {
                log.warn("Interrupted while persisting metadata for sequence [%s].", new Object[]{sequenceMetadata.getSequenceName()});
                throw e;
            }
        }
    }

    private Set<StreamPartition<PartitionIdType>> assignPartitions(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier) {
        HashSet assignment = new HashSet();
        for (Map.Entry entry : this.currOffsets.entrySet()) {
            Object partition = entry.getKey();
            Object currOffset = entry.getValue();
            SequenceOffsetType endOffset = this.endOffsets.get(partition);
            if (!this.isRecordAlreadyRead(partition, endOffset) && this.isMoreToReadBeforeReadingRecord(currOffset, endOffset)) {
                log.info("Adding partition[%s], start[%s] -> end[%s] to assignment.", new Object[]{partition, currOffset, endOffset});
                assignment.add(StreamPartition.of(this.stream, partition));
                continue;
            }
            log.info("Finished reading partition[%s], up to[%s].", new Object[]{partition, currOffset});
        }
        recordSupplier.assign(assignment);
        return assignment;
    }

    private void addSequence(SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata) {
        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : sequenceMetadata.getStartOffsets().entrySet()) {
            Object priorOffset;
            PartitionIdType partition = entry.getKey();
            SequenceOffsetType startOffset = entry.getValue();
            if (this.sequences.isEmpty() || startOffset.equals(priorOffset = this.getLastSequenceMetadata().endOffsets.get(partition))) continue;
            throw new ISE("New sequence startOffset[%s] does not equal expected prior offset[%s]", new Object[]{startOffset, priorOffset});
        }
        if (!this.isEndOffsetExclusive() && !this.sequences.isEmpty()) {
            SequenceMetadata<PartitionIdType, SequenceOffsetType> lastMetadata = this.getLastSequenceMetadata();
            if (!lastMetadata.endOffsets.keySet().equals(sequenceMetadata.getExclusiveStartPartitions())) {
                throw new ISE("Exclusive start partitions[%s] for new sequence don't match to the prior offset[%s]", new Object[]{sequenceMetadata.getExclusiveStartPartitions(), lastMetadata});
            }
        }
        this.sequences.add(sequenceMetadata);
    }

    private SequenceMetadata<PartitionIdType, SequenceOffsetType> getLastSequenceMetadata() {
        Preconditions.checkState((!this.sequences.isEmpty() ? 1 : 0) != 0, (Object)"Empty sequences");
        return this.sequences.get(this.sequences.size() - 1);
    }

    private boolean isRecordAlreadyRead(PartitionIdType recordPartition, SequenceOffsetType recordSequenceNumber) {
        SequenceOffsetType lastReadOffset = this.lastReadOffsets.get(recordPartition);
        if (lastReadOffset == null) {
            return false;
        }
        return this.createSequenceNumber(recordSequenceNumber).compareTo(this.createSequenceNumber(lastReadOffset)) <= 0;
    }

    private boolean isMoreToReadBeforeReadingRecord(SequenceOffsetType recordSequenceNumber, SequenceOffsetType endSequenceNumber) {
        return this.createSequenceNumber(recordSequenceNumber).isMoreToReadBeforeReadingRecord(this.createSequenceNumber(endSequenceNumber), this.isEndOffsetExclusive());
    }

    private boolean isMoreToReadAfterReadingRecord(SequenceOffsetType recordSequenceNumber, SequenceOffsetType endSequenceNumber) {
        int compareNextToEnd = this.createSequenceNumber(this.getNextStartOffset(recordSequenceNumber)).compareTo(this.createSequenceNumber(endSequenceNumber));
        return compareNextToEnd < 0;
    }

    private void seekToStartingSequence(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier, Set<StreamPartition<PartitionIdType>> partitions) throws InterruptedException {
        for (StreamPartition<PartitionIdType> partition : partitions) {
            Object sequence = this.currOffsets.get(partition.getPartitionId());
            log.info("Seeking partition[%s] to[%s].", new Object[]{partition.getPartitionId(), sequence});
            recordSupplier.seek(partition, sequence);
        }
    }

    private boolean possiblyPause() throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            if (this.pauseRequested) {
                this.status = Status.PAUSED;
                this.hasPaused.signalAll();
                log.debug("Received pause command, pausing ingestion until resumed.", new Object[0]);
                while (this.pauseRequested) {
                    this.shouldResume.await();
                }
                this.status = Status.READING;
                this.shouldResume.signalAll();
                log.debug("Received resume command, resuming ingestion.", new Object[0]);
                boolean bl = true;
                return bl;
            }
        }
        finally {
            this.pauseLock.unlock();
        }
        return false;
    }

    private boolean isPaused() {
        return this.status == Status.PAUSED;
    }

    private void requestPause() {
        this.pauseRequested = true;
    }

    protected void sendResetRequestAndWait(Map<StreamPartition<PartitionIdType>, SequenceOffsetType> outOfRangePartitions, TaskToolbox taskToolbox) throws IOException {
        Map partitionOffsetMap = CollectionUtils.mapKeys(outOfRangePartitions, StreamPartition::getPartitionId);
        boolean result = taskToolbox.getTaskActionClient().submit(new ResetDataSourceMetadataAction(this.task.getDataSource(), this.createDataSourceMetadata(new SeekableStreamEndSequenceNumbers(this.ioConfig.getStartSequenceNumbers().getStream(), partitionOffsetMap))));
        if (result) {
            log.makeAlert("Offsets were reset automatically, potential data duplication or loss", new Object[0]).addData("task", (Object)this.task.getId()).addData("dataSource", (Object)this.task.getDataSource()).addData("partitions", partitionOffsetMap.keySet()).emit();
            this.requestPause();
        } else {
            log.makeAlert("Failed to send offset reset request", new Object[0]).addData("task", (Object)this.task.getId()).addData("dataSource", (Object)this.task.getDataSource()).addData("partitions", (Object)ImmutableSet.copyOf(partitionOffsetMap.keySet())).emit();
        }
    }

    private Access authorizationCheck(HttpServletRequest req, Action action) {
        return IndexTaskUtils.datasourceAuthorizationCheck(req, action, this.task.getDataSource(), this.authorizerMapper);
    }

    public Appenderator getAppenderator() {
        return this.appenderator;
    }

    @VisibleForTesting
    public RowIngestionMeters getRowIngestionMeters() {
        return this.rowIngestionMeters;
    }

    @VisibleForTesting
    public FireDepartmentMetrics getFireDepartmentMetrics() {
        return this.fireDepartmentMetrics;
    }

    public void stopForcefully() {
        log.info("Stopping forcefully (status: [%s])", new Object[]{this.status});
        this.stopRequested.set(true);
        this.runThread.interrupt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void stopGracefully() {
        log.info("Stopping gracefully (status: [%s])", new Object[]{this.status});
        this.stopRequested.set(true);
        Object object = this.statusLock;
        synchronized (object) {
            if (this.status == Status.PUBLISHING) {
                this.runThread.interrupt();
                return;
            }
        }
        try {
            if (!this.pauseLock.tryLock(15L, TimeUnit.SECONDS)) {
                log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread", new Object[0]);
                this.runThread.interrupt();
                return;
            }
            try {
                if (this.pauseRequested) {
                    this.pauseRequested = false;
                    this.shouldResume.signalAll();
                }
            }
            finally {
                this.pauseLock.unlock();
            }
            if (!this.pollRetryLock.tryLock(15L, TimeUnit.SECONDS)) {
                log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread", new Object[0]);
                this.runThread.interrupt();
                return;
            }
            try {
                this.isAwaitingRetry.signalAll();
                return;
            }
            finally {
                this.pollRetryLock.unlock();
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @POST
    @Path(value="/stop")
    public Response stop(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.WRITE);
        this.stopGracefully();
        return Response.status((Response.Status)Response.Status.OK).build();
    }

    @GET
    @Path(value="/status")
    @Produces(value={"application/json"})
    public Status getStatusHTTP(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return this.status;
    }

    @VisibleForTesting
    public Status getStatus() {
        return this.status;
    }

    @GET
    @Path(value="/offsets/current")
    @Produces(value={"application/json"})
    public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return this.getCurrentOffsets();
    }

    public ConcurrentMap<PartitionIdType, SequenceOffsetType> getCurrentOffsets() {
        return this.currOffsets;
    }

    @GET
    @Path(value="/offsets/end")
    @Produces(value={"application/json"})
    public Map<PartitionIdType, SequenceOffsetType> getEndOffsetsHTTP(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return this.getEndOffsets();
    }

    public Map<PartitionIdType, SequenceOffsetType> getEndOffsets() {
        return this.endOffsets;
    }

    @POST
    @Path(value="/offsets/end")
    @Consumes(value={"application/json"})
    @Produces(value={"application/json"})
    public Response setEndOffsetsHTTP(Map<PartitionIdType, SequenceOffsetType> sequences, @QueryParam(value="finish") @DefaultValue(value="true") boolean finish, @Context HttpServletRequest req) throws InterruptedException {
        this.authorizationCheck(req, Action.WRITE);
        return this.setEndOffsets(sequences, finish);
    }

    public Map<String, Object> doGetRowStats() {
        HashMap<String, Object> returnMap = new HashMap<String, Object>();
        HashMap<String, RowIngestionMetersTotals> totalsMap = new HashMap<String, RowIngestionMetersTotals>();
        HashMap<String, Map> averagesMap = new HashMap<String, Map>();
        totalsMap.put("buildSegments", this.rowIngestionMeters.getTotals());
        averagesMap.put("buildSegments", this.rowIngestionMeters.getMovingAverages());
        returnMap.put("movingAverages", averagesMap);
        returnMap.put("totals", totalsMap);
        return returnMap;
    }

    public Map<String, Object> doGetLiveReports() {
        HashMap<String, Object> returnMap = new HashMap<String, Object>();
        HashMap<String, Object> ingestionStatsAndErrors = new HashMap<String, Object>();
        HashMap<String, Object> payload = new HashMap<String, Object>();
        Map<String, Object> events = this.getTaskCompletionUnparseableEvents();
        payload.put("ingestionState", this.ingestionState);
        payload.put("unparseableEvents", events);
        payload.put("rowStats", this.doGetRowStats());
        ingestionStatsAndErrors.put("taskId", this.task.getId());
        ingestionStatsAndErrors.put("payload", payload);
        ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
        returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
        return returnMap;
    }

    @GET
    @Path(value="/rowStats")
    @Produces(value={"application/json"})
    public Response getRowStats(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return Response.ok(this.doGetRowStats()).build();
    }

    @GET
    @Path(value="/liveReports")
    @Produces(value={"application/json"})
    public Response getLiveReport(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return Response.ok(this.doGetLiveReports()).build();
    }

    @GET
    @Path(value="/unparseableEvents")
    @Produces(value={"application/json"})
    public Response getUnparseableEvents(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        List<ParseExceptionReport> events = IndexTaskUtils.getReportListFromSavedParseExceptions((CircularBuffer<ParseExceptionReport>)this.parseExceptionHandler.getSavedParseExceptionReports());
        return Response.ok(events).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public Response setEndOffsets(Map<PartitionIdType, SequenceOffsetType> sequenceNumbers, boolean finish) throws InterruptedException {
        if (sequenceNumbers == null) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"Request body must contain a map of { partition:endOffset }").build();
        }
        if (!this.endOffsets.keySet().containsAll(sequenceNumbers.keySet())) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)StringUtils.format((String)"Request contains partitions not being handled by this task, my partitions: %s", (Object[])new Object[]{this.endOffsets.keySet()})).build();
        }
        try {
            this.pauseLock.lockInterruptibly();
            Preconditions.checkState((sequenceNumbers.size() > 0 ? 1 : 0) != 0, (Object)"No sequences found to set end sequences");
            SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = this.getLastSequenceMetadata();
            Set<Object> exclusiveStartPartitions = this.isEndOffsetExclusive() ? Collections.emptySet() : sequenceNumbers.keySet();
            if (latestSequence.getStartOffsets().equals(sequenceNumbers) && latestSequence.getExclusiveStartPartitions().equals(exclusiveStartPartitions) && !finish || latestSequence.getEndOffsets().equals(sequenceNumbers) && finish) {
                log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", new Object[]{sequenceNumbers});
                this.resume();
                Response response = Response.ok(sequenceNumbers).build();
                return response;
            }
            if (latestSequence.isCheckpointed()) {
                Response response = Response.status((Response.Status)Response.Status.BAD_REQUEST).type("text/plain").entity((Object)StringUtils.format((String)"Sequence [%s] has already endOffsets set, cannot set to [%s]", (Object[])new Object[]{latestSequence, sequenceNumbers})).build();
                return response;
            }
            if (!this.isPaused()) {
                Response response = Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"Task must be paused before changing the end offsets").build();
                return response;
            }
            for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : sequenceNumbers.entrySet()) {
                if (this.createSequenceNumber(entry.getValue()).compareTo(this.createSequenceNumber(this.currOffsets.get(entry.getKey()))) >= 0) continue;
                Response response = Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)StringUtils.format((String)"End sequence must be >= current sequence for partition [%s] (current: %s)", (Object[])new Object[]{entry.getKey(), this.currOffsets.get(entry.getKey())})).build();
                return response;
            }
            this.resetNextCheckpointTime();
            latestSequence.setEndOffsets(sequenceNumbers);
            if (finish) {
                log.info("Sequence[%s] end offsets updated from [%s] to [%s].", new Object[]{latestSequence.getSequenceName(), this.endOffsets, sequenceNumbers});
                this.endOffsets.putAll(sequenceNumbers);
            } else {
                SequenceMetadata<PartitionIdType, SequenceOffsetType> newSequence = new SequenceMetadata<PartitionIdType, SequenceOffsetType>(latestSequence.getSequenceId() + 1, StringUtils.format((String)"%s_%d", (Object[])new Object[]{this.ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1}), sequenceNumbers, this.endOffsets, false, exclusiveStartPartitions);
                log.info("Sequence[%s] created with start offsets [%s] and end offsets [%s].", new Object[]{newSequence.getSequenceName(), sequenceNumbers, this.endOffsets});
                this.addSequence(newSequence);
            }
            this.persistSequences();
        }
        catch (Exception e) {
            log.error((Throwable)e, "Failed to set end offsets.", new Object[0]);
            this.backgroundThreadException = e;
            this.resume();
            Response response = Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).entity((Object)Throwables.getStackTraceAsString((Throwable)e)).build();
            return response;
        }
        finally {
            this.pauseLock.unlock();
        }
        this.resume();
        return Response.ok(sequenceNumbers).build();
    }

    private void resetNextCheckpointTime() {
        this.nextCheckpointTime = DateTimes.nowUtc().plus((ReadablePeriod)this.tuningConfig.getIntermediateHandoffPeriod()).getMillis();
    }

    @VisibleForTesting
    public CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> getSequences() {
        return this.sequences;
    }

    @GET
    @Path(value="/checkpoints")
    @Produces(value={"application/json"})
    public Map<Integer, Map<PartitionIdType, SequenceOffsetType>> getCheckpointsHTTP(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.READ);
        return this.getCheckpoints();
    }

    private Map<Integer, Map<PartitionIdType, SequenceOffsetType>> getCheckpoints() {
        return new TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>(this.sequences.stream().collect(Collectors.toMap(SequenceMetadata::getSequenceId, SequenceMetadata::getStartOffsets)));
    }

    @POST
    @Path(value="/pause")
    @Produces(value={"application/json"})
    public Response pauseHTTP(@Context HttpServletRequest req) throws InterruptedException {
        this.authorizationCheck(req, Action.WRITE);
        return this.pause();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public Response pause() throws InterruptedException {
        if (this.status != Status.PAUSED && this.status != Status.READING) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("text/plain").entity((Object)StringUtils.format((String)"Can't pause, task is not in a pausable state (state: [%s])", (Object[])new Object[]{this.status})).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseRequested = true;
            this.pollRetryLock.lockInterruptibly();
            try {
                this.isAwaitingRetry.signalAll();
            }
            finally {
                this.pollRetryLock.unlock();
            }
            if (this.isPaused()) {
                this.shouldResume.signalAll();
            }
            long nanos = TimeUnit.SECONDS.toNanos(2L);
            while (!this.isPaused()) {
                if (nanos <= 0L) {
                    Response response = Response.status((Response.Status)Response.Status.ACCEPTED).entity((Object)"Request accepted but task has not yet paused").build();
                    return response;
                }
                nanos = this.hasPaused.awaitNanos(nanos);
            }
        }
        finally {
            this.pauseLock.unlock();
        }
        try {
            return Response.ok().entity((Object)this.toolbox.getJsonMapper().writeValueAsString(this.getCurrentOffsets())).build();
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    @POST
    @Path(value="/resume")
    public Response resumeHTTP(@Context HttpServletRequest req) throws InterruptedException {
        this.authorizationCheck(req, Action.WRITE);
        this.resume();
        return Response.status((Response.Status)Response.Status.OK).build();
    }

    @VisibleForTesting
    public void resume() throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseRequested = false;
            this.shouldResume.signalAll();
            long nanos = TimeUnit.SECONDS.toNanos(5L);
            while (this.isPaused()) {
                if (nanos <= 0L) {
                    throw new RuntimeException("Resume command was not accepted within 5 seconds");
                }
                nanos = this.shouldResume.awaitNanos(nanos);
            }
        }
        finally {
            this.pauseLock.unlock();
        }
    }

    @GET
    @Path(value="/time/start")
    @Produces(value={"application/json"})
    public DateTime getStartTime(@Context HttpServletRequest req) {
        this.authorizationCheck(req, Action.WRITE);
        return this.startTime;
    }

    private boolean verifyRecordInRange(PartitionIdType partition, SequenceOffsetType recordOffset) {
        OrderedSequenceNumber<Object> currentSequenceNumber;
        Object currOffset = Preconditions.checkNotNull(this.currOffsets.get(partition), (String)"Current offset is null for partition[%s]", (Object[])new Object[]{recordOffset, partition});
        OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = this.createSequenceNumber(recordOffset);
        int comparisonToCurrent = recordSequenceNumber.compareTo(currentSequenceNumber = this.createSequenceNumber(currOffset));
        if (comparisonToCurrent < 0) {
            throw new ISE("Record sequenceNumber[%s] is smaller than current sequenceNumber[%s] for partition[%s]", new Object[]{recordOffset, currOffset, partition});
        }
        if (this.isRecordAlreadyRead(partition, recordOffset)) {
            return false;
        }
        return this.isMoreToReadBeforeReadingRecord(recordSequenceNumber.get(), this.endOffsets.get(partition));
    }

    protected abstract boolean isEndOfShard(SequenceOffsetType var1);

    @Nullable
    protected abstract TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> getCheckPointsFromContext(TaskToolbox var1, String var2) throws IOException;

    protected abstract SequenceOffsetType getNextStartOffset(SequenceOffsetType var1);

    protected abstract SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> deserializePartitionsFromMetadata(ObjectMapper var1, Object var2);

    @NotNull
    protected abstract List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType>> getRecords(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> var1, TaskToolbox var2) throws Exception;

    protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadata(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> var1);

    protected abstract OrderedSequenceNumber<SequenceOffsetType> createSequenceNumber(SequenceOffsetType var1);

    protected abstract void possiblyResetDataSourceMetadata(TaskToolbox var1, RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> var2, Set<StreamPartition<PartitionIdType>> var3);

    protected abstract boolean isEndOffsetExclusive();

    protected abstract TypeReference<List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>> getSequenceMetadataTypeReference();

    public static enum Status {
        NOT_STARTED,
        STARTING,
        READING,
        PAUSED,
        PUBLISHING;

    }
}

