package com.jxdinfo.hussar.support.mq.redis.consumer;

import com.jxdinfo.hussar.support.mq.consumer.HussarMQMessageListener;
import com.jxdinfo.hussar.support.mq.lifecycle.HussarMQAbstractLifecycleResource;
import com.jxdinfo.hussar.support.mq.lifecycle.HussarMQLifecycleManager;
import com.jxdinfo.hussar.support.mq.redis.constants.HussarRedisMQConstants;
import com.jxdinfo.hussar.support.mq.redis.message.HussarRedisMQMessage;
import com.jxdinfo.hussar.support.mq.redis.message.HussarRedisMQMessageCodec;
import com.jxdinfo.hussar.support.mq.redis.utils.HussarRedisMQUtils;
import java.io.InterruptedIOException;
import java.lang.reflect.Type;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.exception.UncheckedException;
import org.apache.commons.lang3.exception.UncheckedInterruptedException;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.lang.NonNull;

/* loaded from: input_file:com/jxdinfo/hussar/support/mq/redis/consumer/HussarRedisMQPollWorker.class */
public class HussarRedisMQPollWorker<T> extends HussarMQAbstractLifecycleResource implements Runnable {
    private static final int DEFAULT_WAIT_START_TIMEOUT_MILLIS = 5000;
    private static final int DEFAULT_WAIT_STOP_TIMEOUT_MILLIS = 5000;
    private static final int DEFAULT_POLL_COUNT = 20;
    private static final boolean DEFAULT_CHECK_ALWAYS = true;
    private static final int DEFAULT_CHECK_LENGTH = 5;
    private static final int DEFAULT_RETRY_COUNT = 3;
    private static final int DEFAULT_DEAD_LETTER_LIMIT = 1000;
    private static final int DEFAULT_COMMIT_COUNT = 5;
    private final HussarRedisMQConsumer<T> consumer;
    private RedisTemplate<String, Object> redisTemplate;
    private HussarRedisMQMessageCodec messageCodec;
    private Type messageType;
    private HussarMQMessageListener<T> messageListener;
    private volatile Thread boundedThread;
    private final CountDownLatch startSignal;
    private final CountDownLatch stopSignal;
    private byte[] streamNameRaw;
    private byte[] deadLetterStreamNameRaw;
    private Consumer readConsumer;
    private StreamReadOptions readOptions;
    private StreamOffset<byte[]> readOffsetRaw;
    private long waitStartTimeoutMillis;
    private long waitStopTimeoutMillis;
    private Duration fetchConnectionInterval;
    private Duration pollInitialDelay;
    private Duration pollInterval;
    private int pollCount;
    private boolean checkAlways;
    private int checkLength;
    private Duration checkInterval;
    private Duration retryTimeout;
    private int retryCount;
    private int deadLetterLimit;
    private Duration acknowledgeInterval;
    private int acknowledgeCount;
    private static final Logger logger = LoggerFactory.getLogger(HussarRedisMQPollWorker.class);
    private static final byte[] SCRIPT_SINGLE_GROUP_ACK_DEL = "return{redis.call('XACK',KEYS[1],'-single-',unpack(ARGV)),redis.call('XDEL',KEYS[1],unpack(ARGV))}".getBytes(StandardCharsets.UTF_8);
    private static final Duration DEFAULT_POLL_INITIAL_DELAY = Duration.ofSeconds(0);
    private static final Duration DEFAULT_INTERVAL = HussarRedisMQConstants.DEFAULT_INTERVAL;
    private static final Duration DEFAULT_RETRY_TIMEOUT = HussarRedisMQConstants.DEFAULT_TIMEOUT;
    private static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofMillis(50);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/jxdinfo/hussar/support/mq/redis/consumer/HussarRedisMQPollWorker$LoopState.class */
    public static final class LoopState {
        private final HussarRedisMQPollWorker<?> worker;
        private Instant lastFetchConnection;
        private Instant lastCheck;

