package com.iohao.game.bolt.broker.server.processor;

import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.exception.RemotingException;
import com.iohao.game.action.skeleton.core.exception.ActionErrorEnum;
import com.iohao.game.action.skeleton.protocol.RequestMessage;
import com.iohao.game.action.skeleton.protocol.ResponseMessage;
import com.iohao.game.action.skeleton.protocol.collect.RequestCollectMessage;
import com.iohao.game.action.skeleton.protocol.collect.ResponseCollectItemMessage;
import com.iohao.game.action.skeleton.protocol.collect.ResponseCollectMessage;
import com.iohao.game.bolt.broker.cluster.BrokerRunModeEnum;
import com.iohao.game.bolt.broker.core.common.AbstractAsyncUserProcessor;
import com.iohao.game.bolt.broker.core.common.IoGameGlobalConfig;
import com.iohao.game.bolt.broker.server.BrokerServer;
import com.iohao.game.bolt.broker.server.aware.BrokerServerAware;
import com.iohao.game.bolt.broker.server.balanced.region.BrokerClientRegion;
import com.iohao.game.common.kit.CompletableFutureKit;
import com.iohao.game.core.common.NetCommonKit;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/iohao/game/bolt/broker/server/processor/InnerModuleRequestCollectMessageBrokerProcessor.class */
public final class InnerModuleRequestCollectMessageBrokerProcessor extends AbstractAsyncUserProcessor<RequestCollectMessage> implements BrokerServerAware {
    private static final Logger log = LoggerFactory.getLogger(InnerModuleRequestCollectMessageBrokerProcessor.class);
    private BrokerServer brokerServer;

    public void handleRequest(BizContext bizContext, AsyncContext asyncContext, RequestCollectMessage requestCollectMessage) {
        RequestMessage requestMessage = requestCollectMessage.getRequestMessage();
        BrokerClientRegion brokerClientRegion = this.brokerServer.getBalancedManager().getLogicBalanced().getBrokerClientRegion(requestMessage.getHeadMetadata().getCmdMerge());
        if (brokerClientRegion != null) {
            CompletableFutureKit.sequenceAsync(listFuture(requestMessage, brokerClientRegion)).thenAcceptAsync(list -> {
                ResponseCollectMessage responseCollectMessage = new ResponseCollectMessage();
                responseCollectMessage.setMessageList(list);
                asyncContext.sendResponse(responseCollectMessage);
                print(responseCollectMessage);
            }, NetCommonKit.getVirtualExecutor());
            return;
        }
        ResponseCollectMessage responseCollectMessage = new ResponseCollectMessage();
        responseCollectMessage.setError(ActionErrorEnum.cmdInfoErrorCode);
        asyncContext.sendResponse(responseCollectMessage);
    }

    private void print(ResponseCollectMessage responseCollectMessage) {
        if (IoGameGlobalConfig.requestResponseLog && this.brokerServer.getBrokerRunMode() == BrokerRunModeEnum.CLUSTER) {
            int port = this.brokerServer.getPort();
            log.info("\n port [{}] gossipListenPort [{}] id [{}] \n responseAggregationMessage : {}", new Object[]{Integer.valueOf(port), Integer.valueOf(this.brokerServer.getBrokerClusterManager().getGossipListenPort()), this.brokerServer.getBrokerId(), responseCollectMessage});
        }
    }

    private List<CompletableFuture<ResponseCollectItemMessage>> listFuture(RequestMessage requestMessage, BrokerClientRegion brokerClientRegion) {
        return brokerClientRegion.listBrokerClientProxy().stream().map(brokerClientProxy -> {
            return CompletableFuture.supplyAsync(() -> {
                byte[] data;
                try {
                    ResponseMessage responseMessage = (ResponseMessage) brokerClientProxy.invokeSync(requestMessage);
                    if (responseMessage == null || responseMessage.hasError() || (data = responseMessage.getData()) == null || data.length == 0) {
                        return null;
                    }
                    return new ResponseCollectItemMessage().setResponseMessage(responseMessage).setLogicServerId(brokerClientProxy.getId());
                } catch (RemotingException | InterruptedException e) {
                    log.error(e.getMessage(), e);
                    return null;
                }
            }, NetCommonKit.getVirtualExecutor());
        }).toList();
    }

    public String interest() {
        return RequestCollectMessage.class.getName();
    }

    @Override // com.iohao.game.bolt.broker.server.aware.BrokerServerAware
    public void setBrokerServer(BrokerServer brokerServer) {
        this.brokerServer = brokerServer;
    }
}
