/*
 * Decompiled with CFR 0.152.
 */
package com.jxdinfo.hussar.support.mq.standalone.consumer;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jxdinfo.hussar.support.mq.consumer.HussarMQAbstractConsumer;
import com.jxdinfo.hussar.support.mq.consumer.HussarMQMessageListener;
import com.jxdinfo.hussar.support.mq.lifecycle.HussarMQLifecycleManager;
import com.jxdinfo.hussar.support.mq.standalone.queue.HussarMQStandaloneQueue;
import java.io.InterruptedIOException;
import java.lang.reflect.Type;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.exception.UncheckedInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;

public class HussarStandaloneMQConsumer<T>
extends HussarMQAbstractConsumer<T> {
    private static final Logger logger = LoggerFactory.getLogger(HussarStandaloneMQConsumer.class);
    private final ObjectMapper objectMapper;
    private final ExecutorService executorService;
    private final HussarMQStandaloneQueue queue;
    private final String key;
    private final String description;
    private volatile String name;
    private volatile PollWorker pollWorker;

    protected HussarStandaloneMQConsumer(HussarMQLifecycleManager lifecycleManager, @NonNull ObjectMapper objectMapper, @NonNull ExecutorService executorService, @NonNull HussarMQStandaloneQueue queue, @NonNull Type messageType, @NonNull HussarMQMessageListener<T> messageListener, @NonNull String key, String description) {
        super(lifecycleManager, messageType, messageListener);
        this.objectMapper = objectMapper;
        this.executorService = executorService;
        this.queue = queue;
        this.key = key;
        this.description = description;
    }

    protected void doStart() {
        logger.debug("Start standalone-mq consumer {}", (Object)this.key);
        this.name = this.queue.registerConsumer(this.description);
        this.pollWorker = new PollWorker();
        this.executorService.submit(this.pollWorker);
        this.pollWorker.waitStart();
    }

    protected void doStop() {
        logger.debug("Stop standalone-mq consumer {}/{}", (Object)this.key, (Object)this.name);
        try {
            this.pollWorker.stop();
        }
        finally {
            this.queue.unregisterConsumer(this.name);
            this.name = null;
        }
    }

    public String toString() {
        return "HussarStandaloneMQConsumer<" + this.getLifecycleStateName() + " @" + this.key + "/" + this.name + ">";
    }

    private class PollWorker
    implements Runnable {
        private static final int START_TIMEOUT = 3000;
        private static final int STOP_TIMEOUT = 3000;
        private final JavaType messageJavaType;
        private volatile Thread boundedThread;
        private final CountDownLatch startSignal;
        private final CountDownLatch stopSignal;

        private PollWorker() {
            this.messageJavaType = HussarStandaloneMQConsumer.this.objectMapper.getTypeFactory().constructType(HussarStandaloneMQConsumer.this.getMessageType());
            this.startSignal = new CountDownLatch(1);
            this.stopSignal = new CountDownLatch(1);
        }

        public boolean isRunning() {
            return this.boundedThread != null && this.boundedThread.isAlive();
        }

        @Override
        public void run() {
            logger.debug("Poll worker is starting...");
            this.start();
            try {
                logger.debug("Start poll worker loop");
                while (!Thread.interrupted()) {
                    byte[] data = HussarStandaloneMQConsumer.this.queue.poll(HussarStandaloneMQConsumer.this.name);
                    this.handle(data);
                }
            }
            catch (Throwable ex) {
                Throwable throwable;
                if (Thread.interrupted()) {
                    logger.debug("Cleared thread interrupted flag");
                }
                if ((throwable = this.convertInterruption(ex)) instanceof InterruptedException) {
                    logger.debug("Poll worker is terminated by interruption");
                } else {
                    logger.error("Unexpected error occurred in poll worker", ex);
                }
            }
            finally {
                this.boundedThread = null;
                this.stopSignal.countDown();
            }
        }

        private void handle(byte[] data) throws InterruptedException {
            try {
                logger.debug("Deserialize incoming message");
                Object message = HussarStandaloneMQConsumer.this.objectMapper.readValue(data, this.messageJavaType);
                logger.debug("Message listener handle incoming message");
                HussarStandaloneMQConsumer.this.messageListener.onMessage(message);
            }
            catch (Throwable ex) {
                Throwable throwable = this.convertInterruption(ex);
                if (throwable instanceof InterruptedException) {
                    throw (InterruptedException)throwable;
                }
                logger.error("Failed to handle message ({} bytes) from {}/{}", new Object[]{data != null ? data.length : 0, HussarStandaloneMQConsumer.this.key, HussarStandaloneMQConsumer.this.name, ex});
            }
        }

        private synchronized void start() {
            this.boundedThread = Thread.currentThread();
            this.startSignal.countDown();
        }

        public void waitStart() {
            logger.debug("Waiting for poll worker starting...");
            try {
                boolean done = this.startSignal.await(3000L, TimeUnit.MILLISECONDS);
                if (!done) {
                    throw new IllegalStateException("timeout while waiting for poll worker start: " + (Object)((Object)HussarStandaloneMQConsumer.this));
                }
            }
            catch (InterruptedException ex) {
                throw new IllegalStateException("interrupted while waiting for start: " + (Object)((Object)HussarStandaloneMQConsumer.this), ex);
            }
        }

        public synchronized void stop() {
            logger.debug("Poll worker is stopping...");
            if (this.boundedThread != null) {
                this.boundedThread.interrupt();
                logger.debug("Waiting for poll worker stopping...");
                this.waitStop();
            }
        }

        private void waitStop() {
            try {
                boolean done = this.stopSignal.await(3000L, TimeUnit.MILLISECONDS);
                if (!done) {
                    throw new IllegalStateException("timeout while waiting for poll worker stop: " + (Object)((Object)HussarStandaloneMQConsumer.this));
                }
            }
            catch (InterruptedException ex) {
                throw new IllegalStateException("interrupted while waiting for stop: " + (Object)((Object)HussarStandaloneMQConsumer.this), ex);
            }
        }

        private Throwable convertInterruption(Throwable throwable) {
            if (throwable instanceof UncheckedInterruptedException) {
                throwable = throwable.getCause();
            }
            if (throwable instanceof InterruptedException) {
                return throwable;
            }
            if (throwable instanceof InterruptedIOException || throwable instanceof ClosedByInterruptException) {
                InterruptedException ex = new InterruptedException("io interruption");
                ex.initCause(throwable);
                return ex;
            }
            Throwable cause = ExceptionUtils.getRootCause((Throwable)throwable);
            if (cause instanceof InterruptedException || cause instanceof InterruptedIOException || cause instanceof ClosedByInterruptException) {
                InterruptedException ex = new InterruptedException("nested interruption");
                ex.initCause(throwable);
                return ex;
            }
            return throwable;
        }
    }
}

