/*
 * Decompiled with CFR 0.152.
 */
package com.iohao.game.bolt.broker.server.processor;

import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.exception.RemotingException;
import com.iohao.game.action.skeleton.core.exception.ActionErrorEnum;
import com.iohao.game.action.skeleton.core.exception.MsgExceptionInfo;
import com.iohao.game.action.skeleton.protocol.HeadMetadata;
import com.iohao.game.action.skeleton.protocol.RequestMessage;
import com.iohao.game.action.skeleton.protocol.ResponseMessage;
import com.iohao.game.action.skeleton.protocol.collect.RequestCollectMessage;
import com.iohao.game.action.skeleton.protocol.collect.ResponseCollectItemMessage;
import com.iohao.game.action.skeleton.protocol.collect.ResponseCollectMessage;
import com.iohao.game.bolt.broker.cluster.BrokerClusterManager;
import com.iohao.game.bolt.broker.cluster.BrokerRunModeEnum;
import com.iohao.game.bolt.broker.core.common.AbstractAsyncUserProcessor;
import com.iohao.game.bolt.broker.core.common.IoGameGlobalConfig;
import com.iohao.game.bolt.broker.server.BrokerServer;
import com.iohao.game.bolt.broker.server.aware.BrokerServerAware;
import com.iohao.game.bolt.broker.server.balanced.BalancedManager;
import com.iohao.game.bolt.broker.server.balanced.LogicBrokerClientLoadBalanced;
import com.iohao.game.bolt.broker.server.balanced.region.BrokerClientProxy;
import com.iohao.game.bolt.broker.server.balanced.region.BrokerClientRegion;
import com.iohao.game.common.kit.CompletableFutureKit;
import com.iohao.game.core.common.NetCommonKit;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class InnerModuleRequestCollectMessageBrokerProcessor
extends AbstractAsyncUserProcessor<RequestCollectMessage>
implements BrokerServerAware {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InnerModuleRequestCollectMessageBrokerProcessor.class);
    private BrokerServer brokerServer;

    public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, RequestCollectMessage requestCollectMessage) {
        RequestMessage requestMessage = requestCollectMessage.getRequestMessage();
        HeadMetadata headMetadata = requestMessage.getHeadMetadata();
        int cmdMerge = headMetadata.getCmdMerge();
        BalancedManager balancedManager = this.brokerServer.getBalancedManager();
        LogicBrokerClientLoadBalanced logicBalanced = balancedManager.getLogicBalanced();
        BrokerClientRegion brokerClientRegion = logicBalanced.getBrokerClientRegion(cmdMerge);
        if (brokerClientRegion == null) {
            ResponseCollectMessage responseCollectMessage = new ResponseCollectMessage();
            responseCollectMessage.setError((MsgExceptionInfo)ActionErrorEnum.cmdInfoErrorCode);
            asyncCtx.sendResponse((Object)responseCollectMessage);
            return;
        }
        List<CompletableFuture<ResponseCollectItemMessage>> futureList = this.listFuture(requestMessage, brokerClientRegion);
        CompletableFutureKit.sequenceAsync(futureList).thenAcceptAsync(messageList -> {
            ResponseCollectMessage responseCollectMessage = new ResponseCollectMessage();
            responseCollectMessage.setMessageList(messageList);
            asyncCtx.sendResponse((Object)responseCollectMessage);
            this.print(responseCollectMessage);
        }, NetCommonKit.getVirtualExecutor());
    }

    private void print(ResponseCollectMessage responseCollectMessage) {
        if (IoGameGlobalConfig.requestResponseLog) {
            if (this.brokerServer.getBrokerRunMode() != BrokerRunModeEnum.CLUSTER) {
                return;
            }
            int port = this.brokerServer.getPort();
            String brokerId = this.brokerServer.getBrokerId();
            BrokerClusterManager brokerClusterManager = this.brokerServer.getBrokerClusterManager();
            int gossipListenPort = brokerClusterManager.getGossipListenPort();
            log.info("\n port [{}] gossipListenPort [{}] id [{}] \n responseAggregationMessage : {}", new Object[]{port, gossipListenPort, brokerId, responseCollectMessage});
        }
    }

    private List<CompletableFuture<ResponseCollectItemMessage>> listFuture(RequestMessage requestMessage, BrokerClientRegion brokerClientRegion) {
        Stream<BrokerClientProxy> stream = brokerClientRegion.listBrokerClientProxy().stream();
        return stream.map(brokerClientProxy -> CompletableFuture.supplyAsync(() -> {
            byte[] data;
            ResponseMessage responseMessage;
            try {
                responseMessage = (ResponseMessage)brokerClientProxy.invokeSync(requestMessage);
            }
            catch (RemotingException | InterruptedException e) {
                log.error(e.getMessage(), e);
                return null;
            }
            if (responseMessage == null || responseMessage.hasError() || (data = responseMessage.getData()) == null || data.length == 0) {
                return null;
            }
            String logicServerId = brokerClientProxy.getId();
            return new ResponseCollectItemMessage().setResponseMessage(responseMessage).setLogicServerId(logicServerId);
        }, NetCommonKit.getVirtualExecutor())).toList();
    }

    public String interest() {
        return RequestCollectMessage.class.getName();
    }

    @Override
    @Generated
    public void setBrokerServer(BrokerServer brokerServer) {
        this.brokerServer = brokerServer;
    }
}

