package org.apache.rocketmq.broker.metrics;

import java.util.Iterator;
import java.util.function.Consumer;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PopCommandCallback;
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.processor.PopBufferMergeService;
import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SimpleSubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.DefaultMessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.exception.ConsumeQueueException;

/* loaded from: input_file:org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.class */
public class ConsumerLagCalculator {
    private final BrokerConfig brokerConfig;
    private final TopicConfigManager topicConfigManager;
    private final ConsumerManager consumerManager;
    private final ConsumerOffsetManager offsetManager;
    private final ConsumerFilterManager consumerFilterManager;
    private final SubscriptionGroupManager subscriptionGroupManager;
    private final MessageStore messageStore;
    private final PopBufferMergeService popBufferMergeService;
    private final PopLongPollingService popLongPollingService;
    private final PopInflightMessageCounter popInflightMessageCounter;
    private static final Logger LOGGER = LoggerFactory.getLogger("RocketmqBroker");

    /* loaded from: input_file:org/apache/rocketmq/broker/metrics/ConsumerLagCalculator$BaseCalculateResult.class */
    public static class BaseCalculateResult {
        public String group;
        public String topic;
        public boolean isRetry;

        public BaseCalculateResult(String str, String str2, boolean z) {
            this.group = str;
            this.topic = str2;
            this.isRetry = z;
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/metrics/ConsumerLagCalculator$CalculateAvailableResult.class */
    public static class CalculateAvailableResult extends BaseCalculateResult {
        public long available;

        public CalculateAvailableResult(String str, String str2, boolean z) {
            super(str, str2, z);
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/metrics/ConsumerLagCalculator$CalculateInflightResult.class */
    public static class CalculateInflightResult extends BaseCalculateResult {
        public long inFlight;
        public long earliestUnPulledTimestamp;

        public CalculateInflightResult(String str, String str2, boolean z) {
            super(str, str2, z);
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/metrics/ConsumerLagCalculator$CalculateLagResult.class */
    public static class CalculateLagResult extends BaseCalculateResult {
        public long lag;
        public long earliestUnconsumedTimestamp;

        public CalculateLagResult(String str, String str2, boolean z) {
            super(str, str2, z);
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/broker/metrics/ConsumerLagCalculator$ProcessGroupInfo.class */
    public static class ProcessGroupInfo {
        public String group;
        public String topic;
        public boolean isPop;
        public String retryTopic;

        public ProcessGroupInfo(String str, String str2, boolean z, String str3) {
            this.group = str;
            this.topic = str2;
            this.isPop = z;
            this.retryTopic = str3;
        }
    }

    public ConsumerLagCalculator(BrokerController brokerController) {
        this.brokerConfig = brokerController.getBrokerConfig();
        this.topicConfigManager = brokerController.getTopicConfigManager();
        this.consumerManager = brokerController.getConsumerManager();
        this.offsetManager = brokerController.getConsumerOffsetManager();
        this.consumerFilterManager = brokerController.getConsumerFilterManager();
        this.subscriptionGroupManager = brokerController.getSubscriptionGroupManager();
        this.messageStore = brokerController.getMessageStore();
        this.popBufferMergeService = brokerController.getPopMessageProcessor().getPopBufferMergeService();
        this.popLongPollingService = brokerController.getPopMessageProcessor().getPopLongPollingService();
        this.popInflightMessageCounter = brokerController.getPopInflightMessageCounter();
    }

    /* JADX WARN: Removed duplicated region for block: B:27:0x00dc  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void processAllGroup(java.util.function.Consumer<org.apache.rocketmq.broker.metrics.ConsumerLagCalculator.ProcessGroupInfo> r9) {
        /*
            Method dump skipped, instructions count: 536
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.broker.metrics.ConsumerLagCalculator.processAllGroup(java.util.function.Consumer):void");
    }

    public void calculateLag(Consumer<CalculateLagResult> consumer) {
        processAllGroup(processGroupInfo -> {
            if (processGroupInfo.group == null || processGroupInfo.topic == null) {
                return;
            }
            if (processGroupInfo.isPop && this.brokerConfig.isEnableNotifyBeforePopCalculateLag() && this.popLongPollingService.notifyMessageArriving(processGroupInfo.topic, -1, processGroupInfo.group, true, null, 0L, null, null, new PopCommandCallback(this::calculate, processGroupInfo, consumer))) {
                return;
            }
            calculate(processGroupInfo, consumer);
        });
    }

    public void calculate(ProcessGroupInfo processGroupInfo, Consumer<CalculateLagResult> consumer) {
        CalculateLagResult calculateLagResult = new CalculateLagResult(processGroupInfo.group, processGroupInfo.topic, false);
        try {
            Pair<Long, Long> consumerLagStats = getConsumerLagStats(processGroupInfo.group, processGroupInfo.topic, processGroupInfo.isPop);
            if (consumerLagStats != null) {
                calculateLagResult.lag = ((Long) consumerLagStats.getObject1()).longValue();
                calculateLagResult.earliestUnconsumedTimestamp = ((Long) consumerLagStats.getObject2()).longValue();
            }
            consumer.accept(calculateLagResult);
        } catch (ConsumeQueueException e) {
            LOGGER.error("Failed to get lag stats", e);
        }
        if (processGroupInfo.isPop) {
            try {
                Pair<Long, Long> consumerLagStats2 = getConsumerLagStats(processGroupInfo.group, processGroupInfo.retryTopic, true);
                CalculateLagResult calculateLagResult2 = new CalculateLagResult(processGroupInfo.group, processGroupInfo.topic, true);
                if (consumerLagStats2 != null) {
                    calculateLagResult2.lag = ((Long) consumerLagStats2.getObject1()).longValue();
                    calculateLagResult2.earliestUnconsumedTimestamp = ((Long) consumerLagStats2.getObject2()).longValue();
                }
                consumer.accept(calculateLagResult2);
            } catch (ConsumeQueueException e2) {
                LOGGER.error("Failed to get lag stats", e2);
            }
        }
    }

    public void calculateInflight(Consumer<CalculateInflightResult> consumer) {
        processAllGroup(processGroupInfo -> {
            CalculateInflightResult calculateInflightResult = new CalculateInflightResult(processGroupInfo.group, processGroupInfo.topic, false);
            try {
                Pair<Long, Long> inFlightMsgStats = getInFlightMsgStats(processGroupInfo.group, processGroupInfo.topic, processGroupInfo.isPop);
                if (inFlightMsgStats != null) {
                    calculateInflightResult.inFlight = ((Long) inFlightMsgStats.getObject1()).longValue();
                    calculateInflightResult.earliestUnPulledTimestamp = ((Long) inFlightMsgStats.getObject2()).longValue();
                }
                consumer.accept(calculateInflightResult);
            } catch (ConsumeQueueException e) {
                LOGGER.error("Failed to get inflight message stats", e);
            }
            if (processGroupInfo.isPop) {
                try {
                    Pair<Long, Long> inFlightMsgStats2 = getInFlightMsgStats(processGroupInfo.group, processGroupInfo.retryTopic, true);
                    CalculateInflightResult calculateInflightResult2 = new CalculateInflightResult(processGroupInfo.group, processGroupInfo.topic, true);
                    if (inFlightMsgStats2 != null) {
                        calculateInflightResult2.inFlight = ((Long) inFlightMsgStats2.getObject1()).longValue();
                        calculateInflightResult2.earliestUnPulledTimestamp = ((Long) inFlightMsgStats2.getObject2()).longValue();
                    }
                    consumer.accept(calculateInflightResult2);
                } catch (ConsumeQueueException e2) {
                    LOGGER.error("Failed to get inflight message stats", e2);
                }
            }
        });
    }

    public void calculateAvailable(Consumer<CalculateAvailableResult> consumer) {
        processAllGroup(processGroupInfo -> {
            CalculateAvailableResult calculateAvailableResult = new CalculateAvailableResult(processGroupInfo.group, processGroupInfo.topic, false);
            try {
                calculateAvailableResult.available = getAvailableMsgCount(processGroupInfo.group, processGroupInfo.topic, processGroupInfo.isPop);
                consumer.accept(calculateAvailableResult);
            } catch (ConsumeQueueException e) {
                LOGGER.error("Failed to get available message count", e);
            }
            if (processGroupInfo.isPop) {
                try {
                    long availableMsgCount = getAvailableMsgCount(processGroupInfo.group, processGroupInfo.retryTopic, true);
                    CalculateAvailableResult calculateAvailableResult2 = new CalculateAvailableResult(processGroupInfo.group, processGroupInfo.topic, true);
                    calculateAvailableResult2.available = availableMsgCount;
                    consumer.accept(calculateAvailableResult2);
                } catch (ConsumeQueueException e2) {
                    LOGGER.error("Failed to get available message count", e2);
                }
            }
        });
    }

    public Pair<Long, Long> getConsumerLagStats(String str, String str2, boolean z) throws ConsumeQueueException {
        long j = 0;
        long j2 = Long.MAX_VALUE;
        if (str == null || str2 == null) {
            return new Pair<>(0L, Long.MAX_VALUE);
        }
        TopicConfig selectTopicConfig = this.topicConfigManager.selectTopicConfig(str2);
        if (selectTopicConfig != null) {
            for (int i = 0; i < selectTopicConfig.getWriteQueueNums(); i++) {
                Pair<Long, Long> consumerLagStats = getConsumerLagStats(str, str2, i, z);
                j += ((Long) consumerLagStats.getObject1()).longValue();
                j2 = Math.min(j2, ((Long) consumerLagStats.getObject2()).longValue());
            }
        } else {
            LOGGER.warn("failed to get config of topic {}", str2);
        }
        if (j2 < 0 || j2 == Long.MAX_VALUE) {
            j2 = 0;
        }
        Logger logger = LOGGER;
        Object[] objArr = new Object[4];
        objArr[0] = str2;
        objArr[1] = str;
        objArr[2] = Long.valueOf(j);
        objArr[3] = Long.valueOf(j2 > 0 ? System.currentTimeMillis() - j2 : 0L);
        logger.debug("GetConsumerLagStats, topic={}, group={}, lag={}, latency={}", objArr);
        return new Pair<>(Long.valueOf(j), Long.valueOf(j2));
    }

    public Pair<Long, Long> getConsumerLagStats(String str, String str2, int i, boolean z) throws ConsumeQueueException {
        long maxOffsetInQueue = this.messageStore.getMaxOffsetInQueue(str2, i);
        if (maxOffsetInQueue < 0) {
            maxOffsetInQueue = 0;
        }
        if (!z || this.brokerConfig.isPopConsumerKVServiceEnable()) {
            long queryOffset = this.offsetManager.queryOffset(str, str2, i);
            if (queryOffset < 0) {
                queryOffset = maxOffsetInQueue;
            }
            return new Pair<>(Long.valueOf(calculateMessageCount(str, str2, i, queryOffset, maxOffsetInQueue)), Long.valueOf(getStoreTimeStamp(str2, i, queryOffset)));
        }
        long latestOffset = this.popBufferMergeService.getLatestOffset(str2, str, i);
        if (latestOffset < 0) {
            latestOffset = this.offsetManager.queryOffset(str, str2, i);
        }
        if (latestOffset < 0) {
            latestOffset = maxOffsetInQueue;
        }
        long groupPopInFlightMessageNum = this.popInflightMessageCounter.getGroupPopInFlightMessageNum(str2, str, i);
        return new Pair<>(Long.valueOf(calculateMessageCount(str, str2, i, latestOffset, maxOffsetInQueue) + groupPopInFlightMessageNum), Long.valueOf(getStoreTimeStamp(str2, i, latestOffset - groupPopInFlightMessageNum)));
    }

    public Pair<Long, Long> getInFlightMsgStats(String str, String str2, boolean z) throws ConsumeQueueException {
        long j = 0;
        long j2 = Long.MAX_VALUE;
        if (str == null || str2 == null) {
            return new Pair<>(0L, Long.MAX_VALUE);
        }
        TopicConfig selectTopicConfig = this.topicConfigManager.selectTopicConfig(str2);
        if (selectTopicConfig != null) {
            for (int i = 0; i < selectTopicConfig.getWriteQueueNums(); i++) {
                Pair<Long, Long> inFlightMsgStats = getInFlightMsgStats(str, str2, i, z);
                j += ((Long) inFlightMsgStats.getObject1()).longValue();
                j2 = Math.min(j2, ((Long) inFlightMsgStats.getObject2()).longValue());
            }
        } else {
            LOGGER.warn("failed to get config of topic {}", str2);
        }
        if (j2 < 0 || j2 == Long.MAX_VALUE) {
            j2 = 0;
        }
        return new Pair<>(Long.valueOf(j), Long.valueOf(j2));
    }

    public Pair<Long, Long> getInFlightMsgStats(String str, String str2, int i, boolean z) throws ConsumeQueueException {
        if (z && !this.brokerConfig.isPopConsumerKVServiceEnable()) {
            long groupPopInFlightMessageNum = this.popInflightMessageCounter.getGroupPopInFlightMessageNum(str2, str, i);
            long latestOffset = this.popBufferMergeService.getLatestOffset(str2, str, i);
            if (latestOffset < 0) {
                latestOffset = this.offsetManager.queryOffset(str, str2, i);
            }
            if (latestOffset < 0) {
                latestOffset = this.messageStore.getMaxOffsetInQueue(str2, i);
            }
            return new Pair<>(Long.valueOf(groupPopInFlightMessageNum), Long.valueOf(getStoreTimeStamp(str2, i, latestOffset)));
        }
        long queryPullOffset = this.offsetManager.queryPullOffset(str, str2, i);
        if (queryPullOffset < 0) {
            queryPullOffset = 0;
        }
        long queryOffset = this.offsetManager.queryOffset(str, str2, i);
        if (queryOffset < 0) {
            queryOffset = queryPullOffset;
        }
        return new Pair<>(Long.valueOf(calculateMessageCount(str, str2, i, queryOffset, queryPullOffset)), Long.valueOf(getStoreTimeStamp(str2, i, queryPullOffset)));
    }

    public long getAvailableMsgCount(String str, String str2, boolean z) throws ConsumeQueueException {
        long j = 0;
        if (str == null || str2 == null) {
            return 0L;
        }
        TopicConfig selectTopicConfig = this.topicConfigManager.selectTopicConfig(str2);
        if (selectTopicConfig != null) {
            for (int i = 0; i < selectTopicConfig.getWriteQueueNums(); i++) {
                j += getAvailableMsgCount(str, str2, i, z);
            }
        } else {
            LOGGER.warn("failed to get config of topic {}", str2);
        }
        return j;
    }

    public long getAvailableMsgCount(String str, String str2, int i, boolean z) throws ConsumeQueueException {
        long queryPullOffset;
        long maxOffsetInQueue = this.messageStore.getMaxOffsetInQueue(str2, i);
        if (maxOffsetInQueue < 0) {
            maxOffsetInQueue = 0;
        }
        if (!z || this.brokerConfig.isPopConsumerKVServiceEnable()) {
            queryPullOffset = this.offsetManager.queryPullOffset(str, str2, i);
        } else {
            queryPullOffset = this.popBufferMergeService.getLatestOffset(str2, str, i);
            if (queryPullOffset < 0) {
                queryPullOffset = this.offsetManager.queryOffset(str, str2, i);
            }
        }
        if (queryPullOffset < 0) {
            queryPullOffset = maxOffsetInQueue;
        }
        return calculateMessageCount(str, str2, i, queryPullOffset, maxOffsetInQueue);
    }

    public long getStoreTimeStamp(String str, int i, long j) {
        long j2 = Long.MAX_VALUE;
        if (j >= 0) {
            long messageStoreTimeStamp = this.messageStore.getMessageStoreTimeStamp(str, i, j);
            j2 = messageStoreTimeStamp > 0 ? messageStoreTimeStamp : Long.MAX_VALUE;
        }
        return j2;
    }

    public long calculateMessageCount(String str, String str2, int i, long j, long j2) {
        long j3 = j2 - j;
        if (this.brokerConfig.isEstimateAccumulation() && j2 > j) {
            SubscriptionData subscriptionData = null;
            if (this.brokerConfig.isUseStaticSubscription()) {
                SubscriptionGroupConfig findSubscriptionGroupConfig = this.subscriptionGroupManager.findSubscriptionGroupConfig(str);
                if (findSubscriptionGroupConfig != null) {
                    Iterator it = findSubscriptionGroupConfig.getSubscriptionDataSet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        SimpleSubscriptionData simpleSubscriptionData = (SimpleSubscriptionData) it.next();
                        if (str2.equals(simpleSubscriptionData.getTopic())) {
                            try {
                                subscriptionData = FilterAPI.buildSubscriptionData(simpleSubscriptionData.getTopic(), simpleSubscriptionData.getExpression(), simpleSubscriptionData.getExpressionType());
                                break;
                            } catch (Exception e) {
                                LOGGER.error("Try to build subscription for group:{}, topic:{} exception.", new Object[]{str, str2, e});
                            }
                        }
                    }
                }
            } else {
                ConsumerGroupInfo consumerGroupInfo = this.consumerManager.getConsumerGroupInfo(str, true);
                if (consumerGroupInfo != null) {
                    subscriptionData = consumerGroupInfo.findSubscriptionData(str2);
                }
            }
            if (null != subscriptionData) {
                if ("TAG".equalsIgnoreCase(subscriptionData.getExpressionType()) && !"*".equals(subscriptionData.getSubString())) {
                    j3 = this.messageStore.estimateMessageCount(str2, i, j, j2, new DefaultMessageFilter(subscriptionData));
                } else if ("SQL92".equalsIgnoreCase(subscriptionData.getExpressionType())) {
                    j3 = this.messageStore.estimateMessageCount(str2, i, j, j2, new ExpressionMessageFilter(subscriptionData, this.consumerFilterManager.get(str2, str), this.consumerFilterManager));
                }
            }
        }
        if (j3 < 0) {
            return 0L;
        }
        return j3;
    }
}
