package org.apache.rocketmq.controller.impl.manager;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.heartbeat.BrokerIdentityInfo;
import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelRequest;
import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelResponse;
import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerRequest;
import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerResponse;
import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoRequest;
import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoResponse;
import org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventRequest;
import org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventResponse;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/controller/impl/manager/RaftReplicasInfoManager.class */
public class RaftReplicasInfoManager extends ReplicasInfoManager {
    private static final Logger log = LoggerFactory.getLogger("RocketmqController");
    private final Map<BrokerIdentityInfo, BrokerLiveInfo> brokerLiveTable;

    /* loaded from: input_file:org/apache/rocketmq/controller/impl/manager/RaftReplicasInfoManager$BrokerValidPredicateWithInvokeTime.class */
    public static class BrokerValidPredicateWithInvokeTime implements BrokerValidPredicate {
        private final long invokeTime;
        private final RaftReplicasInfoManager raftBrokerHeartBeatManager;

        public BrokerValidPredicateWithInvokeTime(long j, RaftReplicasInfoManager raftReplicasInfoManager) {
            this.invokeTime = j;
            this.raftBrokerHeartBeatManager = raftReplicasInfoManager;
        }

        @Override // org.apache.rocketmq.controller.helper.BrokerValidPredicate
        public boolean check(String str, String str2, Long l) {
            return this.raftBrokerHeartBeatManager.isBrokerActive(str, str2, l, this.invokeTime);
        }
    }

    public RaftReplicasInfoManager(ControllerConfig controllerConfig) {
        super(controllerConfig);
        this.brokerLiveTable = new ConcurrentHashMap(256);
    }

    public ControllerResult<GetBrokerLiveInfoResponse> getBrokerLiveInfo(GetBrokerLiveInfoRequest getBrokerLiveInfoRequest) {
        BrokerIdentityInfo brokerIdentity = getBrokerLiveInfoRequest.getBrokerIdentity();
        ControllerResult<GetBrokerLiveInfoResponse> controllerResult = new ControllerResult<>(new GetBrokerLiveInfoResponse());
        HashMap hashMap = new HashMap();
        if (brokerIdentity == null || brokerIdentity.isEmpty()) {
            hashMap.putAll(this.brokerLiveTable);
        } else if (this.brokerLiveTable.containsKey(brokerIdentity)) {
            hashMap.put(brokerIdentity, this.brokerLiveTable.get(brokerIdentity));
        } else {
            log.warn("GetBrokerLiveInfo failed, brokerIdentityInfo: {} not exist", brokerIdentity);
            controllerResult.setCodeAndRemark(2016, "brokerIdentityInfo not exist");
        }
        try {
            controllerResult.setBody(JSON.toJSONBytes(hashMap, new SerializerFeature[0]));
        } catch (Throwable th) {
            log.error("json serialize resBrokerLiveTable {} error", hashMap, th);
            controllerResult.setCodeAndRemark(1, "serialize error");
        }
        return controllerResult;
    }

    public ControllerResult<RaftBrokerHeartBeatEventResponse> onBrokerHeartBeat(RaftBrokerHeartBeatEventRequest raftBrokerHeartBeatEventRequest) {
        BrokerIdentityInfo brokerIdentityInfo = raftBrokerHeartBeatEventRequest.getBrokerIdentityInfo();
        BrokerLiveInfo brokerLiveInfo = raftBrokerHeartBeatEventRequest.getBrokerLiveInfo();
        ControllerResult<RaftBrokerHeartBeatEventResponse> controllerResult = new ControllerResult<>(new RaftBrokerHeartBeatEventResponse());
        BrokerLiveInfo computeIfAbsent = this.brokerLiveTable.computeIfAbsent(brokerIdentityInfo, brokerIdentityInfo2 -> {
            log.info("new broker registered, brokerIdentityInfo: {}", brokerIdentityInfo2);
            return brokerLiveInfo;
        });
        computeIfAbsent.setLastUpdateTimestamp(brokerLiveInfo.getLastUpdateTimestamp());
        computeIfAbsent.setHeartbeatTimeoutMillis(brokerLiveInfo.getHeartbeatTimeoutMillis());
        computeIfAbsent.setElectionPriority(brokerLiveInfo.getElectionPriority());
        if (brokerLiveInfo.getEpoch() > computeIfAbsent.getEpoch() || (brokerLiveInfo.getEpoch() == computeIfAbsent.getEpoch() && brokerLiveInfo.getMaxOffset() > computeIfAbsent.getMaxOffset())) {
            computeIfAbsent.setEpoch(brokerLiveInfo.getEpoch());
            computeIfAbsent.setMaxOffset(brokerLiveInfo.getMaxOffset());
            computeIfAbsent.setConfirmOffset(brokerLiveInfo.getConfirmOffset());
        }
        return controllerResult;
    }

    public ControllerResult<BrokerCloseChannelResponse> onBrokerCloseChannel(BrokerCloseChannelRequest brokerCloseChannelRequest) {
        BrokerIdentityInfo brokerIdentityInfo = brokerCloseChannelRequest.getBrokerIdentityInfo();
        ControllerResult<BrokerCloseChannelResponse> controllerResult = new ControllerResult<>(new BrokerCloseChannelResponse());
        if (brokerIdentityInfo == null || brokerIdentityInfo.isEmpty()) {
            log.warn("onBrokerCloseChannel failed, brokerIdentityInfo is null");
        } else {
            this.brokerLiveTable.remove(brokerIdentityInfo);
            log.info("onBrokerCloseChannel success, brokerIdentityInfo: {}", brokerIdentityInfo);
        }
        return controllerResult;
    }

