package com.jxdinfo.hussar.support.mq.standalone.consumer;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jxdinfo.hussar.support.mq.consumer.HussarMQAbstractConsumer;
import com.jxdinfo.hussar.support.mq.consumer.HussarMQMessageListener;
import com.jxdinfo.hussar.support.mq.lifecycle.HussarMQLifecycleManager;
import com.jxdinfo.hussar.support.mq.standalone.queue.HussarMQStandaloneQueue;
import java.io.InterruptedIOException;
import java.lang.reflect.Type;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.exception.UncheckedInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;

/* loaded from: input_file:com/jxdinfo/hussar/support/mq/standalone/consumer/HussarStandaloneMQConsumer.class */
public class HussarStandaloneMQConsumer<T> extends HussarMQAbstractConsumer<T> {
    private static final Logger logger = LoggerFactory.getLogger(HussarStandaloneMQConsumer.class);
    private final ObjectMapper objectMapper;
    private final ExecutorService executorService;
    private final HussarMQStandaloneQueue queue;
    private final String key;
    private final String description;
    private volatile String name;
    private volatile HussarStandaloneMQConsumer<T>.PollWorker pollWorker;

    /* loaded from: input_file:com/jxdinfo/hussar/support/mq/standalone/consumer/HussarStandaloneMQConsumer$PollWorker.class */
    private class PollWorker implements Runnable {
        private static final int START_TIMEOUT = 3000;
        private static final int STOP_TIMEOUT = 3000;
        private final JavaType messageJavaType;
        private volatile Thread boundedThread;
        private final CountDownLatch startSignal;
        private final CountDownLatch stopSignal;

        private PollWorker() {
            this.messageJavaType = HussarStandaloneMQConsumer.this.objectMapper.getTypeFactory().constructType(HussarStandaloneMQConsumer.this.getMessageType());
            this.startSignal = new CountDownLatch(1);
            this.stopSignal = new CountDownLatch(1);
        }

        public boolean isRunning() {
            return this.boundedThread != null && this.boundedThread.isAlive();
        }

        @Override // java.lang.Runnable
        public void run() {
            HussarStandaloneMQConsumer.logger.debug("Poll worker is starting...");
            start();
            try {
                HussarStandaloneMQConsumer.logger.debug("Start poll worker loop");
                while (!Thread.interrupted()) {
                    handle(HussarStandaloneMQConsumer.this.queue.poll(HussarStandaloneMQConsumer.this.name));
                }
            } catch (Throwable th) {
                if (Thread.interrupted()) {
                    HussarStandaloneMQConsumer.logger.debug("Cleared thread interrupted flag");
                }
                if (convertInterruption(th) instanceof InterruptedException) {
                    HussarStandaloneMQConsumer.logger.debug("Poll worker is terminated by interruption");
                } else {
                    HussarStandaloneMQConsumer.logger.error("Unexpected error occurred in poll worker", th);
                }
            } finally {
                this.boundedThread = null;
                this.stopSignal.countDown();
            }
        }

        private void handle(byte[] bArr) throws InterruptedException {
            try {
                HussarStandaloneMQConsumer.logger.debug("Deserialize incoming message");
                Object readValue = HussarStandaloneMQConsumer.this.objectMapper.readValue(bArr, this.messageJavaType);
                HussarStandaloneMQConsumer.logger.debug("Message listener handle incoming message");
                HussarStandaloneMQConsumer.this.messageListener.onMessage(readValue);
            } catch (Throwable th) {
                Throwable convertInterruption = convertInterruption(th);
                if (convertInterruption instanceof InterruptedException) {
                    throw ((InterruptedException) convertInterruption);
                }
                Logger logger = HussarStandaloneMQConsumer.logger;
                Object[] objArr = new Object[4];
                objArr[0] = Integer.valueOf(bArr != null ? bArr.length : 0);
                objArr[1] = HussarStandaloneMQConsumer.this.key;
                objArr[2] = HussarStandaloneMQConsumer.this.name;
                objArr[3] = th;
                logger.error("Failed to handle message ({} bytes) from {}/{}", objArr);
            }
        }

        private synchronized void start() {
            this.boundedThread = Thread.currentThread();
            this.startSignal.countDown();
        }

        public void waitStart() {
            HussarStandaloneMQConsumer.logger.debug("Waiting for poll worker starting...");
            try {
                if (this.startSignal.await(3000L, TimeUnit.MILLISECONDS)) {
                } else {
                    throw new IllegalStateException("timeout while waiting for poll worker start: " + HussarStandaloneMQConsumer.this);
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException("interrupted while waiting for start: " + HussarStandaloneMQConsumer.this, e);
            }
        }

        public synchronized void stop() {
            HussarStandaloneMQConsumer.logger.debug("Poll worker is stopping...");
            if (this.boundedThread != null) {
                this.boundedThread.interrupt();
                HussarStandaloneMQConsumer.logger.debug("Waiting for poll worker stopping...");
                waitStop();
            }
        }

        private void waitStop() {
            try {
                if (this.stopSignal.await(3000L, TimeUnit.MILLISECONDS)) {
                } else {
                    throw new IllegalStateException("timeout while waiting for poll worker stop: " + HussarStandaloneMQConsumer.this);
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException("interrupted while waiting for stop: " + HussarStandaloneMQConsumer.this, e);
            }
        }

        private Throwable convertInterruption(Throwable th) {
            if (th instanceof UncheckedInterruptedException) {
                th = th.getCause();
            }
            if (th instanceof InterruptedException) {
                return th;
            }
            if ((th instanceof InterruptedIOException) || (th instanceof ClosedByInterruptException)) {
                InterruptedException interruptedException = new InterruptedException("io interruption");
                interruptedException.initCause(th);
                return interruptedException;
            }
            Throwable rootCause = ExceptionUtils.getRootCause(th);
            if (!(rootCause instanceof InterruptedException) && !(rootCause instanceof InterruptedIOException) && !(rootCause instanceof ClosedByInterruptException)) {
                return th;
            }
            InterruptedException interruptedException2 = new InterruptedException("nested interruption");
            interruptedException2.initCause(th);
            return interruptedException2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HussarStandaloneMQConsumer(HussarMQLifecycleManager hussarMQLifecycleManager, @NonNull ObjectMapper objectMapper, @NonNull ExecutorService executorService, @NonNull HussarMQStandaloneQueue hussarMQStandaloneQueue, @NonNull Type type, @NonNull HussarMQMessageListener<T> hussarMQMessageListener, @NonNull String str, String str2) {
        super(hussarMQLifecycleManager, type, hussarMQMessageListener);
        this.objectMapper = objectMapper;
        this.executorService = executorService;
        this.queue = hussarMQStandaloneQueue;
        this.key = str;
        this.description = str2;
    }

    protected void doStart() {
        logger.debug("Start standalone-mq consumer {}", this.key);
        this.name = this.queue.registerConsumer(this.description);
        this.pollWorker = new PollWorker();
        this.executorService.submit(this.pollWorker);
        this.pollWorker.waitStart();
    }

    protected void doStop() {
        logger.debug("Stop standalone-mq consumer {}/{}", this.key, this.name);
        try {
            this.pollWorker.stop();
        } finally {
            this.queue.unregisterConsumer(this.name);
            this.name = null;
        }
    }

    public String toString() {
        return "HussarStandaloneMQConsumer<" + getLifecycleStateName() + " @" + this.key + "/" + this.name + ">";
    }
}
