/*
 * Decompiled with CFR 0.152.
 */
package com.jxdinfo.hussar.support.mq.redis.consumer;

import com.jxdinfo.hussar.support.mq.consumer.HussarMQMessageListener;
import com.jxdinfo.hussar.support.mq.lifecycle.HussarMQAbstractLifecycleResource;
import com.jxdinfo.hussar.support.mq.redis.constants.HussarRedisMQConstants;
import com.jxdinfo.hussar.support.mq.redis.consumer.HussarRedisMQConsumer;
import com.jxdinfo.hussar.support.mq.redis.message.HussarRedisMQMessage;
import com.jxdinfo.hussar.support.mq.redis.message.HussarRedisMQMessageCodec;
import com.jxdinfo.hussar.support.mq.redis.utils.HussarRedisMQUtils;
import java.io.InterruptedIOException;
import java.lang.reflect.Type;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.exception.UncheckedException;
import org.apache.commons.lang3.exception.UncheckedInterruptedException;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.lang.NonNull;

public class HussarRedisMQPollWorker<T>
extends HussarMQAbstractLifecycleResource
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(HussarRedisMQPollWorker.class);
    private static final byte[] SCRIPT_SINGLE_GROUP_ACK_DEL = "return{redis.call('XACK',KEYS[1],'-single-',unpack(ARGV)),redis.call('XDEL',KEYS[1],unpack(ARGV))}".getBytes(StandardCharsets.UTF_8);
    private static final int DEFAULT_WAIT_START_TIMEOUT_MILLIS = 5000;
    private static final int DEFAULT_WAIT_STOP_TIMEOUT_MILLIS = 5000;
    private static final Duration DEFAULT_POLL_INITIAL_DELAY = Duration.ofSeconds(0L);
    private static final Duration DEFAULT_INTERVAL = HussarRedisMQConstants.DEFAULT_INTERVAL;
    private static final int DEFAULT_POLL_COUNT = 20;
    private static final boolean DEFAULT_CHECK_ALWAYS = true;
    private static final int DEFAULT_CHECK_LENGTH = 5;
    private static final Duration DEFAULT_RETRY_TIMEOUT = HussarRedisMQConstants.DEFAULT_TIMEOUT;
    private static final int DEFAULT_RETRY_COUNT = 3;
    private static final int DEFAULT_DEAD_LETTER_LIMIT = 1000;
    private static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofMillis(50L);
    private static final int DEFAULT_COMMIT_COUNT = 5;
    private final HussarRedisMQConsumer<T> consumer;
    private RedisTemplate<String, Object> redisTemplate;
    private HussarRedisMQMessageCodec messageCodec;
    private Type messageType;
    private HussarMQMessageListener<T> messageListener;
    private volatile Thread boundedThread;
    private final CountDownLatch startSignal = new CountDownLatch(1);
    private final CountDownLatch stopSignal = new CountDownLatch(1);
    private byte[] streamNameRaw;
    private byte[] deadLetterStreamNameRaw;
    private Consumer readConsumer;
    private StreamReadOptions readOptions;
    private StreamOffset<byte[]> readOffsetRaw;
    private long waitStartTimeoutMillis = 5000L;
    private long waitStopTimeoutMillis = 5000L;
    private Duration fetchConnectionInterval = DEFAULT_INTERVAL;
    private Duration pollInitialDelay = DEFAULT_POLL_INITIAL_DELAY;
    private Duration pollInterval = DEFAULT_INTERVAL;
    private int pollCount = 20;
    private boolean checkAlways = true;
    private int checkLength = 5;
    private Duration checkInterval = DEFAULT_INTERVAL;
    private Duration retryTimeout = DEFAULT_RETRY_TIMEOUT;
    private int retryCount = 3;
    private int deadLetterLimit = 1000;
    private Duration acknowledgeInterval = DEFAULT_COMMIT_INTERVAL;
    private int acknowledgeCount = 5;

    public HussarRedisMQPollWorker(HussarRedisMQConsumer<T> consumer) {
        super(null);
        this.consumer = consumer;
    }

    @Override
    public void run() {
        logger.debug("Bind poll worker thread");
        if (this.boundedThread != null) {
            throw new IllegalStateException("already bound to thread");
        }
        this.boundedThread = Thread.currentThread();
        this.start();
        try {
            logger.debug("Start poll worker loop");
            this.doLoop();
        }
        catch (Throwable ex) {
            if (Thread.interrupted()) {
                logger.debug("Cleared thread interrupted flag");
            }
            if (ex instanceof UncheckedInterruptedException) {
                logger.debug("Poll worker is terminated by interruption");
            } else {
                logger.error("Unexpected error occurred in poll worker", ex);
            }
        }
        finally {
            this.stop();
        }
    }

    public boolean isRunning() {
        return super.isRunning() && this.boundedThread != null && this.boundedThread.isAlive();
    }

    public void start() {
        if (this.boundedThread != null && this.boundedThread == Thread.currentThread()) {
            logger.debug("Poll worker is starting...");
            super.start();
        } else {
            logger.debug("Waiting for poll worker starting...");
            try {
                boolean done = this.startSignal.await(this.waitStartTimeoutMillis, TimeUnit.MILLISECONDS);
                if (!done) {
                    throw new IllegalStateException("timeout while waiting for start: " + this);
                }
            }
            catch (InterruptedException ex) {
                throw new IllegalStateException("interrupted while waiting for start: " + this, ex);
            }
        }
    }

    protected void doStart() {
        try {
            this.setThreadName(true);
            if (this.getRetryTimeout().compareTo(this.getAcknowledgeInterval()) <= 0) {
                throw new IllegalArgumentException("message expire time (" + this.getRetryTimeout() + ") must be greater than batch commit interval (" + this.getPollInterval() + ")");
            }
            this.redisTemplate = this.consumer.getRedisTemplate();
            this.messageCodec = this.consumer.getMessageCodec();
            this.messageType = this.consumer.getMessageType();
            this.messageListener = this.consumer.getMessageListener();
            this.streamNameRaw = this.messageCodec.toRawKey(this.consumer.getStreamName());
            this.deadLetterStreamNameRaw = this.messageCodec.toRawKey(HussarRedisMQUtils.generateStreamName("hussar_mq", "mpmc_dead", this.consumer.getKey() + ":" + this.consumer.getGroupName()));
            this.readConsumer = Consumer.from((String)this.consumer.getGroupName(), (String)this.consumer.getConsumerName());
            this.readOptions = StreamReadOptions.empty().block(this.getPollInterval()).count((long)this.getPollCount());
            this.readOffsetRaw = StreamOffset.create((Object)this.messageCodec.toRawKey(this.consumer.getStreamName()), (ReadOffset)ReadOffset.lastConsumed());
            HussarRedisMQUtils.createStreamGroupIfAbsent(this.redisTemplate, this.consumer.getStreamName(), this.consumer.getGroupName());
        }
        finally {
            this.startSignal.countDown();
        }
    }

    public void stop() {
        block11: {
            if (this.boundedThread == null) {
                logger.debug("Poll worker is not bind to thread");
                switch (this.lifecycleState) {
                    case 0: {
                        this.lifecycleState = 2;
                        return;
                    }
                    case 2: {
                        return;
                    }
                }
                throw new IllegalStateException("failed to stop: " + this);
            }
            if (this.boundedThread == Thread.currentThread()) {
                logger.debug("Poll worker is stopping...");
                super.stop();
            } else if (this.lifecycleState == 1) {
                logger.debug("Waiting for poll worker stopping...");
                this.boundedThread.interrupt();
                try {
                    boolean done = this.stopSignal.await(this.waitStopTimeoutMillis, TimeUnit.MILLISECONDS);
                    if (done) break block11;
                    String clientType = HussarRedisMQUtils.getClientType(this.redisTemplate);
                    if ("jedis".equals(clientType)) {
                        logger.warn("Jedis client is not interruptible, just ignore timeout of poll worker thread termination");
                        break block11;
                    }
                    throw new IllegalStateException("timeout while waiting for stop: " + this);
                }
                catch (InterruptedException ex) {
                    throw new IllegalStateException("interrupted while waiting for stop: " + this, ex);
                }
            }
        }
    }

    protected void doStop() {
        try {
            this.redisTemplate.execute(connection -> {
                logger.debug("Read pending list of {}/{} before close", (Object)this.consumer.getStreamName(), (Object)this.consumer.getConsumerName());
                PendingMessages pendingMessages = connection.xPending(this.streamNameRaw, this.readConsumer.getGroup(), RedisStreamCommands.XPendingOptions.unbounded((Long)Long.MAX_VALUE).consumer(this.readConsumer.getName()));
                if (pendingMessages != null && !pendingMessages.isEmpty()) {
                    ArrayList<RecordId> unfinishedIds = new ArrayList<RecordId>(pendingMessages.size());
                    for (PendingMessage pendingMessage : pendingMessages) {
                        unfinishedIds.add(pendingMessage.getId());
                    }
                    this.doReject(connection, unfinishedIds);
                }
                logger.debug("Delete consumer {}/{}", (Object)this.consumer.getStreamName(), (Object)this.consumer.getConsumerName());
                connection.xGroupDelConsumer(this.streamNameRaw, this.readConsumer);
                return null;
            }, true);
        }
        finally {
            this.setThreadName(false);
            this.boundedThread = null;
            this.consumer.onPollWorkerStop(this);
            this.stopSignal.countDown();
        }
    }

    protected void setThreadName(boolean start) {
        String prefix;
        String name = this.boundedThread.getName();
        if (name == null) {
            return;
        }
        int sharp = name.indexOf(35);
        String string = prefix = sharp >= 0 ? name.substring(0, sharp + 1) : name + " #";
        if (start) {
            String key = StringUtils.abbreviateMiddle((String)this.consumer.getKey(), (String)"*", (int)5);
            String consumer = StringUtils.abbreviateMiddle((String)StringUtils.removeStart((String)this.consumer.getConsumerName(), (String)"c-"), (String)"*", (int)5);
            name = prefix + key + '/' + consumer;
        } else {
            name = prefix + "idle";
        }
        this.boundedThread.setName(name);
    }

    protected void doLoop() throws UncheckedInterruptedException {
        Duration initialDelay = this.getPollInitialDelay();
        if (initialDelay.compareTo(Duration.ZERO) > 0) {
            logger.debug("Delay before poll worker loop: {}", (Object)initialDelay);
            try {
                Thread.sleep(initialDelay.toMillis());
            }
            catch (InterruptedException ex) {
                throw new UncheckedInterruptedException((Throwable)ex);
            }
        }
        LoopState loopState = new LoopState(this);
        while (this.isRunning()) {
            try {
                this.interruptibleCheckpoint();
                logger.debug("Fetch new connection for polling {}/{}", (Object)this.consumer.getStreamName(), (Object)this.consumer.getConsumerName());
                this.redisTemplate.execute(connection -> {
                    while (this.isRunning() && loopState.shouldReuseConnection()) {
                        this.interruptibleCheckpoint();
                        if (loopState.shouldCheck(connection)) {
                            logger.debug("Check peer messages timeout: {}/{}", (Object)this.consumer.getStreamName(), (Object)this.consumer.getConsumerName());
                            this.doCheckTimeout(connection);
                            logger.debug("Check idle messages: {}/{}", (Object)this.consumer.getStreamName(), (Object)this.consumer.getConsumerName());
                            List<ByteRecord> retryRecords = this.doCheckIdle(connection);
                            this.doHandle(connection, retryRecords);
                        }
                        logger.debug("Read messages: {}/{}", (Object)this.consumer.getStreamName(), (Object)this.consumer.getConsumerName());
                        List<ByteRecord> newRecords = this.doRead(connection);
                        this.doHandle(connection, newRecords);
                    }
                    return null;
                }, true);
            }
            catch (Throwable ex) {
                this.checkInterruption(ex);
                if (HussarRedisMQUtils.isRedisNoGroup(ex)) {
                    try {
                        logger.warn("Stream '{}' or consumer group '{}' not exists, recreate them", (Object)this.consumer.getStreamName(), (Object)this.consumer.getGroupName());
                        HussarRedisMQUtils.createStreamGroupIfAbsent(this.redisTemplate, this.consumer.getStreamName(), this.consumer.getGroupName());
                        continue;
                    }
                    catch (Throwable throwable) {
                        logger.error("Failed to create stream '{}' and consumer group '{}'", new Object[]{this.consumer.getStreamName(), this.consumer.getGroupName(), throwable});
                        ex.addSuppressed(throwable);
                        throw ex;
                    }
                }
                throw ex;
            }
        }
    }

    private List<ByteRecord> doRead(RedisConnection connection) {
        return connection.xReadGroup(this.readConsumer, this.readOptions, new StreamOffset[]{this.readOffsetRaw});
    }

    private void doHandle(RedisConnection connection, List<ByteRecord> rawRecords) throws UncheckedException {
        if (rawRecords == null || rawRecords.isEmpty()) {
            return;
        }
        StopWatch perf = null;
        if (logger.isDebugEnabled()) {
            perf = new StopWatch();
            perf.start();
        }
        long lastCommit = System.currentTimeMillis();
        ArrayList<RecordId> commitList = new ArrayList<RecordId>();
        long lastReject = System.currentTimeMillis();
        ArrayList<RecordId> rejectList = new ArrayList<RecordId>();
        logger.debug("Handling {} message records", (Object)rawRecords.size());
        for (ByteRecord rawRecord : rawRecords) {
            this.interruptibleCheckpoint();
            try {
                logger.debug("Deserialize message: {}", (Object)rawRecord.getId());
                HussarRedisMQMessage message = HussarRedisMQMessage.fromByteRecord(this.messageCodec, this.messageType, rawRecord);
                logger.debug("Message listener: {}", (Object)rawRecord.getId());
                this.messageListener.onMessage(message.getData());
                commitList.add(rawRecord.getId());
                if (commitList.size() < this.getAcknowledgeCount() && Duration.ofMillis(System.currentTimeMillis() - lastCommit).compareTo(this.getAcknowledgeInterval()) < 0) continue;
                this.doCommit(connection, commitList);
                lastCommit = System.currentTimeMillis();
                commitList.clear();
            }
            catch (Throwable ex) {
                Throwable converted = this.convertInterruption(ex);
                if (converted instanceof InterruptedException) {
                    if (!commitList.isEmpty()) {
                        logger.debug("Commit uncommited messages before interruption: {}", commitList);
                        try {
                            this.doCommit(connection, commitList);
                        }
                        catch (Throwable throwable) {
                            logger.warn("Ignore commit error during interruption", throwable);
                            converted.addSuppressed(throwable);
                        }
                    }
                    throw new UncheckedInterruptedException(converted);
                }
                if (HussarRedisMQUtils.getRootRedisException(ex) != null) {
                    throw new UncheckedException(ex);
                }
                logger.error("Failed to handle message {} from {}/{}", new Object[]{rawRecord.getId(), this.consumer.getStreamName(), this.consumer.getConsumerName(), ex});
                rejectList.add(rawRecord.getId());
                if (rejectList.size() < this.getAcknowledgeCount() && Duration.ofMillis(System.currentTimeMillis() - lastReject).compareTo(this.getAcknowledgeInterval()) < 0) continue;
                this.doReject(connection, rejectList);
                lastReject = System.currentTimeMillis();
                rejectList.clear();
            }
        }
        if (!commitList.isEmpty()) {
            this.doCommit(connection, commitList);
        }
        if (!rejectList.isEmpty()) {
            this.doReject(connection, rejectList);
        }
        if (logger.isDebugEnabled() && perf != null) {
            perf.stop();
            logger.debug("Handled {} messages in {}", (Object)rawRecords.size(), (Object)perf.formatTime());
        }
    }

    private void doCommit(RedisConnection connection, List<RecordId> recordIds) {
        if (recordIds == null || recordIds.isEmpty()) {
            return;
        }
        logger.debug("Acknowledge messages: {}", recordIds);
        this.doAcknowledge(connection, recordIds, true);
    }

    private void doAcknowledge(RedisConnection connection, List<RecordId> recordIds, boolean checkCounts) {
        if (recordIds == null || recordIds.isEmpty()) {
            return;
        }
        if (this.consumer.isSingleGroup()) {
            ArrayList<byte[]> rawArgs = new ArrayList<byte[]>(1 + recordIds.size());
            rawArgs.add(this.streamNameRaw);
            for (RecordId recordId : recordIds) {
                byte[] rawId = recordId.getValue().getBytes(StandardCharsets.UTF_8);
                rawArgs.add(rawId);
            }
            List counts = (List)connection.eval(SCRIPT_SINGLE_GROUP_ACK_DEL, ReturnType.MULTI, 1, (byte[][])rawArgs.toArray((T[])new byte[0][]));
            if (checkCounts && logger.isWarnEnabled()) {
                if (counts != null && counts.size() == 2) {
                    Long delCount;
                    Long ackCount = counts.get(0) instanceof Number ? Long.valueOf(((Number)counts.get(0)).longValue()) : null;
                    Long l = delCount = counts.get(1) instanceof Number ? Long.valueOf(((Number)counts.get(1)).longValue()) : null;
                    if (ackCount == null || ackCount != (long)recordIds.size()) {
                        logger.warn("Consumer acknowledged message count {} != {}: {}", new Object[]{ackCount, recordIds.size(), recordIds});
                    }
                    if (delCount == null || delCount != (long)recordIds.size()) {
                        logger.warn("Consumer deleted message count {} != {}: {}", new Object[]{delCount, recordIds.size(), recordIds});
                    }
                } else {
                    logger.warn("Illegal consumer acknowledged/deleted counts: {} ({} records: {})", new Object[]{counts, recordIds.size(), recordIds});
                }
            }
        } else {
            Long ackCount = connection.xAck(this.streamNameRaw, this.readConsumer.getGroup(), recordIds.toArray(new RecordId[0]));
            if (checkCounts && logger.isWarnEnabled()) {
                if (ackCount == null) {
                    logger.warn("Illegal consumer acknowledged count: {} ({} records: {})", new Object[]{ackCount, recordIds.size(), recordIds});
                } else if (ackCount != (long)recordIds.size()) {
                    logger.warn("Consumer acknowledged message count {} != {}: {}", new Object[]{ackCount, recordIds.size(), recordIds});
                }
            }
        }
    }

    private void doReject(RedisConnection connection, List<RecordId> recordIds) {
        List rejected;
        if (recordIds == null || recordIds.isEmpty()) {
            return;
        }
        logger.debug("Transfer rejected messages from {}/{} to the idle consumer: {}", new Object[]{this.consumer.getStreamName(), this.consumer.getConsumerName(), recordIds});
        List list = rejected = recordIds.isEmpty() ? Collections.emptyList() : connection.xClaimJustId(this.streamNameRaw, this.readConsumer.getGroup(), "idle", RedisStreamCommands.XClaimOptions.minIdle((Duration)Duration.ZERO).ids(recordIds.toArray(new RecordId[0])).idle(this.getRetryTimeout()));
        if (rejected == null || rejected.size() != recordIds.size()) {
            logger.warn("Consumer rejected message count {} != {}: {} <=> {}", new Object[]{rejected != null ? rejected.size() : 0, recordIds.size(), rejected, recordIds});
        }
    }

    private void doCheckTimeout(RedisConnection connection) {
        Duration retryTimeout = this.getRetryTimeout();
        logger.debug("Read pending list of {} for checking timeout messages", (Object)this.consumer.getStreamName());
        PendingMessages pendingMessages = connection.xPending(this.streamNameRaw, this.readConsumer.getGroup(), RedisStreamCommands.XPendingOptions.unbounded((Long)Long.MAX_VALUE));
        if (pendingMessages == null || pendingMessages.isEmpty()) {
            return;
        }
        ArrayList<RecordId> timeoutIds = new ArrayList<RecordId>();
        for (PendingMessage pendingMessage : pendingMessages) {
            String consumerName = pendingMessage.getConsumerName();
            if ("idle".equals(consumerName) || "dead".equals(consumerName) || pendingMessage.getElapsedTimeSinceLastDelivery().compareTo(retryTimeout) <= 0) continue;
            timeoutIds.add(pendingMessage.getId());
        }
        logger.debug("Transfer timeout messages to the idle consumer (checked by consumer {}/{}): {}", new Object[]{this.consumer.getStreamName(), this.consumer.getConsumerName(), timeoutIds});
        if (!timeoutIds.isEmpty()) {
            connection.xClaimJustId(this.streamNameRaw, this.readConsumer.getGroup(), "idle", RedisStreamCommands.XClaimOptions.minIdle((Duration)retryTimeout).ids(timeoutIds.toArray(new RecordId[0])).idle(retryTimeout));
        }
    }

    private List<ByteRecord> doCheckIdle(RedisConnection connection) {
        logger.debug("Read pending list of {}/{} for retry delivery", (Object)this.consumer.getStreamName(), (Object)"idle");
        PendingMessages pendingMessages = connection.xPending(this.streamNameRaw, this.readConsumer.getGroup(), RedisStreamCommands.XPendingOptions.unbounded((Long)Long.valueOf(this.getPollCount())).consumer("idle"));
        if (pendingMessages == null || pendingMessages.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<RecordId> retryIds = new ArrayList<RecordId>();
        ArrayList<RecordId> deadLetterIds = new ArrayList<RecordId>();
        for (PendingMessage pendingMessage : pendingMessages) {
            if (pendingMessage.getTotalDeliveryCount() < (long)this.getRetryCount()) {
                retryIds.add(pendingMessage.getId());
                continue;
            }
            deadLetterIds.add(pendingMessage.getId());
        }
        if (!deadLetterIds.isEmpty()) {
            logger.debug("Detected dead letters in {}: {}", (Object)this.consumer.getStreamName(), deadLetterIds);
            this.doHandleDeadLetter(connection, deadLetterIds);
        }
        logger.debug("Claim idle messages, retry delivery to consumer {}/{}: {}", new Object[]{this.consumer.getStreamName(), this.consumer.getConsumerName(), retryIds});
        return retryIds.isEmpty() ? Collections.emptyList() : connection.xClaim(this.streamNameRaw, this.readConsumer.getGroup(), this.readConsumer.getName(), RedisStreamCommands.XClaimOptions.minIdle((Duration)this.getRetryTimeout()).ids(retryIds));
    }

    private void doHandleDeadLetter(RedisConnection connection, List<RecordId> recordIds) {
        List claimedRecords = recordIds.isEmpty() ? Collections.emptyList() : connection.xClaim(this.streamNameRaw, this.readConsumer.getGroup(), "dead", RedisStreamCommands.XClaimOptions.minIdle((Duration)this.getRetryTimeout()).ids(recordIds));
        List<RecordId> claimedIds = Optional.ofNullable(claimedRecords).orElseGet(Collections::emptyList).stream().map(Record::getId).collect(Collectors.toList());
        this.doHandleDanglingRecordIds(connection, recordIds, claimedIds);
        if (claimedRecords == null || claimedRecords.isEmpty()) {
            return;
        }
        int deadLetterLimit = this.getDeadLetterLimit();
        RedisStreamCommands.XAddOptions options = deadLetterLimit > 0 ? RedisStreamCommands.XAddOptions.maxlen((long)deadLetterLimit) : RedisStreamCommands.XAddOptions.none();
        for (ByteRecord claimedRecord : claimedRecords) {
            ByteRecord record = claimedRecord.withId(RecordId.autoGenerate()).withStreamKey(this.deadLetterStreamNameRaw);
            connection.xAdd((MapRecord)record, options);
            this.doAcknowledge(connection, Collections.singletonList(claimedRecord.getId()), false);
        }
    }

    private void doHandleDanglingRecordIds(RedisConnection connection, @NonNull List<RecordId> claimRequiredIds, List<RecordId> claimedIds) {
        if (claimRequiredIds.size() != (claimedIds != null ? claimedIds.size() : 0)) {
            LinkedHashSet claimedIdSet = new LinkedHashSet(Optional.ofNullable(claimedIds).orElseGet(Collections::emptyList));
            LinkedHashSet<RecordId> danglingIdSet = new LinkedHashSet<RecordId>();
            for (RecordId claimRequiredId : claimRequiredIds) {
                if (claimedIdSet.contains(claimRequiredId)) continue;
                danglingIdSet.add(claimRequiredId);
            }
            logger.warn("Force to acknowledge dangling messages detected in {}/{}: {}", new Object[]{this.consumer.getStreamName(), this.consumer.getConsumerName(), danglingIdSet});
            connection.xAck(this.streamNameRaw, this.readConsumer.getGroup(), danglingIdSet.toArray(new RecordId[0]));
        }
    }

    private void interruptibleCheckpoint() throws UncheckedInterruptedException {
        try {
            Thread.sleep(0L);
        }
        catch (InterruptedException ex) {
            throw new UncheckedInterruptedException((Throwable)ex);
        }
    }

    private void checkInterruption(Throwable throwable) throws UncheckedInterruptedException {
        if ((throwable = this.convertInterruption(throwable)) instanceof InterruptedException) {
            throw new UncheckedInterruptedException(throwable);
        }
    }

    private Throwable convertInterruption(Throwable throwable) {
        if (throwable instanceof UncheckedInterruptedException) {
            throwable = throwable.getCause();
        }
        if (throwable instanceof InterruptedException) {
            return throwable;
        }
        if (throwable instanceof InterruptedIOException || throwable instanceof ClosedByInterruptException) {
            InterruptedException ex = new InterruptedException("io interruption");
            ex.initCause(throwable);
            return ex;
        }
        Throwable cause = ExceptionUtils.getRootCause((Throwable)throwable);
        if (cause instanceof InterruptedException || cause instanceof InterruptedIOException || cause instanceof ClosedByInterruptException) {
            InterruptedException ex = new InterruptedException("nested interruption");
            ex.initCause(throwable);
            return ex;
        }
        return throwable;
    }

    public long getWaitStartTimeoutMillis() {
        return this.waitStartTimeoutMillis > 0L ? this.waitStartTimeoutMillis : 5000L;
    }

    public void setWaitStartTimeoutMillis(long waitStartTimeoutMillis) {
        this.waitStartTimeoutMillis = waitStartTimeoutMillis;
    }

    public long getWaitStopTimeoutMillis() {
        return this.waitStopTimeoutMillis > 0L ? this.waitStopTimeoutMillis : 5000L;
    }

    public void setWaitStopTimeoutMillis(long waitStopTimeoutMillis) {
        this.waitStopTimeoutMillis = waitStopTimeoutMillis;
    }

    public Duration getFetchConnectionInterval() {
        return this.fetchConnectionInterval != null ? this.fetchConnectionInterval : DEFAULT_INTERVAL;
    }

    public void setFetchConnectionInterval(Duration fetchConnectionInterval) {
        this.fetchConnectionInterval = fetchConnectionInterval;
    }

    public Duration getPollInitialDelay() {
        return this.pollInitialDelay != null ? this.pollInitialDelay : DEFAULT_POLL_INITIAL_DELAY;
    }

    public void setPollInitialDelay(Duration pollInitialDelay) {
        this.pollInitialDelay = pollInitialDelay;
    }

    public Duration getPollInterval() {
        return this.pollInterval != null ? this.pollInterval : DEFAULT_INTERVAL;
    }

    public void setPollInterval(Duration pollInterval) {
        this.pollInterval = pollInterval;
    }

    public int getPollCount() {
        return this.pollCount > 0 ? this.pollCount : 20;
    }

    public void setPollCount(int pollCount) {
        this.pollCount = pollCount;
    }

    public boolean isCheckAlways() {
        return this.checkAlways;
    }

    public void setCheckAlways(boolean checkAlways) {
        this.checkAlways = checkAlways;
    }

    public int getCheckLength() {
        return this.checkLength;
    }

    public void setCheckLength(int checkLength) {
        this.checkLength = checkLength;
    }

    public Duration getCheckInterval() {
        return this.checkInterval != null ? this.checkInterval : DEFAULT_INTERVAL;
    }

    public void setCheckInterval(Duration checkInterval) {
        this.checkInterval = checkInterval;
    }

    public Duration getRetryTimeout() {
        return this.retryTimeout != null ? this.retryTimeout : DEFAULT_RETRY_TIMEOUT;
    }

    public void setRetryTimeout(Duration retryTimeout) {
        this.retryTimeout = retryTimeout;
    }

    public int getRetryCount() {
        return this.retryCount >= 0 ? this.retryCount : 3;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }

    public int getDeadLetterLimit() {
        return this.deadLetterLimit;
    }

    public void setDeadLetterLimit(int deadLetterLimit) {
        this.deadLetterLimit = deadLetterLimit;
    }

    public Duration getAcknowledgeInterval() {
        return this.acknowledgeInterval != null ? this.acknowledgeInterval : DEFAULT_COMMIT_INTERVAL;
    }

    public void setAcknowledgeInterval(Duration acknowledgeInterval) {
        this.acknowledgeInterval = acknowledgeInterval;
    }

    public int getAcknowledgeCount() {
        return this.acknowledgeCount > 0 ? this.acknowledgeCount : 5;
    }

    public void setAcknowledgeCount(int acknowledgeCount) {
        this.acknowledgeCount = acknowledgeCount;
    }

    protected static final class LoopState {
        private final HussarRedisMQPollWorker<?> worker;
        private Instant lastFetchConnection;
        private Instant lastCheck;

        public LoopState(HussarRedisMQPollWorker<?> worker) {
            this.worker = worker;
        }

        public synchronized boolean shouldReuseConnection() {
            if (this.lastFetchConnection == null) {
                this.lastFetchConnection = Instant.now();
                return true;
            }
            Instant currentTime = Instant.now();
            if (Duration.between(this.lastFetchConnection, currentTime).compareTo(this.worker.getFetchConnectionInterval()) >= 0) {
                this.lastFetchConnection = currentTime;
                return false;
            }
            return true;
        }

        public synchronized boolean shouldCheck(RedisConnection connection) {
            Long length;
            if (this.worker.isCheckAlways() || this.lastCheck == null) {
                this.lastCheck = Instant.now();
                return true;
            }
            Instant currentTime = Instant.now();
            if (Duration.between(this.lastCheck, currentTime).compareTo(this.worker.getCheckInterval()) >= 0) {
                this.lastCheck = currentTime;
                return true;
            }
            if (this.worker.getCheckLength() > 0 && (length = connection.xLen(((HussarRedisMQPollWorker)this.worker).streamNameRaw)) != null && length >= (long)this.worker.getCheckLength()) {
                this.lastCheck = currentTime;
                return true;
            }
            return false;
        }
    }
}

