/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeReturnType;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyByGroupService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeThreadExecutor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MessageQueueGroupLock;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.QueuePair;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.Pair;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueueGroup;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.aliyun.openservices.shade.com.google.common.base.Objects;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;

public class ConsumeRequest
implements Runnable {
    private InternalLogger log = ClientLogger.getLog();
    private static final long MAX_TIME_CONSUME_CONTINUOUSLY = Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "5000"));
    private final MessageQueueGroup messageQueueGroup;
    private final ConsumeThreadExecutor consumeThreadExecutor;
    private final ConsumeMessageOrderlyByGroupService cs;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final MessageListenerOrderly messageListener;
    private final String consumerGroup;
    private final MessageQueueGroupLock consumeRequestLock;
    private final MessageQueueGroupLock shardingKeyLock;
    private final LinkedBlockingDeque<Pair<QueuePair, Integer>> queueToConsume = new LinkedBlockingDeque();
    private int shardingKeyIndex;

    public ConsumeRequest(MessageQueueGroup messageQueueGroup, ConsumeThreadExecutor consumeThreadExecutor, ConsumeMessageOrderlyByGroupService cs) {
        this(messageQueueGroup, consumeThreadExecutor, cs, 0);
    }

    public ConsumeRequest(MessageQueueGroup messageQueueGroup, ConsumeThreadExecutor consumeThreadExecutor, ConsumeMessageOrderlyByGroupService cs, int shardingKeyIndex) {
        this.messageQueueGroup = messageQueueGroup;
        this.consumeThreadExecutor = consumeThreadExecutor;
        this.cs = cs;
        this.defaultMQPushConsumer = cs.getDefaultMQPushConsumer();
        this.defaultMQPushConsumerImpl = cs.getDefaultMQPushConsumerImpl();
        this.messageListener = cs.getMessageListener();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.shardingKeyIndex = shardingKeyIndex;
        this.consumeRequestLock = consumeThreadExecutor.getConsumeRequestLock();
        this.shardingKeyLock = consumeThreadExecutor.getShardingKeyLock();
    }

    public LinkedBlockingDeque<Pair<QueuePair, Integer>> getQueueToConsume() {
        return this.queueToConsume;
    }

    public int getShardingKeyIndex() {
        return this.shardingKeyIndex;
    }

    public MessageQueueGroup getMessageQueueGroup() {
        return this.messageQueueGroup;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            Object objLock;
            Object object = objLock = this.shardingKeyLock.fetchLockObject(this.messageQueueGroup, this.shardingKeyIndex);
            synchronized (object) {
                boolean continueConsume = true;
                long beginTime = System.currentTimeMillis();
                int invokeCnt = 0;
                while (continueConsume) {
                    Object lock;
                    long interval = System.currentTimeMillis() - beginTime;
                    if (invokeCnt > 0 && interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                        this.consumeThreadExecutor.submit(this, null, true);
                        break;
                    }
                    Pair<QueuePair, Integer> queuePair = this.queueToConsume.poll();
                    Object object2 = lock = this.consumeRequestLock.fetchLockObject(this.messageQueueGroup, this.shardingKeyIndex);
                    synchronized (object2) {
                        if (queuePair == null && (queuePair = this.queueToConsume.poll()) == null) {
                            this.consumeThreadExecutor.remove(this);
                            return;
                        }
                    }
                    MessageQueue messageQueue = queuePair.getObject1().getMessageQueue();
                    ProcessQueue processQueue = queuePair.getObject1().getProcessQueue();
                    int consumeBatchSize = queuePair.getObject2();
                    for (int i = 0; i < consumeBatchSize && continueConsume; ++i) {
                        if (processQueue.isDropped()) {
                            this.log.warn("the message queue not be able to consume, because it's dropped. {}", (Object)messageQueue);
                            this.consumeThreadExecutor.remove(this);
                            continueConsume = false;
                            break;
                        }
                        if (MessageModel.CLUSTERING.equals((Object)this.defaultMQPushConsumerImpl.messageModel()) && !processQueue.isLocked()) {
                            this.log.warn("the message queue not locked, so consume later, {}", (Object)messageQueue);
                            queuePair.setObject2(consumeBatchSize - i);
                            this.queueToConsume.addFirst(queuePair);
                            this.consumeThreadExecutor.tryLockLaterAndConsumeAgain(this, messageQueue, 10L);
                            continueConsume = false;
                            break;
                        }
                        if (MessageModel.CLUSTERING.equals((Object)this.defaultMQPushConsumerImpl.messageModel()) && processQueue.isLockExpired()) {
                            this.log.warn("the message queue lock expired, so consume later, {}", (Object)messageQueue);
                            queuePair.setObject2(consumeBatchSize - i);
                            this.queueToConsume.addFirst(queuePair);
                            this.consumeThreadExecutor.tryLockLaterAndConsumeAgain(this, messageQueue, 10L);
                            continueConsume = false;
                            break;
                        }
                        int consumeBatchMaxSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                        List<MessageExt> msgs = processQueue.takeMessagesByShardingKeyIndex(this.shardingKeyIndex, consumeBatchMaxSize);
                        this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.defaultMQPushConsumer.getConsumerGroup());
                        ConsumeOrderlyStatus status = null;
                        ConsumeMessageContext consumeMessageContext = null;
                        if (this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext = new ConsumeMessageContext();
                            consumeMessageContext.setConsumerGroup(this.consumerGroup);
                            consumeMessageContext.setNamespace(this.defaultMQPushConsumer.getNamespace());
                            consumeMessageContext.setMq(messageQueue);
                            consumeMessageContext.setMsgList(msgs);
                            consumeMessageContext.setSuccess(false);
                            consumeMessageContext.setProps(new HashMap<String, String>());
                            this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                        }
                        boolean hasException = false;
                        long beginTimestamp = System.currentTimeMillis();
                        ConsumeOrderlyContext context = new ConsumeOrderlyContext(messageQueue);
                        try {
                            processQueue.getLockConsume().readLock().lock();
                            try {
                                if (processQueue.isDropped()) {
                                    this.log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", (Object)messageQueue);
                                    this.consumeThreadExecutor.remove(this);
                                    break;
                                }
                                status = this.messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            }
                            catch (Throwable e) {
                                this.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), this.consumerGroup, msgs, messageQueue);
                                hasException = true;
                            }
                            finally {
                                processQueue.getLockConsume().readLock().unlock();
                            }
                        }
                        catch (Throwable e) {
                            hasException = true;
                        }
                        if (null == status || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            this.log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", this.consumerGroup, msgs, messageQueue);
                        }
                        long consumeRT = System.currentTimeMillis() - beginTimestamp;
                        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                        if (null == status) {
                            returnType = hasException ? ConsumeReturnType.EXCEPTION : ConsumeReturnType.RETURNNULL;
                        } else if (consumeRT >= this.defaultMQPushConsumer.getConsumeTimeout() * 60L * 1000L) {
                            returnType = ConsumeReturnType.TIME_OUT;
                        } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            returnType = ConsumeReturnType.FAILED;
                        } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                            returnType = ConsumeReturnType.SUCCESS;
                        }
                        if (this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.getProps().put("ConsumeContextType", returnType.name());
                        }
                        if (null == status) {
                            status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                        }
                        if (this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.setStatus(status.toString());
                            consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                            this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                        }
                        this.cs.getConsumerStatsManager().incConsumeRT(this.consumerGroup, messageQueue.getTopic(), consumeRT);
                        continueConsume = this.cs.processConsumeResult(msgs, status, context, this, messageQueue, processQueue, queuePair, i);
                    }
                    ++invokeCnt;
                }
            }
        }
        catch (Exception e) {
            this.log.error("Run consume request exception", e);
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        ConsumeRequest other = (ConsumeRequest)o;
        return this.shardingKeyIndex == other.shardingKeyIndex && Objects.equal(this.messageQueueGroup, other.messageQueueGroup);
    }

    public int hashCode() {
        return Objects.hashCode(this.messageQueueGroup, this.shardingKeyIndex);
    }
}

