package org.apache.rocketmq.broker.client;

import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

/* loaded from: input_file:org/apache/rocketmq/broker/client/ConsumerManager.class */
public class ConsumerManager {
    private static final Logger LOGGER = LoggerFactory.getLogger("RocketmqBroker");
    private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap(1024);
    private final ConcurrentMap<String, ConsumerGroupInfo> consumerCompensationTable = new ConcurrentHashMap(1024);
    private final List<ConsumerIdsChangeListener> consumerIdsChangeListenerList = new CopyOnWriteArrayList();
    protected final BrokerStatsManager brokerStatsManager;
    private final long channelExpiredTimeout;
    private final long subscriptionExpiredTimeout;

    public ConsumerManager(ConsumerIdsChangeListener consumerIdsChangeListener, long j) {
        this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
        this.brokerStatsManager = null;
        this.channelExpiredTimeout = j;
        this.subscriptionExpiredTimeout = j;
    }

    public ConsumerManager(ConsumerIdsChangeListener consumerIdsChangeListener, BrokerStatsManager brokerStatsManager, BrokerConfig brokerConfig) {
        this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
        this.brokerStatsManager = brokerStatsManager;
        this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout();
        this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout();
    }

    public ClientChannelInfo findChannel(String str, String str2) {
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(str);
        if (consumerGroupInfo != null) {
            return consumerGroupInfo.findChannel(str2);
        }
        return null;
    }

    public ClientChannelInfo findChannel(String str, Channel channel) {
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(str);
        if (consumerGroupInfo != null) {
            return consumerGroupInfo.findChannel(channel);
        }
        return null;
    }

    public SubscriptionData findSubscriptionData(String str, String str2) {
        return findSubscriptionData(str, str2, true);
    }

    public SubscriptionData findSubscriptionData(String str, String str2, boolean z) {
        ConsumerGroupInfo consumerGroupInfo;
        SubscriptionData findSubscriptionData;
        ConsumerGroupInfo consumerGroupInfo2 = getConsumerGroupInfo(str, false);
        if (consumerGroupInfo2 != null && (findSubscriptionData = consumerGroupInfo2.findSubscriptionData(str2)) != null) {
            return findSubscriptionData;
        }
        if (!z || (consumerGroupInfo = this.consumerCompensationTable.get(str)) == null) {
            return null;
        }
        return consumerGroupInfo.findSubscriptionData(str2);
    }

    public ConcurrentMap<String, ConsumerGroupInfo> getConsumerTable() {
        return this.consumerTable;
    }

    public ConsumerGroupInfo getConsumerGroupInfo(String str) {
        return getConsumerGroupInfo(str, false);
    }

    public ConsumerGroupInfo getConsumerGroupInfo(String str, boolean z) {
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(str);
        if (consumerGroupInfo == null && z) {
            consumerGroupInfo = this.consumerCompensationTable.get(str);
        }
        return consumerGroupInfo;
    }

    public int findSubscriptionDataCount(String str) {
        ConsumerGroupInfo consumerGroupInfo = getConsumerGroupInfo(str);
        if (consumerGroupInfo != null) {
            return consumerGroupInfo.getSubscriptionTable().size();
        }
        return 0;
    }

