/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.PoisonPill;
import akka.actor.Status;
import akka.actor.Terminated;
import akka.dispatch.OnSuccess;
import java.util.Objects;
import java.util.UUID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.Preconditions;
import scala.PartialFunction;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

public abstract class JobClientActor
extends FlinkUntypedActor
implements LeaderRetrievalListener {
    private final LeaderRetrievalService leaderRetrievalService;
    protected final FiniteDuration timeout;
    private final boolean sysoutUpdates;
    private boolean toBeTerminated = false;
    protected ActorRef jobManager;
    protected UUID leaderSessionID;
    protected ActorRef client;
    private Cancellable connectionTimeout;
    private UUID connectionTimeoutId;

    public JobClientActor(LeaderRetrievalService leaderRetrievalService, FiniteDuration timeout, boolean sysoutUpdates) {
        this.leaderRetrievalService = (LeaderRetrievalService)Preconditions.checkNotNull((Object)leaderRetrievalService);
        this.timeout = (FiniteDuration)Preconditions.checkNotNull((Object)timeout);
        this.sysoutUpdates = sysoutUpdates;
        this.jobManager = ActorRef.noSender();
        this.leaderSessionID = null;
        this.connectionTimeout = null;
        this.connectionTimeoutId = null;
    }

    public void preStart() {
        try {
            this.leaderRetrievalService.start(this);
        }
        catch (Exception e) {
            this.LOG.error("Could not start the leader retrieval service.");
            throw new RuntimeException("Could not start the leader retrieval service.", e);
        }
    }

    public void postStop() {
        try {
            this.leaderRetrievalService.stop();
        }
        catch (Exception e) {
            this.LOG.warn("Could not properly stop the leader retrieval service.");
        }
    }

    protected abstract void connectedToJobManager();

    protected abstract void handleCustomMessage(Object var1);

    protected abstract Class getClientMessageClass();

    @Override
    protected void handleMessage(Object message) {
        if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
            this.logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged)message);
        } else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
            this.logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged)message);
        } else if (message instanceof JobClientMessages.JobManagerLeaderAddress) {
            JobClientMessages.JobManagerLeaderAddress msg = (JobClientMessages.JobManagerLeaderAddress)message;
            if (this.jobManager != null) {
                this.logAndPrintMessage("New JobManager elected. Connecting to " + msg.address());
            }
            this.disconnectFromJobManager();
            this.leaderSessionID = msg.leaderSessionID();
            if (msg.address() != null) {
                AkkaUtils.getActorRefFuture(msg.address(), this.getContext().system(), this.timeout).onSuccess((PartialFunction)new OnSuccess<ActorRef>(){

                    public void onSuccess(ActorRef result) throws Throwable {
                        JobClientActor.this.getSelf().tell(JobClientActor.this.decorateMessage(new JobClientMessages.JobManagerActorRef(result)), ActorRef.noSender());
                    }
                }, (ExecutionContext)this.getContext().dispatcher());
            } else if (this.isClientConnected() && this.connectionTimeoutId == null) {
                this.registerConnectionTimeout();
            }
        } else if (message instanceof JobClientMessages.JobManagerActorRef) {
            JobClientMessages.JobManagerActorRef msg = (JobClientMessages.JobManagerActorRef)message;
            this.connectToJobManager(msg.jobManager());
            this.logAndPrintMessage("Connected to JobManager at " + msg.jobManager() + " with leader session id " + this.leaderSessionID + '.');
            this.connectedToJobManager();
        } else if (message instanceof JobManagerMessages.JobResultMessage) {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("Received {} message from JobManager", (Object)message.getClass().getSimpleName());
            }
            if (this.isClientConnected()) {
                this.client.tell(this.decorateMessage(message), this.getSelf());
            }
            this.terminate();
        } else if (message instanceof Terminated) {
            ActorRef target = ((Terminated)message).getActor();
            if (this.jobManager.equals((Object)target)) {
                this.LOG.info("Lost connection to JobManager {}. Triggering connection timeout.", (Object)this.jobManager.path());
                this.disconnectFromJobManager();
                if (this.isClientConnected() && this.connectionTimeoutId == null) {
                    this.registerConnectionTimeout();
                }
            } else {
                this.LOG.warn("Received 'Terminated' for unknown actor " + target);
            }
        } else if (message instanceof JobClientMessages.ConnectionTimeout) {
            JobClientMessages.ConnectionTimeout timeoutMessage = (JobClientMessages.ConnectionTimeout)message;
            if (Objects.equals(this.connectionTimeoutId, timeoutMessage.id())) {
                if (!this.isJobManagerConnected()) {
                    JobClientActorConnectionTimeoutException errorMessage = new JobClientActorConnectionTimeoutException("Lost connection to the JobManager.");
                    Object replyMessage = this.decorateMessage(new Status.Failure((Throwable)errorMessage));
                    if (this.isClientConnected()) {
                        this.client.tell(replyMessage, this.getSelf());
                    }
                    this.terminate();
                }
            } else {
                this.LOG.debug("Received outdated connection timeout.");
            }
        } else if (!this.isJobManagerConnected() && this.getClientMessageClass().equals(message.getClass())) {
            this.LOG.info("Received {} but there is no connection to a JobManager yet.", message);
            if (this.connectionTimeoutId == null) {
                this.registerConnectionTimeout();
            }
            this.handleCustomMessage(message);
        } else if (!this.toBeTerminated) {
            this.handleCustomMessage(message);
        } else {
            String msg = this.getClass().getName() + " is about to be terminated. Therefore, the job submission cannot be executed.";
            this.LOG.error(msg);
            this.getSender().tell(this.decorateMessage(new Status.Failure((Throwable)new Exception(msg))), ActorRef.noSender());
        }
    }

    @Override
    protected UUID getLeaderSessionID() {
        return this.leaderSessionID;
    }

    protected void logAndPrintMessage(String message) {
        this.LOG.info(message);
        if (this.sysoutUpdates) {
            System.out.println(message);
        }
    }

    private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged message) {
        this.LOG.info(message.toString());
        if (this.sysoutUpdates) {
            System.out.println(message.toString());
        }
    }

    private void logAndPrintMessage(ExecutionGraphMessages.JobStatusChanged message) {
        if (message.newJobStatus() != JobStatus.FAILING || message.error() == null) {
            this.LOG.info(message.toString());
            if (this.sysoutUpdates) {
                System.out.println(message.toString());
            }
        } else {
            Throwable error = SerializedThrowable.get(message.error(), this.getClass().getClassLoader());
            this.LOG.info(message.toString(), error);
            if (this.sysoutUpdates) {
                System.out.println(message.toString());
                message.error().printStackTrace(System.out);
            }
        }
    }

    @Override
    public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
        this.getSelf().tell(this.decorateMessage(new JobClientMessages.JobManagerLeaderAddress(leaderAddress, leaderSessionID)), this.getSelf());
    }

    @Override
    public void handleError(Exception exception) {
        this.LOG.error("Error occurred in the LeaderRetrievalService.", (Throwable)exception);
        this.getSelf().tell(this.decorateMessage(PoisonPill.getInstance()), this.getSelf());
    }

    private void disconnectFromJobManager() {
        this.LOG.info("Disconnect from JobManager {}.", (Object)this.jobManager);
        if (this.jobManager != ActorRef.noSender()) {
            this.getContext().unwatch(this.jobManager);
            this.jobManager = ActorRef.noSender();
        }
        this.leaderSessionID = null;
    }

    private void connectToJobManager(ActorRef jobManager) {
        this.LOG.info("Connect to JobManager {}.", (Object)jobManager);
        if (jobManager != ActorRef.noSender()) {
            this.getContext().unwatch(jobManager);
        }
        this.jobManager = jobManager;
        this.getContext().watch(jobManager);
        this.unregisterConnectionTimeout();
    }

    protected void terminate() {
        this.LOG.info("Terminate JobClientActor.");
        this.toBeTerminated = true;
        this.disconnectFromJobManager();
        this.getSelf().tell(this.decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
    }

    private boolean isJobManagerConnected() {
        return this.jobManager != ActorRef.noSender();
    }

    protected boolean isClientConnected() {
        return this.client != ActorRef.noSender();
    }

    private void registerConnectionTimeout() {
        if (this.connectionTimeout != null) {
            this.connectionTimeout.cancel();
        }
        this.connectionTimeoutId = UUID.randomUUID();
        this.connectionTimeout = this.getContext().system().scheduler().scheduleOnce(this.timeout, this.getSelf(), this.decorateMessage(new JobClientMessages.ConnectionTimeout(this.connectionTimeoutId)), (ExecutionContext)this.getContext().dispatcher(), ActorRef.noSender());
    }

    private void unregisterConnectionTimeout() {
        if (this.connectionTimeout != null) {
            this.connectionTimeout.cancel();
            this.connectionTimeout = null;
            this.connectionTimeoutId = null;
        }
    }
}

