/*
 * Decompiled with CFR 0.152.
 */
package com.jxdinfo.hussar.support.mq.redis.consumer;

import com.jxdinfo.hussar.support.mq.consumer.HussarMQAbstractConsumer;
import com.jxdinfo.hussar.support.mq.consumer.HussarMQMessageListener;
import com.jxdinfo.hussar.support.mq.lifecycle.HussarMQLifecycleManager;
import com.jxdinfo.hussar.support.mq.redis.config.HussarRedisMQProperties;
import com.jxdinfo.hussar.support.mq.redis.constants.HussarRedisMQConstants;
import com.jxdinfo.hussar.support.mq.redis.consumer.HussarRedisMQPollWorker;
import com.jxdinfo.hussar.support.mq.redis.message.HussarRedisMQMessageCodec;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.exception.UncheckedInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.lang.NonNull;

public class HussarRedisMQConsumer<T>
extends HussarMQAbstractConsumer<T> {
    private static final Logger logger = LoggerFactory.getLogger(HussarRedisMQConsumer.class);
    private final HussarRedisMQProperties properties;
    private final RedisTemplate<String, Object> redisTemplate;
    private final HussarRedisMQMessageCodec messageCodec;
    private final ExecutorService executorService;
    private final String key;
    private final String streamName;
    private final String groupName;
    private final String consumerName;
    private final boolean singleGroup;
    private volatile HussarRedisMQPollWorker<T> worker;
    private volatile boolean stopping = false;

    public HussarRedisMQConsumer(HussarMQLifecycleManager lifecycleManager, @NonNull HussarRedisMQProperties properties, @NonNull RedisTemplate<String, Object> redisTemplate, @NonNull HussarRedisMQMessageCodec messageCodec, @NonNull ExecutorService executorService, @NonNull Type messageType, @NonNull HussarMQMessageListener<T> messageListener, @NonNull String key, @NonNull String streamName, @NonNull String groupName, @NonNull String consumerName, boolean singleGroup) {
        super(lifecycleManager, messageType, messageListener);
        this.properties = properties;
        this.redisTemplate = redisTemplate;
        this.messageCodec = messageCodec;
        this.executorService = executorService;
        this.key = key;
        this.streamName = streamName;
        this.groupName = groupName;
        this.consumerName = consumerName;
        this.singleGroup = singleGroup;
    }

    protected void doStart() {
        logger.debug("Start redis-mq consumer {}/{}", (Object)this.streamName, (Object)this.consumerName);
        this.restartPollWorker();
    }

    protected void restartPollWorker() {
        if (this.stopping || this.lifecycleState == 2 || this.worker != null && this.worker.isRunning()) {
            return;
        }
        this.worker = this.createPollWorker();
        this.executorService.execute(this.worker);
        this.worker.start();
    }

    protected HussarRedisMQPollWorker<T> createPollWorker() {
        HussarRedisMQPollWorker worker = new HussarRedisMQPollWorker(this);
        worker.setWaitStartTimeoutMillis(Optional.ofNullable(this.properties.getPollWorkerStartTimeout()).orElse(HussarRedisMQConstants.DEFAULT_START_TIMEOUT).toMillis());
        worker.setWaitStopTimeoutMillis(Optional.ofNullable(this.properties.getPollWorkerStopTimeout()).orElse(HussarRedisMQConstants.DEFAULT_STOP_TIMEOUT).toMillis());
        worker.setFetchConnectionInterval(this.properties.getFetchConnectionInterval());
        worker.setPollInitialDelay(this.properties.getPollInitialDelay());
        worker.setPollInterval(this.properties.getPollInterval());
        worker.setPollCount(this.properties.getPollCount());
        worker.setCheckAlways(this.properties.isCheckAlways());
        worker.setCheckLength(this.properties.getCheckLength());
        worker.setCheckInterval(this.properties.getCheckInterval());
        worker.setRetryTimeout(this.properties.getRetryTimeout());
        worker.setRetryCount(this.properties.getRetryCount());
        worker.setDeadLetterLimit(this.properties.getDeadLetterLimit());
        worker.setAcknowledgeInterval(this.properties.getAcknowledgeInterval());
        worker.setAcknowledgeCount(this.properties.getAcknowledgeCount());
        return worker;
    }

    protected void doStop() {
        logger.debug("Stop redis-mq consumer {}/{}", (Object)this.streamName, (Object)this.consumerName);
        if (this.worker != null) {
            this.stopping = true;
            this.worker.stop();
            this.worker = null;
            this.stopping = false;
        }
    }

    public void onPollWorkerStop(HussarRedisMQPollWorker<T> worker) {
        logger.debug("Consumer {}/{} received poll worker stop signal", (Object)this.streamName, (Object)this.consumerName);
        if (this.worker == worker) {
            this.worker = null;
            if (!this.stopping) {
                logger.debug("Detected unexpected termination in poll worker of consumer {}/{}", (Object)this.streamName, (Object)this.consumerName);
                if (this.properties.isPollWorkerAutoRestart()) {
                    this.executorService.submit(new PollWorkerDelayRestart());
                }
            }
        }
    }

    public HussarRedisMQProperties getProperties() {
        return this.properties;
    }

    public RedisTemplate<String, Object> getRedisTemplate() {
        return this.redisTemplate;
    }

    public HussarRedisMQMessageCodec getMessageCodec() {
        return this.messageCodec;
    }

    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    public String getKey() {
        return this.key;
    }

    public String getStreamName() {
        return this.streamName;
    }

    public String getGroupName() {
        return this.groupName;
    }

    public String getConsumerName() {
        return this.consumerName;
    }

    public boolean isSingleGroup() {
        return this.singleGroup;
    }

    public String toString() {
        return "HussarRedisMQConsumer<" + this.getLifecycleStateName() + " @" + this.streamName + "/" + this.consumerName + ">";
    }

    private class PollWorkerDelayRestart
    implements Runnable {
        private PollWorkerDelayRestart() {
        }

        @Override
        public void run() {
            long interval = Optional.ofNullable(HussarRedisMQConsumer.this.getProperties()).map(HussarRedisMQProperties::getPollWorkerRestartInterval).filter(duration -> duration.compareTo(Duration.ZERO) > 0).orElse(HussarRedisMQConstants.DEFAULT_RESTART_INTERVAL).toMillis();
            try {
                Thread.sleep(interval);
            }
            catch (InterruptedException ex) {
                logger.warn("Poll worker restart task of consumer {}/{} is interrupted", (Object)HussarRedisMQConsumer.this.streamName, (Object)HussarRedisMQConsumer.this.consumerName);
                throw new UncheckedInterruptedException((Throwable)ex);
            }
            logger.info("Poll worker of consumer {}/{} is restarting", (Object)HussarRedisMQConsumer.this.streamName, (Object)HussarRedisMQConsumer.this.consumerName);
            HussarRedisMQConsumer.this.restartPollWorker();
        }
    }
}

