/*
 * Decompiled with CFR 0.152.
 */
package com.jxdinfo.hussar.support.job.dispatch.remote.server.election;

import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Sets;
import com.jxdinfo.hussar.support.job.core.enums.Protocol;
import com.jxdinfo.hussar.support.job.core.exception.JobRuntimeException;
import com.jxdinfo.hussar.support.job.core.response.AskResponse;
import com.jxdinfo.hussar.support.job.core.serialize.JsonUtils;
import com.jxdinfo.hussar.support.job.core.support.JobAppInfoService;
import com.jxdinfo.hussar.support.job.core.support.entity.JobAppInfoEntity;
import com.jxdinfo.hussar.support.job.dispatch.extension.LockService;
import com.jxdinfo.hussar.support.job.dispatch.remote.config.JobDispatchProperties;
import com.jxdinfo.hussar.support.job.dispatch.remote.server.election.Ping;
import com.jxdinfo.hussar.support.job.dispatch.remote.transport.TransportService;
import com.jxdinfo.hussar.support.job.dispatch.remote.transport.starter.AkkaStarter;
import java.io.Serializable;
import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class ServerElectionService {
    private static Logger log = LoggerFactory.getLogger(ServerElectionService.class);
    @Resource
    private LockService lockService;
    @Resource
    private TransportService transportService;
    @Resource
    private JobAppInfoService jobAppInfoService;
    @Resource
    private JobDispatchProperties jobDispatchProperties;
    private static final int RETRY_TIMES = 10;
    private static final long PING_TIMEOUT_MS = 1000L;
    private static final String SERVER_ELECT_LOCK = "server_elect_%d";

    public String elect(Long appId, String protocol, String currentServer) {
        if (!this.accurate() && this.getProtocolServerAddress(protocol).equals(currentServer)) {
            return currentServer;
        }
        return this.getServer0(appId, protocol);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String getServer0(Long appId, String protocol) {
        HashSet downServerCache = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            Optional<Object> appInfoOpt = Optional.ofNullable(this.jobAppInfoService.getById((Serializable)appId));
            if (!appInfoOpt.isPresent()) {
                throw new JobRuntimeException(appId + " is not registered!");
            }
            String appName = ((JobAppInfoEntity)appInfoOpt.get()).getAppName();
            String originServer = ((JobAppInfoEntity)appInfoOpt.get()).getCurrentServer();
            String activeAddress = this.activeAddress(originServer, downServerCache, protocol);
            if (StringUtils.isNotEmpty((CharSequence)activeAddress)) {
                return activeAddress;
            }
            String lockName = String.format(SERVER_ELECT_LOCK, appId);
            boolean lockStatus = this.lockService.tryLock(lockName, 30000L);
            if (!lockStatus) {
                try {
                    Thread.sleep(500L);
                }
                catch (Exception exception) {}
                continue;
            }
            try {
                JobAppInfoEntity appInfo = (JobAppInfoEntity)Optional.ofNullable(this.jobAppInfoService.getById((Serializable)appId)).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
                String address = this.activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);
                if (StringUtils.isNotEmpty((CharSequence)address)) {
                    String string = address;
                    return string;
                }
                appInfo.setCurrentServer(this.transportService.getTransporter(Protocol.AKKA).getAddress());
                this.jobAppInfoService.saveOrUpdate((Object)appInfo);
                log.info("[ServerElection] this server({}) become the new server for app(appId={}).", (Object)appInfo.getCurrentServer(), (Object)appId);
                String string = this.getProtocolServerAddress(protocol);
                return string;
            }
            catch (Exception e) {
                log.error("[ServerElection] write new server to db failed for app {}.", (Object)appName, (Object)e);
                continue;
            }
            finally {
                this.lockService.unlock(lockName);
            }
        }
        throw new JobRuntimeException("server elect failed for app " + appId);
    }

    private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {
        if (downServerCache.contains(serverAddress)) {
            return null;
        }
        if (StringUtils.isEmpty((CharSequence)serverAddress)) {
            return null;
        }
        Ping ping = new Ping();
        ping.setCurrentTime(System.currentTimeMillis());
        ActorSelection serverActor = AkkaStarter.getFriendActor(serverAddress);
        try {
            CompletionStage askCS = Patterns.ask((ActorSelection)serverActor, (Object)ping, (Duration)Duration.ofMillis(1000L));
            AskResponse response = (AskResponse)askCS.toCompletableFuture().get(1000L, TimeUnit.MILLISECONDS);
            downServerCache.remove(serverAddress);
            if (response.isSuccess()) {
                return ((JSONObject)JsonUtils.parseObject((byte[])response.getData(), JSONObject.class)).getString(protocol);
            }
        }
        catch (Exception e) {
            log.warn("[ServerElection] server({}) was down.", (Object)serverAddress);
        }
        downServerCache.add(serverAddress);
        return null;
    }

    private boolean accurate() {
        return ThreadLocalRandom.current().nextInt(100) < this.jobDispatchProperties.getAccurateSelectServerPercentage();
    }

    private String getProtocolServerAddress(String protocol) {
        Protocol pt = Protocol.of((String)protocol);
        return TransportService.getProtocol2Transporter().get(pt).getAddress();
    }
}

