/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.deployment.executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.SupplierWithException;

@Internal
public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
implements PipelineExecutor {
    private final ClientFactory clusterClientFactory;

    public AbstractSessionClusterExecutor(@Nonnull ClientFactory clusterClientFactory) {
        this.clusterClientFactory = (ClusterClientFactory)Preconditions.checkNotNull(clusterClientFactory);
    }

    public CompletableFuture<JobClient> execute(@Nonnull Pipeline pipeline, @Nonnull Configuration configuration, @Nonnull ClassLoader userCodeClassloader) throws Exception {
        JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
        try (ClusterDescriptor clusterDescriptor = this.clusterClientFactory.createClusterDescriptor(configuration);){
            Object clusterID = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState((clusterID != null ? 1 : 0) != 0);
            ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(clusterID);
            ClusterClient clusterClient = clusterClientProvider.getClusterClient();
            CompletionStage completionStage = ((CompletableFuture)((CompletableFuture)clusterClient.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
                ClientUtils.waitUntilJobInitializationFinished((SupplierWithException<JobStatus, Exception>)((SupplierWithException)() -> clusterClient.getJobStatus((JobID)jobId).get()), (SupplierWithException<JobResult, Exception>)((SupplierWithException)() -> clusterClient.requestJobResult((JobID)jobId).get()), userCodeClassloader);
                return jobId;
            }))).thenApplyAsync(jobID -> new ClusterClientJobClientAdapter(clusterClientProvider, (JobID)jobID, userCodeClassloader))).whenComplete((ignored1, ignored2) -> clusterClient.close());
            return completionStage;
        }
    }
}

