/*
 * Decompiled with CFR 0.152.
 */
package com.iohao.game.bolt.broker.server.processor;

import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.exception.RemotingException;
import com.iohao.game.action.skeleton.protocol.external.RequestCollectExternalMessage;
import com.iohao.game.action.skeleton.protocol.external.ResponseCollectExternalItemMessage;
import com.iohao.game.action.skeleton.protocol.external.ResponseCollectExternalMessage;
import com.iohao.game.bolt.broker.core.common.AbstractAsyncUserProcessor;
import com.iohao.game.bolt.broker.server.BrokerServer;
import com.iohao.game.bolt.broker.server.aware.BrokerServerAware;
import com.iohao.game.bolt.broker.server.balanced.BalancedManager;
import com.iohao.game.bolt.broker.server.balanced.ExternalBrokerClientLoadBalanced;
import com.iohao.game.bolt.broker.server.balanced.region.BrokerClientProxy;
import com.iohao.game.bolt.broker.server.processor.BrokerExternalKit;
import com.iohao.game.common.kit.CompletableFutureKit;
import com.iohao.game.core.common.NetCommonKit;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class InnerModuleRequestCollectExternalMessageBrokerProcessor
extends AbstractAsyncUserProcessor<RequestCollectExternalMessage>
implements BrokerServerAware {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InnerModuleRequestCollectExternalMessageBrokerProcessor.class);
    private BrokerServer brokerServer;

    public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, RequestCollectExternalMessage requestCollectMessage) {
        BalancedManager balancedManager = this.brokerServer.getBalancedManager();
        ExternalBrokerClientLoadBalanced balanced = balancedManager.getExternalLoadBalanced();
        List<CompletableFuture<ResponseCollectExternalItemMessage>> futureList = this.listFuture(requestCollectMessage, balanced);
        CompletableFutureKit.sequenceAsync(futureList).thenAccept(messageList -> {
            ResponseCollectExternalMessage responseCollectMessage = new ResponseCollectExternalMessage();
            responseCollectMessage.setMessageList(messageList);
            asyncCtx.sendResponse((Object)responseCollectMessage);
        });
    }

    private List<CompletableFuture<ResponseCollectExternalItemMessage>> listFuture(RequestCollectExternalMessage requestCollectMessage, ExternalBrokerClientLoadBalanced externalLoadBalanced) {
        int sourceClientId = requestCollectMessage.getSourceClientId();
        Stream<BrokerClientProxy> stream = BrokerExternalKit.streamToggle(sourceClientId, externalLoadBalanced);
        return stream.map(brokerClientProxy -> CompletableFuture.supplyAsync(() -> {
            ResponseCollectExternalItemMessage itemMessage;
            try {
                itemMessage = (ResponseCollectExternalItemMessage)brokerClientProxy.invokeSync(requestCollectMessage);
            }
            catch (RemotingException | InterruptedException e) {
                log.error(e.getMessage(), e);
                return null;
            }
            if (itemMessage == null) {
                return null;
            }
            String logicServerId = brokerClientProxy.getId();
            return itemMessage.setLogicServerId(logicServerId);
        }, NetCommonKit.getVirtualExecutor())).toList();
    }

    public String interest() {
        return RequestCollectExternalMessage.class.getName();
    }

    @Override
    @Generated
    public void setBrokerServer(BrokerServer brokerServer) {
        this.brokerServer = brokerServer;
    }
}