    public boolean doChannelCloseEvent(String str, Channel channel) {
        for (Map.Entry<String, ConsumerGroupInfo> entry : this.consumerTable.entrySet()) {
            ConsumerGroupInfo value = entry.getValue();
            ClientChannelInfo doChannelCloseEvent = value.doChannelCloseEvent(str, channel);
            if (doChannelCloseEvent != null) {
                callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, entry.getKey(), doChannelCloseEvent, value.getSubscribeTopics());
                if (value.getChannelInfoTable().isEmpty() && this.consumerTable.remove(entry.getKey()) != null) {
                    LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", entry.getKey());
                    callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, entry.getKey(), new Object[0]);
                }
                if (!isBroadcastMode(value.getMessageModel())) {
                    callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, entry.getKey(), value.getAllChannel());
                }
            }
        }
        return false;
    }

    public void compensateBasicConsumerInfo(String str, ConsumeType consumeType, MessageModel messageModel) {
        ConsumerGroupInfo computeIfAbsent = this.consumerCompensationTable.computeIfAbsent(str, ConsumerGroupInfo::new);
        computeIfAbsent.setConsumeType(consumeType);
        computeIfAbsent.setMessageModel(messageModel);
    }

    public void compensateSubscribeData(String str, String str2, SubscriptionData subscriptionData) {
        this.consumerCompensationTable.computeIfAbsent(str, ConsumerGroupInfo::new).getSubscriptionTable().put(str2, subscriptionData);
    }

    public boolean registerConsumer(String str, ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, Set<SubscriptionData> set, boolean z) {
        return registerConsumer(str, clientChannelInfo, consumeType, messageModel, consumeFromWhere, set, z, true);
    }

    public boolean registerConsumer(String str, ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, Set<SubscriptionData> set, boolean z, boolean z2) {
        long currentTimeMillis = System.currentTimeMillis();
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(str);
        if (null == consumerGroupInfo) {
            ConsumerGroupInfo consumerGroupInfo2 = new ConsumerGroupInfo(str, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo putIfAbsent = this.consumerTable.putIfAbsent(str, consumerGroupInfo2);
            consumerGroupInfo = putIfAbsent != null ? putIfAbsent : consumerGroupInfo2;
        }
        boolean updateChannel = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
        if (updateChannel) {
            callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, str, clientChannelInfo, set.stream().map((v0) -> {
                return v0.getTopic();
            }).collect(Collectors.toSet()));
        }
        boolean z3 = false;
        if (z2) {
            z3 = consumerGroupInfo.updateSubscription(set);
        }
        if ((updateChannel || z3) && z && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
            callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, str, consumerGroupInfo.getAllChannel());
        }
        if (null != this.brokerStatsManager) {
            this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - currentTimeMillis));
        }
        callConsumerIdsChangeListener(ConsumerGroupEvent.REGISTER, str, set, clientChannelInfo);
        return updateChannel || z3;
    }

    public boolean registerConsumerWithoutSub(String str, ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, boolean z) {
        long currentTimeMillis = System.currentTimeMillis();
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(str);
        if (null == consumerGroupInfo) {
            ConsumerGroupInfo consumerGroupInfo2 = new ConsumerGroupInfo(str, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo putIfAbsent = this.consumerTable.putIfAbsent(str, consumerGroupInfo2);
            consumerGroupInfo = putIfAbsent != null ? putIfAbsent : consumerGroupInfo2;
        }
        boolean updateChannel = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
        if (updateChannel && z && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
            callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, str, consumerGroupInfo.getAllChannel());
        }
        if (null != this.brokerStatsManager) {
            this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - currentTimeMillis));
        }
        return updateChannel;
    }

    public void unregisterConsumer(String str, ClientChannelInfo clientChannelInfo, boolean z) {
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(str);
        if (null != consumerGroupInfo) {
            if (consumerGroupInfo.unregisterChannel(clientChannelInfo)) {
                callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, str, clientChannelInfo, consumerGroupInfo.getSubscribeTopics());
            }
            if (consumerGroupInfo.getChannelInfoTable().isEmpty() && this.consumerTable.remove(str) != null) {
                LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", str);
                callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, str, new Object[0]);
            }
            if (!z || isBroadcastMode(consumerGroupInfo.getMessageModel())) {
                return;
            }
            callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, str, consumerGroupInfo.getAllChannel());
        }
    }

    public void removeExpireConsumerGroupInfo() {
        ArrayList arrayList = new ArrayList();
        this.consumerCompensationTable.forEach((str, consumerGroupInfo) -> {
            ArrayList arrayList2 = new ArrayList();
            ConcurrentMap<String, SubscriptionData> subscriptionTable = consumerGroupInfo.getSubscriptionTable();
            subscriptionTable.forEach((str, subscriptionData) -> {
                if (System.currentTimeMillis() - subscriptionData.getSubVersion() > this.subscriptionExpiredTimeout) {
                    arrayList2.add(str);
                }
            });
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                subscriptionTable.remove((String) it.next());
                if (subscriptionTable.isEmpty()) {
                    arrayList.add(str);
                }
            }
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.consumerCompensationTable.remove((String) it.next());
        }
    }

    public void scanNotActiveChannel() {
        Iterator<Map.Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ConsumerGroupInfo> next = it.next();
            String key = next.getKey();
            ConsumerGroupInfo value = next.getValue();
            ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = value.getChannelInfoTable();
            Iterator<Map.Entry<Channel, ClientChannelInfo>> it2 = channelInfoTable.entrySet().iterator();
            while (it2.hasNext()) {
                ClientChannelInfo value2 = it2.next().getValue();
                if (System.currentTimeMillis() - value2.getLastUpdateTimestamp() > this.channelExpiredTimeout) {
                    LOGGER.warn("SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}", RemotingHelper.parseChannelRemoteAddr(value2.getChannel()), key);
                    callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, key, value2, value.getSubscribeTopics());
                    RemotingHelper.closeChannel(value2.getChannel());
                    it2.remove();
                }
            }
            if (channelInfoTable.isEmpty()) {
                LOGGER.warn("SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}", key);
                it.remove();
            }
        }
        removeExpireConsumerGroupInfo();
    }

    public HashSet<String> queryTopicConsumeByWho(String str) {
        HashSet<String> hashSet = new HashSet<>();
        for (Map.Entry<String, ConsumerGroupInfo> entry : this.consumerTable.entrySet()) {
            if (entry.getValue().getSubscriptionTable().containsKey(str)) {
                hashSet.add(entry.getKey());
            }
        }
        return hashSet;
    }

    public void appendConsumerIdsChangeListener(ConsumerIdsChangeListener consumerIdsChangeListener) {
        this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
    }

    protected void callConsumerIdsChangeListener(ConsumerGroupEvent consumerGroupEvent, String str, Object... objArr) {
        Iterator<ConsumerIdsChangeListener> it = this.consumerIdsChangeListenerList.iterator();
        while (it.hasNext()) {
            try {
                it.next().handle(consumerGroupEvent, str, objArr);
            } catch (Throwable th) {
                LOGGER.error("err when call consumerIdsChangeListener", th);
            }
        }
    }

    private boolean isBroadcastMode(MessageModel messageModel) {
        return MessageModel.BROADCASTING.equals(messageModel);
    }
}
