/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms;

import jakarta.jms.IllegalStateException;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageFormatException;
import jakarta.jms.MessageListener;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpid.jms.JmsAcknowledgeCallback;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.JmsMessageAvailableConsumer;
import org.apache.qpid.jms.JmsMessageAvailableListener;
import org.apache.qpid.jms.JmsMessageDispatcher;
import org.apache.qpid.jms.JmsSession;
import org.apache.qpid.jms.JmsTemporaryDestination;
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsMessageSupport;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.meta.JmsResource;
import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderException;
import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.provider.ProviderSynchronization;
import org.apache.qpid.jms.tracing.JmsTracer;
import org.apache.qpid.jms.util.FifoMessageQueue;
import org.apache.qpid.jms.util.MessageQueue;
import org.apache.qpid.jms.util.PriorityMessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsMessageConsumer
implements AutoCloseable,
MessageConsumer,
JmsMessageAvailableConsumer,
JmsMessageDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumer.class);
    protected final JmsSession session;
    protected final JmsConnection connection;
    protected JmsConsumerInfo consumerInfo;
    protected final int acknowledgementMode;
    protected final AtomicBoolean closed = new AtomicBoolean();
    protected volatile MessageListener messageListener;
    protected volatile JmsMessageAvailableListener availableListener;
    protected final MessageQueue messageQueue;
    protected final Lock lock = new ReentrantLock();
    protected final ReentrantLock dispatchLock = new ReentrantLock();
    protected final AtomicReference<Throwable> failureCause = new AtomicReference();
    protected final MessageDeliverTask deliveryTask = new MessageDeliverTask();
    protected final JmsTracer tracer;
    protected final String address;

    protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination, String selector, boolean noLocal) throws JMSException {
        this(consumerId, session, destination, null, selector, noLocal);
    }

    protected JmsMessageConsumer(JmsConsumerId consumerId, final JmsSession session, JmsDestination destination, String name, String selector, boolean noLocal) throws JMSException {
        this.session = session;
        this.connection = session.getConnection();
        this.tracer = this.connection.getTracer();
        this.address = destination.getAddress();
        int n = this.acknowledgementMode = this.isBrowser() ? 1 : session.acknowledgementMode();
        if (destination.isTemporary()) {
            this.connection.checkConsumeFromTemporaryDestination((JmsTemporaryDestination)destination);
        }
        JmsPrefetchPolicy prefetchPolicy = session.getPrefetchPolicy();
        JmsRedeliveryPolicy redeliveryPolicy = session.getRedeliveryPolicy().copy();
        JmsDeserializationPolicy deserializationPolicy = session.getDeserializationPolicy().copy();
        int configuredPrefetch = prefetchPolicy.getConfiguredPrefetch(session, destination, this.isDurableSubscription(), this.isBrowser());
        this.messageQueue = this.connection.isLocalMessagePriority() ? new PriorityMessageQueue() : new FifoMessageQueue(configuredPrefetch);
        this.consumerInfo = new JmsConsumerInfo(consumerId, this);
        this.consumerInfo.setExplicitClientID(this.connection.isExplicitClientID());
        this.consumerInfo.setSelector(selector);
        this.consumerInfo.setDurable(this.isDurableSubscription());
        this.consumerInfo.setSubscriptionName(name);
        this.consumerInfo.setShared(this.isSharedSubscription());
        this.consumerInfo.setDestination(destination);
        this.consumerInfo.setAcknowledgementMode(this.acknowledgementMode);
        this.consumerInfo.setNoLocal(noLocal);
        this.consumerInfo.setBrowser(this.isBrowser());
        this.consumerInfo.setPrefetchSize(configuredPrefetch);
        this.consumerInfo.setRedeliveryPolicy(redeliveryPolicy);
        this.consumerInfo.setLocalMessageExpiry(this.connection.isLocalMessageExpiry());
        this.consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(session, destination));
        this.consumerInfo.setDeserializationPolicy(deserializationPolicy);
        session.getConnection().createResource(this.consumerInfo, new ProviderSynchronization(){

            @Override
            public void onPendingSuccess() {
                session.add(JmsMessageConsumer.this);
            }

            @Override
            public void onPendingFailure(ProviderException cause) {
            }
        });
        if (session.isStarted()) {
            this.start();
        }
    }

    public void init() throws JMSException {
        if (!this.isPullConsumer()) {
            this.startConsumerResource();
        }
    }

    private void startConsumerResource() throws JMSException {
        try {
            this.session.getConnection().startResource(this.consumerInfo);
        }
        catch (JMSException ex) {
            this.session.remove(this);
            throw ex;
        }
    }

    @Override
    public void close() throws JMSException {
        if (!this.closed.get()) {
            this.doClose();
        }
    }

    protected void doClose() throws JMSException {
        this.shutdown();
        try {
            this.connection.destroyResource(this.consumerInfo);
        }
        catch (JmsConnectionFailedException jmsConnectionFailedException) {
            // empty catch block
        }
    }

    protected void shutdown() throws JMSException {
        this.shutdown(null);
    }

    protected void shutdown(Throwable cause) throws JMSException {
        if (this.closed.compareAndSet(false, true)) {
            this.consumerInfo.setState(JmsResource.ResourceState.CLOSED);
            this.setFailureCause(cause);
            this.session.remove(this);
            this.stop(true);
        }
    }

    public Message receive() throws JMSException {
        return this.receive(0L);
    }

    public Message receive(long timeout) throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        if (timeout == 0L) {
            timeout = -1L;
        }
        return this.copy(this.ackFromReceive(this.dequeue(timeout, this.connection.isReceiveLocalOnly())));
    }

    public Message receiveNoWait() throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        return this.copy(this.ackFromReceive(this.dequeue(0L, this.connection.isReceiveNoWaitLocalOnly())));
    }

    public <T> T receiveBody(Class<T> desired, long timeout) throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        T messageBody = null;
        JmsInboundMessageDispatch envelope = null;
        try {
            envelope = this.dequeue(timeout, this.connection.isReceiveLocalOnly());
            if (envelope != null) {
                messageBody = envelope.getMessage().getBody(desired);
            }
        }
        catch (MessageFormatException mfe) {
            if (this.acknowledgementMode == 1 || this.acknowledgementMode == 3) {
                envelope.setEnqueueFirst(true);
                this.onInboundMessage(envelope);
                envelope = null;
            }
            throw mfe;
        }
        finally {
            if (envelope != null) {
                this.ackFromReceive(envelope);
            }
        }
        return messageBody;
    }

    private JmsInboundMessageDispatch dequeue(long timeout, boolean localCheckOnly) throws JMSException {
        boolean pullConsumer;
        boolean pullForced = pullConsumer = this.isPullConsumer();
        try {
            JmsMessageFacade facade;
            JmsInboundMessageDispatch envelope;
            long deadline = 0L;
            if (timeout > 0L) {
                deadline = System.currentTimeMillis() + timeout;
            }
            this.performPullIfRequired(timeout, false);
            while (true) {
                envelope = null;
                envelope = pullForced || pullConsumer ? this.messageQueue.dequeue(0L) : this.messageQueue.dequeue(timeout);
                if (this.getFailureCause() != null) {
                    LOG.debug("{} receive failed: {}", (Object)this.getConsumerId(), (Object)this.getFailureCause().getMessage());
                    throw JmsExceptionSupport.create(this.getFailureCause());
                }
                if (envelope == null) {
                    if (timeout == 0L && (pullForced || localCheckOnly) || pullConsumer || this.messageQueue.isClosed()) {
                        return null;
                    }
                    if (timeout > 0L) {
                        timeout = Math.max(deadline - System.currentTimeMillis(), 0L);
                    }
                    if (timeout < 0L || localCheckOnly) continue;
                    pullForced = true;
                    if (!this.performPullIfRequired(timeout, true)) continue;
                    this.startConsumerResource();
                    continue;
                }
                facade = envelope.getMessage().getFacade();
                if (this.consumeExpiredMessage(envelope)) {
                    LOG.trace("{} filtered expired message: {}", (Object)this.getConsumerId(), (Object)envelope);
                    this.doAckExpired(envelope);
                    this.tracer.syncReceive(facade, this.address, JmsTracer.DeliveryOutcome.EXPIRED);
                    if (timeout > 0L) {
                        timeout = Math.max(deadline - System.currentTimeMillis(), 0L);
                    }
                    this.performPullIfRequired(timeout, false);
                    continue;
                }
                if (!this.session.redeliveryExceeded(envelope)) break;
                LOG.debug("{} filtered message with excessive redelivery count: {}", (Object)this.getConsumerId(), (Object)envelope);
                this.applyRedeliveryPolicyOutcome(envelope);
                this.tracer.syncReceive(facade, this.address, JmsTracer.DeliveryOutcome.REDELIVERIES_EXCEEDED);
                if (timeout > 0L) {
                    timeout = Math.max(deadline - System.currentTimeMillis(), 0L);
                }
                this.performPullIfRequired(timeout, false);
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace(this.getConsumerId() + " received message: " + envelope);
            }
            this.tracer.syncReceive(facade, this.address, JmsTracer.DeliveryOutcome.DELIVERED);
            return envelope;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw JmsExceptionSupport.create(e);
        }
    }

    private boolean consumeExpiredMessage(JmsInboundMessageDispatch dispatch) {
        return !this.isBrowser() && this.consumerInfo.isLocalMessageExpiry() && dispatch.getMessage().isExpired();
    }

    protected void checkClosed() throws IllegalStateException {
        if (this.closed.get()) {
            IllegalStateException jmsEx = null;
            if (this.getFailureCause() == null) {
                jmsEx = new IllegalStateException("The MessageConsumer is closed");
            } else {
                jmsEx = new IllegalStateException("The MessageConsumer was closed due to an unrecoverable error.");
                jmsEx.initCause(this.getFailureCause());
            }
            throw jmsEx;
        }
    }

    void setFailureCause(Throwable failureCause) {
        this.failureCause.set(failureCause);
    }

    Throwable getFailureCause() {
        if (this.failureCause.get() == null) {
            return this.session.getFailureCause();
        }
        return this.failureCause.get();
    }

    JmsMessage copy(JmsInboundMessageDispatch envelope) throws JMSException {
        if (envelope == null || envelope.getMessage() == null) {
            return null;
        }
        try {
            return envelope.getMessage().copy();
        }
        catch (Exception ex) {
            this.session.acknowledge(envelope, ProviderConstants.ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
            throw ex;
        }
    }

    JmsInboundMessageDispatch ackFromReceive(JmsInboundMessageDispatch envelope) throws JMSException {
        if (envelope != null && envelope.getMessage() != null) {
            JmsMessage message = envelope.getMessage();
            if (message.getAcknowledgeCallback() != null) {
                this.doAckDelivered(envelope);
            } else {
                this.doAckConsumed(envelope);
            }
        }
        return envelope;
    }

    private JmsInboundMessageDispatch doAckConsumed(JmsInboundMessageDispatch envelope) throws JMSException {
        try {
            this.session.acknowledge(envelope, ProviderConstants.ACK_TYPE.ACCEPTED);
        }
        catch (JMSException ex) {
            this.signalExceptionListener((Exception)((Object)ex));
            throw ex;
        }
        return envelope;
    }

    private JmsInboundMessageDispatch doAckDelivered(JmsInboundMessageDispatch envelope) throws JMSException {
        try {
            this.session.acknowledge(envelope, ProviderConstants.ACK_TYPE.DELIVERED);
        }
        catch (JMSException ex) {
            this.signalExceptionListener((Exception)((Object)ex));
            throw ex;
        }
        return envelope;
    }

    private void doAckExpired(JmsInboundMessageDispatch envelope) throws JMSException {
        try {
            this.session.acknowledge(envelope, ProviderConstants.ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
        }
        catch (JMSException ex) {
            this.signalExceptionListener((Exception)((Object)ex));
            throw ex;
        }
    }

    private void applyRedeliveryPolicyOutcome(JmsInboundMessageDispatch envelope) throws JMSException {
        try {
            JmsRedeliveryPolicy redeliveryPolicy = this.consumerInfo.getRedeliveryPolicy();
            this.session.acknowledge(envelope, JmsMessageSupport.lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(this.getDestination())));
        }
        catch (JMSException ex) {
            this.signalExceptionListener((Exception)((Object)ex));
            throw ex;
        }
    }

    private void doAckReleased(JmsInboundMessageDispatch envelope) throws JMSException {
        try {
            this.session.acknowledge(envelope, ProviderConstants.ACK_TYPE.RELEASED);
        }
        catch (JMSException ex) {
            this.signalExceptionListener((Exception)((Object)ex));
            throw ex;
        }
    }

    @Override
    public void onInboundMessage(JmsInboundMessageDispatch envelope) {
        envelope.setConsumerInfo(this.consumerInfo);
        this.lock.lock();
        try {
            if (this.acknowledgementMode == 2) {
                envelope.getMessage().setAcknowledgeCallback(new JmsAcknowledgeCallback(this.session));
            } else if (this.session.isIndividualAcknowledge()) {
                envelope.getMessage().setAcknowledgeCallback(new JmsAcknowledgeCallback(this.session, envelope));
            }
            if (envelope.isEnqueueFirst()) {
                this.messageQueue.enqueueFirst(envelope);
            } else {
                this.messageQueue.enqueue(envelope);
            }
            if (this.session.isStarted() && this.messageQueue.isRunning()) {
                if (this.messageListener != null) {
                    this.session.getDispatcherExecutor().execute(this.deliveryTask);
                } else if (this.availableListener != null) {
                    this.session.getDispatcherExecutor().execute(new Runnable(){

                        @Override
                        public void run() {
                            if (JmsMessageConsumer.this.messageQueue.isRunning()) {
                                JmsMessageConsumer.this.availableListener.onMessageAvailable(JmsMessageConsumer.this);
                            }
                        }
                    });
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public void start() {
        this.lock.lock();
        try {
            if (!this.messageQueue.isRunning()) {
                this.messageQueue.start();
                this.drainMessageQueueToListener();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public void stop() {
        this.stop(false);
    }

    private void stop(boolean closeMessageQueue) {
        this.dispatchLock.lock();
        this.lock.lock();
        try {
            if (closeMessageQueue) {
                this.messageQueue.close();
            } else {
                this.messageQueue.stop();
            }
        }
        finally {
            this.lock.unlock();
            this.dispatchLock.unlock();
        }
    }

    void suspendForRollback() throws JMSException {
        this.stop();
        try {
            this.session.getConnection().stopResource(this.consumerInfo);
        }
        finally {
            if (this.session.getTransactionContext().isActiveInThisContext(this.getConsumerId())) {
                this.messageQueue.clear();
            }
        }
    }

    void resumeAfterRollback() throws JMSException {
        this.start();
        this.startConsumerResource();
    }

    public JmsConsumerId getConsumerId() {
        return this.consumerInfo.getId();
    }

    public JmsDestination getDestination() {
        return this.consumerInfo.getDestination();
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkClosed();
        return this.messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.checkClosed();
        this.dispatchLock.lock();
        try {
            this.messageListener = listener;
            this.consumerInfo.setListener(listener != null);
            if (listener != null) {
                if (this.isPullConsumer()) {
                    this.startConsumerResource();
                }
                this.drainMessageQueueToListener();
            }
        }
        finally {
            this.dispatchLock.unlock();
        }
    }

    public String getMessageSelector() throws JMSException {
        this.checkClosed();
        return this.consumerInfo.getSelector();
    }

    public int getPrefetchSize() {
        return this.consumerInfo.getPrefetchSize();
    }

    protected void checkMessageListener() throws JMSException {
        this.session.checkMessageListener();
    }

    boolean hasMessageListener() {
        return this.messageListener != null;
    }

    boolean isUsingDestination(JmsDestination destination) {
        return this.consumerInfo.getDestination().equals(destination);
    }

    protected int getMessageQueueSize() {
        return this.messageQueue.size();
    }

    protected boolean isNoLocal() {
        return this.consumerInfo.isNoLocal();
    }

    public boolean isDurableSubscription() {
        return false;
    }

    public boolean isSharedSubscription() {
        return false;
    }

    public boolean isBrowser() {
        return false;
    }

    public boolean isPullConsumer() {
        return this.getPrefetchSize() == 0;
    }

    @Override
    public void setAvailableListener(JmsMessageAvailableListener availableListener) {
        this.availableListener = availableListener;
    }

    @Override
    public JmsMessageAvailableListener getAvailableListener() {
        return this.availableListener;
    }

    protected void onConnectionInterrupted() {
        this.messageQueue.clear();
    }

    protected void onConnectionRecovery(Provider provider) throws Exception {
        if (!this.consumerInfo.isClosed()) {
            ProviderFuture request = provider.newProviderFuture();
            try {
                provider.create(this.consumerInfo, request);
                request.sync();
            }
            catch (ProviderException poe) {
                if (this.connection.isCloseLinksThatFailOnReconnect()) {
                    this.session.consumerClosed(this.consumerInfo, poe);
                }
                throw poe;
            }
        }
    }

    protected void onConnectionRecovered(Provider provider) throws Exception {
        if (!this.consumerInfo.isClosed()) {
            ProviderFuture request = provider.newProviderFuture();
            provider.start(this.consumerInfo, request);
            request.sync();
        }
    }

    protected void onConnectionRestored() {
    }

    protected boolean performPullIfRequired(long timeout, boolean treatAsPullConsumer) throws JMSException {
        if ((this.isPullConsumer() || treatAsPullConsumer) && this.messageQueue.isRunning() && this.messageQueue.isEmpty()) {
            this.connection.pull(this.getConsumerId(), timeout);
            return true;
        }
        return false;
    }

    private final void signalExceptionListener(Exception ex) {
        boolean reclaimLock = false;
        if (this.dispatchLock.isHeldByCurrentThread()) {
            reclaimLock = true;
            this.session.setDeliveryThreadCheckEnabled(false);
            this.dispatchLock.unlock();
        }
        try {
            this.session.onException(ex);
        }
        finally {
            if (reclaimLock) {
                this.dispatchLock.lock();
                this.session.setDeliveryThreadCheckEnabled(true);
            }
        }
    }

    private void drainMessageQueueToListener() {
        if (this.messageListener != null && this.session.isStarted() && this.messageQueue.isRunning()) {
            this.session.getDispatcherExecutor().execute(new BoundedMessageDeliverTask(this.messageQueue.size()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean deliverNextPending() {
        block28: {
            if (this.session.isStarted() && this.messageQueue.isRunning() && this.messageListener != null) {
                this.dispatchLock.lock();
                try {
                    JmsInboundMessageDispatch envelope = this.messageQueue.dequeueNoWait();
                    if (envelope == null) {
                        boolean bl = false;
                        return bl;
                    }
                    JmsMessageFacade facade = envelope.getMessage().getFacade();
                    if (this.consumeExpiredMessage(envelope)) {
                        LOG.trace("{} filtered expired message: {}", (Object)this.getConsumerId(), (Object)envelope);
                        this.doAckExpired(envelope);
                        this.tracer.asyncDeliveryInit(facade, this.address);
                        this.tracer.asyncDeliveryComplete(facade, JmsTracer.DeliveryOutcome.EXPIRED, null);
                        break block28;
                    }
                    if (this.session.redeliveryExceeded(envelope)) {
                        LOG.trace("{} filtered message with excessive redelivery count: {}", (Object)this.getConsumerId(), (Object)envelope);
                        this.applyRedeliveryPolicyOutcome(envelope);
                        this.tracer.asyncDeliveryInit(facade, this.address);
                        this.tracer.asyncDeliveryComplete(facade, JmsTracer.DeliveryOutcome.REDELIVERIES_EXCEEDED, null);
                        break block28;
                    }
                    boolean deliveryFailed = false;
                    boolean autoAckOrDupsOk = this.acknowledgementMode == 1 || this.acknowledgementMode == 3;
                    JmsMessage copy = autoAckOrDupsOk ? this.copy(this.doAckDelivered(envelope)) : this.copy(this.ackFromReceive(envelope));
                    this.session.clearSessionRecovered();
                    try {
                        this.tracer.asyncDeliveryInit(facade, this.address);
                        this.messageListener.onMessage((Message)copy);
                    }
                    catch (RuntimeException rte) {
                        deliveryFailed = true;
                        this.tracer.asyncDeliveryComplete(facade, JmsTracer.DeliveryOutcome.APPLICATION_ERROR, rte);
                    }
                    finally {
                        if (!deliveryFailed) {
                            this.tracer.asyncDeliveryComplete(facade, JmsTracer.DeliveryOutcome.DELIVERED, null);
                        }
                    }
                    if (autoAckOrDupsOk && !this.session.isSessionRecovered()) {
                        if (!deliveryFailed) {
                            this.doAckConsumed(envelope);
                        } else {
                            this.doAckReleased(envelope);
                        }
                    }
                }
                catch (Exception e) {
                    this.signalExceptionListener(e);
                }
                finally {
                    this.dispatchLock.unlock();
                    if (this.isPullConsumer()) {
                        try {
                            this.startConsumerResource();
                        }
                        catch (JMSException e) {
                            LOG.error("Exception during credit replenishment for consumer listener {}", (Object)this.getConsumerId(), (Object)e);
                        }
                    }
                }
            }
        }
        return !this.messageQueue.isEmpty();
    }

    private final class MessageDeliverTask
    implements Runnable {
        private MessageDeliverTask() {
        }

        @Override
        public void run() {
            JmsMessageConsumer.this.deliverNextPending();
        }
    }

    private final class BoundedMessageDeliverTask
    implements Runnable {
        private final int deliveryCount;

        public BoundedMessageDeliverTask(int deliveryCount) {
            this.deliveryCount = deliveryCount;
        }

        @Override
        public void run() {
            int current = 0;
            while (JmsMessageConsumer.this.session.isStarted() && JmsMessageConsumer.this.messageQueue.isRunning() && current++ < this.deliveryCount) {
                if (JmsMessageConsumer.this.deliverNextPending()) continue;
                return;
            }
        }
    }
}