        public LoopState(HussarRedisMQPollWorker<?> hussarRedisMQPollWorker) {
            this.worker = hussarRedisMQPollWorker;
        }

        public synchronized boolean shouldReuseConnection() {
            if (this.lastFetchConnection == null) {
                this.lastFetchConnection = Instant.now();
                return true;
            }
            Instant now = Instant.now();
            if (Duration.between(this.lastFetchConnection, now).compareTo(this.worker.getFetchConnectionInterval()) < 0) {
                return true;
            }
            this.lastFetchConnection = now;
            return false;
        }

        public synchronized boolean shouldCheck(RedisConnection redisConnection) {
            Long xLen;
            if (this.worker.isCheckAlways() || this.lastCheck == null) {
                this.lastCheck = Instant.now();
                return true;
            }
            Instant now = Instant.now();
            if (Duration.between(this.lastCheck, now).compareTo(this.worker.getCheckInterval()) >= 0) {
                this.lastCheck = now;
                return true;
            }
            if (this.worker.getCheckLength() <= 0 || (xLen = redisConnection.xLen(((HussarRedisMQPollWorker) this.worker).streamNameRaw)) == null || xLen.longValue() < this.worker.getCheckLength()) {
                return false;
            }
            this.lastCheck = now;
            return true;
        }
    }

    public HussarRedisMQPollWorker(HussarRedisMQConsumer<T> hussarRedisMQConsumer) {
        super((HussarMQLifecycleManager) null);
        this.startSignal = new CountDownLatch(1);
        this.stopSignal = new CountDownLatch(1);
        this.waitStartTimeoutMillis = 5000L;
        this.waitStopTimeoutMillis = 5000L;
        this.fetchConnectionInterval = DEFAULT_INTERVAL;
        this.pollInitialDelay = DEFAULT_POLL_INITIAL_DELAY;
        this.pollInterval = DEFAULT_INTERVAL;
        this.pollCount = DEFAULT_POLL_COUNT;
        this.checkAlways = true;
        this.checkLength = 5;
        this.checkInterval = DEFAULT_INTERVAL;
        this.retryTimeout = DEFAULT_RETRY_TIMEOUT;
        this.retryCount = DEFAULT_RETRY_COUNT;
        this.deadLetterLimit = DEFAULT_DEAD_LETTER_LIMIT;
        this.acknowledgeInterval = DEFAULT_COMMIT_INTERVAL;
        this.acknowledgeCount = 5;
        this.consumer = hussarRedisMQConsumer;
    }

    @Override // java.lang.Runnable
    public void run() {
        logger.debug("Bind poll worker thread");
        if (this.boundedThread != null) {
            throw new IllegalStateException("already bound to thread");
        }
        this.boundedThread = Thread.currentThread();
        start();
        try {
            logger.debug("Start poll worker loop");
            doLoop();
        } catch (Throwable th) {
            if (Thread.interrupted()) {
                logger.debug("Cleared thread interrupted flag");
            }
            if (th instanceof UncheckedInterruptedException) {
                logger.debug("Poll worker is terminated by interruption");
            } else {
                logger.error("Unexpected error occurred in poll worker", th);
            }
        } finally {
            stop();
        }
    }

    public boolean isRunning() {
        return super.isRunning() && this.boundedThread != null && this.boundedThread.isAlive();
    }

    public void start() {
        if (this.boundedThread != null && this.boundedThread == Thread.currentThread()) {
            logger.debug("Poll worker is starting...");
            super.start();
            return;
        }
        logger.debug("Waiting for poll worker starting...");
        try {
            if (this.startSignal.await(this.waitStartTimeoutMillis, TimeUnit.MILLISECONDS)) {
            } else {
                throw new IllegalStateException("timeout while waiting for start: " + this);
            }
        } catch (InterruptedException e) {
            throw new IllegalStateException("interrupted while waiting for start: " + this, e);
        }
    }

