package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.NonNull;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.class */
public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy {
    private static final Log log = LogFactory.getLog(AsynchronousGetRecordsRetrievalStrategy.class);
    private static final int TIME_TO_KEEP_ALIVE = 5;
    private static final int CORE_THREAD_POOL_COUNT = 1;
    private final KinesisDataFetcher dataFetcher;
    private final ExecutorService executorService;
    private final int retryGetRecordsInSeconds;
    private final String shardId;
    final Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;

    public AsynchronousGetRecordsRetrievalStrategy(@NonNull KinesisDataFetcher kinesisDataFetcher, int i, int i2, String str) {
        this(kinesisDataFetcher, buildExector(i2, str), i, str);
        if (kinesisDataFetcher == null) {
            throw new NullPointerException("dataFetcher is marked non-null but is null");
        }
    }

    public AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher kinesisDataFetcher, ExecutorService executorService, int i, String str) {
        this(kinesisDataFetcher, executorService, i, () -> {
            return new ExecutorCompletionService(executorService);
        }, str);
    }

    AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher kinesisDataFetcher, ExecutorService executorService, int i, Supplier<CompletionService<DataFetcherResult>> supplier, String str) {
        this.dataFetcher = kinesisDataFetcher;
        this.executorService = executorService;
        this.retryGetRecordsInSeconds = i;
        this.completionServiceSupplier = supplier;
        this.shardId = str;
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsRetrievalStrategy
    public GetRecordsResult getRecords(int i) {
        Future<DataFetcherResult> poll;
        if (this.executorService.isShutdown()) {
            throw new IllegalStateException("Strategy has been shutdown");
        }
        GetRecordsResult getRecordsResult = null;
        CompletionService<DataFetcherResult> completionService = this.completionServiceSupplier.get();
        HashSet hashSet = new HashSet();
        Callable<DataFetcherResult> createRetrieverCallable = createRetrieverCallable(i);
        while (true) {
            try {
                try {
                    hashSet.add(completionService.submit(createRetrieverCallable));
                } catch (RejectedExecutionException e) {
                    log.warn("Out of resources, unable to start additional requests.");
                }
                try {
                    poll = completionService.poll(this.retryGetRecordsInSeconds, TimeUnit.SECONDS);
                } catch (InterruptedException e2) {
                    log.error("Thread was interrupted", e2);
                } catch (ExecutionException e3) {
                    if (e3.getCause() instanceof ExpiredIteratorException) {
                        throw e3.getCause();
                    }
                    log.error("ExecutionException thrown while trying to get records", e3);
                }
                if (poll != null) {
                    getRecordsResult = poll.get().accept();
                    break;
                }
            } finally {
                hashSet.forEach(future -> {
                    future.cancel(true);
                });
            }
        }
        return getRecordsResult;
    }

    private Callable<DataFetcherResult> createRetrieverCallable(int i) {
        ThreadSafeMetricsDelegatingScope threadSafeMetricsDelegatingScope = new ThreadSafeMetricsDelegatingScope(MetricsHelper.getMetricsScope());
        return () -> {
            try {
                MetricsHelper.setMetricsScope(threadSafeMetricsDelegatingScope);
                DataFetcherResult records = this.dataFetcher.getRecords(i);
                MetricsHelper.unsetMetricsScope();
                return records;
            } catch (Throwable th) {
                MetricsHelper.unsetMetricsScope();
                throw th;
            }
        };
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsRetrievalStrategy
    public void shutdown() {
        this.executorService.shutdownNow();
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsRetrievalStrategy
    public boolean isShutdown() {
        return this.executorService.isShutdown();
    }

    private static ExecutorService buildExector(int i, String str) {
        return new ThreadPoolExecutor(1, i, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue(1), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("get-records-worker-" + str + "-%d").build(), new ThreadPoolExecutor.AbortPolicy());
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsRetrievalStrategy
    public KinesisDataFetcher getDataFetcher() {
        return this.dataFetcher;
    }
}
