package org.apache.rocketmq.controller.impl.heartbeat;

import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/* loaded from: input_file:org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.class */
public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
    private static final Logger log = LoggerFactory.getLogger("RocketmqController");
    private ScheduledExecutorService scheduledService;
    private ExecutorService executor;
    private final ControllerConfig controllerConfig;
    private final Map<BrokerIdentityInfo, BrokerLiveInfo> brokerLiveTable = new ConcurrentHashMap(256);
    private final List<BrokerLifecycleListener> brokerLifecycleListeners = new ArrayList();

    public DefaultBrokerHeartbeatManager(ControllerConfig controllerConfig) {
        this.controllerConfig = controllerConfig;
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void start() {
        this.scheduledService.scheduleAtFixedRate(this::scanNotActiveBroker, 2000L, this.controllerConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void shutdown() {
        this.scheduledService.shutdown();
        this.executor.shutdown();
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void initialize() {
        this.scheduledService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
        this.executor = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
    }

    public void scanNotActiveBroker() {
        try {
            log.info("start scanNotActiveBroker");
            Iterator<Map.Entry<BrokerIdentityInfo, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<BrokerIdentityInfo, BrokerLiveInfo> next = it.next();
                long lastUpdateTimestamp = next.getValue().getLastUpdateTimestamp();
                long heartbeatTimeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
                if (System.currentTimeMillis() - lastUpdateTimestamp > heartbeatTimeoutMillis) {
                    Channel channel = next.getValue().getChannel();
                    it.remove();
                    if (channel != null) {
                        RemotingHelper.closeChannel(channel);
                    }
                    this.executor.submit(() -> {
                        notifyBrokerInActive(((BrokerIdentityInfo) next.getKey()).getClusterName(), ((BrokerLiveInfo) next.getValue()).getBrokerName(), Long.valueOf(((BrokerLiveInfo) next.getValue()).getBrokerId()));
                    });
                    log.warn("The broker channel {} expired, brokerInfo {}, expired {}ms", new Object[]{next.getValue().getChannel(), next.getKey(), Long.valueOf(heartbeatTimeoutMillis)});
                }
            }
        } catch (Exception e) {
            log.error("scanNotActiveBroker exception", e);
        }
    }

    private void notifyBrokerInActive(String str, String str2, Long l) {
        Iterator<BrokerLifecycleListener> it = this.brokerLifecycleListeners.iterator();
        while (it.hasNext()) {
            it.next().onBrokerInactive(str, str2, l);
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void registerBrokerLifecycleListener(BrokerLifecycleListener brokerLifecycleListener) {
        this.brokerLifecycleListeners.add(brokerLifecycleListener);
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void onBrokerHeartbeat(String str, String str2, String str3, Long l, Long l2, Channel channel, Integer num, Long l3, Long l4, Integer num2) {
        BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(str, str2, l);
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(brokerIdentityInfo);
        int intValue = ((Integer) Optional.ofNullable(num).orElse(-1)).intValue();
        long longValue = ((Long) Optional.ofNullable(l).orElse(-1L)).longValue();
        long longValue2 = ((Long) Optional.ofNullable(l3).orElse(-1L)).longValue();
        long longValue3 = ((Long) Optional.ofNullable(l4).orElse(-1L)).longValue();
        long longValue4 = ((Long) Optional.ofNullable(l2).orElse(Long.valueOf(BrokerHeartbeatManager.DEFAULT_BROKER_CHANNEL_EXPIRED_TIME))).longValue();
        int intValue2 = ((Integer) Optional.ofNullable(num2).orElse(Integer.MAX_VALUE)).intValue();
        if (null == brokerLiveInfo) {
            this.brokerLiveTable.put(brokerIdentityInfo, new BrokerLiveInfo(str2, str3, longValue, System.currentTimeMillis(), longValue4, channel, intValue, longValue2, Integer.valueOf(intValue2)));
            log.info("new broker registered, {}, brokerId:{}", brokerIdentityInfo, Long.valueOf(longValue));
            return;
        }
        brokerLiveInfo.setLastUpdateTimestamp(System.currentTimeMillis());
        brokerLiveInfo.setHeartbeatTimeoutMillis(longValue4);
        brokerLiveInfo.setElectionPriority(Integer.valueOf(intValue2));
        if (intValue > brokerLiveInfo.getEpoch() || (intValue == brokerLiveInfo.getEpoch() && longValue2 > brokerLiveInfo.getMaxOffset())) {
            brokerLiveInfo.setEpoch(intValue);
            brokerLiveInfo.setMaxOffset(longValue2);
            brokerLiveInfo.setConfirmOffset(longValue3);
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public void onBrokerChannelClose(Channel channel) {
        BrokerIdentityInfo brokerIdentityInfo = null;
        Iterator<Map.Entry<BrokerIdentityInfo, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<BrokerIdentityInfo, BrokerLiveInfo> next = it.next();
            if (next.getValue().getChannel() == channel) {
                log.info("Channel {} inactive, broker {}, addr:{}, id:{}", new Object[]{next.getValue().getChannel(), next.getValue().getBrokerName(), next.getValue().getBrokerAddr(), Long.valueOf(next.getValue().getBrokerId())});
                brokerIdentityInfo = next.getKey();
                this.executor.submit(() -> {
                    notifyBrokerInActive(((BrokerIdentityInfo) next.getKey()).getClusterName(), ((BrokerLiveInfo) next.getValue()).getBrokerName(), Long.valueOf(((BrokerLiveInfo) next.getValue()).getBrokerId()));
                });
                break;
            }
        }
        if (brokerIdentityInfo != null) {
            this.brokerLiveTable.remove(brokerIdentityInfo);
        }
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public BrokerLiveInfo getBrokerLiveInfo(String str, String str2, Long l) {
        return this.brokerLiveTable.get(new BrokerIdentityInfo(str, str2, l));
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public boolean isBrokerActive(String str, String str2, Long l) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(new BrokerIdentityInfo(str, str2, l));
        return brokerLiveInfo != null && brokerLiveInfo.getLastUpdateTimestamp() + brokerLiveInfo.getHeartbeatTimeoutMillis() >= System.currentTimeMillis();
    }

    @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager
    public Map<String, Map<String, Integer>> getActiveBrokersNum() {
        HashMap hashMap = new HashMap();
        this.brokerLiveTable.keySet().stream().filter(brokerIdentityInfo -> {
            return isBrokerActive(brokerIdentityInfo.getClusterName(), brokerIdentityInfo.getBrokerName(), brokerIdentityInfo.getBrokerId());
        }).forEach(brokerIdentityInfo2 -> {
            hashMap.computeIfAbsent(brokerIdentityInfo2.getClusterName(), str -> {
                return new HashMap();
            });
            ((Map) hashMap.get(brokerIdentityInfo2.getClusterName())).compute(brokerIdentityInfo2.getBrokerName(), (str2, num) -> {
                return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
            });
        });
        return hashMap;
    }
}