    protected void doStart() {
        try {
            setThreadName(true);
            if (getRetryTimeout().compareTo(getAcknowledgeInterval()) <= 0) {
                throw new IllegalArgumentException("message expire time (" + getRetryTimeout() + ") must be greater than batch commit interval (" + getPollInterval() + ")");
            }
            this.redisTemplate = this.consumer.getRedisTemplate();
            this.messageCodec = this.consumer.getMessageCodec();
            this.messageType = this.consumer.getMessageType();
            this.messageListener = this.consumer.getMessageListener();
            this.streamNameRaw = this.messageCodec.toRawKey(this.consumer.getStreamName());
            this.deadLetterStreamNameRaw = this.messageCodec.toRawKey(HussarRedisMQUtils.generateStreamName(HussarRedisMQConstants.DEFAULT_KEY_PREFIX, HussarRedisMQConstants.MPMC_DEAD_LETTER_NAMESPACE, this.consumer.getKey() + ":" + this.consumer.getGroupName()));
            this.readConsumer = Consumer.from(this.consumer.getGroupName(), this.consumer.getConsumerName());
            this.readOptions = StreamReadOptions.empty().block(getPollInterval()).count(getPollCount());
            this.readOffsetRaw = StreamOffset.create(this.messageCodec.toRawKey(this.consumer.getStreamName()), ReadOffset.lastConsumed());
            HussarRedisMQUtils.createStreamGroupIfAbsent(this.redisTemplate, this.consumer.getStreamName(), this.consumer.getGroupName());
        } finally {
            this.startSignal.countDown();
        }
    }

