/*
 * Decompiled with CFR 0.152.
 */
package com.webank.wedatasphere.schedulis.common.executor;

import azkaban.db.DatabaseOperator;
import azkaban.event.EventHandler;
import azkaban.executor.ActiveExecutors;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableJobInfo;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionFinalizer;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionReference;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorApiGateway;
import azkaban.executor.ExecutorInfo;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.ExecutorManagerUpdaterStage;
import azkaban.executor.QueuedExecutions;
import azkaban.executor.RunningExecutions;
import azkaban.executor.RunningExecutionsUpdaterThread;
import azkaban.executor.Status;
import azkaban.executor.selector.ExecutorComparator;
import azkaban.executor.selector.ExecutorFilter;
import azkaban.executor.selector.ExecutorSelector;
import azkaban.flow.Flow;
import azkaban.flow.FlowUtils;
import azkaban.history.ExecutionRecover;
import azkaban.metrics.CommonMetrics;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.project.ProjectWhitelist;
import azkaban.user.User;
import azkaban.utils.AuthenticationUtils;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.Utils;
import azkaban.utils.WebUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.webank.wedatasphere.schedulis.common.executor.ExecutionCycle;
import com.webank.wedatasphere.schedulis.common.executor.ExecutorQueueLoader;
import com.webank.wedatasphere.schedulis.common.jobExecutor.utils.SystemBuiltInParamJodeTimeUtils;
import com.webank.wedatasphere.schedulis.common.log.LogFilterEntity;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.LocalDateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class ExecutorManagerHA
extends EventHandler
implements ExecutorManagerAdapter {
    public static final String HISTORY_RECOVER_INTERVAL_MS = "history.recover.interval.ms";
    private static final String SPARK_JOB_TYPE = "spark";
    private static final String APPLICATION_ID = "${application.id}";
    private static final Pattern APPLICATION_ID_PATTERN = Pattern.compile("application_\\d+_\\d+");
    private static final Pattern IS_NUMBER = Pattern.compile("^[0-9]+$");
    private static final Pattern FAILED_TO_READ_APPLICATION_PATTERN = Pattern.compile("Failed to read the application");
    private static final Pattern INVALID_APPLICATION_ID_PATTERN = Pattern.compile("Invalid Application ID");
    private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
    private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 7257600000L;
    private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10L);
    private static final Logger logger = LoggerFactory.getLogger(ExecutorManagerHA.class);
    private final RunningExecutions runningExecutions;
    private final Props azkProps;
    private final CommonMetrics commonMetrics;
    private final ExecutorLoader executorLoader;
    private final ExecutorQueueLoader executorQueueLoader;
    private ProjectLoader projectLoader;
    private final CleanerThread cleanerThread;
    private final RecoverThread recoverThread;
    private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap();
    private final RunningExecutionsUpdaterThread updaterThread;
    private final ExecutorApiGateway apiGateway;
    private final int maxConcurrentRunsOneFlow;
    private final ExecutorManagerUpdaterStage updaterStage;
    private final ExecutionFinalizer executionFinalizer;
    private final ActiveExecutors activeExecutors;
    private final ExecutorService executorInfoRefresherService;
    QueuedExecutions queuedFlows;
    File cacheDir;
    private QueueProcessorThread queueProcessor;
    private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
    private long lastCleanerThreadCheckTime = -1L;
    private long lastThreadCheckTime = -1L;
    private List<String> filterList;
    private Map<String, Integer> comparatorWeightsMap;
    private long lastSuccessfulExecutorInfoRefresh;
    private Duration sleepAfterDispatchFailure = Duration.ofSeconds(1L);
    private boolean initialized = false;
    private ExecutorService executorInforRefresherService;
    private DatabaseOperator dbOperator;

    @Inject
    public ExecutorManagerHA(Props azkProps, ExecutorLoader executorLoader, ExecutorQueueLoader executorQueueLoader, CommonMetrics commonMetrics, ExecutorApiGateway apiGateway, RunningExecutions runningExecutions, ActiveExecutors activeExecutors, ExecutorManagerUpdaterStage updaterStage, ExecutionFinalizer executionFinalizer, RunningExecutionsUpdaterThread updaterThread, ProjectLoader projectLoader, DatabaseOperator dbOperator) throws ExecutorManagerException {
        this.azkProps = azkProps;
        this.commonMetrics = commonMetrics;
        this.executorLoader = executorLoader;
        this.executorQueueLoader = executorQueueLoader;
        this.projectLoader = projectLoader;
        this.apiGateway = apiGateway;
        this.runningExecutions = runningExecutions;
        this.activeExecutors = activeExecutors;
        this.updaterStage = updaterStage;
        this.executionFinalizer = executionFinalizer;
        this.updaterThread = updaterThread;
        this.maxConcurrentRunsOneFlow = this.getMaxConcurrentRunsOneFlow(azkProps);
        this.cleanerThread = this.createCleanerThread();
        this.recoverThread = this.createRecoverThread();
        this.executorInfoRefresherService = this.createExecutorInfoRefresherService();
        this.dbOperator = dbOperator;
    }

    private int getMaxConcurrentRunsOneFlow(Props azkProps) {
        return azkProps.getInt("azkaban.max.concurrent.runs.oneflow", 30);
    }

    private CleanerThread createCleanerThread() {
        long executionLogsRetentionMs = this.azkProps.getLong("execution.logs.retention.ms", 7257600000L);
        return new CleanerThread(executionLogsRetentionMs);
    }

    @Override
    public Props getAzkabanProps() {
        return this.azkProps;
    }

    private RecoverThread createRecoverThread() {
        long waitTime = this.azkProps.getLong(HISTORY_RECOVER_INTERVAL_MS, 10000L);
        return new RecoverThread(waitTime);
    }

    void initialize() throws ExecutorManagerException {
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        this.setupExecutors();
        this.loadRunningExecutions();
        this.queuedFlows = new QueuedExecutions(this.azkProps.getLong("azkaban.webserver.queue.size", 100000L));
        this.cacheDir = new File(this.azkProps.getString("cache.directory", "cache"));
        this.setupExecutotrComparatorWeightsMap();
        this.setupExecutorFilterList();
        this.queueProcessor = this.setupQueueProcessor();
    }

    @Override
    public void start() throws ExecutorManagerException {
        this.initialize();
        this.updaterThread.start();
        this.cleanerThread.start();
        this.queueProcessor.start();
        this.recoverThread.start();
    }

    private String findApplicationIdFromLog(String logData) {
        Matcher matcher = APPLICATION_ID_PATTERN.matcher(logData);
        String appId = null;
        if (matcher.find()) {
            appId = matcher.group().substring(12);
        }
        logger.info("Application ID is " + appId);
        return appId;
    }

    private QueueProcessorThread setupQueueProcessor() {
        return new QueueProcessorThread(this.azkProps.getBoolean("azkaban.queueprocessing.enabled", true), this.azkProps.getLong("azkaban.activeexecutor.refresh.milisecinterval", 50000L), this.azkProps.getInt("azkaban.activeexecutor.refresh.flowinterval", 5), this.azkProps.getInt("azkaban.maxDispatchingErrors", this.activeExecutors.getAll().size()), this.sleepAfterDispatchFailure);
    }

    private void setupExecutotrComparatorWeightsMap() {
        Map compListStrings = this.azkProps.getMapByPrefix("azkaban.executorselector.comparator.");
        if (compListStrings != null) {
            this.comparatorWeightsMap = new TreeMap<String, Integer>();
            for (Map.Entry entry : compListStrings.entrySet()) {
                this.comparatorWeightsMap.put((String)entry.getKey(), Integer.valueOf((String)entry.getValue()));
            }
        }
    }

    private void setupExecutorFilterList() {
        String filters = this.azkProps.getString("azkaban.executorselector.filters", "");
        if (filters != null) {
            this.filterList = Arrays.asList(StringUtils.split((String)filters, (String)","));
        }
    }

    private ExecutorService createExecutorInfoRefresherService() {
        return Executors.newFixedThreadPool(this.azkProps.getInt("azkaban.executorinfo.refresh.maxThreads", 5));
    }

    @Override
    public void setupExecutors() throws ExecutorManagerException {
        this.checkMultiExecutorMode();
        this.activeExecutors.setupExecutors();
    }

    @Deprecated
    private void checkMultiExecutorMode() {
        if (!this.azkProps.getBoolean("azkaban.use.multiple.executors", false)) {
            throw new IllegalArgumentException("azkaban.use.multiple.executors must be true. Single executor mode is not supported any more.");
        }
    }

    private void refreshExecutors() {
        ArrayList<Pair<Executor, Future<ExecutorInfo>>> futures = new ArrayList<Pair<Executor, Future<ExecutorInfo>>>();
        for (Executor executor : this.activeExecutors.getAll()) {
            Future<ExecutorInfo> future = this.executorInfoRefresherService.submit(() -> this.apiGateway.callForJsonType(executor.getHost(), executor.getPort(), "/serverStatistics", null, ExecutorInfo.class));
            futures.add(new Pair<Executor, Future<ExecutorInfo>>(executor, future));
        }
        boolean wasSuccess = true;
        for (Pair pair : futures) {
            Executor executor = (Executor)pair.getFirst();
            executor.setExecutorInfo(null);
            try {
                ExecutorInfo executorInfo = (ExecutorInfo)((Future)pair.getSecond()).get(5L, TimeUnit.SECONDS);
                executor.setExecutorInfo(executorInfo);
                logger.info(String.format("Successfully refreshed executor: %s with executor info : %s", executor, executorInfo));
            }
            catch (TimeoutException e) {
                wasSuccess = false;
                logger.error("Timed out while waiting for ExecutorInfo refresh" + executor, (Throwable)e);
            }
            catch (Exception e) {
                wasSuccess = false;
                logger.error("Failed to update ExecutorInfo for executor : " + executor, (Throwable)e);
            }
            if (!wasSuccess) continue;
            this.lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
        }
    }

    @Override
    public void disableQueueProcessorThread() {
        this.queueProcessor.setActive(false);
    }

    @Override
    public void enableQueueProcessorThread() {
        this.queueProcessor.setActive(true);
    }

    public Thread.State getQueueProcessorThreadState() {
        return this.queueProcessor.getState();
    }

    public boolean isQueueProcessorThreadActive() {
        return this.queueProcessor.isActive();
    }

    public long getLastSuccessfulExecutorInfoRefresh() {
        return this.lastSuccessfulExecutorInfoRefresh;
    }

    public Set<String> getAvailableExecutorComparatorNames() {
        return ExecutorComparator.getAvailableComparatorNames();
    }

    public Set<String> getAvailableExecutorFilterNames() {
        return ExecutorFilter.getAvailableFilterNames();
    }

    @Override
    public Thread.State getExecutorManagerThreadState() {
        return this.updaterThread.getState();
    }

    public String getExecutorThreadStage() {
        return this.updaterStage.get();
    }

    @Override
    public boolean isExecutorManagerThreadActive() {
        return this.updaterThread.isAlive();
    }

    @Override
    public long getLastExecutorManagerThreadCheckTime() {
        return this.updaterThread.getLastThreadCheckTime();
    }

    @Override
    public Collection<Executor> getAllActiveExecutors() {
        return Collections.unmodifiableCollection(this.activeExecutors.getAll());
    }

    @Override
    public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
        for (Executor executor : this.activeExecutors.getAll()) {
            if (executor.getId() != executorId) continue;
            return executor;
        }
        return this.executorLoader.fetchExecutor(executorId);
    }

    public Set<String> getPrimaryServerHosts() {
        HashSet<String> ports = new HashSet<String>();
        for (Executor executor : this.activeExecutors.getAll()) {
            ports.add(executor.getHost() + ":" + executor.getPort());
        }
        return ports;
    }

    @Override
    public Set<String> getAllActiveExecutorServerHosts() {
        HashSet<String> ports = new HashSet<String>();
        for (Executor executor : this.activeExecutors.getAll()) {
            ports.add(executor.getHost() + ":" + executor.getPort());
        }
        try {
            this.loadRunningExecutions();
        }
        catch (ExecutorManagerException e) {
            logger.error("getAllActiveExecutorServerHosts loadQueuedFlows failed", (Throwable)e);
        }
        for (Pair pair : this.runningExecutions.get().values()) {
            ExecutionReference ref = (ExecutionReference)pair.getFirst();
            if (!ref.getExecutor().isPresent()) continue;
            Executor executor = ref.getExecutor().get();
            ports.add(executor.getHost() + ":" + executor.getPort());
        }
        return ports;
    }

    private void loadRunningExecutions() throws ExecutorManagerException {
        logger.info("Loading running flows from database..");
        Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = this.executorLoader.fetchActiveFlows();
        logger.info("Loaded " + activeFlows.size() + " running flows");
        this.runningExecutions.get().putAll(activeFlows);
    }

    private QueuedExecutions loadQueuedFlows() {
        try {
            List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions = this.executorLoader.fetchQueuedFlows();
            if (retrievedExecutions != null) {
                for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
                    this.queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
                }
            }
        }
        catch (ExecutorManagerException e) {
            logger.error("getActiveFlowsWithExecutor loadQueuedFlows failed", (Throwable)e);
        }
        return this.queuedFlows;
    }

    @Override
    public List<Integer> getRunningFlows(int projectId, String flowId) {
        ArrayList<Integer> executionIds = new ArrayList<Integer>();
        try {
            this.queuedFlows = this.loadQueuedFlows();
            executionIds.addAll(this.getRunningFlowsHelper(projectId, flowId, this.queuedFlows.getAllEntries()));
            if (this.runningCandidate != null) {
                executionIds.addAll(this.getRunningFlowsHelper(projectId, flowId, Lists.newArrayList((Object[])new Pair[]{this.runningCandidate})));
            }
            this.loadRunningExecutions();
            executionIds.addAll(this.getRunningFlowsHelper(projectId, flowId, this.runningExecutions.get().values()));
            Collections.sort(executionIds);
        }
        catch (ExecutorManagerException | RuntimeException e) {
            logger.error("getRunningFlows loadQueuedFlows failed", (Throwable)e);
        }
        return executionIds;
    }

    private List<Integer> getRunningFlowsHelper(int projectId, String flowId, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        ArrayList<Integer> executionIds = new ArrayList<Integer>();
        for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
            if (!ref.getSecond().getFlowId().equals(flowId) || ref.getSecond().getProjectId() != projectId) continue;
            executionIds.add(ref.getFirst().getExecId());
        }
        return executionIds;
    }

    @Override
    public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor() throws IOException {
        ArrayList<Pair<ExecutableFlow, Optional<Executor>>> flows = new ArrayList<Pair<ExecutableFlow, Optional<Executor>>>();
        this.queuedFlows = this.loadQueuedFlows();
        this.getActiveFlowsWithExecutorHelper(flows, this.queuedFlows.getAllEntries());
        try {
            this.loadRunningExecutions();
        }
        catch (ExecutorManagerException e) {
            logger.error("loadRunningExecutions getActiveFlowsWithExecutor failed " + e);
        }
        this.getActiveFlowsWithExecutorHelper(flows, this.runningExecutions.get().values());
        if (null != flows && !flows.isEmpty()) {
            flows.stream().forEach(pair -> {
                ExecutableFlow executableFlow = (ExecutableFlow)pair.getFirst();
                Map<String, String> repeatMap = executableFlow.getRepeatOption();
                if (!repeatMap.isEmpty()) {
                    Long recoverRunDate = Long.valueOf(String.valueOf(repeatMap.get("startTimeLong")));
                    LocalDateTime localDateTime = new LocalDateTime((Object)new Date(recoverRunDate)).minusDays(1);
                    Date date = localDateTime.toDate();
                    executableFlow.setUpdateTime(date.getTime());
                } else {
                    Long runDate = executableFlow.getStartTime();
                    if (-1L != runDate) {
                        LocalDateTime localDateTime = new LocalDateTime((Object)new Date(runDate)).minusDays(1);
                        Date date = localDateTime.toDate();
                        executableFlow.setUpdateTime(date.getTime());
                    } else {
                        executableFlow.setUpdateTime(runDate);
                    }
                }
            });
        }
        return flows;
    }

    private void getActiveFlowsWithExecutorHelper(List<Pair<ExecutableFlow, Optional<Executor>>> flows, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
            flows.add(new Pair<ExecutableFlow, Optional<Executor>>(ref.getSecond(), ref.getFirst().getExecutor()));
        }
    }

    @Override
    public boolean isFlowRunning(int projectId, String flowId) throws ExecutorManagerException {
        this.queuedFlows = this.loadQueuedFlows();
        this.loadRunningExecutions();
        boolean isRunning = false;
        isRunning = isRunning || this.isFlowRunningHelper(projectId, flowId, this.queuedFlows.getAllEntries());
        isRunning = isRunning || this.isFlowRunningHelper(projectId, flowId, this.runningExecutions.get().values());
        return isRunning;
    }

    private boolean isFlowRunningHelper(int projectId, String flowId, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
            if (ref.getSecond().getProjectId() != projectId || !ref.getSecond().getFlowId().equals(flowId)) continue;
            return true;
        }
        return false;
    }

    @Override
    public ExecutableFlow getExecutableFlow(int execId) throws ExecutorManagerException {
        return this.executorLoader.fetchExecutableFlow(execId);
    }

    @Override
    public List<ExecutableFlow> getExecutableFlowByRepeatId(int repeatId) throws ExecutorManagerException {
        return this.executorLoader.fetchExecutableFlowByRepeatId(repeatId);
    }

    @Override
    public List<ExecutableFlow> getRunningFlows() {
        ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
        this.queuedFlows = this.loadQueuedFlows();
        try {
            this.loadRunningExecutions();
        }
        catch (ExecutorManagerException e) {
            logger.error("loadRunningExecutions getRunningFlows failed " + e);
        }
        this.getActiveFlowHelper(flows, this.queuedFlows.getAllEntries());
        this.getActiveFlowHelper(flows, this.runningExecutions.get().values());
        return flows;
    }

    private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
            flows.add(ref.getSecond());
        }
    }

    public String getRunningFlowIds() throws ExecutorManagerException {
        ArrayList<Integer> allIds = new ArrayList<Integer>();
        this.queuedFlows = this.loadQueuedFlows();
        this.loadRunningExecutions();
        this.getRunningFlowsIdsHelper(allIds, this.queuedFlows.getAllEntries());
        this.getRunningFlowsIdsHelper(allIds, this.runningExecutions.get().values());
        Collections.sort(allIds);
        return ((Object)allIds).toString();
    }

    public String getQueuedFlowIds() {
        ArrayList<Integer> allIds = new ArrayList<Integer>();
        this.queuedFlows = this.loadQueuedFlows();
        this.getRunningFlowsIdsHelper(allIds, this.queuedFlows.getAllEntries());
        Collections.sort(allIds);
        return ((Object)allIds).toString();
    }

    @Override
    public long getQueuedFlowSize() {
        this.queuedFlows = this.loadQueuedFlows();
        return this.queuedFlows.size();
    }

    private void getRunningFlowsIdsHelper(List<Integer> allIds, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
            allIds.add(ref.getSecond().getExecutionId());
        }
    }

    @Override
    public List<ExecutableFlow> getRecentlyFinishedFlows() {
        List<Object> flows = new ArrayList();
        try {
            flows = this.executorLoader.fetchRecentlyFinishedFlows(RECENTLY_FINISHED_LIFETIME);
        }
        catch (ExecutorManagerException e) {
            logger.error("Failed to fetch recently finished flows.", (Throwable)e);
        }
        if (null != flows && !flows.isEmpty()) {
            flows.stream().forEach(executableFlow -> {
                Map<String, String> repeatMap = executableFlow.getRepeatOption();
                if (!repeatMap.isEmpty()) {
                    Long recoverRunDate = Long.valueOf(String.valueOf(repeatMap.get("startTimeLong")));
                    LocalDateTime localDateTime = new LocalDateTime((Object)new Date(recoverRunDate)).minusDays(1);
                    Date date = localDateTime.toDate();
                    executableFlow.setUpdateTime(date.getTime());
                } else {
                    Long runDate = executableFlow.getStartTime();
                    if (-1L != runDate) {
                        LocalDateTime localDateTime = new LocalDateTime((Object)new Date(runDate)).minusDays(1);
                        Date date = localDateTime.toDate();
                        executableFlow.setUpdateTime(date.getTime());
                    } else {
                        executableFlow.setUpdateTime(runDate);
                    }
                }
            });
        }
        return flows;
    }

    @Override
    public List<ExecutableFlow> getExecutableFlows(Project project, String flowId, int skip, int size) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
        return flows;
    }

    @Override
    public List<ExecutableFlow> getExecutableFlows(int skip, int size) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(skip, size);
        return flows;
    }

    @Override
    public List<ExecutableFlow> getMaintainedExecutableFlows(String username, List<Integer> projectIds, int skip, int size) throws ExecutorManagerException {
        return this.executorLoader.fetchMaintainedFlowHistory(username, projectIds, skip, size);
    }

    @Override
    public List<ExecutableFlow> getExecutableFlowsQuickSearch(String flowIdContains, int skip, int size) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistoryQuickSearch('%' + flowIdContains + '%', null, skip, size);
        return flows;
    }

    @Override
    public List<ExecutableFlow> getExecutableFlows(String projContain, String flowContain, String execIdContain, String userContain, String status, long begin, long end, int skip, int size, int flowType) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(projContain, flowContain, execIdContain, userContain, status, begin, end, skip, size, flowType);
        return flows;
    }

    @Override
    public List<ExecutableFlow> getMaintainedExecutableFlows(String projContain, String flowContain, String execIdContain, String userContain, String status, long begin, long end, int skip, int size, int flowType, String username, List<Integer> projectIds) throws ExecutorManagerException {
        return this.executorLoader.fetchMaintainedFlowHistory(projContain, flowContain, execIdContain, userContain, status, begin, end, skip, size, flowType, username, projectIds);
    }

    @Override
    public List<ExecutableJobInfo> getExecutableJobs(Project project, String jobId, int skip, int size) throws ExecutorManagerException {
        List<ExecutableJobInfo> nodes = this.executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
        return nodes;
    }

    @Override
    public long getExecutableJobsMoyenneRunTime(Project project, String jobId) throws ExecutorManagerException {
        List<ExecutableJobInfo> jobInfos = this.executorLoader.fetchJobAllHistory(project.getId(), jobId);
        long moyenne = 0L;
        long allRunTime = 0L;
        int successFlowNum = 0;
        if (jobInfos != null) {
            for (ExecutableJobInfo info : jobInfos) {
                if (!Status.SUCCEEDED.equals((Object)info.getStatus())) continue;
                ++successFlowNum;
                allRunTime += info.getEndTime() - info.getStartTime();
            }
            if (allRunTime != 0L && successFlowNum != 0) {
                moyenne = allRunTime / (long)successFlowNum;
            }
        }
        return moyenne;
    }

    @Override
    public int getNumberOfJobExecutions(Project project, String jobId) throws ExecutorManagerException {
        return this.executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
    }

    @Override
    public int getNumberOfExecutions(Project project, String flowId) throws ExecutorManagerException {
        return this.executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
    }

    @Override
    public FileIOUtils.LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException {
        this.loadRunningExecutions();
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
        if (pair != null) {
            Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
            Pair<String, String> offsetParam = new Pair<String, String>("offset", String.valueOf(offset));
            Pair<String, String> lengthParam = new Pair<String, String>("length", String.valueOf(length));
            Map<String, Object> result = this.apiGateway.callWithReference(pair.getFirst(), "log", typeParam, offsetParam, lengthParam);
            return FileIOUtils.LogData.createLogDataFromObject(result);
        }
        FileIOUtils.LogData value = this.executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
        return value;
    }

    @Override
    public FileIOUtils.LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
        this.loadRunningExecutions();
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
        if (pair != null) {
            Pair<String, String> typeParam = new Pair<String, String>("type", "job");
            Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
            Pair<String, String> offsetParam = new Pair<String, String>("offset", String.valueOf(offset));
            Pair<String, String> lengthParam = new Pair<String, String>("length", String.valueOf(length));
            Pair<String, String> attemptParam = new Pair<String, String>("attempt", String.valueOf(attempt));
            Map<String, Object> result = this.apiGateway.callWithReference(pair.getFirst(), "log", typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
            return FileIOUtils.LogData.createLogDataFromObject(result);
        }
        FileIOUtils.LogData value = this.executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
        return value;
    }

    @Override
    public Long getLatestLogOffset(ExecutableFlow exFlow, String jobId, Long length, int attempt, User user) throws ExecutorManagerException {
        this.loadRunningExecutions();
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
        if (pair != null) {
            logger.info("get offset from local file.");
            Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
            Pair<String, String> lengthParam = new Pair<String, String>("len", String.valueOf(length));
            Pair<String, String> attemptParam = new Pair<String, String>("attempt", String.valueOf(attempt));
            Map<String, Object> ret = this.apiGateway.callWithReferenceByUser(pair.getFirst(), "offset", user.getUserId(), jobIdParam, lengthParam, attemptParam);
            return Long.valueOf(ret.get("offset").toString());
        }
        logger.info("get offset from db.");
        return this.executorLoader.getJobLogOffset(exFlow.getExecutionId(), jobId, attempt, length);
    }

    @Override
    public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId, int attempt) throws ExecutorManagerException {
        this.loadRunningExecutions();
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
        if (pair == null) {
            return this.executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId, attempt);
        }
        Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
        Pair<String, String> attemptParam = new Pair<String, String>("attempt", String.valueOf(attempt));
        Map<String, Object> result = this.apiGateway.callWithReference(pair.getFirst(), "attachments", jobIdParam, attemptParam);
        List jobStats = (List)result.get("attachments");
        return jobStats;
    }

    @Override
    public String getJobLinkUrl(ExecutableFlow exFlow, String jobId, int attempt) {
        URL url;
        if (!(this.azkProps.containsKey((Object)"azkaban.server.external.resource_manager_job_url") && this.azkProps.containsKey((Object)"azkaban.server.external.history_server_job_url") && this.azkProps.containsKey((Object)"azkaban.server.external.spark_history_server_job_url"))) {
            return null;
        }
        String applicationId = this.getApplicationId(exFlow, jobId, attempt);
        if (applicationId == null) {
            return null;
        }
        boolean isRMJobLinkValid = true;
        try {
            url = new URL(this.azkProps.getString("azkaban.server.external.resource_manager_job_url").replace(APPLICATION_ID, applicationId));
            String keytabPrincipal = Objects.requireNonNull(this.azkProps.getString("azkaban.kerberos.principal"));
            String keytabPath = Objects.requireNonNull(this.azkProps.getString("azkaban.keytab.path"));
            HttpURLConnection connection = AuthenticationUtils.loginAuthenticatedURL(url, keytabPrincipal, keytabPath);
            try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8));){
                String inputLine;
                while ((inputLine = in.readLine()) != null) {
                    if (!FAILED_TO_READ_APPLICATION_PATTERN.matcher(inputLine).find() && !INVALID_APPLICATION_ID_PATTERN.matcher(inputLine).find()) continue;
                    logger.info("RM job link is invalid or has expired for application_" + applicationId);
                    isRMJobLinkValid = false;
                    break;
                }
            }
        }
        catch (Exception e) {
            logger.error("Failed to get job link for application_" + applicationId, (Throwable)e);
            return null;
        }
        String jobLinkUrl = isRMJobLinkValid ? url.toString() : (exFlow.getExecutableNode(jobId).getType().equals(SPARK_JOB_TYPE) ? this.azkProps.get((Object)"azkaban.server.external.spark_history_server_job_url").replace(APPLICATION_ID, applicationId) : this.azkProps.get((Object)"azkaban.server.external.history_server_job_url").replace(APPLICATION_ID, applicationId));
        logger.info("Job link url is " + jobLinkUrl + " for execution " + exFlow.getExecutionId() + ", job " + jobId);
        return jobLinkUrl;
    }

    private String getApplicationId(ExecutableFlow exFlow, String jobId, int attempt) {
        boolean finished = false;
        int offset = 0;
        try {
            while (!finished) {
                FileIOUtils.LogData data = this.getExecutionJobLog(exFlow, jobId, offset, 50000, attempt);
                if (data != null) {
                    String applicationId = this.findApplicationIdFromLog(data.getData());
                    if (applicationId != null) {
                        return applicationId;
                    }
                    offset = data.getOffset() + data.getLength();
                    logger.info("Get application ID for execution " + exFlow.getExecutionId() + ", job " + jobId + ", attempt " + attempt + ", data offset " + offset);
                    continue;
                }
                finished = true;
            }
        }
        catch (ExecutorManagerException e) {
            logger.error("Failed to get application ID for execution " + exFlow.getExecutionId() + ", job " + jobId + ", attempt " + attempt + ", data offset " + offset, (Throwable)e);
        }
        return null;
    }

    @Override
    public FileIOUtils.JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(exFlow.getExecutionId());
        if (pair != null) {
            Pair<String, String> typeParam = new Pair<String, String>("type", "job");
            Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
            Pair<String, String> offsetParam = new Pair<String, String>("offset", String.valueOf(offset));
            Pair<String, String> lengthParam = new Pair<String, String>("length", String.valueOf(length));
            Pair<String, String> attemptParam = new Pair<String, String>("attempt", String.valueOf(attempt));
            Map<String, Object> result = this.apiGateway.callWithReference(pair.getFirst(), "metadata", typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
            return FileIOUtils.JobMetaData.createJobMetaDataFromObject(result);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
        this.queuedFlows = this.loadQueuedFlows();
        this.loadRunningExecutions();
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            if (this.runningExecutions.get().containsKey(exFlow.getExecutionId())) {
                Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
                this.apiGateway.callWithReferenceByUser(pair.getFirst(), "cancel", userId, new Pair[0]);
            } else if (this.queuedFlows.hasExecution(exFlow.getExecutionId())) {
                this.queuedFlows.dequeue(exFlow.getExecutionId());
                this.executionFinalizer.finalizeFlow(exFlow, "Cancelled before dispatching to executor", null);
            } else {
                throw new ExecutorManagerException("Executor Id is[" + exFlow.getExecutionId() + "] and its Flow[" + exFlow.getFlowId() + "] has stop working.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resumeFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
        this.loadRunningExecutions();
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            this.apiGateway.callWithReferenceByUser(pair.getFirst(), "resume", userId, new Pair[0]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setFlowFailed(ExecutableFlow exFlow, String userId, List<Pair<String, String>> param) throws Exception {
        this.loadRunningExecutions();
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
            if (pair == null) {
                throw new Exception("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            this.apiGateway.callForJsonObjectMap(pair.getFirst().getExecutor().get().getHost(), pair.getFirst().getExecutor().get().getPort(), "/executor", param);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String setJobDisabled(ExecutableFlow exFlow, String userId, String request) throws Exception {
        this.loadRunningExecutions();
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
            if (pair == null) {
                throw new Exception("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            String url = "http://" + pair.getFirst().getExecutor().get().getHost() + ":" + pair.getFirst().getExecutor().get().getPort() + "/executor?action=" + "disable_job" + "&execid=" + exFlow.getExecutionId() + "&user=" + userId;
            return this.apiGateway.httpPost(url, request);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String retryFailedJobs(ExecutableFlow exFlow, String userId, String request) throws Exception {
        this.loadRunningExecutions();
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
            if (pair == null) {
                throw new Exception("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            String url = "http://" + pair.getFirst().getExecutor().get().getHost() + ":" + pair.getFirst().getExecutor().get().getPort() + "/executor?action=" + "retry_failed_jobs" + "&execid=" + exFlow.getExecutionId() + "&user=" + userId;
            return this.apiGateway.httpPost(url, request);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String skipFailedJobs(ExecutableFlow exFlow, String userId, String request) throws Exception {
        this.loadRunningExecutions();
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
            if (pair == null) {
                throw new Exception("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            String url = "http://" + pair.getFirst().getExecutor().get().getHost() + ":" + pair.getFirst().getExecutor().get().getPort() + "/executor?action=" + "skip_failed_jobs" + "&execid=" + exFlow.getExecutionId() + "&user=" + userId;
            return this.apiGateway.httpPost(url, request);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pauseFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
        this.loadRunningExecutions();
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            this.apiGateway.callWithReferenceByUser(pair.getFirst(), "pause", userId, new Pair[0]);
        }
    }

    @Override
    public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "pauseJobs", userId, jobIds);
    }

    @Override
    public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "resumeJobs", userId, jobIds);
    }

    @Override
    public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "retryFailures", userId, new String[0]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void skipAllFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
        this.loadRunningExecutions();
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            this.apiGateway.callWithReferenceByUser(pair.getFirst(), "skippedAllFailedJobs", userId, new Pair[0]);
        }
    }

    @Override
    public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "retryJobs", userId, jobIds);
    }

    @Override
    public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "skipJobs", userId, jobIds);
    }

    @Override
    public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "enableJobs", userId, jobIds);
    }

    @Override
    public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
        this.modifyExecutingJobs(exFlow, "cancelJobs", userId, jobIds);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow, String command, String userId, String ... jobIds) throws ExecutorManagerException {
        this.loadRunningExecutions();
        ExecutableFlow executableFlow = exFlow;
        synchronized (executableFlow) {
            Map<String, Object> response;
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningExecutions.get().get(exFlow.getExecutionId());
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
            }
            if (jobIds != null && jobIds.length > 0) {
                for (String jobId : jobIds) {
                    ExecutableNode node;
                    if (jobId.isEmpty() || (node = exFlow.getExecutableNode(jobId)) != null) continue;
                    throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
                }
                String ids = StringUtils.join((Object[])jobIds, (char)',');
                response = this.apiGateway.callWithReferenceByUser(pair.getFirst(), "modifyExecution", userId, new Pair<String, String>("modifyType", command), new Pair<String, String>("jobIds", ids));
            } else {
                response = this.apiGateway.callWithReferenceByUser(pair.getFirst(), "modifyExecution", userId, new Pair<String, String>("modifyType", command));
            }
            return response;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException {
        String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
        String string = exFlowKey.intern();
        synchronized (string) {
            String flowId = exflow.getFlowId();
            logger.info("Submitting execution flow " + flowId + " by " + userId);
            String message = "";
            if (this.queuedFlows.isFull()) {
                message = String.format("Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity", flowId, exflow.getProjectName());
                logger.error(message);
                this.commonMetrics.markSubmitFlowFail();
            } else {
                int projectId = exflow.getProjectId();
                exflow.setSubmitUser(userId);
                List<Integer> executorIds = null;
                try {
                    executorIds = this.executorLoader.getExecutorIdsBySubmitUser(exflow.getSubmitUser());
                }
                catch (ExecutorManagerException em) {
                    logger.error("get executorId by " + exflow.getSubmitUser() + ", failed", (Throwable)em);
                    throw new ExecutorManagerException("get executorId by " + exflow.getSubmitUser() + ", failed", em);
                }
                if (executorIds == null || executorIds.size() == 0) {
                    logger.error("can not found executorId by " + exflow.getSubmitUser());
                    throw new ExecutorManagerException("\u7528\u6237:" + exflow.getSubmitUser() + "\uff0c\u6ca1\u6709\u5206\u914dexecutor");
                }
                exflow.setExecutorIds(executorIds);
                exflow.setSubmitTime(System.currentTimeMillis());
                List<Integer> running = this.getRunningFlows(projectId, flowId);
                ExecutionOptions options = exflow.getExecutionOptions();
                if (options == null) {
                    options = new ExecutionOptions();
                }
                if (options.getDisabledJobs() != null) {
                    FlowUtils.applyDisabledJobs(options.getDisabledJobs(), exflow);
                }
                if (!running.isEmpty()) {
                    if (running.size() > this.maxConcurrentRunsOneFlow) {
                        this.commonMetrics.markSubmitFlowSkip();
                        throw new ExecutorManagerException("Flow " + flowId + " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping", ExecutorManagerException.Reason.SkippedExecution);
                    }
                    if (options.getConcurrentOption().equals("pipeline")) {
                        Collections.sort(running);
                        Integer runningExecId = running.get(running.size() - 1);
                        options.setPipelineExecutionId(runningExecId);
                        message = "Flow " + flowId + " is already running with exec id " + runningExecId + ". Pipelining level " + options.getPipelineLevel() + ". \n";
                    } else {
                        if (options.getConcurrentOption().equals("skip")) {
                            this.commonMetrics.markSubmitFlowSkip();
                            throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.", ExecutorManagerException.Reason.SkippedExecution);
                        }
                        message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, (String)",") + ". Will execute concurrently. \n";
                    }
                }
                boolean memoryCheck = !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(), ProjectWhitelist.WhitelistType.MemoryCheck);
                options.setMemoryCheck(memoryCheck);
                this.executorLoader.uploadExecutableFlow(exflow);
                ExecutionReference reference = new ExecutionReference(exflow.getExecutionId());
                this.executorLoader.addActiveExecutableReference(reference);
                this.queuedFlows.enqueue(exflow, reference);
                this.commonMetrics.markSubmitFlowSuccess();
                message = message + "Execution DB queued successfully with exec id " + exflow.getExecutionId();
                logger.info(message);
            }
            return message;
        }
    }

    private void cleanOldExecutionLogs(long millis) {
        long beforeDeleteLogsTimestamp = System.currentTimeMillis();
        try {
            int count = this.executorLoader.removeExecutionLogsByTime(millis);
            logger.info("Cleaned up " + count + " log entries.");
        }
        catch (ExecutorManagerException e) {
            logger.error("log clean up failed. ", (Throwable)e);
        }
        logger.info("log clean up time: " + (System.currentTimeMillis() - beforeDeleteLogsTimestamp) / 1000L + " seconds.");
    }

    @Override
    public Map<String, Object> callExecutorStats(int executorId, String action, Pair<String, String> ... params) throws IOException, ExecutorManagerException {
        Executor executor = this.fetchExecutor(executorId);
        ArrayList<Pair<String, String>> paramList = new ArrayList<Pair<String, String>>();
        if (params != null) {
            paramList.addAll(Arrays.asList(params));
        }
        paramList.add(new Pair<String, String>("action", action));
        return this.apiGateway.callForJsonObjectMap(executor.getHost(), executor.getPort(), "/stats", paramList);
    }

    @Override
    public Map<String, Object> callExecutorJMX(String hostPort, String action, String mBean) throws IOException {
        ArrayList<Pair<String, String>> paramList = new ArrayList<Pair<String, String>>();
        paramList.add(new Pair<String, String>(action, ""));
        if (mBean != null) {
            paramList.add(new Pair<String, String>("mBean", mBean));
        }
        String[] hostPortSplit = hostPort.split(":");
        return this.apiGateway.callForJsonObjectMap(hostPortSplit[0], Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
    }

    @Override
    public void shutdown() {
        this.queueProcessor.shutdown();
        this.updaterThread.shutdown();
        this.recoverThread.shutdown();
    }

    private void failEverything(ExecutableFlow exFlow) {
        long time = System.currentTimeMillis();
        block4: for (ExecutableNode node : exFlow.getExecutableNodes()) {
            switch (node.getStatus()) {
                case SUCCEEDED: 
                case FAILED: 
                case KILLED: 
                case SKIPPED: 
                case DISABLED: 
                case FAILED_SKIPPED: {
                    continue block4;
                }
                case READY: {
                    node.setStatus(Status.KILLING);
                    break;
                }
                default: {
                    node.setStatus(Status.FAILED);
                }
            }
            if (node.getStartTime() == -1L) {
                node.setStartTime(time);
            }
            if (node.getEndTime() != -1L) continue;
            node.setEndTime(time);
        }
        if (exFlow.getEndTime() == -1L) {
            exFlow.setEndTime(time);
        }
        exFlow.setStatus(Status.FAILED);
    }

    public boolean isFinished(ExecutableFlow flow) {
        switch (flow.getStatus()) {
            case SUCCEEDED: 
            case FAILED: 
            case KILLED: {
                return true;
            }
        }
        return false;
    }

    public boolean isFailedFinishing(ExecutableFlow flow) {
        switch (flow.getStatus()) {
            case FAILED_FINISHING: {
                return true;
            }
        }
        return false;
    }

    private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows, List<Integer> executionIds, List<Long> updateTimes) {
        for (ExecutableFlow flow : flows) {
            executionIds.add(flow.getExecutionId());
            updateTimes.add(flow.getUpdateTime());
        }
    }

    @Override
    public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(projectId, flowId, from, length);
        outputList.addAll(flows);
        return this.executorLoader.fetchNumExecutableFlows(projectId, flowId);
    }

    @Override
    public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatch(ExecutionReference reference, ExecutableFlow exflow, Executor choosenExecutor) throws ExecutorManagerException {
        exflow.setUpdateTime(System.currentTimeMillis());
        this.executorLoader.assignExecutor(choosenExecutor.getId(), exflow.getExecutionId());
        try {
            this.apiGateway.callWithExecutable(exflow, choosenExecutor, "execute");
        }
        catch (ExecutorManagerException ex) {
            logger.error("Rolling back executor assignment for execution id:" + exflow.getExecutionId(), (Throwable)ex);
            this.executorLoader.unassignExecutor(exflow.getExecutionId());
            throw new ExecutorManagerException(ex);
        }
        reference.setExecutor(choosenExecutor);
        this.runningExecutions.get().put(exflow.getExecutionId(), new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
        Object object = this.runningExecutions.get();
        synchronized (object) {
            this.runningExecutions.get().notifyAll();
        }
        object = this;
        synchronized (object) {
            this.notifyAll();
        }
        logger.info(String.format("Successfully dispatched exec %d with error count %d", exflow.getExecutionId(), reference.getNumErrors()));
    }

    @VisibleForTesting
    void setSleepAfterDispatchFailure(Duration sleepAfterDispatchFailure) {
        this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
    }

    @Override
    public List<ExecutableFlow> getUserExecutableFlows(int skip, int size, String user) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchUserFlowHistory(skip, size, user);
        return flows;
    }

    @Override
    public List<ExecutableFlow> getUserExecutableFlowsByAdvanceFilter(String projContain, String flowContain, String execIdContain, String userContain, String status, long begin, long end, int skip, int size, int flowType) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchUserFlowHistoryByAdvanceFilter(projContain, flowContain, execIdContain, userContain, status, begin, end, skip, size, flowType);
        return flows;
    }

    @Override
    public List<ExecutableFlow> getUserExecutableFlowsQuickSearch(String flowIdContains, String user, int skip, int size) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistoryQuickSearch('%' + flowIdContains + '%', user, skip, size);
        return flows;
    }

    @Override
    public List<ExecutableFlow> getHistoryRecoverExecutableFlows(String userNameContains) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchHistoryRecoverFlows(userNameContains);
        return flows;
    }

    @Override
    public ExecutableFlow getHistoryRecoverExecutableFlowsByRepeatId(String repeatId) throws ExecutorManagerException {
        ExecutableFlow ef = new ExecutableFlow();
        List<ExecutableFlow> flows = this.executorLoader.fetchHistoryRecoverFlowByRepeatId(repeatId);
        if (flows.isEmpty()) {
            throw new ExecutorManagerException("Failed to search current job flow by RepeatId[" + repeatId + "]");
        }
        ef = flows.get(0);
        return ef;
    }

    @Override
    public void stopHistoryRecoverExecutableFlowByRepeatId(String repeatId) throws ExecutorManagerException {
        ExecutableFlow exFlow = null;
        try {
            exFlow = this.getHistoryRecoverExecutableFlowsByRepeatId(repeatId);
            if (2 == exFlow.getFlowType() && !Status.FAILED.equals((Object)exFlow.getStatus())) {
                exFlow.setFlowType(3);
            } else if (2 == exFlow.getFlowType() && Status.FAILED.equals((Object)exFlow.getStatus())) {
                exFlow.setFlowType(5);
            }
            this.executorLoader.updateExecutableFlow(exFlow);
        }
        catch (ExecutorManagerException e) {
            e.printStackTrace();
        }
    }

    @Override
    public ExecutableFlow getHistoryRecoverExecutableFlowsByFlowId(String flowId, String projectId) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchHistoryRecoverFlowByFlowId(flowId, projectId);
        if (flows.isEmpty()) {
            return null;
        }
        return flows.get(0);
    }

    @Override
    public List<ExecutionRecover> listHistoryRecoverFlows(Map paramMap, int skip, int size) throws ExecutorManagerException {
        List<ExecutionRecover> flows = this.executorLoader.listHistoryRecoverFlows(paramMap, skip, size);
        return flows;
    }

    @Override
    public List<ExecutionRecover> listMaintainedHistoryRecoverFlows(String username, List<Integer> projectIds, int skip, int size) throws ExecutorManagerException {
        return this.executorLoader.listMaintainedHistoryRecoverFlows(username, projectIds, skip, size);
    }

    @Override
    public Integer saveHistoryRecoverFlow(ExecutionRecover executionRecover) throws ExecutorManagerException {
        return this.executorLoader.saveHistoryRecoverFlow(executionRecover);
    }

    @Override
    public void updateHistoryRecover(ExecutionRecover executionRecover) throws ExecutorManagerException {
        try {
            executionRecover.setUpdateTime(System.currentTimeMillis());
            this.executorLoader.updateHistoryRecover(executionRecover);
        }
        catch (ExecutorManagerException e) {
            e.printStackTrace();
        }
    }

    @Override
    public ExecutionRecover getHistoryRecoverFlow(Integer recoverId) throws ExecutorManagerException {
        ExecutionRecover executionRecover = this.executorLoader.getHistoryRecoverFlow(recoverId);
        return executionRecover;
    }

    @Override
    public ExecutionRecover getHistoryRecoverFlowByPidAndFid(String projectId, String flowId) throws ExecutorManagerException {
        ExecutionRecover executionRecover = this.executorLoader.getHistoryRecoverFlowByPidAndFid(projectId, flowId);
        return executionRecover;
    }

    @Override
    public List<ExecutionRecover> listHistoryRecoverRunnning(Integer loadSize) throws ExecutorManagerException {
        List<ExecutionRecover> flows = this.executorLoader.listHistoryRecoverRunnning(loadSize);
        return flows;
    }

    @Override
    public int getHistoryRecoverTotal() throws ExecutorManagerException {
        return this.executorLoader.getHistoryRecoverTotal();
    }

    @Override
    public ExecutableFlow getProjectLastExecutableFlow(int projectId, String flowId) throws ExecutorManagerException {
        ExecutableFlow flow = this.executorLoader.getProjectLastExecutableFlow(projectId, flowId);
        return flow;
    }

    @Override
    public int getMaintainedHistoryRecoverTotal(String username, List<Integer> maintainedProjectIds) throws ExecutorManagerException {
        return this.executorLoader.getMaintainedHistoryRecoverTotal(username, maintainedProjectIds);
    }

    @Override
    public int getUserHistoryRecoverTotal(String userName) throws ExecutorManagerException {
        return this.executorLoader.getUserRecoverHistoryTotal(userName);
    }

    @Override
    public int getExecutionCycleTotal(Optional<String> usernameOp) throws ExecutorManagerException {
        return this.executorLoader.getExecutionCycleTotal(usernameOp);
    }

    @Override
    public int getExecutionCycleTotal(String username, List<Integer> projectIds) throws ExecutorManagerException {
        return this.executorLoader.getExecutionCycleTotal(username, projectIds);
    }

    @Override
    public List<ExecutionCycle> listExecutionCycleFlows(Optional<String> username, int offset, int length) throws ExecutorManagerException {
        return this.executorLoader.listExecutionCycleFlows(username, offset, length);
    }

    @Override
    public List<ExecutionCycle> listExecutionCycleFlows(String username, List<Integer> projectIds, int offset, int length) throws ExecutorManagerException {
        return this.executorLoader.listExecutionCycleFlows(username, projectIds, offset, length);
    }

    @Override
    public int saveExecutionCycleFlow(ExecutionCycle cycleFlow) throws ExecutorManagerException {
        return this.executorLoader.saveExecutionCycleFlow(cycleFlow);
    }

    @Override
    public ExecutionCycle getExecutionCycleFlow(String projectId, String flowId) throws ExecutorManagerException {
        return this.executorLoader.getExecutionCycleFlow(projectId, flowId);
    }

    @Override
    public ExecutionCycle getExecutionCycleFlow(int id) throws ExecutorManagerException {
        return this.executorLoader.getExecutionCycleFlow(id);
    }

    @Override
    public int updateExecutionFlow(ExecutionCycle executionCycle) throws ExecutorManagerException {
        return this.executorLoader.updateExecutionFlow(executionCycle);
    }

    @Override
    public int stopAllCycleFlows() throws ExecutorManagerException {
        return this.executorLoader.stopAllCycleFlows();
    }

    @Override
    public List<ExecutionCycle> getAllRunningCycleFlows() throws ExecutorManagerException {
        return this.executorLoader.getAllRunningCycleFlows();
    }

    @Override
    public String getDownLoadAllExecutionLog(ExecutableFlow executableFlow) throws ExecutorManagerException {
        String logZipFilePath = "";
        try {
            ArrayList<String> nameList = new ArrayList<String>();
            this.getExecutableNodeInfo(executableFlow, nameList);
            File flowLogDir = new File("temp" + File.separator + executableFlow.getId() + System.currentTimeMillis());
            if (flowLogDir.exists()) {
                flowLogDir.delete();
                flowLogDir.mkdir();
            } else {
                flowLogDir.mkdir();
            }
            for (String jobName : nameList) {
                FileIOUtils.LogData value = this.executorLoader.fetchAllLogs(executableFlow.getExecutionId(), jobName, 0);
                if (null == value) continue;
                File file = new File(flowLogDir + File.separator + jobName + ".log");
                ExecutorManagerHA.FileWrite(file.getPath(), file.getName(), value.getData());
            }
            logZipFilePath = ExecutorManagerHA.fileToZip(flowLogDir.getPath(), new File("temp").getPath(), flowLogDir.getName());
        }
        catch (Exception e) {
            logger.error("\u4e0b\u8f7d\u6240\u6709\u65e5\u5fd7\u6570\u636e\u5931\u8d25, \u539f\u56e0\u4e3a:" + e);
        }
        return logZipFilePath;
    }

    public static String fileToZip(String sourceFilePath, String zipFilePath, String fileName) {
        boolean flag = false;
        File sourceFile = new File(sourceFilePath);
        FileInputStream fis = null;
        BufferedInputStream bis = null;
        FileOutputStream fos = null;
        ZipOutputStream zos = null;
        File zipFile = new File(zipFilePath + "/" + fileName + ".zip");
        if (!sourceFile.exists()) {
            System.out.println("\u5f85\u538b\u7f29\u7684\u6587\u4ef6\u76ee\u5f55\uff1a" + sourceFilePath + "\u4e0d\u5b58\u5728.");
        } else {
            try {
                if (zipFile.exists()) {
                    System.out.println(zipFilePath + "\u76ee\u5f55\u4e0b\u5b58\u5728\u540d\u5b57\u4e3a:" + fileName + ".zip\u6253\u5305\u6587\u4ef6.");
                } else {
                    File[] sourceFiles = sourceFile.listFiles();
                    if (null == sourceFiles || sourceFiles.length < 1) {
                        System.out.println("\u5f85\u538b\u7f29\u7684\u6587\u4ef6\u76ee\u5f55\uff1a" + sourceFilePath + "\u91cc\u9762\u4e0d\u5b58\u5728\u6587\u4ef6\uff0c\u65e0\u9700\u538b\u7f29.");
                    } else {
                        fos = new FileOutputStream(zipFile);
                        zos = new ZipOutputStream(new BufferedOutputStream(fos));
                        byte[] bufs = new byte[10240];
                        for (int i = 0; i < sourceFiles.length; ++i) {
                            ZipEntry zipEntry = new ZipEntry(sourceFiles[i].getName());
                            zos.putNextEntry(zipEntry);
                            fis = new FileInputStream(sourceFiles[i]);
                            bis = new BufferedInputStream(fis, 10240);
                            int read = 0;
                            while ((read = bis.read(bufs, 0, 10240)) != -1) {
                                zos.write(bufs, 0, read);
                            }
                        }
                        flag = true;
                    }
                }
            }
            catch (FileNotFoundException e) {
                logger.error("FileNotFoundException , caused by:" + e);
                throw new RuntimeException(e);
            }
            catch (IOException e) {
                logger.error("IOException , caused by:" + e);
                throw new RuntimeException(e);
            }
            finally {
                try {
                    if (null != bis) {
                        bis.close();
                    }
                    if (null != zos) {
                        zos.close();
                    }
                }
                catch (IOException e) {
                    logger.error("close io stream failed, caused by:" + e);
                }
            }
        }
        return zipFile.getPath();
    }

    private Map<String, Object> getExecutableNodeInfo(ExecutableNode node, List<String> nameList) {
        HashMap<String, Object> nodeObj = new HashMap<String, Object>();
        nodeObj.put("id", node.getId());
        if (null != node.getParentFlow()) {
            nameList.add(node.getNestedId());
        }
        if (node instanceof ExecutableFlowBase) {
            ExecutableFlowBase base = (ExecutableFlowBase)node;
            ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
            for (ExecutableNode subNode : base.getExecutableNodes()) {
                Map<String, Object> subNodeObj = this.getExecutableNodeInfo(subNode, nameList);
                if (subNodeObj.isEmpty()) continue;
                nodeList.add(subNodeObj);
            }
            nodeObj.put("flow", base.getFlowId());
            nodeObj.put("nodes", nodeList);
            nodeObj.put("flowId", base.getFlowId());
        }
        return nodeObj;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void FileWrite(String allFilePath, String fileName, String fileStr) {
        FileWriter fw = null;
        try {
            fw = new FileWriter(allFilePath);
            fw.write(fileStr);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
            if (fw != null) {
                try {
                    fw.close();
                }
                catch (IOException e) {
                    logger.error("close io stream failed, caused by:" + e);
                }
            }
        }
    }

    @Override
    public String getJobLogByJobId(int execId, String jobName) throws ExecutorManagerException {
        String logZipFilePath = "";
        try {
            File flowLogDir = new File("temp" + File.separator + jobName + System.currentTimeMillis());
            if (flowLogDir.exists()) {
                flowLogDir.delete();
                flowLogDir.mkdir();
            } else {
                flowLogDir.mkdir();
            }
            FileIOUtils.LogData value = this.executorLoader.fetchAllLogs(execId, jobName, 0);
            if (null != value) {
                File file = new File(flowLogDir + File.separator + jobName + ".txt");
                ExecutorManagerHA.FileWrite(file.getPath(), file.getName(), value.getData());
                logZipFilePath = file.getPath();
            }
        }
        catch (Exception e) {
            logger.error("getJobLogByJobId execute failed, caused by:" + e);
        }
        return logZipFilePath;
    }

    @Override
    public String getAllExecutionJobLog(ExecutableFlow exFlow, String jobId, int attempt) throws ExecutorManagerException {
        FileIOUtils.LogData value;
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(exFlow.getExecutionId());
        StringBuilder allLogData = new StringBuilder();
        int offset = 0;
        int length = 50000;
        while (null != (value = this.executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length))) {
            allLogData.append(value.getData());
            offset += length;
        }
        return allLogData.toString();
    }

    @Override
    public List<LogFilterEntity> listAllLogFilter() throws ExecutorManagerException {
        return this.executorLoader.listAllLogFilter();
    }

    @Override
    public int getExecHistoryTotal(Map<String, String> filterMap) throws ExecutorManagerException {
        return this.executorLoader.getExecHistoryTotal(filterMap);
    }

    @Override
    public int getExecHistoryTotal(String username, Map<String, String> filterMap, List<Integer> projectIds) throws ExecutorManagerException {
        return this.executorLoader.getExecHistoryTotal(username, filterMap, projectIds);
    }

    @Override
    public int getMaintainedExecHistoryTotal(String username, List<Integer> projectIds) throws ExecutorManagerException {
        return this.executorLoader.getMaintainedExecHistoryTotal(username, projectIds);
    }

    @Override
    public int getExecHistoryQuickSerachTotal(Map<String, String> filterMap) throws ExecutorManagerException {
        return this.executorLoader.getExecHistoryQuickSerachTotal(filterMap);
    }

    @Override
    public int getMaintainedFlowsQuickSearchTotal(String username, Map<String, String> filterMap, List<Integer> projectIds) throws ExecutorManagerException {
        return this.executorLoader.getMaintainedFlowsQuickSearchTotal(username, filterMap, projectIds);
    }

    @Override
    public List<ExecutableFlow> getMaintainedFlowsQuickSearch(String flowIdContains, int skip, int size, String username, List<Integer> projectIds) throws ExecutorManagerException {
        return Collections.emptyList();
    }

    @Override
    public int getUserExecutableFlowsTotalByProjectIdAndFlowId(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList, String userName) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchUserFlowHistoryByProjectIdAndFlowId(projectId, flowId, from, length, userName);
        outputList.addAll(flows);
        return this.executorLoader.fetchNumUserExecutableFlowsByProjectIdAndFlowId(projectId, flowId, userName);
    }

    @Override
    public long getExecutableFlowsMoyenneRunTime(int projectId, String flowId, String user) throws ExecutorManagerException {
        List<ExecutableFlow> exFlows = this.executorLoader.fetchFlowAllHistory(projectId, flowId, user);
        long moyenne = 0L;
        long allRunTime = 0L;
        int successFlowNum = 0;
        for (ExecutableFlow flow : exFlows) {
            if (!Status.SUCCEEDED.equals((Object)flow.getStatus())) continue;
            ++successFlowNum;
            allRunTime += flow.getEndTime() - flow.getStartTime();
        }
        if (allRunTime != 0L && successFlowNum != 0) {
            moyenne = allRunTime / (long)successFlowNum;
        }
        return moyenne;
    }

    @Override
    public int getUserExecHistoryTotal(Map<String, String> filterMap) throws ExecutorManagerException {
        return this.executorLoader.getUserExecHistoryTotal(filterMap);
    }

    @Override
    public int getUserExecHistoryQuickSerachTotal(Map<String, String> filterMap) throws ExecutorManagerException {
        return this.executorLoader.getUserExecHistoryQuickSerachTotal(filterMap);
    }

    @Override
    public List<ExecutableFlow> getUserExecutableFlows(String loginUser, String projContain, String flowContain, String execIdContain, String userContain, String status, long begin, long end, int skip, int size, int flowType) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.fetchUserFlowHistory(loginUser, projContain, flowContain, execIdContain, userContain, status, begin, end, skip, size, flowType);
        return flows;
    }

    @Override
    public List<ExecutableFlow> getTodayExecutableFlowData(String userName) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.getTodayExecutableFlowData(userName);
        return flows;
    }

    @Override
    public List<ExecutableFlow> getTodayExecutableFlowDataNew(String userName) throws ExecutorManagerException {
        return this.executorLoader.getTodayExecutableFlowDataNew(userName);
    }

    @Override
    public Integer getTodayFlowRunTimesByFlowId(String projectId, String flowId, String usename) throws ExecutorManagerException {
        return this.executorLoader.getTodayFlowRunTimesByFlowId(projectId, flowId, usename);
    }

    @Override
    public List<ExecutableFlow> getRealTimeExecFlowData(String userName) throws ExecutorManagerException {
        List<ExecutableFlow> flows = this.executorLoader.getRealTimeExecFlowData(userName);
        return flows;
    }

    @Override
    public ExecutableFlow getRecentExecutableFlow(int projectId, String flowId) throws ExecutorManagerException {
        ExecutableFlow flow = this.executorLoader.getProjectLastExecutableFlow(projectId, flowId);
        return flow;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Map<String, String>> getExectingFlowsData() throws IOException {
        List<Object> flows = new ArrayList();
        try {
            flows = this.executorLoader.fetchAllUnfinishedFlows();
        }
        catch (ExecutorManagerException e) {
            logger.error("Failed to get active flows with executor.", (Throwable)e);
        }
        ArrayList<Map<String, String>> exectingFlowList = new ArrayList<Map<String, String>>();
        WebUtils webUtils = new WebUtils();
        if (null != flows && !flows.isEmpty()) {
            for (ExecutableFlow executableFlow : flows) {
                Map<String, String> repeatMap = executableFlow.getRepeatOption();
                if (!repeatMap.isEmpty()) {
                    Long recoverRunDate = Long.valueOf(String.valueOf(repeatMap.get("startTimeLong")));
                    LocalDateTime localDateTime = new LocalDateTime((Object)new Date(recoverRunDate)).minusDays(1);
                    Date date = localDateTime.toDate();
                    executableFlow.setUpdateTime(date.getTime());
                } else {
                    Date date;
                    LocalDateTime localDateTime;
                    String runDatestr = executableFlow.getExecutionOptions().getFlowParameters().get("run_date");
                    Object runDateOther = executableFlow.getOtherOption().get("run_date");
                    if (runDatestr != null && !"".equals(runDatestr) && !runDatestr.isEmpty()) {
                        try {
                            executableFlow.setUpdateTime(Long.parseLong(runDatestr));
                        }
                        catch (Exception e) {
                            logger.error("rundate convert failed (String to long)" + runDatestr + "{}" + e);
                        }
                        finally {
                            executableFlow.setUpdateTime(0L);
                            executableFlow.getOtherOption().put("run_date", runDatestr);
                        }
                    } else if (runDateOther != null && !"".equals(runDateOther.toString()) && !runDateOther.toString().isEmpty()) {
                        String runDateTime = (String)runDateOther;
                        if (SystemBuiltInParamJodeTimeUtils.dateFormatCheck(runDateTime = runDateTime.replaceAll("'", "").replaceAll("\"", ""))) {
                            executableFlow.setUpdateTime(0L);
                            runDateTime = runDateTime.replaceAll("[./-]", "");
                            executableFlow.getOtherOption().put("run_date", runDateTime);
                        } else if (-1L != executableFlow.getStartTime()) {
                            localDateTime = new LocalDateTime((Object)new Date(executableFlow.getStartTime())).minusDays(1);
                            date = localDateTime.toDate();
                            executableFlow.setUpdateTime(date.getTime());
                        }
                    } else {
                        Long runDate = executableFlow.getStartTime();
                        if (-1L != executableFlow.getStartTime()) {
                            localDateTime = new LocalDateTime((Object)new Date(executableFlow.getStartTime())).minusDays(1);
                            date = localDateTime.toDate();
                            executableFlow.setUpdateTime(date.getTime());
                        }
                    }
                }
                HashMap<String, String> exectingMap = new HashMap<String, String>();
                try {
                    exectingMap.put("execId", executableFlow.getExecutionId() + "");
                    String executorId = (String)executableFlow.getOtherOption().get("currentExecutorId") != null ? (String)executableFlow.getOtherOption().get("currentExecutorId") : "";
                    exectingMap.put("exectorId", executorId);
                    exectingMap.put("flowName", executableFlow.getFlowId());
                    exectingMap.put("projectName", executableFlow.getProjectName());
                    exectingMap.put("submitUser", executableFlow.getSubmitUser());
                    exectingMap.put("proxyUsers", executableFlow.getProxyUsers().toString());
                    exectingMap.put("startTime", webUtils.formatHistoryDateTime(executableFlow.getStartTime()));
                    if (executableFlow.getOtherOption().get("run_date") != null) {
                        exectingMap.put("runDate", executableFlow.getUpdateTime() == 0L ? executableFlow.getOtherOption().get("run_date").toString() : webUtils.formatRunDate(executableFlow.getUpdateTime()));
                    } else {
                        exectingMap.put("runDate", webUtils.formatRunDate(executableFlow.getUpdateTime()));
                    }
                    exectingMap.put("duration", Utils.formatDuration((long)executableFlow.getStartTime(), (long)executableFlow.getEndTime()));
                    exectingMap.put("status", executableFlow.getStatus().toString());
                    exectingMap.put("flowType", String.valueOf(executableFlow.getFlowType()));
                    exectingMap.put("projectId", String.valueOf(executableFlow.getProjectId()));
                    exectingFlowList.add(exectingMap);
                }
                catch (Exception e) {
                    throw new RuntimeException("generate executingMap failed" + e);
                }
            }
        }
        return exectingFlowList;
    }

    @Override
    public List<Integer> fetchPermissionsProjectId(String user) {
        return this.projectLoader.fetchPermissionsProjectId(user);
    }

    private class QueueProcessorThread
    extends Thread {
        private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000L;
        private final int maxDispatchingErrors;
        private final long activeExecutorRefreshWindowInMillisec;
        private final int activeExecutorRefreshWindowInFlows;
        private final Duration sleepAfterDispatchFailure;
        private volatile boolean shutdown = false;
        private volatile boolean isActive = true;

        public QueueProcessorThread(boolean isActive, long activeExecutorRefreshWindowInTime, int activeExecutorRefreshWindowInFlows, int maxDispatchingErrors, Duration sleepAfterDispatchFailure) {
            this.setActive(isActive);
            this.maxDispatchingErrors = maxDispatchingErrors;
            this.activeExecutorRefreshWindowInFlows = activeExecutorRefreshWindowInFlows;
            this.activeExecutorRefreshWindowInMillisec = activeExecutorRefreshWindowInTime;
            this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
            this.setName("AzkabanWebServer-QueueProcessor-Thread-HA");
        }

        public boolean isActive() {
            return this.isActive;
        }

        public void setActive(boolean isActive) {
            this.isActive = isActive;
            logger.info("QueueProcessorThreadHA active turned " + this.isActive);
        }

        public void shutdown() {
            this.shutdown = true;
            this.interrupt();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!this.shutdown) {
                QueueProcessorThread queueProcessorThread = this;
                synchronized (queueProcessorThread) {
                    try {
                        if (this.isActive) {
                            this.processQueuedFlows(this.activeExecutorRefreshWindowInMillisec, this.activeExecutorRefreshWindowInFlows);
                        }
                        this.wait(1000L);
                    }
                    catch (Exception e) {
                        logger.error("QueueProcessorThread Interrupted. Probably to shut down.", (Throwable)e);
                    }
                }
            }
        }

        private void processQueuedFlows(long activeExecutorsRefreshWindow, int maxContinuousFlowProcessed) throws InterruptedException, ExecutorManagerException {
            long lastExecutorRefreshTime = 0L;
            int currentContinuousFlowProcessed = 0;
            while (this.isActive() && (ExecutorManagerHA.this.runningCandidate = ExecutorManagerHA.this.queuedFlows.fetchHead()) != null) {
                ExecutionReference reference = (ExecutionReference)ExecutorManagerHA.this.runningCandidate.getFirst();
                ExecutableFlow exflow = (ExecutableFlow)ExecutorManagerHA.this.runningCandidate.getSecond();
                long currentTime = System.currentTimeMillis();
                if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
                    ExecutorManagerHA.this.refreshExecutors();
                    logger.debug("refreshExecutors success");
                    lastExecutorRefreshTime = currentTime;
                    currentContinuousFlowProcessed = 0;
                }
                if (exflow.getUpdateTime() > lastExecutorRefreshTime) {
                    ExecutorManagerHA.this.queuedFlows.enqueue(exflow, reference);
                    ExecutorManagerHA.this.runningCandidate = null;
                    long sleepInterval = activeExecutorsRefreshWindow - (currentTime - lastExecutorRefreshTime);
                    Thread.sleep(sleepInterval);
                    logger.debug("the flow updatetime is greater than lastExecutorRefreshTime");
                } else {
                    exflow.setUpdateTime(currentTime);
                    this.selectExecutorAndDispatchFlow(reference, exflow);
                    ExecutorManagerHA.this.runningCandidate = null;
                    logger.debug("selectExecutorAndDispatchFlow success");
                }
                if (ExecutorManagerHA.this.queuedFlows.getFlow(exflow.getExecutionId()) != null) continue;
                ++currentContinuousFlowProcessed;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void selectExecutorAndDispatchFlow(ExecutionReference reference, ExecutableFlow exflow) throws ExecutorManagerException {
            Set<Executor> flowExecutors = ExecutorManagerHA.this.activeExecutors.getAll().stream().filter(executor -> exflow.getExecutorIds().contains(executor.getId())).collect(Collectors.toSet());
            HashSet<Executor> remainingExecutors = new HashSet<Executor>(flowExecutors);
            logger.info("execId: " + exflow.getExecutionId() + ", executors: " + ((Object)remainingExecutors).toString());
            ExecutableFlow executableFlow = exflow;
            synchronized (executableFlow) {
                while (true) {
                    Executor selectedExecutor;
                    if ((selectedExecutor = this.selectExecutor(exflow, remainingExecutors)) == null) {
                        ExecutorManagerHA.this.commonMetrics.markDispatchFail();
                        this.handleNoExecutorSelectedCase(reference, exflow);
                        return;
                    }
                    try {
                        ExecutorManagerHA.this.dispatch(reference, exflow, selectedExecutor);
                        ExecutorManagerHA.this.commonMetrics.markDispatchSuccess();
                        return;
                    }
                    catch (ExecutorManagerException e) {
                        ExecutorManagerException lastError = e;
                        this.logFailedDispatchAttempt(reference, exflow, selectedExecutor, e);
                        ExecutorManagerHA.this.commonMetrics.markDispatchFail();
                        reference.setNumErrors(reference.getNumErrors() + 1);
                        this.updateRemainingExecutorsAndSleep(remainingExecutors, selectedExecutor, flowExecutors);
                        if (reference.getNumErrors() < this.maxDispatchingErrors) continue;
                        String message = "Failed to dispatch queued execution " + exflow.getId() + " because reached " + "azkaban.maxDispatchingErrors" + " (tried " + reference.getNumErrors() + " executors)";
                        logger.error(message);
                        ExecutorManagerHA.this.executionFinalizer.finalizeFlow(exflow, message, lastError);
                    }
                    break;
                }
            }
        }

        private void updateRemainingExecutorsAndSleep(Set<Executor> remainingExecutors, Executor selectedExecutor, Set<Executor> flowExecutors) {
            remainingExecutors.remove(selectedExecutor);
            if (remainingExecutors.isEmpty()) {
                remainingExecutors.addAll(flowExecutors);
                this.sleepAfterDispatchFailure();
            }
        }

        private void sleepAfterDispatchFailure() {
            try {
                Thread.sleep(this.sleepAfterDispatchFailure.toMillis());
            }
            catch (InterruptedException e1) {
                logger.warn("Sleep after dispatch failure was interrupted - ignoring");
            }
        }

        private void logFailedDispatchAttempt(ExecutionReference reference, ExecutableFlow exflow, Executor selectedExecutor, ExecutorManagerException e) {
            logger.warn(String.format("Executor %s responded with exception for exec: %d", selectedExecutor, exflow.getExecutionId()), (Throwable)e);
            logger.info(String.format("Failed dispatch attempt for exec %d with error count %d", exflow.getExecutionId(), reference.getNumErrors()));
        }

        private Executor getUserSpecifiedExecutor(ExecutionOptions options, int executionId) {
            Executor executor = null;
            if (options != null && options.getFlowParameters() != null && options.getFlowParameters().containsKey("useExecutor") && IS_NUMBER.matcher(options.getFlowParameters().get("useExecutor")).matches()) {
                try {
                    int executorId = Integer.valueOf(options.getFlowParameters().get("useExecutor"));
                    executor = ExecutorManagerHA.this.fetchExecutor(executorId);
                    if (executor == null) {
                        logger.warn(String.format("User specified executor id: %d for execution id: %d is not active, Looking up db.", executorId, executionId));
                        executor = ExecutorManagerHA.this.executorLoader.fetchExecutor(executorId);
                        if (executor == null) {
                            logger.warn(String.format("User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors", executorId, executionId));
                        }
                    }
                }
                catch (ExecutorManagerException ex) {
                    logger.error("Failed to fetch user specified executor for exec_id = " + executionId, (Throwable)ex);
                }
            }
            return executor;
        }

        private Executor selectExecutor(ExecutableFlow exflow, Set<Executor> availableExecutors) {
            Executor choosenExecutor = this.getUserSpecifiedExecutor(exflow.getExecutionOptions(), exflow.getExecutionId());
            if (choosenExecutor == null) {
                ExecutorSelector selector = new ExecutorSelector(ExecutorManagerHA.this.filterList, ExecutorManagerHA.this.comparatorWeightsMap);
                choosenExecutor = selector.getBest(availableExecutors, exflow);
                logger.info("Using dispatcher for execution id :" + exflow.getExecutionId() + ", use executor: " + choosenExecutor);
            }
            return choosenExecutor;
        }

        private void handleNoExecutorSelectedCase(ExecutionReference reference, ExecutableFlow exflow) throws ExecutorManagerException {
            logger.info(String.format("Reached handleNoExecutorSelectedCase stage for exec %d with error count %d", exflow.getExecutionId(), reference.getNumErrors()));
            ExecutorManagerHA.this.queuedFlows.enqueue(exflow, reference);
        }
    }

    private class CleanerThread
    extends Thread {
        private static final long CLEANER_THREAD_WAIT_INTERVAL_MS = 3600000L;
        private final long executionLogsRetentionMs;
        private boolean shutdown = false;
        private long lastLogCleanTime = -1L;

        public CleanerThread(long executionLogsRetentionMs) {
            this.executionLogsRetentionMs = executionLogsRetentionMs;
            this.setName("AzkabanWebServer-Cleaner-Thread");
        }

        public void shutdown() {
            this.shutdown = true;
            this.interrupt();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!this.shutdown) {
                CleanerThread cleanerThread = this;
                synchronized (cleanerThread) {
                    try {
                        long currentTime = System.currentTimeMillis();
                        if (currentTime - 3600000L > this.lastLogCleanTime) {
                            this.cleanExecutionLogs();
                            this.lastLogCleanTime = currentTime;
                        }
                        this.wait(3600000L);
                    }
                    catch (InterruptedException e) {
                        logger.info("Interrupted. Probably to shut down.");
                    }
                }
            }
        }

        private void cleanExecutionLogs() {
            logger.info("Cleaning old logs from execution_logs");
            long cutoff = System.currentTimeMillis() - this.executionLogsRetentionMs;
            logger.info("Cleaning old log files before " + new DateTime(cutoff).toString());
            ExecutorManagerHA.this.cleanOldExecutionLogs(System.currentTimeMillis() - this.executionLogsRetentionMs);
        }
    }

    private class RecoverThread
    extends Thread {
        private boolean shutdown = false;
        private long waitTime;

        public RecoverThread(long waitTime) {
            this.waitTime = waitTime;
            this.setName("AzkabanWebServer-Recover-Thread");
        }

        public void shutdown() {
            this.shutdown = true;
            this.interrupt();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!this.shutdown) {
                RecoverThread recoverThread = this;
                synchronized (recoverThread) {
                    try {
                        this.pullAndSubmitRecoverTask();
                        this.wait(this.waitTime);
                    }
                    catch (InterruptedException e) {
                        logger.info("Recover-Thread interrupted. Probably to shut down." + e);
                    }
                }
            }
        }

        private void pullAndSubmitRecoverTask() {
            List<ExecutionRecover> executionRecoverList = null;
            try {
                executionRecoverList = ExecutorManagerHA.this.executorLoader.fetchHistoryRecoverFlows();
            }
            catch (ExecutorManagerException e) {
                logger.error("\u83b7\u53d6\u5386\u53f2\u91cd\u8dd1\u6570\u636e\u5f02\u5e38, " + e);
            }
            if (executionRecoverList == null) {
                logger.error("\u6ca1\u6709\u83b7\u53d6\u5230\u5386\u53f2\u91cd\u8dd1\u6570\u636e");
                return;
            }
            block10: for (ExecutionRecover recover : executionRecoverList) {
                logger.info("recover Id: " + recover.getRecoverId());
                List repeatTimeList = (List)recover.getRepeatOption().get("repeatTimeList");
                int index = 0;
                for (Map item : repeatTimeList) {
                    ExecutableFlow exflow;
                    if (item.containsKey("isSubmit")) {
                        if (index == repeatTimeList.size() - 1) {
                            int excId = Integer.valueOf((String)((Map)repeatTimeList.get(index)).get("exeId"));
                            ExecutableFlow executableFlow = null;
                            try {
                                executableFlow = ExecutorManagerHA.this.getExecutableFlow(excId);
                            }
                            catch (ExecutorManagerException em) {
                                logger.error("\u83b7\u53d6\u524d\u4e00\u6b21\u6267\u884c\u7684\u5386\u53f2\u91cd\u8dd1\u4efb\u52a1\u5931\u8d25, " + em);
                                this.updateRecoverFlow(recover);
                                continue block10;
                            }
                            item.put("recoverStatus", String.valueOf(executableFlow.getStatus().getNumVal()));
                            if (Status.isStatusFinished(executableFlow.getStatus())) {
                                List flowsStatus;
                                if (executableFlow != null && executableFlow.getStatus().equals((Object)Status.SUCCEEDED)) {
                                    if (!recover.getRecoverErrorOption().equals("errorCountion")) {
                                        recover.setRecoverStatus(Status.SUCCEEDED);
                                    } else {
                                        flowsStatus = repeatTimeList.stream().filter(x -> !((String)x.get("recoverStatus")).equals("50")).collect(Collectors.toList());
                                        if (flowsStatus.size() == 0) {
                                            recover.setRecoverStatus(Status.SUCCEEDED);
                                        } else if (flowsStatus.size() != repeatTimeList.size()) {
                                            recover.setRecoverStatus(Status.FAILED_SUCCEEDED);
                                        } else {
                                            recover.setRecoverStatus(Status.FAILED);
                                        }
                                    }
                                } else if (!recover.getRecoverErrorOption().equals("errorCountion")) {
                                    recover.setRecoverStatus(Status.FAILED);
                                } else {
                                    flowsStatus = repeatTimeList.stream().filter(x -> !((String)x.get("recoverStatus")).equals("50")).collect(Collectors.toList());
                                    if (flowsStatus.size() != repeatTimeList.size()) {
                                        recover.setRecoverStatus(Status.FAILED_SUCCEEDED);
                                    } else {
                                        recover.setRecoverStatus(Status.FAILED);
                                    }
                                }
                                recover.setEndTime(System.currentTimeMillis());
                                try {
                                    ExecutorManagerHA.this.updateHistoryRecover(recover);
                                }
                                catch (ExecutorManagerException e) {
                                    logger.error("\u66f4\u65b0\u5386\u53f2\u91cd\u8dd1\u4fe1\u606f\u5931\u8d25\uff0c" + e);
                                }
                            }
                        }
                        ++index;
                        continue;
                    }
                    Project project = ExecutorManagerHA.this.projectLoader.fetchProjectById(recover.getProjectId());
                    this.loadAllProjectFlows(project);
                    Flow flow = project.getFlow(recover.getFlowId());
                    if (flow == null) {
                        logger.error("recover Id: " + recover.getRecoverId() + ", flow :" + recover.getFlowId() + "\u4e0d\u5b58\u5728.");
                        this.updateRecoverFlow(recover);
                        continue block10;
                    }
                    if (repeatTimeList.indexOf(item) == 0 || !((String)item.get("exeId")).equals("") && Integer.valueOf((String)item.get("exeId")) == -1) {
                        ExecutableFlow exflow2 = new ExecutableFlow(project, flow);
                        this.submitRecoverFlow(exflow2, project, recover, item);
                        recover.setStartTime(System.currentTimeMillis());
                        this.updateRecoverFlow(exflow2, recover, item);
                        continue block10;
                    }
                    int excId = Integer.valueOf((String)((Map)repeatTimeList.get(index - 1)).get("exeId"));
                    ExecutableFlow executableFlow = null;
                    try {
                        executableFlow = ExecutorManagerHA.this.getExecutableFlow(excId);
                    }
                    catch (ExecutorManagerException em) {
                        logger.error("\u83b7\u53d6\u524d\u4e00\u6b21\u6267\u884c\u7684\u5386\u53f2\u91cd\u8dd1\u4efb\u52a1\u5931\u8d25, " + em);
                        this.updateRecoverFlow(recover);
                        continue block10;
                    }
                    logger.info("\u524d\u4e00\u4e2a\u5386\u53f2\u91cd\u8dd1\u6267\u884c\u7ed3\u679c: " + (Object)((Object)executableFlow.getStatus()));
                    if (!Status.isStatusFinished(executableFlow.getStatus())) continue block10;
                    ((Map)repeatTimeList.get(index - 1)).put("recoverStatus", String.valueOf(executableFlow.getStatus().getNumVal()));
                    if (executableFlow.getStatus().equals((Object)Status.SUCCEEDED)) {
                        exflow = new ExecutableFlow(project, flow);
                        this.submitRecoverFlow(exflow, project, recover, item);
                        this.updateRecoverFlow(exflow, recover, item);
                        continue block10;
                    }
                    if (recover.getRecoverErrorOption().equals("errorCountion")) {
                        exflow = new ExecutableFlow(project, flow);
                        this.submitRecoverFlow(exflow, project, recover, item);
                        this.updateRecoverFlow(exflow, recover, item);
                        continue block10;
                    }
                    recover.setRecoverStatus(Status.FAILED);
                    recover.setEndTime(System.currentTimeMillis());
                    try {
                        ExecutorManagerHA.this.updateHistoryRecover(recover);
                    }
                    catch (ExecutorManagerException em) {
                        logger.error("\u66f4\u65b0\u5386\u53f2\u91cd\u8dd1\u4efb\u52a1\u4fe1\u606f\u5931\u8d25, " + em);
                    }
                    continue block10;
                }
            }
        }

        private String submitRecoverFlow(ExecutableFlow exflow, Project project, ExecutionRecover recover, Map<String, String> item) {
            exflow.setSubmitUser(recover.getSubmitUser());
            Set<String> proxyUserSet = project.getProxyUsers();
            proxyUserSet.add(recover.getSubmitUser());
            if (!recover.getProxyUsers().equals("[]")) {
                List<String> proxyUsers = Arrays.asList(recover.getProxyUsers().replace("[", "").replace("]", "").split(","));
                proxyUserSet.addAll(proxyUsers);
            }
            exflow.addAllProxyUsers(proxyUserSet);
            exflow.setExecutionOptions(recover.getExecutionOptions());
            exflow.setOtherOption(recover.getOtherOption());
            if (recover.getOtherOption().get("flowFailedRetryOption") != null) {
                exflow.setFlowFailedRetry((Map)recover.getOtherOption().get("flowFailedRetryOption"));
            }
            exflow.setFailedSkipedAllJobs((Boolean)recover.getOtherOption().getOrDefault("flowFailedSkiped", false));
            if (recover.getSlaOptions() != null) {
                exflow.setSlaOptions(recover.getSlaOptions());
            }
            exflow.setRepeatOption(item);
            exflow.setFlowType(2);
            String message = "";
            try {
                message = ExecutorManagerHA.this.submitExecutableFlow(exflow, recover.getSubmitUser());
            }
            catch (ExecutorManagerException ex) {
                logger.error("\u63d0\u4ea4\u5386\u53f2\u91cd\u8dd1\u4efb\u52a1\u5931\u8d25, " + ex);
            }
            return message;
        }

        private void updateRecoverFlow(ExecutableFlow exflow, ExecutionRecover recover, Map<String, String> item) {
            if (exflow.getExecutionId() != -1) {
                recover.setRecoverStatus(Status.RUNNING);
                recover.setNowExecutionId(exflow.getExecutionId());
                item.put("isSubmit", "true");
                item.put("exeId", String.valueOf(exflow.getExecutionId()));
            } else {
                recover.setRecoverStatus(Status.RUNNING);
                item.put("exeId", "-1");
            }
            try {
                ExecutorManagerHA.this.updateHistoryRecover(recover);
            }
            catch (ExecutorManagerException ExecutorManagerHA2) {
                logger.error("\u66f4\u65b0\u5386\u53f2\u91cd\u8dd1\u4efb\u52a1\u4fe1\u606f\u5931\u8d25, " + ExecutorManagerHA2);
            }
        }

        private void updateRecoverFlow(ExecutionRecover recover) {
            recover.setEndTime(System.currentTimeMillis());
            recover.setRecoverStatus(Status.FAILED);
            try {
                ExecutorManagerHA.this.updateHistoryRecover(recover);
            }
            catch (ExecutorManagerException ExecutorManagerHA2) {
                logger.error("\u66f4\u65b0\u5386\u53f2\u91cd\u8dd1\u4efb\u52a1\u4fe1\u606f\u5931\u8d25, " + ExecutorManagerHA2);
            }
        }

        private void loadAllProjectFlows(Project project) {
            try {
                List<Flow> flows = ExecutorManagerHA.this.projectLoader.fetchAllProjectFlows(project);
                HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
                for (Flow flow : flows) {
                    flowMap.put(flow.getId(), flow);
                }
                project.setFlows(flowMap);
            }
            catch (ProjectManagerException e) {
                throw new RuntimeException("Could not load projects flows from store.", (Throwable)((Object)e));
            }
        }
    }
}

