package org.apache.rocketmq.controller.impl;

import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.NodeId;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.option.NodeOptions;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.controller.impl.closure.ControllerClosure;
import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelRequest;
import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerRequest;
import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoRequest;
import org.apache.rocketmq.controller.impl.task.GetSyncStateDataRequest;
import org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventRequest;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/controller/impl/JRaftController.class */
public class JRaftController implements Controller {
    private static final Logger log = LoggerFactory.getLogger("RocketmqController");
    private final RaftGroupService raftGroupService;
    private Node node;
    private final JRaftControllerStateMachine stateMachine;
    private final ControllerConfig controllerConfig;
    private final List<BrokerLifecycleListener> brokerLifecycleListeners = new ArrayList();
    private final Map<PeerId, String> peerIdToAddr;
    private final NettyRemotingServer remotingServer;

    public JRaftController(ControllerConfig controllerConfig, ChannelEventListener channelEventListener) throws IOException {
        this.controllerConfig = controllerConfig;
        NodeOptions nodeOptions = new NodeOptions();
        nodeOptions.setElectionTimeoutMs(controllerConfig.getJraftConfig().getjRaftElectionTimeoutMs());
        nodeOptions.setSnapshotIntervalSecs(controllerConfig.getJraftConfig().getjRaftSnapshotIntervalSecs());
        PeerId peerId = new PeerId();
        if (!peerId.parse(controllerConfig.getJraftConfig().getjRaftServerId())) {
            throw new IllegalArgumentException("Fail to parse serverId:" + controllerConfig.getJraftConfig().getjRaftServerId());
        }
        Configuration configuration = new Configuration();
        if (!configuration.parse(controllerConfig.getJraftConfig().getjRaftInitConf())) {
            throw new IllegalArgumentException("Fail to parse initConf:" + controllerConfig.getJraftConfig().getjRaftInitConf());
        }
        nodeOptions.setInitialConf(configuration);
        FileUtils.forceMkdir(new File(controllerConfig.getControllerStorePath()));
        nodeOptions.setLogUri(controllerConfig.getControllerStorePath() + File.separator + "log");
        nodeOptions.setRaftMetaUri(controllerConfig.getControllerStorePath() + File.separator + "raft_meta");
        nodeOptions.setSnapshotUri(controllerConfig.getControllerStorePath() + File.separator + "snapshot");
        this.stateMachine = new JRaftControllerStateMachine(controllerConfig, new NodeId(controllerConfig.getJraftConfig().getjRaftGroupId(), peerId));
        this.stateMachine.registerOnLeaderStart((v1) -> {
            onLeaderStart(v1);
        });
        this.stateMachine.registerOnLeaderStop(this::onLeaderStop);
        nodeOptions.setFsm(this.stateMachine);
        this.raftGroupService = new RaftGroupService(controllerConfig.getJraftConfig().getjRaftGroupId(), peerId, nodeOptions);
        this.peerIdToAddr = new HashMap();
        initPeerIdMap();
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(Integer.parseInt(this.peerIdToAddr.get(peerId).split(":")[1]));
        this.remotingServer = new NettyRemotingServer(nettyServerConfig, channelEventListener);
    }

    private void initPeerIdMap() {
        String[] split = this.controllerConfig.getJraftConfig().getjRaftInitConf().split(",");
        String[] split2 = this.controllerConfig.getJraftConfig().getjRaftControllerRPCAddr().split(",");
        for (int i = 0; i < split.length; i++) {
            PeerId peerId = new PeerId();
            if (!peerId.parse(split[i])) {
                throw new IllegalArgumentException("Fail to parse peerId:" + split[i]);
            }
            this.peerIdToAddr.put(peerId, split2[i]);
        }
    }

    @Override // org.apache.rocketmq.controller.Controller
    public void startup() {
        this.remotingServer.start();
        this.node = this.raftGroupService.start();
        log.info("Controller {} started.", this.node.getNodeId());
    }

    @Override // org.apache.rocketmq.controller.Controller
    public void shutdown() {
        stopScheduling();
        this.raftGroupService.shutdown();
        this.remotingServer.shutdown();
        log.info("Controller {} stopped.", this.node.getNodeId());
    }

    @Override // org.apache.rocketmq.controller.Controller
    public void startScheduling() {
    }

    @Override // org.apache.rocketmq.controller.Controller
    public void stopScheduling() {
    }

    @Override // org.apache.rocketmq.controller.Controller
    public boolean isLeaderState() {
        return this.node.isLeader();
    }

