package com.jxdinfo.hussar.support.mq.redis.consumer;

import com.jxdinfo.hussar.support.mq.consumer.HussarMQAbstractConsumer;
import com.jxdinfo.hussar.support.mq.consumer.HussarMQMessageListener;
import com.jxdinfo.hussar.support.mq.lifecycle.HussarMQLifecycleManager;
import com.jxdinfo.hussar.support.mq.redis.config.HussarRedisMQProperties;
import com.jxdinfo.hussar.support.mq.redis.constants.HussarRedisMQConstants;
import com.jxdinfo.hussar.support.mq.redis.message.HussarRedisMQMessageCodec;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.exception.UncheckedInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.lang.NonNull;

/* loaded from: input_file:com/jxdinfo/hussar/support/mq/redis/consumer/HussarRedisMQConsumer.class */
public class HussarRedisMQConsumer<T> extends HussarMQAbstractConsumer<T> {
    private static final Logger logger = LoggerFactory.getLogger(HussarRedisMQConsumer.class);
    private final HussarRedisMQProperties properties;
    private final RedisTemplate<String, Object> redisTemplate;
    private final HussarRedisMQMessageCodec messageCodec;
    private final ExecutorService executorService;
    private final String key;
    private final String streamName;
    private final String groupName;
    private final String consumerName;
    private final boolean singleGroup;
    private volatile HussarRedisMQPollWorker<T> worker;
    private volatile boolean stopping;

    /* loaded from: input_file:com/jxdinfo/hussar/support/mq/redis/consumer/HussarRedisMQConsumer$PollWorkerDelayRestart.class */
    private class PollWorkerDelayRestart implements Runnable {
        private PollWorkerDelayRestart() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Thread.sleep(((Duration) Optional.ofNullable(HussarRedisMQConsumer.this.getProperties()).map((v0) -> {
                    return v0.getPollWorkerRestartInterval();
                }).filter(duration -> {
                    return duration.compareTo(Duration.ZERO) > 0;
                }).orElse(HussarRedisMQConstants.DEFAULT_RESTART_INTERVAL)).toMillis());
                HussarRedisMQConsumer.logger.info("Poll worker of consumer {}/{} is restarting", HussarRedisMQConsumer.this.streamName, HussarRedisMQConsumer.this.consumerName);
                HussarRedisMQConsumer.this.restartPollWorker();
            } catch (InterruptedException e) {
                HussarRedisMQConsumer.logger.warn("Poll worker restart task of consumer {}/{} is interrupted", HussarRedisMQConsumer.this.streamName, HussarRedisMQConsumer.this.consumerName);
                throw new UncheckedInterruptedException(e);
            }
        }
    }

    public HussarRedisMQConsumer(HussarMQLifecycleManager hussarMQLifecycleManager, @NonNull HussarRedisMQProperties hussarRedisMQProperties, @NonNull RedisTemplate<String, Object> redisTemplate, @NonNull HussarRedisMQMessageCodec hussarRedisMQMessageCodec, @NonNull ExecutorService executorService, @NonNull Type type, @NonNull HussarMQMessageListener<T> hussarMQMessageListener, @NonNull String str, @NonNull String str2, @NonNull String str3, @NonNull String str4, boolean z) {
        super(hussarMQLifecycleManager, type, hussarMQMessageListener);
        this.stopping = false;
        this.properties = hussarRedisMQProperties;
        this.redisTemplate = redisTemplate;
        this.messageCodec = hussarRedisMQMessageCodec;
        this.executorService = executorService;
        this.key = str;
        this.streamName = str2;
        this.groupName = str3;
        this.consumerName = str4;
        this.singleGroup = z;
    }

    protected void doStart() {
        logger.debug("Start redis-mq consumer {}/{}", this.streamName, this.consumerName);
        restartPollWorker();
    }

    protected void restartPollWorker() {
        if (this.stopping || this.lifecycleState == 2) {
            return;
        }
        if (this.worker == null || !this.worker.isRunning()) {
            this.worker = createPollWorker();
            this.executorService.execute(this.worker);
            this.worker.start();
        }
    }

    protected HussarRedisMQPollWorker<T> createPollWorker() {
        HussarRedisMQPollWorker<T> hussarRedisMQPollWorker = new HussarRedisMQPollWorker<>(this);
        hussarRedisMQPollWorker.setWaitStartTimeoutMillis(((Duration) Optional.ofNullable(this.properties.getPollWorkerStartTimeout()).orElse(HussarRedisMQConstants.DEFAULT_START_TIMEOUT)).toMillis());
        hussarRedisMQPollWorker.setWaitStopTimeoutMillis(((Duration) Optional.ofNullable(this.properties.getPollWorkerStopTimeout()).orElse(HussarRedisMQConstants.DEFAULT_STOP_TIMEOUT)).toMillis());
        hussarRedisMQPollWorker.setFetchConnectionInterval(this.properties.getFetchConnectionInterval());
        hussarRedisMQPollWorker.setPollInitialDelay(this.properties.getPollInitialDelay());
        hussarRedisMQPollWorker.setPollInterval(this.properties.getPollInterval());
        hussarRedisMQPollWorker.setPollCount(this.properties.getPollCount().intValue());
        hussarRedisMQPollWorker.setCheckAlways(this.properties.isCheckAlways());
        hussarRedisMQPollWorker.setCheckLength(this.properties.getCheckLength());
        hussarRedisMQPollWorker.setCheckInterval(this.properties.getCheckInterval());
        hussarRedisMQPollWorker.setRetryTimeout(this.properties.getRetryTimeout());
        hussarRedisMQPollWorker.setRetryCount(this.properties.getRetryCount());
        hussarRedisMQPollWorker.setDeadLetterLimit(this.properties.getDeadLetterLimit());
        hussarRedisMQPollWorker.setAcknowledgeInterval(this.properties.getAcknowledgeInterval());
        hussarRedisMQPollWorker.setAcknowledgeCount(this.properties.getAcknowledgeCount());
        return hussarRedisMQPollWorker;
    }

    protected void doStop() {
        logger.debug("Stop redis-mq consumer {}/{}", this.streamName, this.consumerName);
        if (this.worker != null) {
            this.stopping = true;
            this.worker.stop();
            this.worker = null;
            this.stopping = false;
        }
    }

    public void onPollWorkerStop(HussarRedisMQPollWorker<T> hussarRedisMQPollWorker) {
        logger.debug("Consumer {}/{} received poll worker stop signal", this.streamName, this.consumerName);
        if (this.worker == hussarRedisMQPollWorker) {
            this.worker = null;
            if (this.stopping) {
                return;
            }
            logger.debug("Detected unexpected termination in poll worker of consumer {}/{}", this.streamName, this.consumerName);
            if (this.properties.isPollWorkerAutoRestart()) {
                this.executorService.submit(new PollWorkerDelayRestart());
            }
        }
    }

    public HussarRedisMQProperties getProperties() {
        return this.properties;
    }

    public RedisTemplate<String, Object> getRedisTemplate() {
        return this.redisTemplate;
    }

    public HussarRedisMQMessageCodec getMessageCodec() {
        return this.messageCodec;
    }

    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    public String getKey() {
        return this.key;
    }

    public String getStreamName() {
        return this.streamName;
    }

    public String getGroupName() {
        return this.groupName;
    }

    public String getConsumerName() {
        return this.consumerName;
    }

    public boolean isSingleGroup() {
        return this.singleGroup;
    }

    public String toString() {
        return "HussarRedisMQConsumer<" + getLifecycleStateName() + " @" + this.streamName + "/" + this.consumerName + ">";
    }
}
