/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider;
import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

public class SpeculativeScheduler
extends AdaptiveBatchScheduler
implements SlowTaskDetectorListener {
    private final int maxConcurrentExecutions;
    private final Duration blockSlowNodeDuration;
    private final BlocklistOperations blocklistOperations;
    private final SlowTaskDetector slowTaskDetector;
    private long numSlowExecutionVertices;
    private final Counter numEffectiveSpeculativeExecutionsCounter;

    public SpeculativeScheduler(Logger log, JobGraph jobGraph, Executor ioExecutor, Configuration jobMasterConfiguration, Consumer<ComponentMainThreadExecutor> startUpAction, ScheduledExecutor delayExecutor, ClassLoader userCodeLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory failoverStrategyFactory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Time rpcTimeout, VertexParallelismDecider vertexParallelismDecider, int defaultMaxParallelism, BlocklistOperations blocklistOperations) throws Exception {
        super(log, jobGraph, ioExecutor, jobMasterConfiguration, startUpAction, delayExecutor, userCodeLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, schedulingStrategyFactory, failoverStrategyFactory, restartBackoffTimeStrategy, executionOperations, executionVertexVersioner, executionSlotAllocatorFactory, initializationTimestamp, mainThreadExecutor, jobStatusListener, executionGraphFactory, shuffleMaster, rpcTimeout, vertexParallelismDecider, defaultMaxParallelism);
        this.maxConcurrentExecutions = jobMasterConfiguration.getInteger(JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
        this.blockSlowNodeDuration = (Duration)jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
        Preconditions.checkArgument((!this.blockSlowNodeDuration.isNegative() ? 1 : 0) != 0, (Object)"The blocking duration should not be negative.");
        this.blocklistOperations = (BlocklistOperations)Preconditions.checkNotNull((Object)blocklistOperations);
        this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
        this.numEffectiveSpeculativeExecutionsCounter = new SimpleCounter();
    }

    @Override
    protected void startSchedulingInternal() {
        this.registerMetrics(this.jobManagerJobMetricGroup);
        super.startSchedulingInternal();
        this.slowTaskDetector.start(this.getExecutionGraph(), this, this.getMainThreadExecutor());
    }

    private void registerMetrics(MetricGroup metricGroup) {
        metricGroup.gauge("numSlowExecutionVertices", () -> this.numSlowExecutionVertices);
        metricGroup.counter("numEffectiveSpeculativeExecutions", this.numEffectiveSpeculativeExecutionsCounter);
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        this.slowTaskDetector.stop();
        return super.closeAsync();
    }

    @Override
    public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) {
        return (SpeculativeExecutionVertex)super.getExecutionVertex(executionVertexId);
    }

    @Override
    protected void onTaskFinished(Execution execution) {
        if (!SpeculativeScheduler.isOriginalAttempt(execution)) {
            this.numEffectiveSpeculativeExecutionsCounter.inc();
        }
        FutureUtils.assertNoException(this.cancelPendingExecutions(execution.getVertex().getID()));
        super.onTaskFinished(execution);
    }

    private static boolean isOriginalAttempt(Execution execution) {
        return ((SpeculativeExecutionVertex)execution.getVertex()).isOriginalAttempt(execution.getAttemptNumber());
    }

    private CompletableFuture<?> cancelPendingExecutions(ExecutionVertexID executionVertexId) {
        List pendingExecutions = this.getExecutionVertex(executionVertexId).getCurrentExecutions().stream().filter(e -> !e.getState().isTerminal() && e.getState() != ExecutionState.CANCELING).collect(Collectors.toList());
        if (pendingExecutions.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        this.log.info("Canceling {} un-finished executions of {} because one of its executions has finished.", (Object)pendingExecutions.size(), (Object)executionVertexId);
        FutureUtils.ConjunctFuture future = FutureUtils.combineAll((Collection)pendingExecutions.stream().map(this::cancelExecution).collect(Collectors.toList()));
        this.cancelAllPendingSlotRequestsForVertex(executionVertexId);
        return future;
    }

    @Override
    protected void onTaskFailed(Execution execution) {
        SpeculativeExecutionVertex executionVertex = this.getExecutionVertex(execution.getVertex().getID());
        executionVertex.archiveFailedExecution(execution.getAttemptId());
        super.onTaskFailed(execution);
    }

    @Override
    protected void handleTaskFailure(Execution failedExecution, @Nullable Throwable error) {
        SpeculativeExecutionVertex executionVertex = this.getExecutionVertex(failedExecution.getVertex().getID());
        if (!SpeculativeScheduler.isExecutionVertexPossibleToFinish(executionVertex) || ExceptionUtils.findThrowable((Throwable)error, PartitionException.class).isPresent()) {
            super.handleTaskFailure(failedExecution, error);
        } else {
            this.handleLocalExecutionAttemptFailure(failedExecution, error);
        }
    }

    private void handleLocalExecutionAttemptFailure(Execution failedExecution, @Nullable Throwable error) {
        this.executionSlotAllocator.cancel(failedExecution.getAttemptId());
        FailureHandlingResult failureHandlingResult = this.recordTaskFailure(failedExecution, error);
        if (failureHandlingResult.canRestart()) {
            this.archiveFromFailureHandlingResult(this.createFailureHandlingResultSnapshot(failureHandlingResult));
        } else {
            this.failJob(error, failureHandlingResult.getTimestamp());
        }
    }

    private static boolean isExecutionVertexPossibleToFinish(SpeculativeExecutionVertex executionVertex) {
        boolean anyExecutionPossibleToFinish = false;
        for (Execution execution : executionVertex.getCurrentExecutions()) {
            Preconditions.checkState((execution.getState() != ExecutionState.FINISHED ? 1 : 0) != 0);
            if (execution.getState() != ExecutionState.CREATED && execution.getState() != ExecutionState.SCHEDULED && execution.getState() != ExecutionState.DEPLOYING && execution.getState() != ExecutionState.INITIALIZING && execution.getState() != ExecutionState.RUNNING) continue;
            anyExecutionPossibleToFinish = true;
        }
        return anyExecutionPossibleToFinish;
    }

    @Override
    protected void resetForNewExecution(ExecutionVertexID executionVertexId) {
        SpeculativeExecutionVertex executionVertex = this.getExecutionVertex(executionVertexId);
        Execution execution = ((ExecutionVertex)executionVertex).getCurrentExecutionAttempt();
        if (execution.getState() == ExecutionState.FINISHED && !SpeculativeScheduler.isOriginalAttempt(execution)) {
            this.numEffectiveSpeculativeExecutionsCounter.dec();
        }
        super.resetForNewExecution(executionVertexId);
    }

    @Override
    public void notifySlowTasks(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) {
        long currentTimestamp = System.currentTimeMillis();
        this.numSlowExecutionVertices = slowTasks.size();
        this.blockSlowNodes(slowTasks, currentTimestamp);
        ArrayList<Execution> newSpeculativeExecutions = new ArrayList<Execution>();
        HashSet<ExecutionVertexID> verticesToDeploy = new HashSet<ExecutionVertexID>();
        for (ExecutionVertexID executionVertexId : slowTasks.keySet()) {
            int currentConcurrentExecutions;
            int newSpeculativeExecutionsToDeploy;
            SpeculativeExecutionVertex executionVertex = this.getExecutionVertex(executionVertexId);
            if (executionVertex.containsSinks() || (newSpeculativeExecutionsToDeploy = this.maxConcurrentExecutions - (currentConcurrentExecutions = executionVertex.getCurrentExecutions().size())) <= 0) continue;
            this.log.info("{} ({}) is detected as a slow vertex, create and deploy {} new speculative executions for it.", new Object[]{executionVertex.getTaskNameWithSubtaskIndex(), executionVertex.getID(), newSpeculativeExecutionsToDeploy});
            Collection attempts = IntStream.range(0, newSpeculativeExecutionsToDeploy).mapToObj(i -> executionVertex.createNewSpeculativeExecution(currentTimestamp)).collect(Collectors.toList());
            this.setupSubtaskGatewayForAttempts(executionVertex, attempts);
            verticesToDeploy.add(executionVertexId);
            newSpeculativeExecutions.addAll(attempts);
        }
        this.executionDeployer.allocateSlotsAndDeploy(newSpeculativeExecutions, this.executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));
    }

    private void blockSlowNodes(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks, long currentTimestamp) {
        if (!this.blockSlowNodeDuration.isZero()) {
            long blockedEndTimestamp = currentTimestamp + this.blockSlowNodeDuration.toMillis();
            Collection nodesToBlock = this.getSlowNodeIds(slowTasks).stream().map(nodeId -> new BlockedNode((String)nodeId, "Node is detected to be slow.", blockedEndTimestamp)).collect(Collectors.toList());
            this.blocklistOperations.addNewBlockedNodes(nodesToBlock);
        }
    }

    private Set<String> getSlowNodeIds(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) {
        Set slowExecutions = slowTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
        return slowExecutions.stream().map(id -> this.getExecutionGraph().getRegisteredExecutions().get(id)).map(e -> {
            Preconditions.checkNotNull((Object)e.getAssignedResource(), (String)"The reported slow node have not been assigned a slot. This is unexpected and indicates that there is something wrong with the slow task detector.");
            return e.getAssignedResourceLocation();
        }).map(TaskManagerLocation::getNodeId).collect(Collectors.toSet());
    }

    private void setupSubtaskGatewayForAttempts(SpeculativeExecutionVertex executionVertex, Collection<Execution> attempts) {
        Set attemptNumbers = attempts.stream().map(Execution::getAttemptNumber).collect(Collectors.toSet());
        executionVertex.getJobVertex().getOperatorCoordinators().forEach(operatorCoordinator -> operatorCoordinator.setupSubtaskGatewayForAttempts(executionVertex.getParallelSubtaskIndex(), attemptNumbers));
    }

    @VisibleForTesting
    long getNumSlowExecutionVertices() {
        return this.numSlowExecutionVertices;
    }

    @VisibleForTesting
    long getNumEffectiveSpeculativeExecutions() {
        return this.numEffectiveSpeculativeExecutionsCounter.getCount();
    }
}