    private <T extends CommandCustomHeader> CompletableFuture<RemotingCommand> applyToJRaft(RemotingCommand remotingCommand) {
        if (!isLeaderState()) {
            RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(2007, "The controller is not in leader state");
            CompletableFuture<RemotingCommand> completableFuture = new CompletableFuture<>();
            completableFuture.complete(createResponseCommand);
            log.warn("Apply to none leader controller, controller state is {}", this.node.getNodeState());
            return completableFuture;
        }
        ControllerClosure controllerClosure = new ControllerClosure(remotingCommand);
        Task taskWithThisClosure = controllerClosure.taskWithThisClosure();
        if (taskWithThisClosure != null) {
            this.node.apply(taskWithThisClosure);
            return controllerClosure.getFuture();
        }
        log.error("Apply task failed, task is null.");
        return CompletableFuture.completedFuture(RemotingCommand.createResponseCommand(2015, "Apply task failed, Please see the server log."));
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> alterSyncStateSet(AlterSyncStateSetRequestHeader alterSyncStateSetRequestHeader, SyncStateSet syncStateSet) {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(1001, alterSyncStateSetRequestHeader);
        createRequestCommand.setBody(syncStateSet.encode());
        return applyToJRaft(createRequestCommand);
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> electMaster(ElectMasterRequestHeader electMasterRequestHeader) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1002, electMasterRequestHeader));
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> getNextBrokerId(GetNextBrokerIdRequestHeader getNextBrokerIdRequestHeader) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1012, getNextBrokerIdRequestHeader));
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> applyBrokerId(ApplyBrokerIdRequestHeader applyBrokerIdRequestHeader) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1013, applyBrokerIdRequestHeader));
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> registerBroker(RegisterBrokerToControllerRequestHeader registerBrokerToControllerRequestHeader) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1003, registerBrokerToControllerRequestHeader));
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> getReplicaInfo(GetReplicaInfoRequestHeader getReplicaInfoRequestHeader) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1004, getReplicaInfoRequestHeader));
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> getSyncStateData(List<String> list) {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(1006, new GetSyncStateDataRequest());
        createRequestCommand.setBody(RemotingSerializable.encode(list));
        return applyToJRaft(createRequestCommand);
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> cleanBrokerData(CleanControllerBrokerDataRequestHeader cleanControllerBrokerDataRequestHeader) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1011, cleanControllerBrokerDataRequestHeader));
    }

    @Override // org.apache.rocketmq.controller.Controller
    public void registerBrokerLifecycleListener(BrokerLifecycleListener brokerLifecycleListener) {
        this.brokerLifecycleListeners.add(brokerLifecycleListener);
    }

    @Override // org.apache.rocketmq.controller.Controller
    public RemotingCommand getControllerMetadata() {
        List peers = this.node.getOptions().getInitialConf().getPeers();
        StringBuilder sb = new StringBuilder();
        Iterator it = peers.iterator();
        while (it.hasNext()) {
            sb.append(this.peerIdToAddr.get((PeerId) it.next())).append(";");
        }
        return RemotingCommand.createResponseCommandWithHeader(0, new GetMetaDataResponseHeader(this.node.getGroupId(), this.node.getLeaderId() == null ? "" : this.node.getLeaderId().toString(), this.peerIdToAddr.get(this.node.getLeaderId()), this.node.isLeader(), sb.toString()));
    }

    @Override // org.apache.rocketmq.controller.Controller
    public RemotingServer getRemotingServer() {
        return this.remotingServer;
    }

    public void onLeaderStart(long j) {
        log.info("Controller start leadership, term: {}.", Long.valueOf(j));
    }

    public void onLeaderStop(Status status) {
        log.info("Controller {} stop leadership, status: {}.", this.node.getNodeId(), status);
        stopScheduling();
    }

    public CompletableFuture<RemotingCommand> getBrokerLiveInfo(GetBrokerLiveInfoRequest getBrokerLiveInfoRequest) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1016, getBrokerLiveInfoRequest));
    }

    public CompletableFuture<RemotingCommand> onBrokerHeartBeat(RaftBrokerHeartBeatEventRequest raftBrokerHeartBeatEventRequest) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1018, raftBrokerHeartBeatEventRequest));
    }

    public CompletableFuture<RemotingCommand> onBrokerCloseChannel(BrokerCloseChannelRequest brokerCloseChannelRequest) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1014, brokerCloseChannelRequest));
    }

    public CompletableFuture<RemotingCommand> checkNotActiveBroker(CheckNotActiveBrokerRequest checkNotActiveBrokerRequest) {
        return applyToJRaft(RemotingCommand.createRequestCommand(1015, checkNotActiveBrokerRequest));
    }
}
