/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.hadoop2.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.hadoop2.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;

public class NameNodeProxies {
    private static final Log LOG = LogFactory.getLog(NameNodeProxies.class);

    public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
        return NameNodeProxies.createProxy(conf, nameNodeUri, xface, null);
    }

    public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth) throws IOException {
        AbstractNNFailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies.createFailoverProxyProvider(conf, nameNodeUri, xface, true, fallbackToSimpleAuth);
        if (failoverProxyProvider == null) {
            return NameNodeProxies.createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
        }
        DFSClient.Conf config = new DFSClient.Conf(conf);
        Object proxy = RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.maxRetryAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis));
        Text dtService = failoverProxyProvider.useLogicalURI() ? HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, "hdfs") : SecurityUtil.buildTokenService(NameNode.getAddress(nameNodeUri));
        InetSocketAddress nnAddress = HAUtil.isLogicalUri(conf, nameNodeUri) ? InetSocketAddress.createUnresolved(nameNodeUri.getHost(), 8020) : NameNode.getAddress(nameNodeUri);
        return new ProxyAndInfo<Object>(proxy, dtService, nnAddress);
    }

    public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(Configuration config, URI nameNodeUri, Class<T> xface, int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth) throws IOException {
        Preconditions.checkArgument(numResponseToDrop > 0);
        AbstractNNFailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies.createFailoverProxyProvider(config, nameNodeUri, xface, true, fallbackToSimpleAuth);
        if (failoverProxyProvider != null) {
            int delay = config.getInt("dfs.client.failover.sleep.base.millis", 500);
            int maxCap = config.getInt("dfs.client.failover.sleep.max.millis", 15000);
            int maxFailoverAttempts = config.getInt("dfs.client.failover.max.attempts", 15);
            int maxRetryAttempts = config.getInt("dfs.client.retry.max.attempts", 10);
            LossyRetryInvocationHandler<T> dummyHandler = new LossyRetryInvocationHandler<T>(numResponseToDrop, failoverProxyProvider, RetryPolicies.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, Math.max(numResponseToDrop + 1, maxRetryAttempts), delay, maxCap));
            Object proxy = Proxy.newProxyInstance(failoverProxyProvider.getInterface().getClassLoader(), new Class[]{xface}, dummyHandler);
            Text dtService = failoverProxyProvider.useLogicalURI() ? HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, "hdfs") : SecurityUtil.buildTokenService(NameNode.getAddress(nameNodeUri));
            return new ProxyAndInfo<Object>(proxy, dtService, NameNode.getAddress(nameNodeUri));
        }
        LOG.warn((Object)"Currently creating proxy using LossyRetryInvocationHandler requires NN HA setup");
        return null;
    }

    public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries) throws IOException {
        return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
    }

    public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException {
        Object proxy;
        Text dtService = SecurityUtil.buildTokenService(nnAddr);
        if (xface == ClientProtocol.class) {
            proxy = NameNodeProxies.createNNProxyWithClientProtocol(nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth);
        } else if (xface == JournalProtocol.class) {
            proxy = NameNodeProxies.createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
        } else if (xface == NamenodeProtocol.class) {
            proxy = NameNodeProxies.createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi, withRetries);
        } else if (xface == GetUserMappingsProtocol.class) {
            proxy = NameNodeProxies.createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
        } else if (xface == RefreshUserMappingsProtocol.class) {
            proxy = NameNodeProxies.createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
        } else if (xface == RefreshAuthorizationPolicyProtocol.class) {
            proxy = NameNodeProxies.createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr, conf, ugi);
        } else if (xface == RefreshCallQueueProtocol.class) {
            proxy = NameNodeProxies.createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
        } else {
            String message = "Unsupported protocol found when creating the proxy connection to NameNode: " + (xface != null ? xface.getClass().getName() : "null");
            LOG.error((Object)message);
            throw new IllegalStateException(message);
        }
        return new ProxyAndInfo<Object>(proxy, dtService, nnAddr);
    }

    private static JournalProtocol createNNProxyWithJournalProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi) throws IOException {
        JournalProtocolPB proxy = (JournalProtocolPB)NameNodeProxies.createNameNodeProxy(address, conf, ugi, JournalProtocolPB.class);
        return new JournalProtocolTranslatorPB(proxy);
    }

    private static RefreshAuthorizationPolicyProtocol createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi) throws IOException {
        RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB)NameNodeProxies.createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class);
        return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
    }

    private static RefreshUserMappingsProtocol createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi) throws IOException {
        RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB)NameNodeProxies.createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class);
        return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
    }

    private static RefreshCallQueueProtocol createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi) throws IOException {
        RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)NameNodeProxies.createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class);
        return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
    }

    private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi) throws IOException {
        GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)NameNodeProxies.createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class);
        return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
    }

    private static NamenodeProtocol createNNProxyWithNamenodeProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries) throws IOException {
        NamenodeProtocolPB proxy = (NamenodeProtocolPB)NameNodeProxies.createNameNodeProxy(address, conf, ugi, NamenodeProtocolPB.class);
        if (withRetries) {
            RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200L, TimeUnit.MILLISECONDS);
            HashMap<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
            methodNameToPolicyMap.put("getBlocks", timeoutPolicy);
            methodNameToPolicyMap.put("getAccessKeys", timeoutPolicy);
            NamenodeProtocolTranslatorPB translatorProxy = new NamenodeProtocolTranslatorPB(proxy);
            return (NamenodeProtocol)RetryProxy.create(NamenodeProtocol.class, translatorProxy, methodNameToPolicyMap);
        }
        return new NamenodeProtocolTranslatorPB(proxy);
    }

    private static ClientProtocol createNNProxyWithClientProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException {
        RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
        RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf, "dfs.client.retry.policy.enabled", false, "dfs.client.retry.policy.spec", "10000,6,60000,10", SafeModeException.class);
        long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
        ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf), defaultPolicy, fallbackToSimpleAuth).getProxy();
        if (withRetries) {
            HashMap<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
            ClientNamenodeProtocolTranslatorPB translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy);
            return (ClientProtocol)RetryProxy.create(ClientProtocol.class, new DefaultFailoverProxyProvider<ClientNamenodeProtocolTranslatorPB>(ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy);
        }
        return new ClientNamenodeProtocolTranslatorPB(proxy);
    }

    private static Object createNameNodeProxy(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, Class<?> xface) throws IOException {
        RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
        Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address, ugi, conf, NetUtils.getDefaultSocketFactory(conf));
        return proxy;
    }

    @VisibleForTesting
    public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(Configuration conf, URI nameNodeUri) throws IOException {
        if (nameNodeUri == null) {
            return null;
        }
        String host = nameNodeUri.getHost();
        String configKey = "dfs.client.failover.proxy.provider." + host;
        try {
            Class<FailoverProxyProvider<T>> ret = conf.getClass(configKey, null, FailoverProxyProvider.class);
            return ret;
        }
        catch (RuntimeException e) {
            if (e.getCause() instanceof ClassNotFoundException) {
                throw new IOException("Could not load failover proxy provider class " + conf.get(configKey) + " which is configured for authority " + nameNodeUri, e);
            }
            throw e;
        }
    }

    @VisibleForTesting
    public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth) throws IOException {
        int port;
        WrappedFailoverProxyProvider<T> providerNN;
        Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
        Preconditions.checkArgument(xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface);
        try {
            failoverProxyProviderClass = NameNodeProxies.getFailoverProxyProviderClass(conf, nameNodeUri);
            if (failoverProxyProviderClass == null) {
                return null;
            }
            Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass.getConstructor(Configuration.class, URI.class, Class.class);
            FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, xface);
            providerNN = !(provider instanceof AbstractNNFailoverProxyProvider) ? new WrappedFailoverProxyProvider<T>(provider) : (WrappedFailoverProxyProvider<T>)provider;
        }
        catch (Exception e) {
            String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)message, (Throwable)e);
            }
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            throw new IOException(message, e);
        }
        if (checkPort && ((AbstractNNFailoverProxyProvider)providerNN).useLogicalURI() && (port = nameNodeUri.getPort()) > 0 && port != 8020) {
            throw new IOException("Port " + port + " specified in URI " + nameNodeUri + " but host '" + nameNodeUri.getHost() + "' is a logical (HA) namenode" + " and does not use port information.");
        }
        providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
        return providerNN;
    }

    public static class ProxyAndInfo<PROXYTYPE> {
        private final PROXYTYPE proxy;
        private final Text dtService;
        private final InetSocketAddress address;

        public ProxyAndInfo(PROXYTYPE proxy, Text dtService, InetSocketAddress address) {
            this.proxy = proxy;
            this.dtService = dtService;
            this.address = address;
        }

        public PROXYTYPE getProxy() {
            return this.proxy;
        }

        public Text getDelegationTokenService() {
            return this.dtService;
        }

        public InetSocketAddress getAddress() {
            return this.address;
        }
    }
}

