/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyByGroupService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MergeRequest;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MessageQueueGroupLock;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueueGroup;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.QueueGroup;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.com.google.common.base.Objects;
import com.aliyun.openservices.shade.io.netty.util.internal.ConcurrentSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MergeThreadExecutor {
    private static final InternalLogger LOG = ClientLogger.getLog();
    private static final long MAX_TIME_CONSUME_CONTINUOUSLY = Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "5000"));
    private final ThreadPoolExecutor threadPoolExecutor;
    private final MessageQueueGroupLock queueGroupLock = new MessageQueueGroupLock();
    private final ConsumeMessageOrderlyByGroupService cs;
    private ConcurrentSet<ContinuouslyMergeRequest> mergeRequestSet = new ConcurrentSet();
    private int mergeThreadMin = 10;
    private int mergeThreadMax = 32;

    public MergeThreadExecutor(ConsumeMessageOrderlyByGroupService cs) {
        this.cs = cs;
        this.threadPoolExecutor = new ThreadPoolExecutor(this.mergeThreadMin, this.mergeThreadMax, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("MergeMessageThread_"));
    }

    public void shutdown() {
        this.threadPoolExecutor.shutdown();
    }

    public void setCorePoolSize(int corePoolSize) {
        corePoolSize = Math.max(this.mergeThreadMin, corePoolSize);
        this.threadPoolExecutor.setCorePoolSize(corePoolSize);
    }

    public int getCorePoolSize() {
        return this.threadPoolExecutor.getCorePoolSize();
    }

    public MessageQueueGroupLock getQueueGroupLock() {
        return this.queueGroupLock;
    }

    public void remove(ContinuouslyMergeRequest continuouslyMergeRequest) {
        this.mergeRequestSet.remove(continuouslyMergeRequest);
    }

    public void submit(MergeRequest mergeRequest, boolean force) {
        mergeRequest.setInterrupted(false);
        mergeRequest.setInterruptCode(null);
        ContinuouslyMergeRequest continuouslyMergeRequest = new ContinuouslyMergeRequest(mergeRequest);
        boolean isNewReq = this.mergeRequestSet.add(continuouslyMergeRequest);
        if (force || isNewReq) {
            try {
                this.threadPoolExecutor.submit(continuouslyMergeRequest);
            }
            catch (Exception e) {
                LOG.error("error submit merge request: {}, mq: {}", (Object)e.toString(), (Object)mergeRequest.getQueueGroup().getMessageQueueGroup());
            }
        }
    }

    public void submitLater(final MergeRequest mergeRequest, long suspendTimeMillis) {
        long timeMillis = suspendTimeMillis;
        if (timeMillis == -1L) {
            timeMillis = this.cs.getDefaultMQPushConsumer().getSuspendCurrentQueueTimeMillis();
        }
        if (timeMillis > 30000L) {
            timeMillis = 30000L;
        }
        this.cs.getScheduledExecutorService().schedule(new Runnable(){

            @Override
            public void run() {
                MergeThreadExecutor.this.submit(mergeRequest, true);
            }
        }, timeMillis, TimeUnit.MILLISECONDS);
    }

    public void tryLockLaterAndMergeAgain(final MergeRequest mergeRequest, long delayMillis) {
        this.cs.getScheduledExecutorService().schedule(new Runnable(){

            @Override
            public void run() {
                boolean lockOK = MergeThreadExecutor.this.cs.lockMQGroup(mergeRequest.getQueueGroup().getMessageQueueGroup());
                if (lockOK) {
                    MergeThreadExecutor.this.submitLater(mergeRequest, 10L);
                } else {
                    MergeThreadExecutor.this.submitLater(mergeRequest, 3000L);
                }
            }
        }, delayMillis, TimeUnit.MILLISECONDS);
    }

    public void updateLaterAndMergeAgain(final MergeRequest mergeRequest, long delayMillis) {
        this.cs.getScheduledExecutorService().schedule(new Runnable(){

            @Override
            public void run() {
                ((MergeThreadExecutor)MergeThreadExecutor.this).cs.defaultMQPushConsumerImpl.getmQClientFactory().updateTopicRouteInfoFromNameServer(mergeRequest.getQueueGroup().getTopic());
                boolean rebalanced = ((MergeThreadExecutor)MergeThreadExecutor.this).cs.defaultMQPushConsumerImpl.doRebalance();
                QueueGroup oldQueueGroup = mergeRequest.getQueueGroup();
                mergeRequest.setQueueGroup(MergeThreadExecutor.this.cs.getQueueGroupMap().get(oldQueueGroup.getTopic()).get(oldQueueGroup.getQueueGroupId()));
                if (rebalanced) {
                    MergeThreadExecutor.this.submitLater(mergeRequest, 10L);
                } else {
                    MergeThreadExecutor.this.submitLater(mergeRequest, 3000L);
                }
            }
        }, delayMillis, TimeUnit.MILLISECONDS);
    }

    private class ContinuouslyMergeRequest
    implements Runnable {
        private MergeRequest mergeRequest;

        public ContinuouslyMergeRequest(MergeRequest mergeRequest) {
            this.mergeRequest = mergeRequest;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block20: {
                try {
                    if (this.mergeRequest.getQueueGroup().getQueuePairs() == null) {
                        MergeThreadExecutor.this.remove(this);
                        return;
                    }
                    ProcessQueueGroup.ProcessQueueGroupStatus pqGroupStatus = this.mergeRequest.getQueueGroup().getProcessQueueGroup().getProcessQueueStatus();
                    if (pqGroupStatus == ProcessQueueGroup.ProcessQueueGroupStatus.ALL_DROPPED) {
                        LOG.warn("run, the message queue group not be able to consume, because it's dropped. {}", (Object)this.mergeRequest.getQueueGroup().getMessageQueueGroup());
                        MergeThreadExecutor.this.remove(this);
                        return;
                    }
                    if (pqGroupStatus == ProcessQueueGroup.ProcessQueueGroupStatus.PARTIALLY_DROPPED) {
                        LOG.warn("run, the message queue group not be able to consume, because some of its message queues are dropped. {}", (Object)this.mergeRequest.getQueueGroup().getMessageQueueGroup());
                        MergeThreadExecutor.this.updateLaterAndMergeAgain(this.mergeRequest, 1000L);
                        return;
                    }
                    Object objLock = MergeThreadExecutor.this.queueGroupLock.fetchLockObject(this.mergeRequest.getQueueGroup().getMessageQueueGroup());
                    long beginTime = System.currentTimeMillis();
                    int invokeCnt = 0;
                    while (!this.mergeRequest.isInterrupted()) {
                        long interval = System.currentTimeMillis() - beginTime;
                        if (invokeCnt > 0 && interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            MergeThreadExecutor.this.submit(this.mergeRequest, true);
                            return;
                        }
                        Object object = objLock;
                        synchronized (object) {
                            this.mergeRequest.run();
                        }
                        ++invokeCnt;
                    }
                    if (this.mergeRequest.getInterruptCode() == MergeRequest.InterruptCode.REMOVE_REQUEST) {
                        MergeThreadExecutor.this.remove(this);
                        Object object = objLock;
                        synchronized (object) {
                            this.mergeRequest.run();
                            break block20;
                        }
                    }
                    if (this.mergeRequest.getInterruptCode() == MergeRequest.InterruptCode.LOCK_LATER) {
                        MergeThreadExecutor.this.tryLockLaterAndMergeAgain(this.mergeRequest, 10L);
                    } else if (this.mergeRequest.getInterruptCode() == MergeRequest.InterruptCode.UPDATE_LATER) {
                        MergeThreadExecutor.this.updateLaterAndMergeAgain(this.mergeRequest, 10L);
                    } else if (this.mergeRequest.getInterruptCode() == MergeRequest.InterruptCode.MERGE_LATER) {
                        MergeThreadExecutor.this.submitLater(this.mergeRequest, 1L);
                    } else {
                        LOG.error("unexpected interrupt code in mergeRequest {}", (Object)this.mergeRequest.getInterruptCode());
                        MergeThreadExecutor.this.remove(this);
                    }
                }
                catch (Exception e) {
                    LOG.error("Run merge request exception", e);
                }
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ContinuouslyMergeRequest request = (ContinuouslyMergeRequest)o;
            return Objects.equal(this.mergeRequest, request.mergeRequest);
        }

        public int hashCode() {
            return Objects.hashCode(this.mergeRequest);
        }
    }
}

