/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.BitSet;
import java.util.concurrent.Callable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.AkkaGateway;
import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;

class AkkaInvocationHandler
implements InvocationHandler,
AkkaGateway,
MainThreadExecutable,
StartStoppable,
SelfGateway {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class);
    private final String address;
    private final String hostname;
    private final ActorRef rpcEndpoint;
    private final boolean isLocal;
    private final Time timeout;
    private final long maximumFramesize;
    private final org.apache.flink.runtime.concurrent.Future<Void> terminationFuture;

    AkkaInvocationHandler(String address, String hostname, ActorRef rpcEndpoint, Time timeout, long maximumFramesize, org.apache.flink.runtime.concurrent.Future<Void> terminationFuture) {
        this.address = (String)Preconditions.checkNotNull((Object)address);
        this.hostname = (String)Preconditions.checkNotNull((Object)hostname);
        this.rpcEndpoint = (ActorRef)Preconditions.checkNotNull((Object)rpcEndpoint);
        this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
        this.timeout = (Time)Preconditions.checkNotNull((Object)timeout);
        this.maximumFramesize = maximumFramesize;
        this.terminationFuture = terminationFuture;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        FlinkFuture result;
        Class<?> declaringClass = method.getDeclaringClass();
        if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(SelfGateway.class)) {
            result = method.invoke((Object)this, args);
        } else {
            RpcInvocation rpcInvocation;
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            Annotation[][] parameterAnnotations = method.getParameterAnnotations();
            Time futureTimeout = AkkaInvocationHandler.extractRpcTimeout(parameterAnnotations, args, this.timeout);
            Tuple2<Class<?>[], Object[]> filteredArguments = AkkaInvocationHandler.filterArguments(parameterTypes, parameterAnnotations, args);
            if (this.isLocal) {
                rpcInvocation = new LocalRpcInvocation(methodName, (Class[])filteredArguments.f0, (Object[])filteredArguments.f1);
            } else {
                try {
                    RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(methodName, (Class[])filteredArguments.f0, (Object[])filteredArguments.f1);
                    if (remoteRpcInvocation.getSize() > this.maximumFramesize) {
                        throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");
                    }
                    rpcInvocation = remoteRpcInvocation;
                }
                catch (IOException e) {
                    LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", (Throwable)e);
                    throw e;
                }
            }
            Class<?> returnType = method.getReturnType();
            if (returnType.equals(Void.TYPE)) {
                this.rpcEndpoint.tell((Object)rpcInvocation, ActorRef.noSender());
                result = null;
            } else if (returnType.equals(org.apache.flink.runtime.concurrent.Future.class)) {
                result = new FlinkFuture(Patterns.ask((ActorRef)this.rpcEndpoint, (Object)rpcInvocation, (long)futureTimeout.toMilliseconds()));
            } else {
                Future scalaFuture = Patterns.ask((ActorRef)this.rpcEndpoint, (Object)rpcInvocation, (long)futureTimeout.toMilliseconds());
                FlinkFuture futureResult = new FlinkFuture(scalaFuture);
                return futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
            }
        }
        return result;
    }

    @Override
    public ActorRef getRpcEndpoint() {
        return this.rpcEndpoint;
    }

    @Override
    public void runAsync(Runnable runnable) {
        this.scheduleRunAsync(runnable, 0L);
    }

    @Override
    public void scheduleRunAsync(Runnable runnable, long delayMillis) {
        Preconditions.checkNotNull((Object)runnable, (String)"runnable");
        Preconditions.checkArgument((delayMillis >= 0L ? 1 : 0) != 0, (Object)"delay must be zero or greater");
        if (!this.isLocal) {
            throw new RuntimeException("Trying to send a Runnable to a remote actor at " + this.rpcEndpoint.path() + ". This is not supported.");
        }
        long atTimeNanos = delayMillis == 0L ? 0L : System.nanoTime() + delayMillis * 1000000L;
        this.rpcEndpoint.tell((Object)new RunAsync(runnable, atTimeNanos), ActorRef.noSender());
    }

    @Override
    public <V> org.apache.flink.runtime.concurrent.Future<V> callAsync(Callable<V> callable, Time callTimeout) {
        if (this.isLocal) {
            Future result = Patterns.ask((ActorRef)this.rpcEndpoint, (Object)new CallAsync(callable), (long)callTimeout.toMilliseconds());
            return new FlinkFuture(result);
        }
        throw new RuntimeException("Trying to send a Callable to a remote actor at " + this.rpcEndpoint.path() + ". This is not supported.");
    }

    @Override
    public void start() {
        this.rpcEndpoint.tell((Object)Processing.START, ActorRef.noSender());
    }

    @Override
    public void stop() {
        this.rpcEndpoint.tell((Object)Processing.STOP, ActorRef.noSender());
    }

    private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) {
        if (args != null) {
            Preconditions.checkArgument((parameterAnnotations.length == args.length ? 1 : 0) != 0);
            for (int i = 0; i < parameterAnnotations.length; ++i) {
                if (!AkkaInvocationHandler.isRpcTimeout(parameterAnnotations[i])) continue;
                if (args[i] instanceof Time) {
                    return (Time)args[i];
                }
                throw new RuntimeException("The rpc timeout parameter must be of type " + Time.class.getName() + ". The type " + args[i].getClass().getName() + " is not supported.");
            }
        }
        return defaultTimeout;
    }

    private static Tuple2<Class<?>[], Object[]> filterArguments(Class<?>[] parameterTypes, Annotation[][] parameterAnnotations, Object[] args) {
        Object[] filteredArgs;
        Class<?>[] filteredParameterTypes;
        if (args == null) {
            filteredParameterTypes = parameterTypes;
            filteredArgs = null;
        } else {
            Preconditions.checkArgument((parameterTypes.length == parameterAnnotations.length ? 1 : 0) != 0);
            Preconditions.checkArgument((parameterAnnotations.length == args.length ? 1 : 0) != 0);
            BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length);
            int numberRpcParameters = parameterTypes.length;
            for (int i = 0; i < parameterTypes.length; ++i) {
                if (!AkkaInvocationHandler.isRpcTimeout(parameterAnnotations[i])) continue;
                isRpcTimeoutParameter.set(i);
                --numberRpcParameters;
            }
            if (numberRpcParameters == parameterTypes.length) {
                filteredParameterTypes = parameterTypes;
                filteredArgs = args;
            } else {
                filteredParameterTypes = new Class[numberRpcParameters];
                filteredArgs = new Object[numberRpcParameters];
                int counter = 0;
                for (int i = 0; i < parameterTypes.length; ++i) {
                    if (isRpcTimeoutParameter.get(i)) continue;
                    filteredParameterTypes[counter] = parameterTypes[i];
                    filteredArgs[counter] = args[i];
                    ++counter;
                }
            }
        }
        return Tuple2.of(filteredParameterTypes, filteredArgs);
    }

    private static boolean isRpcTimeout(Annotation[] annotations) {
        for (Annotation annotation : annotations) {
            if (!annotation.annotationType().equals(RpcTimeout.class)) continue;
            return true;
        }
        return false;
    }

    @Override
    public String getAddress() {
        return this.address;
    }

    @Override
    public String getHostname() {
        return this.hostname;
    }

    @Override
    public org.apache.flink.runtime.concurrent.Future<Void> getTerminationFuture() {
        return this.terminationFuture;
    }
}

