/*
 * Decompiled with CFR 0.152.
 */
package azkaban.executor;

import azkaban.alert.Alerter;
import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionControllerUtils;
import azkaban.executor.ExecutionFinalizer;
import azkaban.executor.ExecutionReference;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorApiGateway;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.ExecutorManagerUpdaterStage;
import azkaban.executor.RunningExecutions;
import azkaban.executor.Status;
import azkaban.metrics.CommonMetrics;
import azkaban.utils.Pair;
import com.webank.wedatasphere.schedulis.common.executor.ExecutionCycle;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.inject.Inject;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RunningExecutionsUpdater {
    private static final Logger logger = LoggerFactory.getLogger(RunningExecutionsUpdater.class);
    final int numErrorsBeforeUnresponsiveEmail = 6;
    final long errorThreshold = 10000L;
    private final int numErrorsBetweenUnresponsiveEmail = 360;
    private final ExecutorManagerUpdaterStage updaterStage;
    private final AlerterHolder alerterHolder;
    private final CommonMetrics commonMetrics;
    private final ExecutorApiGateway apiGateway;
    private final RunningExecutions runningExecutions;
    private final ExecutionFinalizer executionFinalizer;
    private final ExecutorLoader executorLoader;

    @Inject
    public RunningExecutionsUpdater(ExecutorManagerUpdaterStage updaterStage, AlerterHolder alerterHolder, CommonMetrics commonMetrics, ExecutorApiGateway apiGateway, RunningExecutions runningExecutions, ExecutionFinalizer executionFinalizer, ExecutorLoader executorLoader) {
        this.updaterStage = updaterStage;
        this.alerterHolder = alerterHolder;
        this.commonMetrics = commonMetrics;
        this.apiGateway = apiGateway;
        this.runningExecutions = runningExecutions;
        this.executionFinalizer = executionFinalizer;
        this.executorLoader = executorLoader;
    }

    public void updateExecutions() {
        this.updaterStage.set("Starting update all flows.");
        Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap = this.getFlowToExecutorMap();
        ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
        for (Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap.entrySet()) {
            Optional<Executor> executorOption = entry.getKey();
            if (!executorOption.isPresent()) {
                for (ExecutableFlow flow : entry.getValue()) {
                    logger.warn("Finalizing execution " + flow.getExecutionId() + ". Executor id of this execution doesn't exist");
                    finalizeFlows.add(flow);
                }
                continue;
            }
            Executor executor = executorOption.get();
            this.updaterStage.set("Starting update flows on " + executor.getHost() + ":" + executor.getPort());
            Map<String, Object> results = null;
            try {
                results = this.apiGateway.updateExecutions(executor, entry.getValue());
            }
            catch (ExecutorManagerException e) {
                this.handleException(entry, executor, e, finalizeFlows);
            }
            if (results == null) continue;
            List executionUpdates = (List)results.get("updated");
            for (Map updateMap : executionUpdates) {
                try {
                    ExecutableFlow flow = this.updateExecution(updateMap);
                    this.updaterStage.set("Updated flow " + flow.getExecutionId());
                    if (!ExecutionControllerUtils.isFinished(flow)) continue;
                    finalizeFlows.add(flow);
                }
                catch (ExecutorManagerException e) {
                    ExecutableFlow flow = e.getExecutableFlow();
                    logger.error("", (Throwable)e);
                    if (flow == null) continue;
                    logger.warn("Finalizing execution " + flow.getExecutionId());
                    finalizeFlows.add(flow);
                }
            }
        }
        this.updaterStage.set("Finalizing " + finalizeFlows.size() + " error flows.");
        for (ExecutableFlow flow : finalizeFlows) {
            this.executionFinalizer.finalizeFlow(flow, "Not running on the assigned executor (any more)", null);
        }
        this.updaterStage.set("Updated all active flows. Waiting for next round.");
    }

    private void handleException(Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry, Executor executor, ExecutorManagerException e, ArrayList<ExecutableFlow> finalizeFlows) {
        logger.error("Failed to get update from executor " + executor.getHost(), (Throwable)e);
        boolean sendUnresponsiveEmail = false;
        boolean executorRemoved = this.isExecutorRemoved(executor.getId());
        for (ExecutableFlow flow : entry.getValue()) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(flow.getExecutionId());
            this.updaterStage.set("Failed to get update for flow " + pair.getSecond().getExecutionId());
            if (executorRemoved) {
                logger.warn("Finalizing execution " + flow.getExecutionId() + ". Executor is removed");
                finalizeFlows.add(flow);
                this.finalizeCycleFlow(flow);
                continue;
            }
            ExecutionReference ref = pair.getFirst();
            ref.setNextCheckTime(DateTime.now().getMillis() + this.errorThreshold);
            ref.setNumErrors(ref.getNumErrors() + 1);
            if (ref.getNumErrors() != this.numErrorsBeforeUnresponsiveEmail && ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail != 0) continue;
            sendUnresponsiveEmail = true;
            this.finalizeCycleFlow(flow);
        }
        if (sendUnresponsiveEmail) {
            Alerter alerter = this.alerterHolder.get("email");
        }
    }

    private boolean isExecutorRemoved(int id) {
        Executor fetchedExecutor;
        try {
            fetchedExecutor = this.executorLoader.fetchExecutor(id);
        }
        catch (ExecutorManagerException e) {
            logger.error("Couldn't check if executor exists", (Throwable)e);
            return false;
        }
        return fetchedExecutor == null;
    }

    private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
        HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap = new HashMap<Optional<Executor>, List<ExecutableFlow>>();
        for (Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningExecutions.get().values()) {
            ExecutionReference ref = runningFlow.getFirst();
            ExecutableFlow flow = runningFlow.getSecond();
            Optional<Executor> executor = ref.getExecutor();
            if (ref.getNextCheckTime() >= DateTime.now().getMillis()) continue;
            List<ExecutableFlow> flows = exFlowMap.get(executor);
            if (flows == null) {
                flows = new ArrayList<ExecutableFlow>();
                exFlowMap.put(executor, flows);
            }
            flows.add(flow);
        }
        return exFlowMap;
    }

    private ExecutableFlow updateExecution(Map<String, Object> updateData) throws ExecutorManagerException {
        Integer execId = (Integer)updateData.get("executionId");
        if (execId == null) {
            throw new ExecutorManagerException("Response is malformed. Need exec id to update.");
        }
        Pair<ExecutionReference, ExecutableFlow> refPair = this.runningExecutions.get().get(execId);
        if (refPair == null) {
            throw new ExecutorManagerException("No execution found in the map with the execution id any more. Removing " + execId);
        }
        ExecutionReference ref = refPair.getFirst();
        ExecutableFlow flow = refPair.getSecond();
        if (updateData.containsKey("error")) {
            throw new ExecutorManagerException((String)updateData.get("error"), flow);
        }
        ref.setNextCheckTime(0L);
        ref.setNumErrors(0);
        Status oldStatus = flow.getStatus();
        flow.applyUpdateObject(updateData);
        Status newStatus = flow.getStatus();
        if (oldStatus != newStatus && newStatus == Status.FAILED) {
            this.commonMetrics.markFlowFail();
        }
        if (oldStatus != newStatus && newStatus.equals((Object)Status.FAILED_FINISHING)) {
            ExecutionControllerUtils.alertUserOnFirstError(flow, this.alerterHolder);
        }
        return flow;
    }

    public AlerterHolder getAlerterHolder() {
        return this.alerterHolder;
    }

    private void finalizeCycleFlow(ExecutableFlow flow) {
        try {
            ExecutionCycle cycleFlow;
            if (flow.getFlowType() == 4 && (cycleFlow = this.executorLoader.getExecutionCycleFlow(String.valueOf(flow.getProjectId()), flow.getId())) != null && cycleFlow.getStatus() == Status.RUNNING) {
                cycleFlow.setStatus(Status.FAILED);
                cycleFlow.setEndTime(System.currentTimeMillis());
                this.executorLoader.updateExecutionFlow(cycleFlow);
                ExecutionControllerUtils.alertOnCycleFlowInterrupt(flow, cycleFlow, this.alerterHolder);
            }
        }
        catch (ExecutorManagerException e) {
            logger.error("finalize cycle flow error execId:" + flow.getExecutionId(), (Throwable)e);
        }
    }
}