    public void stop() {
        if (this.boundedThread == null) {
            logger.debug("Poll worker is not bind to thread");
            switch (this.lifecycleState) {
                case 0:
                    this.lifecycleState = 2;
                    return;
                case 2:
                    return;
                default:
                    throw new IllegalStateException("failed to stop: " + this);
            }
        }
        if (this.boundedThread == Thread.currentThread()) {
            logger.debug("Poll worker is stopping...");
            super.stop();
        } else if (this.lifecycleState == 1) {
            logger.debug("Waiting for poll worker stopping...");
            this.boundedThread.interrupt();
            try {
                if (!this.stopSignal.await(this.waitStopTimeoutMillis, TimeUnit.MILLISECONDS)) {
                    if (!HussarRedisMQConstants.CLIENT_TYPE_JEDIS.equals(HussarRedisMQUtils.getClientType(this.redisTemplate))) {
                        throw new IllegalStateException("timeout while waiting for stop: " + this);
                    }
                    logger.warn("Jedis client is not interruptible, just ignore timeout of poll worker thread termination");
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException("interrupted while waiting for stop: " + this, e);
            }
        }
    }

    protected void doStop() {
        try {
            this.redisTemplate.execute(redisConnection -> {
                logger.debug("Read pending list of {}/{} before close", this.consumer.getStreamName(), this.consumer.getConsumerName());
                PendingMessages xPending = redisConnection.xPending(this.streamNameRaw, this.readConsumer.getGroup(), RedisStreamCommands.XPendingOptions.unbounded(Long.MAX_VALUE).consumer(this.readConsumer.getName()));
                if (xPending != null && !xPending.isEmpty()) {
                    ArrayList arrayList = new ArrayList(xPending.size());
                    Iterator it = xPending.iterator();
                    while (it.hasNext()) {
                        arrayList.add(((PendingMessage) it.next()).getId());
                    }
                    doReject(redisConnection, arrayList);
                }
                logger.debug("Delete consumer {}/{}", this.consumer.getStreamName(), this.consumer.getConsumerName());
                redisConnection.xGroupDelConsumer(this.streamNameRaw, this.readConsumer);
                return null;
            }, true);
        } finally {
            setThreadName(false);
            this.boundedThread = null;
            this.consumer.onPollWorkerStop(this);
            this.stopSignal.countDown();
        }
    }

    protected void setThreadName(boolean z) {
        String str;
        String name = this.boundedThread.getName();
        if (name == null) {
            return;
        }
        int indexOf = name.indexOf(35);
        String substring = indexOf >= 0 ? name.substring(0, indexOf + 1) : name + " #";
        if (z) {
            str = substring + StringUtils.abbreviateMiddle(this.consumer.getKey(), "*", 5) + '/' + StringUtils.abbreviateMiddle(StringUtils.removeStart(this.consumer.getConsumerName(), HussarRedisMQConstants.MPMC_CONSUMER_PREFIX), "*", 5);
        } else {
            str = substring + HussarRedisMQConstants.MPMC_IDLE_CONSUMER;
        }
        this.boundedThread.setName(str);
    }

    protected void doLoop() throws UncheckedInterruptedException {
        Duration pollInitialDelay = getPollInitialDelay();
        if (pollInitialDelay.compareTo(Duration.ZERO) > 0) {
            logger.debug("Delay before poll worker loop: {}", pollInitialDelay);
            try {
                Thread.sleep(pollInitialDelay.toMillis());
            } catch (InterruptedException e) {
                throw new UncheckedInterruptedException(e);
            }
        }
        LoopState loopState = new LoopState(this);
        while (isRunning()) {
            try {
                interruptibleCheckpoint();
                logger.debug("Fetch new connection for polling {}/{}", this.consumer.getStreamName(), this.consumer.getConsumerName());
                this.redisTemplate.execute(redisConnection -> {
                    while (isRunning() && loopState.shouldReuseConnection()) {
                        interruptibleCheckpoint();
                        if (loopState.shouldCheck(redisConnection)) {
                            logger.debug("Check peer messages timeout: {}/{}", this.consumer.getStreamName(), this.consumer.getConsumerName());
                            doCheckTimeout(redisConnection);
                            logger.debug("Check idle messages: {}/{}", this.consumer.getStreamName(), this.consumer.getConsumerName());
                            doHandle(redisConnection, doCheckIdle(redisConnection));
                        }
                        logger.debug("Read messages: {}/{}", this.consumer.getStreamName(), this.consumer.getConsumerName());
                        doHandle(redisConnection, doRead(redisConnection));
                    }
                    return null;
                }, true);
            } catch (Throwable th) {
                checkInterruption(th);
                if (!HussarRedisMQUtils.isRedisNoGroup(th)) {
                    throw th;
                }
                try {
                    logger.warn("Stream '{}' or consumer group '{}' not exists, recreate them", this.consumer.getStreamName(), this.consumer.getGroupName());
                    HussarRedisMQUtils.createStreamGroupIfAbsent(this.redisTemplate, this.consumer.getStreamName(), this.consumer.getGroupName());
                } catch (Throwable th2) {
                    logger.error("Failed to create stream '{}' and consumer group '{}'", new Object[]{this.consumer.getStreamName(), this.consumer.getGroupName(), th2});
                    th.addSuppressed(th2);
                    throw th;
                }
            }
        }
    }

    private List<ByteRecord> doRead(RedisConnection redisConnection) {
        return redisConnection.xReadGroup(this.readConsumer, this.readOptions, new StreamOffset[]{this.readOffsetRaw});
    }

    private void doHandle(RedisConnection redisConnection, List<ByteRecord> list) throws UncheckedException {
        if (list == null || list.isEmpty()) {
            return;
        }
        StopWatch stopWatch = null;
        if (logger.isDebugEnabled()) {
            stopWatch = new StopWatch();
            stopWatch.start();
        }
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis2 = System.currentTimeMillis();
        ArrayList arrayList2 = new ArrayList();
        logger.debug("Handling {} message records", Integer.valueOf(list.size()));
        for (ByteRecord byteRecord : list) {
            interruptibleCheckpoint();
            try {
                logger.debug("Deserialize message: {}", byteRecord.getId());
                HussarRedisMQMessage fromByteRecord = HussarRedisMQMessage.fromByteRecord(this.messageCodec, this.messageType, byteRecord);
                logger.debug("Message listener: {}", byteRecord.getId());
                this.messageListener.onMessage(fromByteRecord.getData());
                arrayList.add(byteRecord.getId());
                if (arrayList.size() >= getAcknowledgeCount() || Duration.ofMillis(System.currentTimeMillis() - currentTimeMillis).compareTo(getAcknowledgeInterval()) >= 0) {
                    doCommit(redisConnection, arrayList);
                    currentTimeMillis = System.currentTimeMillis();
                    arrayList.clear();
                }
            } catch (Throwable th) {
                Throwable convertInterruption = convertInterruption(th);
                if (convertInterruption instanceof InterruptedException) {
                    if (!arrayList.isEmpty()) {
                        logger.debug("Commit uncommited messages before interruption: {}", arrayList);
                        try {
                            doCommit(redisConnection, arrayList);
                        } catch (Throwable th2) {
                            logger.warn("Ignore commit error during interruption", th2);
                            convertInterruption.addSuppressed(th2);
                        }
                    }
                    throw new UncheckedInterruptedException(convertInterruption);
                }
                if (HussarRedisMQUtils.getRootRedisException(th) != null) {
                    throw new UncheckedException(th);
                }
                logger.error("Failed to handle message {} from {}/{}", new Object[]{byteRecord.getId(), this.consumer.getStreamName(), this.consumer.getConsumerName(), th});
                arrayList2.add(byteRecord.getId());
                if (arrayList2.size() >= getAcknowledgeCount() || Duration.ofMillis(System.currentTimeMillis() - currentTimeMillis2).compareTo(getAcknowledgeInterval()) >= 0) {
                    doReject(redisConnection, arrayList2);
                    currentTimeMillis2 = System.currentTimeMillis();
                    arrayList2.clear();
                }
            }
        }
        if (!arrayList.isEmpty()) {
            doCommit(redisConnection, arrayList);
        }
        if (!arrayList2.isEmpty()) {
            doReject(redisConnection, arrayList2);
        }
        if (!logger.isDebugEnabled() || stopWatch == null) {
            return;
        }
        stopWatch.stop();
        logger.debug("Handled {} messages in {}", Integer.valueOf(list.size()), stopWatch.formatTime());
    }

    private void doCommit(RedisConnection redisConnection, List<RecordId> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        logger.debug("Acknowledge messages: {}", list);
        doAcknowledge(redisConnection, list, true);
    }

    private void doAcknowledge(RedisConnection redisConnection, List<RecordId> list, boolean z) {
        if (list == null || list.isEmpty()) {
            return;
        }
        if (!this.consumer.isSingleGroup()) {
            Long xAck = redisConnection.xAck(this.streamNameRaw, this.readConsumer.getGroup(), (RecordId[]) list.toArray(new RecordId[0]));
            if (z && logger.isWarnEnabled()) {
                if (xAck == null) {
                    logger.warn("Illegal consumer acknowledged count: {} ({} records: {})", new Object[]{xAck, Integer.valueOf(list.size()), list});
                    return;
                } else {
                    if (xAck.longValue() != list.size()) {
                        logger.warn("Consumer acknowledged message count {} != {}: {}", new Object[]{xAck, Integer.valueOf(list.size()), list});
                        return;
                    }
                    return;
                }
            }
            return;
        }
        ArrayList arrayList = new ArrayList(1 + list.size());
        arrayList.add(this.streamNameRaw);
        Iterator<RecordId> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValue().getBytes(StandardCharsets.UTF_8));
        }
        List list2 = (List) redisConnection.eval(SCRIPT_SINGLE_GROUP_ACK_DEL, ReturnType.MULTI, 1, (byte[][]) arrayList.toArray((Object[]) new byte[0]));
        if (z && logger.isWarnEnabled()) {
            if (list2 == null || list2.size() != 2) {
                logger.warn("Illegal consumer acknowledged/deleted counts: {} ({} records: {})", new Object[]{list2, Integer.valueOf(list.size()), list});
                return;
            }
            Long valueOf = list2.get(0) instanceof Number ? Long.valueOf(((Number) list2.get(0)).longValue()) : null;
            Long valueOf2 = list2.get(1) instanceof Number ? Long.valueOf(((Number) list2.get(1)).longValue()) : null;
            if (valueOf == null || valueOf.longValue() != list.size()) {
                logger.warn("Consumer acknowledged message count {} != {}: {}", new Object[]{valueOf, Integer.valueOf(list.size()), list});
            }
            if (valueOf2 == null || valueOf2.longValue() != list.size()) {
                logger.warn("Consumer deleted message count {} != {}: {}", new Object[]{valueOf2, Integer.valueOf(list.size()), list});
            }
        }
    }

    private void doReject(RedisConnection redisConnection, List<RecordId> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        logger.debug("Transfer rejected messages from {}/{} to the idle consumer: {}", new Object[]{this.consumer.getStreamName(), this.consumer.getConsumerName(), list});
        List emptyList = list.isEmpty() ? Collections.emptyList() : redisConnection.xClaimJustId(this.streamNameRaw, this.readConsumer.getGroup(), HussarRedisMQConstants.MPMC_IDLE_CONSUMER, RedisStreamCommands.XClaimOptions.minIdle(Duration.ZERO).ids((RecordId[]) list.toArray(new RecordId[0])).idle(getRetryTimeout()));
        if (emptyList == null || emptyList.size() != list.size()) {
            Logger logger2 = logger;
            Object[] objArr = new Object[4];
            objArr[0] = Integer.valueOf(emptyList != null ? emptyList.size() : 0);
            objArr[1] = Integer.valueOf(list.size());
            objArr[2] = emptyList;
            objArr[DEFAULT_RETRY_COUNT] = list;
            logger2.warn("Consumer rejected message count {} != {}: {} <=> {}", objArr);
        }
    }

    private void doCheckTimeout(RedisConnection redisConnection) {
        Duration retryTimeout = getRetryTimeout();
        logger.debug("Read pending list of {} for checking timeout messages", this.consumer.getStreamName());
        PendingMessages xPending = redisConnection.xPending(this.streamNameRaw, this.readConsumer.getGroup(), RedisStreamCommands.XPendingOptions.unbounded(Long.MAX_VALUE));
        if (xPending == null || xPending.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = xPending.iterator();
        while (it.hasNext()) {
            PendingMessage pendingMessage = (PendingMessage) it.next();
            String consumerName = pendingMessage.getConsumerName();
            if (!HussarRedisMQConstants.MPMC_IDLE_CONSUMER.equals(consumerName) && !HussarRedisMQConstants.MPMC_DEAD_CONSUMER.equals(consumerName) && pendingMessage.getElapsedTimeSinceLastDelivery().compareTo(retryTimeout) > 0) {
                arrayList.add(pendingMessage.getId());
            }
        }
        logger.debug("Transfer timeout messages to the idle consumer (checked by consumer {}/{}): {}", new Object[]{this.consumer.getStreamName(), this.consumer.getConsumerName(), arrayList});
        if (arrayList.isEmpty()) {
            return;
        }
        redisConnection.xClaimJustId(this.streamNameRaw, this.readConsumer.getGroup(), HussarRedisMQConstants.MPMC_IDLE_CONSUMER, RedisStreamCommands.XClaimOptions.minIdle(retryTimeout).ids((RecordId[]) arrayList.toArray(new RecordId[0])).idle(retryTimeout));
    }

    private List<ByteRecord> doCheckIdle(RedisConnection redisConnection) {
        logger.debug("Read pending list of {}/{} for retry delivery", this.consumer.getStreamName(), HussarRedisMQConstants.MPMC_IDLE_CONSUMER);
        PendingMessages xPending = redisConnection.xPending(this.streamNameRaw, this.readConsumer.getGroup(), RedisStreamCommands.XPendingOptions.unbounded(Long.valueOf(getPollCount())).consumer(HussarRedisMQConstants.MPMC_IDLE_CONSUMER));
        if (xPending == null || xPending.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = xPending.iterator();
        while (it.hasNext()) {
            PendingMessage pendingMessage = (PendingMessage) it.next();
            if (pendingMessage.getTotalDeliveryCount() < getRetryCount()) {
                arrayList.add(pendingMessage.getId());
            } else {
                arrayList2.add(pendingMessage.getId());
            }
        }
        if (!arrayList2.isEmpty()) {
            logger.debug("Detected dead letters in {}: {}", this.consumer.getStreamName(), arrayList2);
            doHandleDeadLetter(redisConnection, arrayList2);
        }
        logger.debug("Claim idle messages, retry delivery to consumer {}/{}: {}", new Object[]{this.consumer.getStreamName(), this.consumer.getConsumerName(), arrayList});
        return arrayList.isEmpty() ? Collections.emptyList() : redisConnection.xClaim(this.streamNameRaw, this.readConsumer.getGroup(), this.readConsumer.getName(), RedisStreamCommands.XClaimOptions.minIdle(getRetryTimeout()).ids(arrayList));
    }

    private void doHandleDeadLetter(RedisConnection redisConnection, List<RecordId> list) {
        List<ByteRecord> emptyList = list.isEmpty() ? Collections.emptyList() : redisConnection.xClaim(this.streamNameRaw, this.readConsumer.getGroup(), HussarRedisMQConstants.MPMC_DEAD_CONSUMER, RedisStreamCommands.XClaimOptions.minIdle(getRetryTimeout()).ids(list));
        doHandleDanglingRecordIds(redisConnection, list, (List) ((List) Optional.ofNullable(emptyList).orElseGet(Collections::emptyList)).stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        if (emptyList == null || emptyList.isEmpty()) {
            return;
        }
        int deadLetterLimit = getDeadLetterLimit();
        RedisStreamCommands.XAddOptions maxlen = deadLetterLimit > 0 ? RedisStreamCommands.XAddOptions.maxlen(deadLetterLimit) : RedisStreamCommands.XAddOptions.none();
        for (ByteRecord byteRecord : emptyList) {
            redisConnection.xAdd(byteRecord.withId(RecordId.autoGenerate()).withStreamKey(this.deadLetterStreamNameRaw), maxlen);
            doAcknowledge(redisConnection, Collections.singletonList(byteRecord.getId()), false);
        }
    }

    private void doHandleDanglingRecordIds(RedisConnection redisConnection, @NonNull List<RecordId> list, List<RecordId> list2) {
        if (list.size() != (list2 != null ? list2.size() : 0)) {
            LinkedHashSet linkedHashSet = new LinkedHashSet((Collection) Optional.ofNullable(list2).orElseGet(Collections::emptyList));
            LinkedHashSet linkedHashSet2 = new LinkedHashSet();
            for (RecordId recordId : list) {
                if (!linkedHashSet.contains(recordId)) {
                    linkedHashSet2.add(recordId);
                }
            }
            logger.warn("Force to acknowledge dangling messages detected in {}/{}: {}", new Object[]{this.consumer.getStreamName(), this.consumer.getConsumerName(), linkedHashSet2});
            redisConnection.xAck(this.streamNameRaw, this.readConsumer.getGroup(), (RecordId[]) linkedHashSet2.toArray(new RecordId[0]));
        }
    }

    private void interruptibleCheckpoint() throws UncheckedInterruptedException {
        try {
            Thread.sleep(0L);
        } catch (InterruptedException e) {
            throw new UncheckedInterruptedException(e);
        }
    }

    private void checkInterruption(Throwable th) throws UncheckedInterruptedException {
        Throwable convertInterruption = convertInterruption(th);
        if (convertInterruption instanceof InterruptedException) {
            throw new UncheckedInterruptedException(convertInterruption);
        }
    }

    private Throwable convertInterruption(Throwable th) {
        if (th instanceof UncheckedInterruptedException) {
            th = th.getCause();
        }
        if (th instanceof InterruptedException) {
            return th;
        }
        if ((th instanceof InterruptedIOException) || (th instanceof ClosedByInterruptException)) {
            InterruptedException interruptedException = new InterruptedException("io interruption");
            interruptedException.initCause(th);
            return interruptedException;
        }
        Throwable rootCause = ExceptionUtils.getRootCause(th);
        if (!(rootCause instanceof InterruptedException) && !(rootCause instanceof InterruptedIOException) && !(rootCause instanceof ClosedByInterruptException)) {
            return th;
        }
        InterruptedException interruptedException2 = new InterruptedException("nested interruption");
        interruptedException2.initCause(th);
        return interruptedException2;
    }

    public long getWaitStartTimeoutMillis() {
        if (this.waitStartTimeoutMillis > 0) {
            return this.waitStartTimeoutMillis;
        }
        return 5000L;
    }

    public void setWaitStartTimeoutMillis(long j) {
        this.waitStartTimeoutMillis = j;
    }

    public long getWaitStopTimeoutMillis() {
        if (this.waitStopTimeoutMillis > 0) {
            return this.waitStopTimeoutMillis;
        }
        return 5000L;
    }

    public void setWaitStopTimeoutMillis(long j) {
        this.waitStopTimeoutMillis = j;
    }

    public Duration getFetchConnectionInterval() {
        return this.fetchConnectionInterval != null ? this.fetchConnectionInterval : DEFAULT_INTERVAL;
    }

    public void setFetchConnectionInterval(Duration duration) {
        this.fetchConnectionInterval = duration;
    }

    public Duration getPollInitialDelay() {
        return this.pollInitialDelay != null ? this.pollInitialDelay : DEFAULT_POLL_INITIAL_DELAY;
    }

    public void setPollInitialDelay(Duration duration) {
        this.pollInitialDelay = duration;
    }

    public Duration getPollInterval() {
        return this.pollInterval != null ? this.pollInterval : DEFAULT_INTERVAL;
    }

    public void setPollInterval(Duration duration) {
        this.pollInterval = duration;
    }

    public int getPollCount() {
        return this.pollCount > 0 ? this.pollCount : DEFAULT_POLL_COUNT;
    }

    public void setPollCount(int i) {
        this.pollCount = i;
    }

    public boolean isCheckAlways() {
        return this.checkAlways;
    }

    public void setCheckAlways(boolean z) {
        this.checkAlways = z;
    }

    public int getCheckLength() {
        return this.checkLength;
    }

    public void setCheckLength(int i) {
        this.checkLength = i;
    }

    public Duration getCheckInterval() {
        return this.checkInterval != null ? this.checkInterval : DEFAULT_INTERVAL;
    }

    public void setCheckInterval(Duration duration) {
        this.checkInterval = duration;
    }

    public Duration getRetryTimeout() {
        return this.retryTimeout != null ? this.retryTimeout : DEFAULT_RETRY_TIMEOUT;
    }

    public void setRetryTimeout(Duration duration) {
        this.retryTimeout = duration;
    }

    public int getRetryCount() {
        return this.retryCount >= 0 ? this.retryCount : DEFAULT_RETRY_COUNT;
    }

    public void setRetryCount(int i) {
        this.retryCount = i;
    }

    public int getDeadLetterLimit() {
        return this.deadLetterLimit;
    }

    public void setDeadLetterLimit(int i) {
        this.deadLetterLimit = i;
    }

    public Duration getAcknowledgeInterval() {
        return this.acknowledgeInterval != null ? this.acknowledgeInterval : DEFAULT_COMMIT_INTERVAL;
    }

    public void setAcknowledgeInterval(Duration duration) {
        this.acknowledgeInterval = duration;
    }

    public int getAcknowledgeCount() {
        if (this.acknowledgeCount > 0) {
            return this.acknowledgeCount;
        }
        return 5;
    }

    public void setAcknowledgeCount(int i) {
        this.acknowledgeCount = i;
    }
}
