/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.AkkaGateway;
import org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler;
import org.apache.flink.runtime.rpc.akka.AkkaRpcActor;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Option;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

@ThreadSafe
public class AkkaRpcService
implements RpcService {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);
    static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
    private final Object lock = new Object();
    private final ActorSystem actorSystem;
    private final Time timeout;
    private final Set<ActorRef> actors = new HashSet<ActorRef>(4);
    private final long maximumFramesize;
    private final String address;
    private final ScheduledExecutor internalScheduledExecutor;
    private volatile boolean stopped;

    public AkkaRpcService(ActorSystem actorSystem, Time timeout) {
        this.actorSystem = (ActorSystem)Preconditions.checkNotNull((Object)actorSystem, (String)"actor system");
        this.timeout = (Time)Preconditions.checkNotNull((Object)timeout, (String)"timeout");
        this.maximumFramesize = actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH) ? actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH) : Long.MAX_VALUE;
        Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
        this.address = actorSystemAddress.host().isDefined() ? (String)actorSystemAddress.host().get() : "";
        this.internalScheduledExecutor = new InternalScheduledExecutorImpl(actorSystem);
    }

    @Override
    public String getAddress() {
        return this.address;
    }

    @Override
    public <C extends RpcGateway> Future<C> connect(final String address, final Class<C> clazz) {
        Preconditions.checkState((!this.stopped ? 1 : 0) != 0, (Object)"RpcService is stopped");
        LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", (Object)address, (Object)clazz.getName());
        ActorSelection actorSel = this.actorSystem.actorSelection(address);
        scala.concurrent.Future identify = Patterns.ask((ActorSelection)actorSel, (Object)new Identify((Object)42), (long)this.timeout.toMilliseconds());
        scala.concurrent.Future resultFuture = identify.map((Function1)new Mapper<Object, C>(){

            public C checkedApply(Object obj) throws Exception {
                ActorIdentity actorIdentity = (ActorIdentity)obj;
                if (actorIdentity.getRef() == null) {
                    throw new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.');
                }
                ActorRef actorRef = actorIdentity.getRef();
                String address2 = AkkaUtils.getAkkaURL(AkkaRpcService.this.actorSystem, actorRef);
                Option host = actorRef.path().address().host();
                String hostname = host.isEmpty() ? "localhost" : (String)host.get();
                AkkaInvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address2, hostname, actorRef, AkkaRpcService.this.timeout, AkkaRpcService.this.maximumFramesize, null);
                ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader();
                RpcGateway proxy = (RpcGateway)Proxy.newProxyInstance(classLoader, new Class[]{clazz}, (InvocationHandler)akkaInvocationHandler);
                return proxy;
            }
        }, (ExecutionContext)this.actorSystem.dispatcher());
        return new FlinkFuture(resultFuture);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
        ActorRef actorRef;
        Preconditions.checkNotNull(rpcEndpoint, (String)"rpc endpoint");
        FlinkCompletableFuture<Void> terminationFuture = new FlinkCompletableFuture<Void>();
        Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, (Object[])new Object[]{rpcEndpoint, terminationFuture});
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.stopped ? 1 : 0) != 0, (Object)"RpcService is stopped");
            actorRef = this.actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
            this.actors.add(actorRef);
        }
        LOG.info("Starting RPC endpoint for {} at {} .", (Object)rpcEndpoint.getClass().getName(), (Object)actorRef.path());
        String address = AkkaUtils.getAkkaURL(this.actorSystem, actorRef);
        Option host = actorRef.path().address().host();
        String hostname = host.isEmpty() ? "localhost" : (String)host.get();
        AkkaInvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, hostname, actorRef, this.timeout, this.maximumFramesize, terminationFuture);
        ClassLoader classLoader = this.getClass().getClassLoader();
        RpcGateway self = (RpcGateway)Proxy.newProxyInstance(classLoader, new Class[]{rpcEndpoint.getSelfGatewayType(), SelfGateway.class, MainThreadExecutable.class, StartStoppable.class, AkkaGateway.class}, (InvocationHandler)akkaInvocationHandler);
        return (C)self;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopServer(RpcGateway selfGateway) {
        if (selfGateway instanceof AkkaGateway) {
            boolean fromThisService;
            AkkaGateway akkaClient = (AkkaGateway)selfGateway;
            Object object = this.lock;
            synchronized (object) {
                if (this.stopped) {
                    return;
                }
                fromThisService = this.actors.remove(akkaClient.getRpcEndpoint());
            }
            if (fromThisService) {
                ActorRef selfActorRef = akkaClient.getRpcEndpoint();
                LOG.info("Stopping RPC endpoint {}.", (Object)selfActorRef.path());
                selfActorRef.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            } else {
                LOG.debug("RPC endpoint {} already stopped or from different RPC service");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stopService() {
        LOG.info("Stopping Akka RPC service.");
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
            this.actorSystem.shutdown();
            this.actors.clear();
        }
        this.actorSystem.awaitTermination();
    }

    @Override
    public Future<Void> getTerminationFuture() {
        return FlinkFuture.supplyAsync(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                AkkaRpcService.this.actorSystem.awaitTermination();
                return null;
            }
        }, this.getExecutor());
    }

    @Override
    public Executor getExecutor() {
        return this.actorSystem.dispatcher();
    }

    @Override
    public ScheduledExecutor getScheduledExecutor() {
        return this.internalScheduledExecutor;
    }

    @Override
    public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
        Preconditions.checkNotNull((Object)runnable, (String)"runnable");
        Preconditions.checkNotNull((Object)((Object)unit), (String)"unit");
        Preconditions.checkArgument((delay >= 0L ? 1 : 0) != 0, (Object)"delay must be zero or larger");
        return this.internalScheduledExecutor.schedule(runnable, delay, unit);
    }

    @Override
    public void execute(Runnable runnable) {
        this.actorSystem.dispatcher().execute(runnable);
    }

    @Override
    public <T> Future<T> execute(Callable<T> callable) {
        scala.concurrent.Future scalaFuture = Futures.future(callable, (ExecutionContext)this.actorSystem.dispatcher());
        return new FlinkFuture(scalaFuture);
    }

    private static final class InternalScheduledExecutorImpl
    implements ScheduledExecutor {
        private final ActorSystem actorSystem;

        private InternalScheduledExecutorImpl(ActorSystem actorSystem) {
            this.actorSystem = (ActorSystem)Preconditions.checkNotNull((Object)actorSystem, (String)"rpcService");
        }

        @Override
        @Nonnull
        public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
            ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(command, unit.toNanos(delay), 0L);
            Cancellable cancellable = this.internalSchedule(scheduledFutureTask, delay, unit);
            scheduledFutureTask.setCancellable(cancellable);
            return scheduledFutureTask;
        }

        @Override
        @Nonnull
        public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
            ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<V>(callable, unit.toNanos(delay), 0L);
            Cancellable cancellable = this.internalSchedule(scheduledFutureTask, delay, unit);
            scheduledFutureTask.setCancellable(cancellable);
            return scheduledFutureTask;
        }

        @Override
        @Nonnull
        public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
            ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(command, this.triggerTime(unit.toNanos(initialDelay)), unit.toNanos(period));
            Cancellable cancellable = this.actorSystem.scheduler().schedule(new FiniteDuration(initialDelay, unit), new FiniteDuration(period, unit), scheduledFutureTask, (ExecutionContext)this.actorSystem.dispatcher());
            scheduledFutureTask.setCancellable(cancellable);
            return scheduledFutureTask;
        }

        @Override
        @Nonnull
        public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
            ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(command, this.triggerTime(unit.toNanos(initialDelay)), unit.toNanos(-delay));
            Cancellable cancellable = this.internalSchedule(scheduledFutureTask, initialDelay, unit);
            scheduledFutureTask.setCancellable(cancellable);
            return scheduledFutureTask;
        }

        @Override
        public void execute(@Nonnull Runnable command) {
            this.actorSystem.dispatcher().execute(command);
        }

        private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
            return this.actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, (ExecutionContext)this.actorSystem.dispatcher());
        }

        private long now() {
            return System.nanoTime();
        }

        private long triggerTime(long delay) {
            return this.now() + delay;
        }

        private final class ScheduledFutureTask<V>
        extends FutureTask<V>
        implements RunnableScheduledFuture<V> {
            private long time;
            private final long period;
            private volatile Cancellable cancellable;

            ScheduledFutureTask(Callable<V> callable, long time, long period) {
                super(callable);
                this.time = time;
                this.period = period;
            }

            ScheduledFutureTask(Runnable runnable, long time, long period) {
                super(runnable, null);
                this.time = time;
                this.period = period;
            }

            public void setCancellable(Cancellable newCancellable) {
                this.cancellable = newCancellable;
            }

            @Override
            public void run() {
                if (!this.isPeriodic()) {
                    super.run();
                } else if (this.runAndReset()) {
                    if (this.period > 0L) {
                        this.time += this.period;
                    } else {
                        this.cancellable = InternalScheduledExecutorImpl.this.internalSchedule(this, -this.period, TimeUnit.NANOSECONDS);
                        if (this.isCancelled()) {
                            this.cancellable.cancel();
                        } else {
                            this.time = InternalScheduledExecutorImpl.this.triggerTime(-this.period);
                        }
                    }
                }
            }

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                boolean result = super.cancel(mayInterruptIfRunning);
                return result && this.cancellable.cancel();
            }

            @Override
            public long getDelay(@Nonnull TimeUnit unit) {
                return unit.convert(this.time - InternalScheduledExecutorImpl.this.now(), TimeUnit.NANOSECONDS);
            }

            @Override
            public int compareTo(@Nonnull Delayed o) {
                if (o == this) {
                    return 0;
                }
                long diff = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
                return diff < 0L ? -1 : (diff > 0L ? 1 : 0);
            }

            @Override
            public boolean isPeriodic() {
                return this.period != 0L;
            }
        }
    }
}

