package tech.powerjob.server.remote.transporter.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.engine.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import tech.powerjob.server.remote.transporter.ProtocolInfo;
import tech.powerjob.server.remote.transporter.TransportService;

@Service
/* loaded from: input_file:tech/powerjob/server/remote/transporter/impl/PowerTransportService.class */
public class PowerTransportService implements TransportService, InitializingBean, DisposableBean, ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(PowerTransportService.class);

    @Value("${oms.transporter.active.protocols}")
    private String activeProtocols;

    @Value("${oms.transporter.main.protocol}")
    private String mainProtocol;
    private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port";
    private final Environment environment;
    private ProtocolInfo defaultProtocol;
    private final Map<String, ProtocolInfo> protocolName2Info = Maps.newHashMap();
    private final List<RemoteEngine> engines = Lists.newArrayList();
    private ApplicationContext applicationContext;

    public PowerTransportService(Environment environment) {
        this.environment = environment;
    }

    @Override // tech.powerjob.server.remote.transporter.TransportService
    public ProtocolInfo defaultProtocol() {
        return this.defaultProtocol;
    }

    @Override // tech.powerjob.server.remote.transporter.TransportService
    public Map<String, ProtocolInfo> allProtocols() {
        return this.protocolName2Info;
    }

    private ProtocolInfo fetchProtocolInfo(String str) {
        String compatibleProtocol = compatibleProtocol(str);
        ProtocolInfo protocolInfo = this.protocolName2Info.get(compatibleProtocol);
        if (protocolInfo == null) {
            throw new IllegalArgumentException("can't find Transporter by protocol :" + compatibleProtocol);
        }
        return protocolInfo;
    }

    @Override // tech.powerjob.server.remote.transporter.TransportService
    public void tell(String str, URL url, PowerSerializable powerSerializable) {
        fetchProtocolInfo(str).getTransporter().tell(url, powerSerializable);
    }

    @Override // tech.powerjob.server.remote.transporter.TransportService
    public <T> CompletionStage<T> ask(String str, URL url, PowerSerializable powerSerializable, Class<T> cls) throws RemotingException {
        return fetchProtocolInfo(str).getTransporter().ask(url, powerSerializable, cls);
    }

    private void initRemoteFrameWork(String str, int i) {
        Map beansWithAnnotation = this.applicationContext.getBeansWithAnnotation(Actor.class);
        log.info("[PowerTransportService] find Actor num={},names={}", Integer.valueOf(beansWithAnnotation.size()), beansWithAnnotation.keySet());
        Address port = new Address().setHost(NetUtils.getLocalHost()).setPort(i);
        EngineConfig actorList = new EngineConfig().setServerType(ServerType.SERVER).setType(str.toUpperCase()).setBindAddress(port).setActorList(Lists.newArrayList(beansWithAnnotation.values()));
        log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", str, port);
        RemoteEngine powerJobRemoteEngine = new PowerJobRemoteEngine();
        EngineOutput start = powerJobRemoteEngine.start(actorList);
        log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", str, port);
        this.engines.add(powerJobRemoteEngine);
        this.protocolName2Info.put(str, new ProtocolInfo(str, port.toFullAddress(), start.getTransporter()));
    }

    public void afterPropertiesSet() throws Exception {
        log.info("[PowerTransportService] start to initialize whole PowerTransportService!");
        log.info("[PowerTransportService] activeProtocols: {}", this.activeProtocols);
        if (StringUtils.isEmpty(this.activeProtocols)) {
            throw new IllegalArgumentException("activeProtocols can't be empty!");
        }
        for (String str : this.activeProtocols.split(",")) {
            try {
                initRemoteFrameWork(str, parseProtocolPort(str));
            } catch (Throwable th) {
                log.error("[PowerTransportService] initialize protocol[{}] failed. If you don't need to use this protocol, you can turn it off by 'oms.transporter.active.protocols'", str);
                ExceptionUtils.rethrow(th);
            }
        }
        choseDefault();
        log.info("[PowerTransportService] initialize successfully!");
        log.info("[PowerTransportService] ALL_PROTOCOLS: {}", this.protocolName2Info);
    }

    private int parseProtocolPort(String str) {
        String format = String.format(PROTOCOL_PORT_CONFIG, str.toLowerCase());
        String format2 = String.format(PROTOCOL_PORT_CONFIG, str.toUpperCase());
        String property = this.environment.getProperty(format);
        if (StringUtils.isEmpty(property)) {
            property = this.environment.getProperty(format2);
        }
        log.info("[PowerTransportService] fetch port for protocol[{}], key={}, value={}", new Object[]{str, format, property});
        if (StringUtils.isEmpty(property)) {
            throw new IllegalArgumentException(String.format("can't find protocol config by key: %s, please check your spring config!", format));
        }
        return Integer.parseInt(property);
    }

    private String compatibleProtocol(String str) {
        return str == null ? Protocol.AKKA.name() : str;
    }

    private void choseDefault() {
        this.defaultProtocol = this.protocolName2Info.get(this.mainProtocol);
        log.info("[PowerTransportService] chose [{}] as the default protocol, make sure this protocol can work!", this.mainProtocol);
        if (this.defaultProtocol == null) {
            throw new IllegalArgumentException("can't find default protocol, please check your config!");
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void destroy() throws Exception {
        this.engines.forEach(remoteEngine -> {
            try {
                remoteEngine.close();
            } catch (Exception e) {
            }
        });
    }
}
