package org.apache.rocketmq.broker.longpolling;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCallback;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.MessageFilter;

/* loaded from: input_file:org/apache/rocketmq/broker/longpolling/PopLongPollingService.class */
public class PopLongPollingService extends ServiceThread {
    private static final Logger POP_LOGGER = LoggerFactory.getLogger("RocketmqPop");
    private final BrokerController brokerController;
    private final NettyRequestProcessor processor;
    private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
    private final ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
    private long lastCleanTime = 0;
    private final AtomicLong totalPollingNum = new AtomicLong(0);
    private final boolean notifyLast;

    public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor nettyRequestProcessor, boolean z) {
        this.brokerController = brokerController;
        this.processor = nettyRequestProcessor;
        this.topicCidMap = new ConcurrentLinkedHashMap.Builder().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2).build();
        this.pollingMap = new ConcurrentLinkedHashMap.Builder().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
        this.notifyLast = z;
    }

    public String getServiceName() {
        return this.brokerController.getBrokerConfig().isInBrokerContainer() ? this.brokerController.getBrokerIdentity().getIdentifier() + PopLongPollingService.class.getSimpleName() : PopLongPollingService.class.getSimpleName();
    }

    public void run() {
        int i = 0;
        while (!this.stopped) {
            try {
                waitForRunning(20L);
                i++;
                if (!this.pollingMap.isEmpty()) {
                    long j = 0;
                    for (Map.Entry entry : this.pollingMap.entrySet()) {
                        String str = (String) entry.getKey();
                        ConcurrentSkipListSet concurrentSkipListSet = (ConcurrentSkipListSet) entry.getValue();
                        if (concurrentSkipListSet != null) {
                            while (true) {
                                PopRequest popRequest = (PopRequest) concurrentSkipListSet.pollFirst();
                                if (popRequest == null) {
                                    break;
                                }
                                if (!popRequest.isTimeout()) {
                                    if (concurrentSkipListSet.add(popRequest)) {
                                        break;
                                    } else {
                                        POP_LOGGER.info("polling, add fail again: {}", popRequest);
                                    }
                                }
                                if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
                                    POP_LOGGER.info("timeout , wakeUp polling : {}", popRequest);
                                }
                                this.totalPollingNum.decrementAndGet();
                                wakeUp(popRequest);
                            }
                            if (i >= 100) {
                                long size = concurrentSkipListSet.size();
                                j += size;
                                if (size > 100) {
                                    POP_LOGGER.info("polling queue {} , size={} ", str, Long.valueOf(size));
                                }
                            }
                        }
                    }
                    if (i >= 100) {
                        POP_LOGGER.info("pollingMapSize={},tmpTotalSize={},atomicTotalSize={},diffSize={}", new Object[]{Integer.valueOf(this.pollingMap.size()), Long.valueOf(j), Long.valueOf(this.totalPollingNum.get()), Long.valueOf(Math.abs(this.totalPollingNum.get() - j))});
                        this.totalPollingNum.set(j);
                        i = 0;
                    }
                    if (this.lastCleanTime == 0 || System.currentTimeMillis() - this.lastCleanTime > 300000) {
                        cleanUnusedResource();
                    }
                }
            } catch (Throwable th) {
                POP_LOGGER.error("checkPolling error", th);
            }
        }
        try {
            Iterator it = this.pollingMap.entrySet().iterator();
            while (it.hasNext()) {
                ConcurrentSkipListSet concurrentSkipListSet2 = (ConcurrentSkipListSet) ((Map.Entry) it.next()).getValue();
                while (true) {
                    PopRequest popRequest2 = (PopRequest) concurrentSkipListSet2.pollFirst();
                    if (popRequest2 != null) {
                        wakeUp(popRequest2);
                    }
                }
            }
        } catch (Throwable th2) {
        }
    }

    public void notifyMessageArrivingWithRetryTopic(String str, int i) {
        notifyMessageArrivingWithRetryTopic(str, i, -1L, null, 0L, null, null);
    }

    public void notifyMessageArrivingWithRetryTopic(String str, int i, long j, Long l, long j2, byte[] bArr, Map<String, String> map) {
        notifyMessageArriving(KeyBuilder.isPopRetryTopicV2(str) ? KeyBuilder.parseNormalTopic(str) : str, i, j, l, j2, bArr, map);
    }

    public void notifyMessageArriving(String str, int i, long j, Long l, long j2, byte[] bArr, Map<String, String> map) {
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) this.topicCidMap.get(str);
        if (concurrentHashMap == null) {
            return;
        }
        long popLongPollingForceNotifyInterval = this.brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval();
        boolean z = popLongPollingForceNotifyInterval > 0 && j % popLongPollingForceNotifyInterval == 0;
        for (Map.Entry entry : concurrentHashMap.entrySet()) {
            if (i >= 0) {
                notifyMessageArriving(str, -1, (String) entry.getKey(), z, l, j2, bArr, map);
            }
            notifyMessageArriving(str, i, (String) entry.getKey(), z, l, j2, bArr, map);
        }
    }

    public boolean notifyMessageArriving(String str, int i, String str2, Long l, long j, byte[] bArr, Map<String, String> map) {
        return notifyMessageArriving(str, i, str2, false, l, j, bArr, map, null);
    }

    public boolean notifyMessageArriving(String str, int i, String str2, boolean z, Long l, long j, byte[] bArr, Map<String, String> map) {
        return notifyMessageArriving(str, i, str2, z, l, j, bArr, map, null);
    }

    public boolean notifyMessageArriving(String str, int i, String str2, boolean z, Long l, long j, byte[] bArr, Map<String, String> map, CommandCallback commandCallback) {
        PopRequest pollRemotingCommands;
        ConcurrentSkipListSet<PopRequest> concurrentSkipListSet = (ConcurrentSkipListSet) this.pollingMap.get(KeyBuilder.buildPollingKey(str, str2, i));
        if (concurrentSkipListSet == null || concurrentSkipListSet.isEmpty() || (pollRemotingCommands = pollRemotingCommands(concurrentSkipListSet)) == null) {
            return false;
        }
        if (!z && pollRemotingCommands.getMessageFilter() != null && pollRemotingCommands.getSubscriptionData() != null) {
            boolean isMatchedByConsumeQueue = pollRemotingCommands.getMessageFilter().isMatchedByConsumeQueue(l, new ConsumeQueueExt.CqExtUnit(l, j, bArr));
            if (isMatchedByConsumeQueue && map != null) {
                isMatchedByConsumeQueue = pollRemotingCommands.getMessageFilter().isMatchedByCommitLog((ByteBuffer) null, map);
            }
            if (!isMatchedByConsumeQueue) {
                concurrentSkipListSet.add(pollRemotingCommands);
                this.totalPollingNum.incrementAndGet();
                return false;
            }
        }
        if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("lock release, new msg arrive, wakeUp: {}", pollRemotingCommands);
        }
        return wakeUp(pollRemotingCommands, commandCallback);
    }

    public boolean wakeUp(PopRequest popRequest) {
        return wakeUp(popRequest, null);
    }

    public boolean wakeUp(PopRequest popRequest, CommandCallback commandCallback) {
        if (popRequest == null || !popRequest.complete()) {
            return false;
        }
        if (commandCallback != null && popRequest.getRemotingCommand() != null) {
            if (popRequest.getRemotingCommand().getCallbackList() == null) {
                popRequest.getRemotingCommand().setCallbackList(new ArrayList());
            }
            popRequest.getRemotingCommand().getCallbackList().add(commandCallback);
        }
        if (!popRequest.getCtx().channel().isActive()) {
            return false;
        }
        this.brokerController.getPullMessageExecutor().submit((Runnable) new RequestTask(() -> {
            try {
                RemotingCommand processRequest = this.processor.processRequest(popRequest.getCtx(), popRequest.getRemotingCommand());
                if (processRequest != null) {
                    processRequest.setOpaque(popRequest.getRemotingCommand().getOpaque());
                    processRequest.markResponseType();
                    NettyRemotingAbstract.writeResponse(popRequest.getChannel(), popRequest.getRemotingCommand(), processRequest, future -> {
                        if (future.isSuccess()) {
                            return;
                        }
                        POP_LOGGER.error("ProcessRequestWrapper response to {} failed", popRequest.getChannel().remoteAddress(), future.cause());
                        POP_LOGGER.error(popRequest.toString());
                        POP_LOGGER.error(processRequest.toString());
                    });
                }
            } catch (Exception e) {
                POP_LOGGER.error("ExecuteRequestWhenWakeup run", e);
            }
        }, popRequest.getChannel(), popRequest.getRemotingCommand()));
        return true;
    }

    public PollingResult polling(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, PollingHeader pollingHeader) {
        return polling(channelHandlerContext, remotingCommand, pollingHeader, null, null);
    }

    public PollingResult polling(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, PollingHeader pollingHeader, SubscriptionData subscriptionData, MessageFilter messageFilter) {
        if (pollingHeader.getPollTime() <= 0 || isStopped()) {
            return PollingResult.NOT_POLLING;
        }
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) this.topicCidMap.get(pollingHeader.getTopic());
        if (concurrentHashMap == null) {
            concurrentHashMap = new ConcurrentHashMap();
            ConcurrentHashMap concurrentHashMap2 = (ConcurrentHashMap) this.topicCidMap.putIfAbsent(pollingHeader.getTopic(), concurrentHashMap);
            if (concurrentHashMap2 != null) {
                concurrentHashMap = concurrentHashMap2;
            }
        }
        concurrentHashMap.putIfAbsent(pollingHeader.getConsumerGroup(), Byte.MIN_VALUE);
        PopRequest popRequest = new PopRequest(remotingCommand, channelHandlerContext, pollingHeader.getBornTime() + pollingHeader.getPollTime(), subscriptionData, messageFilter);
        if (this.totalPollingNum.get() >= this.brokerController.getBrokerConfig().getMaxPopPollingSize()) {
            POP_LOGGER.info("polling {}, result POLLING_FULL, total:{}", remotingCommand, Long.valueOf(this.totalPollingNum.get()));
            return PollingResult.POLLING_FULL;
        }
        if (popRequest.isTimeout()) {
            if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
                POP_LOGGER.info("polling {}, result POLLING_TIMEOUT", remotingCommand);
            }
            return PollingResult.POLLING_TIMEOUT;
        }
        String buildPollingKey = KeyBuilder.buildPollingKey(pollingHeader.getTopic(), pollingHeader.getConsumerGroup(), pollingHeader.getQueueId());
        ConcurrentSkipListSet concurrentSkipListSet = (ConcurrentSkipListSet) this.pollingMap.get(buildPollingKey);
        if (concurrentSkipListSet == null) {
            concurrentSkipListSet = new ConcurrentSkipListSet(PopRequest.COMPARATOR);
            ConcurrentSkipListSet concurrentSkipListSet2 = (ConcurrentSkipListSet) this.pollingMap.putIfAbsent(buildPollingKey, concurrentSkipListSet);
            if (concurrentSkipListSet2 != null) {
                concurrentSkipListSet = concurrentSkipListSet2;
            }
        } else {
            int size = concurrentSkipListSet.size();
            if (size > this.brokerController.getBrokerConfig().getPopPollingSize()) {
                POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, Integer.valueOf(size));
                return PollingResult.POLLING_FULL;
            }
        }
        if (!concurrentSkipListSet.add(popRequest)) {
            POP_LOGGER.info("polling {}, result POLLING_FULL, add fail, {}", popRequest, concurrentSkipListSet);
            return PollingResult.POLLING_FULL;
        }
        remotingCommand.setSuspended(true);
        this.totalPollingNum.incrementAndGet();
        if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("polling {}, result POLLING_SUC", remotingCommand);
        }
        return PollingResult.POLLING_SUC;
    }

    public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
        return this.pollingMap;
    }

    private void cleanUnusedResource() {
        try {
            Iterator it = this.topicCidMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                String str = (String) entry.getKey();
                if (this.brokerController.getTopicConfigManager().selectTopicConfig(str) == null) {
                    POP_LOGGER.info("remove nonexistent topic {} in topicCidMap!", str);
                    it.remove();
                } else {
                    Iterator it2 = ((ConcurrentHashMap) entry.getValue()).entrySet().iterator();
                    while (it2.hasNext()) {
                        String str2 = (String) ((Map.Entry) it2.next()).getKey();
                        if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(str2)) {
                            POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in topicCidMap!", str2, str);
                            it2.remove();
                        }
                    }
                }
            }
            Iterator it3 = this.pollingMap.entrySet().iterator();
            while (it3.hasNext()) {
                Map.Entry entry2 = (Map.Entry) it3.next();
                if (entry2.getKey() != null) {
                    String[] split = ((String) entry2.getKey()).split(ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR);
                    if (split.length == 3) {
                        String str3 = split[0];
                        String str4 = split[1];
                        if (this.brokerController.getTopicConfigManager().selectTopicConfig(str3) == null) {
                            POP_LOGGER.info("remove nonexistent topic {} in pollingMap!", str3);
                            it3.remove();
                        } else if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(str4)) {
                            POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in pollingMap!", str4, str3);
                            it3.remove();
                        }
                    }
                }
            }
        } catch (Throwable th) {
            POP_LOGGER.error("cleanUnusedResource", th);
        }
        this.lastCleanTime = System.currentTimeMillis();
    }

    private PopRequest pollRemotingCommands(ConcurrentSkipListSet<PopRequest> concurrentSkipListSet) {
        PopRequest pollLast;
        if (concurrentSkipListSet == null || concurrentSkipListSet.isEmpty()) {
            return null;
        }
        do {
            pollLast = this.notifyLast ? concurrentSkipListSet.pollLast() : concurrentSkipListSet.pollFirst();
            this.totalPollingNum.decrementAndGet();
            if (pollLast == null) {
                break;
            }
        } while (!pollLast.getChannel().isActive());
        return pollLast;
    }
}