    public ControllerResult<CheckNotActiveBrokerResponse> checkNotActiveBroker(CheckNotActiveBrokerRequest checkNotActiveBrokerRequest) {
        ArrayList arrayList = new ArrayList();
        final long longValue = checkNotActiveBrokerRequest.getCheckTimeMillis().longValue();
        Iterator<Map.Entry<BrokerIdentityInfo, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<BrokerIdentityInfo, BrokerLiveInfo> next = it.next();
            long lastUpdateTimestamp = next.getValue().getLastUpdateTimestamp();
            long heartbeatTimeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
            if (longValue - lastUpdateTimestamp > heartbeatTimeoutMillis) {
                arrayList.add(next.getKey());
                it.remove();
                log.warn("Broker expired, brokerInfo {}, expired {}ms", next.getKey(), Long.valueOf(heartbeatTimeoutMillis));
            }
        }
        List<String> scanNeedReelectBrokerSets = scanNeedReelectBrokerSets(new BrokerValidPredicate() { // from class: org.apache.rocketmq.controller.impl.manager.RaftReplicasInfoManager.1
            @Override // org.apache.rocketmq.controller.helper.BrokerValidPredicate
            public boolean check(String str, String str2, Long l) {
                return !RaftReplicasInfoManager.this.isBrokerActive(str, str2, l, longValue);
            }
        });
        Set set = (Set) arrayList.stream().map((v0) -> {
            return v0.getBrokerName();
        }).collect(Collectors.toSet());
        arrayList.addAll((Collection) scanNeedReelectBrokerSets.stream().filter(str -> {
            return !set.contains(str);
        }).map(str2 -> {
            return new BrokerIdentityInfo(null, str2, null);
        }).collect(Collectors.toList()));
        ControllerResult<CheckNotActiveBrokerResponse> controllerResult = new ControllerResult<>(new CheckNotActiveBrokerResponse());
        try {
            controllerResult.setBody(JSON.toJSONBytes(arrayList, new SerializerFeature[0]));
        } catch (Throwable th) {
            log.error("json serialize notActiveBrokerIdentityInfoList {} error", arrayList, th);
            controllerResult.setCodeAndRemark(1, "serialize error");
        }
        return controllerResult;
    }

    public boolean isBrokerActive(String str, String str2, Long l, long j) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(new BrokerIdentityInfo(str, str2, l));
        return brokerLiveInfo != null && brokerLiveInfo.getLastUpdateTimestamp() + brokerLiveInfo.getHeartbeatTimeoutMillis() >= j;
    }

    public BrokerLiveInfo getBrokerLiveInfo(String str, String str2, Long l) {
        return this.brokerLiveTable.get(new BrokerIdentityInfo(str, str2, l));
    }

    @Override // org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager
    public byte[] serialize() throws Throwable {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            try {
                byte[] serialize = super.serialize();
                putInt(byteArrayOutputStream, serialize.length);
                byteArrayOutputStream.write(serialize);
                putInt(byteArrayOutputStream, this.brokerLiveTable.size());
                for (Map.Entry<BrokerIdentityInfo, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) {
                    byte[] hessianSerialize = hessianSerialize(entry.getKey());
                    byte[] hessianSerialize2 = hessianSerialize(entry.getValue());
                    putInt(byteArrayOutputStream, hessianSerialize.length);
                    byteArrayOutputStream.write(hessianSerialize);
                    putInt(byteArrayOutputStream, hessianSerialize2.length);
                    byteArrayOutputStream.write(hessianSerialize2);
                }
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                if (byteArrayOutputStream != null) {
                    if (0 != 0) {
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        byteArrayOutputStream.close();
                    }
                }
                return byteArray;
            } finally {
            }
        } catch (Throwable th3) {
            log.error("serialize replicaInfoTable or syncStateSetInfoTable error", th3);
            throw th3;
        }
    }

    @Override // org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager
    public void deserializeFrom(byte[] bArr) throws Throwable {
        this.brokerLiveTable.clear();
        try {
            int i = getInt(bArr, 0);
            int i2 = 0 + 4;
            byte[] bArr2 = new byte[i];
            System.arraycopy(bArr, i2, bArr2, 0, i);
            super.deserializeFrom(bArr2);
            int i3 = i2 + i;
            int i4 = getInt(bArr, i3);
            int i5 = i3 + 4;
            for (int i6 = 0; i6 < i4; i6++) {
                int i7 = getInt(bArr, i5);
                int i8 = i5 + 4;
                byte[] bArr3 = new byte[i7];
                System.arraycopy(bArr, i8, bArr3, 0, i7);
                BrokerIdentityInfo brokerIdentityInfo = (BrokerIdentityInfo) hessianDeserialize(bArr3);
                int i9 = i8 + i7;
                int i10 = getInt(bArr, i9);
                int i11 = i9 + 4;
                byte[] bArr4 = new byte[i10];
                System.arraycopy(bArr, i11, bArr4, 0, i10);
                i5 = i11 + i10;
                this.brokerLiveTable.put(brokerIdentityInfo, (BrokerLiveInfo) hessianDeserialize(bArr4));
            }
        } catch (Throwable th) {
            log.error("deserialize replicaInfoTable or syncStateSetInfoTable error", th);
            throw th;
        }
    }
}
