/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.JobLeaderListener;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobLeaderService {
    private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class);
    private final TaskManagerLocation ownLocation;
    private final Map<JobID, Tuple2<LeaderRetrievalService, JobManagerLeaderListener>> jobLeaderServices;
    private volatile State state;
    private String ownerAddress;
    private RpcService rpcService;
    private HighAvailabilityServices highAvailabilityServices;
    private JobLeaderListener jobLeaderListener;

    public JobLeaderService(TaskManagerLocation location) {
        this.ownLocation = (TaskManagerLocation)Preconditions.checkNotNull((Object)location);
        this.jobLeaderServices = new HashMap<JobID, Tuple2<LeaderRetrievalService, JobManagerLeaderListener>>(4);
        this.state = State.CREATED;
        this.ownerAddress = null;
        this.rpcService = null;
        this.highAvailabilityServices = null;
        this.jobLeaderListener = null;
    }

    public void start(String initialOwnerAddress, RpcService initialRpcService, HighAvailabilityServices initialHighAvailabilityServices, JobLeaderListener initialJobLeaderListener) {
        if (State.CREATED != this.state) {
            throw new IllegalStateException("The service has already been started.");
        }
        LOG.info("Start job leader service.");
        this.ownerAddress = (String)Preconditions.checkNotNull((Object)initialOwnerAddress);
        this.rpcService = (RpcService)Preconditions.checkNotNull((Object)initialRpcService);
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)initialHighAvailabilityServices);
        this.jobLeaderListener = (JobLeaderListener)Preconditions.checkNotNull((Object)initialJobLeaderListener);
        this.state = State.STARTED;
    }

    public void stop() throws Exception {
        LOG.info("Stop job leader service.");
        if (State.STARTED == this.state) {
            for (Tuple2<LeaderRetrievalService, JobManagerLeaderListener> leaderRetrievalServiceEntry : this.jobLeaderServices.values()) {
                LeaderRetrievalService leaderRetrievalService = (LeaderRetrievalService)leaderRetrievalServiceEntry.f0;
                JobManagerLeaderListener jobManagerLeaderListener = (JobManagerLeaderListener)leaderRetrievalServiceEntry.f1;
                jobManagerLeaderListener.stop();
                leaderRetrievalService.stop();
            }
            this.jobLeaderServices.clear();
        }
        this.state = State.STOPPED;
    }

    public boolean containsJob(JobID jobId) {
        Preconditions.checkState((State.STARTED == this.state ? 1 : 0) != 0, (Object)"The service is currently not running.");
        return this.jobLeaderServices.containsKey(jobId);
    }

    public void removeJob(JobID jobId) throws Exception {
        Preconditions.checkState((State.STARTED == this.state ? 1 : 0) != 0, (Object)"The service is currently not running.");
        Tuple2<LeaderRetrievalService, JobManagerLeaderListener> entry = this.jobLeaderServices.remove(jobId);
        if (entry != null) {
            LOG.info("Remove job {} from job leader monitoring.", (Object)jobId);
            LeaderRetrievalService leaderRetrievalService = (LeaderRetrievalService)entry.f0;
            JobManagerLeaderListener jobManagerLeaderListener = (JobManagerLeaderListener)entry.f1;
            leaderRetrievalService.stop();
            jobManagerLeaderListener.stop();
        }
    }

    public void addJob(JobID jobId, String defaultTargetAddress) throws Exception {
        Preconditions.checkState((State.STARTED == this.state ? 1 : 0) != 0, (Object)"The service is currently not running.");
        LOG.info("Add job {} for job leader monitoring.", (Object)jobId);
        LeaderRetrievalService leaderRetrievalService = this.highAvailabilityServices.getJobManagerLeaderRetriever(jobId);
        JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
        leaderRetrievalService.start(jobManagerLeaderListener);
        this.jobLeaderServices.put(jobId, (Tuple2<LeaderRetrievalService, JobManagerLeaderListener>)Tuple2.of((Object)leaderRetrievalService, (Object)jobManagerLeaderListener));
    }

    private static enum State {
        CREATED,
        STARTED,
        STOPPED;

    }

    private static final class JobManagerRetryingRegistration
    extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> {
        private final String taskManagerRpcAddress;
        private final TaskManagerLocation taskManagerLocation;

        JobManagerRetryingRegistration(Logger log, RpcService rpcService, String targetName, Class<JobMasterGateway> targetType, String targetAddress, UUID leaderId, String taskManagerRpcAddress, TaskManagerLocation taskManagerLocation) {
            super(log, rpcService, targetName, targetType, targetAddress, leaderId);
            this.taskManagerRpcAddress = taskManagerRpcAddress;
            this.taskManagerLocation = (TaskManagerLocation)Preconditions.checkNotNull((Object)taskManagerLocation);
        }

        @Override
        protected Future<RegistrationResponse> invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
            return gateway.registerTaskManager(this.taskManagerRpcAddress, this.taskManagerLocation, leaderId, Time.milliseconds((long)timeoutMillis));
        }
    }

    private final class JobManagerLeaderListener
    implements LeaderRetrievalListener {
        private final JobID jobId;
        private RegisteredRpcConnection<JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
        private volatile boolean stopped;
        private volatile UUID currentLeaderId;

        private JobManagerLeaderListener(JobID jobId) {
            this.jobId = (JobID)Preconditions.checkNotNull((Object)jobId);
            this.stopped = false;
            this.rpcConnection = null;
            this.currentLeaderId = null;
        }

        public void stop() {
            this.stopped = true;
            if (this.rpcConnection != null) {
                this.rpcConnection.close();
            }
        }

        @Override
        public void notifyLeaderAddress(String leaderAddress, UUID leaderId) {
            if (this.stopped) {
                LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. However, the service is no longer running.", (Object)JobLeaderService.class.getSimpleName(), (Object)this.jobId);
            } else {
                LOG.debug("New leader information for job {}. Address: {}, leader id: {}.", new Object[]{this.jobId, leaderAddress, leaderId});
                if (leaderAddress == null || leaderAddress.isEmpty()) {
                    if (this.rpcConnection != null) {
                        this.rpcConnection.close();
                    }
                    JobLeaderService.this.jobLeaderListener.jobManagerLostLeadership(this.jobId, this.currentLeaderId);
                    this.currentLeaderId = leaderId;
                } else {
                    this.currentLeaderId = leaderId;
                    if (this.rpcConnection != null) {
                        if (!leaderId.equals(this.rpcConnection.getTargetLeaderId())) {
                            this.rpcConnection.close();
                            this.rpcConnection = new JobManagerRegisteredRpcConnection(LOG, leaderAddress, leaderId, JobLeaderService.this.rpcService.getExecutor());
                        }
                    } else {
                        this.rpcConnection = new JobManagerRegisteredRpcConnection(LOG, leaderAddress, leaderId, JobLeaderService.this.rpcService.getExecutor());
                    }
                    if (this.stopped) {
                        this.rpcConnection.close();
                    } else {
                        LOG.info("Try to register at job manager {} with leader id {}.", (Object)leaderAddress, (Object)leaderId);
                        this.rpcConnection.start();
                    }
                }
            }
        }

        @Override
        public void handleError(Exception exception) {
            if (this.stopped) {
                LOG.debug("{}'s leader retrieval listener reported an exception for job {}. However, the service is no longer running.", new Object[]{JobLeaderService.class.getSimpleName(), this.jobId, exception});
            } else {
                JobLeaderService.this.jobLeaderListener.handleError(exception);
            }
        }

        private final class JobManagerRegisteredRpcConnection
        extends RegisteredRpcConnection<JobMasterGateway, JMTMRegistrationSuccess> {
            JobManagerRegisteredRpcConnection(Logger log, String targetAddress, UUID targetLeaderId, Executor executor) {
                super(log, targetAddress, targetLeaderId, executor);
            }

            @Override
            protected RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() {
                return new JobManagerRetryingRegistration(LOG, JobLeaderService.this.rpcService, "JobManager", JobMasterGateway.class, this.getTargetAddress(), this.getTargetLeaderId(), JobLeaderService.this.ownerAddress, JobLeaderService.this.ownLocation);
            }

            @Override
            protected void onRegistrationSuccess(JMTMRegistrationSuccess success) {
                if (this.getTargetLeaderId().equals(JobManagerLeaderListener.this.currentLeaderId)) {
                    this.log.info("Successful registration at job manager {} for job {}.", (Object)this.getTargetAddress(), (Object)JobManagerLeaderListener.this.jobId);
                    JobLeaderService.this.jobLeaderListener.jobManagerGainedLeadership(JobManagerLeaderListener.this.jobId, (JobMasterGateway)this.getTargetGateway(), this.getTargetLeaderId(), success);
                } else {
                    this.log.debug("Encountered obsolete JobManager registration success from {} with leader session ID {}.", (Object)this.getTargetAddress(), (Object)this.getTargetLeaderId());
                }
            }

            @Override
            protected void onRegistrationFailure(Throwable failure) {
                if (this.getTargetLeaderId().equals(JobManagerLeaderListener.this.currentLeaderId)) {
                    this.log.info("Failed to register at job manager {} for job {}.", (Object)this.getTargetAddress(), (Object)JobManagerLeaderListener.this.jobId);
                    JobLeaderService.this.jobLeaderListener.handleError(failure);
                } else {
                    this.log.debug("Obsolete JobManager registration failure from {} with leader session ID {}.", new Object[]{this.getTargetAddress(), this.getTargetLeaderId(), failure});
                }
            }
        }
    }
}

