diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java new file mode 100644 index 00000000000..223c40d3d21 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.io.retry.RetryUtils; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Create proxy objects with {@link ClientProtocol} to communicate with a remote + * NN. Generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol( + * Configuration, URI, AtomicBoolean)}, which will create either an HA- or + * non-HA-enabled client proxy as appropriate. + * + * For creating proxy objects with other protocols, please see + * {@link NameNodeProxies#createProxy(Configuration, URI, Class)}. + */ +@InterfaceAudience.Private +public class NameNodeProxiesClient { + + private static final Logger LOG = LoggerFactory.getLogger( + NameNodeProxiesClient.class); + + /** + * Wrapper for a client proxy as well as its associated service ID. + * This is simply used as a tuple-like return type for created NN proxy. + */ + public static class ProxyAndInfo { + private final PROXYTYPE proxy; + private final Text dtService; + private final InetSocketAddress address; + + public ProxyAndInfo(PROXYTYPE proxy, Text dtService, + InetSocketAddress address) { + this.proxy = proxy; + this.dtService = dtService; + this.address = address; + } + + public PROXYTYPE getProxy() { + return proxy; + } + + public Text getDelegationTokenService() { + return dtService; + } + + public InetSocketAddress getAddress() { + return address; + } + } + + /** + * Creates the namenode proxy with the ClientProtocol. This will handle + * creation of either HA- or non-HA-enabled proxy objects, depending upon + * if the provided URI is a configured logical URI. + * + * @param conf the configuration containing the required IPC + * properties, client failover configurations, etc. + * @param nameNodeUri the URI pointing either to a specific NameNode + * or to a logical nameservice. + * @param fallbackToSimpleAuth set to true or false during calls to indicate + * if a secure client falls back to simple auth + * @return an object containing both the proxy and the associated + * delegation token service it corresponds to + * @throws IOException if there is an error creating the proxy + * @see {@link NameNodeProxies#createProxy(Configuration, URI, Class)}. + */ + public static ProxyAndInfo createProxyWithClientProtocol( + Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) + throws IOException { + AbstractNNFailoverProxyProvider failoverProxyProvider = + createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, + true, fallbackToSimpleAuth); + + if (failoverProxyProvider == null) { + InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); + Text dtService = SecurityUtil.buildTokenService(nnAddr); + ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, + UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); + return new ProxyAndInfo<>(proxy, dtService, nnAddr); + } else { + return createHAProxy(conf, nameNodeUri, ClientProtocol.class, + failoverProxyProvider); + } + } + + /** + * Generate a dummy namenode proxy instance that utilizes our hacked + * {@link LossyRetryInvocationHandler}. Proxy instance generated using this + * method will proactively drop RPC responses. Currently this method only + * support HA setup. null will be returned if the given configuration is not + * for HA. + * + * @param config the configuration containing the required IPC + * properties, client failover configurations, etc. + * @param nameNodeUri the URI pointing either to a specific NameNode + * or to a logical nameservice. + * @param xface the IPC interface which should be created + * @param numResponseToDrop The number of responses to drop for each RPC call + * @param fallbackToSimpleAuth set to true or false during calls to indicate + * if a secure client falls back to simple auth + * @return an object containing both the proxy and the associated + * delegation token service it corresponds to. Will return null of the + * given configuration does not support HA. + * @throws IOException if there is an error creating the proxy + */ + public static ProxyAndInfo createProxyWithLossyRetryHandler( + Configuration config, URI nameNodeUri, Class xface, + int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth) + throws IOException { + Preconditions.checkArgument(numResponseToDrop > 0); + AbstractNNFailoverProxyProvider failoverProxyProvider = + createFailoverProxyProvider(config, nameNodeUri, xface, true, + fallbackToSimpleAuth); + + if (failoverProxyProvider != null) { // HA case + int delay = config.getInt( + HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, + HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); + int maxCap = config.getInt( + HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, + HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); + int maxFailoverAttempts = config.getInt( + HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, + HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); + int maxRetryAttempts = config.getInt( + HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, + HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); + InvocationHandler dummyHandler = new LossyRetryInvocationHandler<>( + numResponseToDrop, failoverProxyProvider, + RetryPolicies.failoverOnNetworkException( + RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, + Math.max(numResponseToDrop + 1, maxRetryAttempts), delay, + maxCap)); + + @SuppressWarnings("unchecked") + T proxy = (T) Proxy.newProxyInstance( + failoverProxyProvider.getInterface().getClassLoader(), + new Class[]{xface}, dummyHandler); + Text dtService; + if (failoverProxyProvider.useLogicalURI()) { + dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, + HdfsConstants.HDFS_URI_SCHEME); + } else { + dtService = SecurityUtil.buildTokenService( + DFSUtilClient.getNNAddress(nameNodeUri)); + } + return new ProxyAndInfo<>(proxy, dtService, + DFSUtilClient.getNNAddress(nameNodeUri)); + } else { + LOG.warn("Currently creating proxy using " + + "LossyRetryInvocationHandler requires NN HA setup"); + return null; + } + } + + /** Creates the Failover proxy provider instance*/ + @VisibleForTesting + public static AbstractNNFailoverProxyProvider createFailoverProxyProvider( + Configuration conf, URI nameNodeUri, Class xface, boolean checkPort, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + Class> failoverProxyProviderClass = null; + AbstractNNFailoverProxyProvider providerNN; + try { + // Obtain the class of the proxy provider + failoverProxyProviderClass = getFailoverProxyProviderClass(conf, + nameNodeUri); + if (failoverProxyProviderClass == null) { + return null; + } + // Create a proxy provider instance. + Constructor> ctor = failoverProxyProviderClass + .getConstructor(Configuration.class, URI.class, Class.class); + FailoverProxyProvider provider = ctor.newInstance(conf, nameNodeUri, + xface); + + // If the proxy provider is of an old implementation, wrap it. + if (!(provider instanceof AbstractNNFailoverProxyProvider)) { + providerNN = new WrappedFailoverProxyProvider<>(provider); + } else { + providerNN = (AbstractNNFailoverProxyProvider)provider; + } + } catch (Exception e) { + final String message = "Couldn't create proxy provider " + + failoverProxyProviderClass; + LOG.debug(message, e); + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException(message, e); + } + } + + // Check the port in the URI, if it is logical. + if (checkPort && providerNN.useLogicalURI()) { + int port = nameNodeUri.getPort(); + if (port > 0 && + port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) { + // Throwing here without any cleanup is fine since we have not + // actually created the underlying proxies yet. + throw new IOException("Port " + port + " specified in URI " + + nameNodeUri + " but host '" + nameNodeUri.getHost() + + "' is a logical (HA) namenode" + + " and does not use port information."); + } + } + providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); + return providerNN; + } + + /** Gets the configured Failover proxy provider's class */ + @VisibleForTesting + public static Class> getFailoverProxyProviderClass( + Configuration conf, URI nameNodeUri) throws IOException { + if (nameNodeUri == null) { + return null; + } + String host = nameNodeUri.getHost(); + String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + host; + try { + @SuppressWarnings("unchecked") + Class> ret = (Class>) + conf.getClass(configKey, null, FailoverProxyProvider.class); + return ret; + } catch (RuntimeException e) { + if (e.getCause() instanceof ClassNotFoundException) { + throw new IOException("Could not load failover proxy provider class " + + conf.get(configKey) + " which is configured for authority " + + nameNodeUri, e); + } else { + throw e; + } + } + } + + /** + * Creates an explicitly HA-enabled proxy object. + * + * @param conf the configuration object + * @param nameNodeUri the URI pointing either to a specific NameNode or to a + * logical nameservice. + * @param xface the IPC interface which should be created + * @param failoverProxyProvider Failover proxy provider + * @return an object containing both the proxy and the associated + * delegation token service it corresponds to + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static ProxyAndInfo createHAProxy( + Configuration conf, URI nameNodeUri, Class xface, + AbstractNNFailoverProxyProvider failoverProxyProvider) + throws IOException { + Preconditions.checkNotNull(failoverProxyProvider); + // HA case + DfsClientConf config = new DfsClientConf(conf); + T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, + RetryPolicies.failoverOnNetworkException( + RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(), + config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(), + config.getFailoverSleepMaxMillis())); + + Text dtService; + if (failoverProxyProvider.useLogicalURI()) { + dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, + HdfsConstants.HDFS_URI_SCHEME); + } else { + dtService = SecurityUtil.buildTokenService( + DFSUtilClient.getNNAddress(nameNodeUri)); + } + return new ProxyAndInfo<>(proxy, dtService, + DFSUtilClient.getNNAddress(nameNodeUri)); + } + + public static ClientProtocol createNonHAProxyWithClientProtocol( + InetSocketAddress address, Configuration conf, UserGroupInformation ugi, + boolean withRetries, AtomicBoolean fallbackToSimpleAuth) + throws IOException { + RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, + ProtobufRpcEngine.class); + + final RetryPolicy defaultPolicy = + RetryUtils.getDefaultRetryPolicy( + conf, + HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, + HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, + HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, + HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, + SafeModeException.class.getName()); + + final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); + ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( + ClientNamenodeProtocolPB.class, version, address, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), + org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, + fallbackToSimpleAuth).getProxy(); + + if (withRetries) { // create the proxy with retries + Map methodNameToPolicyMap = new HashMap<>(); + ClientProtocol translatorProxy = + new ClientNamenodeProtocolTranslatorPB(proxy); + return (ClientProtocol) RetryProxy.create( + ClientProtocol.class, + new DefaultFailoverProxyProvider<>(ClientProtocol.class, + translatorProxy), + methodNameToPolicyMap, + defaultPolicy); + } else { + return new ClientNamenodeProtocolTranslatorPB(proxy); + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java similarity index 92% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index a0aa10bf68f..78cd16047ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -26,7 +26,7 @@ import org.apache.hadoop.io.retry.FailoverProxyProvider; public abstract class AbstractNNFailoverProxyProvider implements FailoverProxyProvider { - protected AtomicBoolean fallbackToSimpleAuth; + private AtomicBoolean fallbackToSimpleAuth; /** * Inquire whether logical HA URI is used for the implementation. If it is @@ -48,4 +48,8 @@ public abstract class AbstractNNFailoverProxyProvider implements AtomicBoolean fallbackToSimpleAuth) { this.fallbackToSimpleAuth = fallbackToSimpleAuth; } + + public synchronized AtomicBoolean getFallbackToSimpleAuth() { + return fallbackToSimpleAuth; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java similarity index 86% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java index 2842fb96e40..0b387b7fb23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java @@ -17,18 +17,9 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import java.io.Closeable; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; - -import com.google.common.base.Preconditions; /** * A NNFailoverProxyProvider implementation which wrapps old implementations diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c9408de442f..19087e8cc2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -596,6 +596,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12428. Fix inconsistency between log-level guards and statements. (Jagadesh Kiran N and Jackie Chang via ozawa) + HDFS-9039. Separate client and server side methods of o.a.h.hdfs. + NameNodeProxies. (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index ef9bf4886ae..f6f159fadbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -98,6 +98,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; @@ -319,14 +320,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, int numResponseToDrop = conf.getInt( DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); - NameNodeProxies.ProxyAndInfo proxyInfo = null; + ProxyAndInfo proxyInfo = null; AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false); if (numResponseToDrop > 0) { // This case is used for testing. LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY + " is set to " + numResponseToDrop + ", this hacked client will proactively drop responses"); - proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf, + proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf, nameNodeUri, ClientProtocol.class, numResponseToDrop, nnFallbackToSimpleAuth); } @@ -342,8 +343,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } else { Preconditions.checkArgument(nameNodeUri != null, "null URI"); - proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, - ClientProtocol.class, nnFallbackToSimpleAuth); + proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, + nameNodeUri, nnFallbackToSimpleAuth); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); } @@ -784,8 +785,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, "a failover proxy provider configured."); } - NameNodeProxies.ProxyAndInfo info = - NameNodeProxies.createProxy(conf, uri, ClientProtocol.class); + ProxyAndInfo info = + NameNodeProxiesClient.createProxyWithClientProtocol(conf, uri, null); assert info.getDelegationTokenService().equals(token.getService()) : "Returned service '" + info.getDelegationTokenService().toString() + "' doesn't match expected service '" + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index c967c6912ed..7800596f0a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo; +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -216,9 +216,9 @@ public class HAUtil { public static boolean useLogicalUri(Configuration conf, URI nameNodeUri) throws IOException { // Create the proxy provider. Actual proxy is not created. - AbstractNNFailoverProxyProvider provider = NameNodeProxies + AbstractNNFailoverProxyProvider provider = NameNodeProxiesClient .createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, - false, null); + false, null); // No need to use logical URI since failover is not configured. if (provider == null) { @@ -332,8 +332,7 @@ public class HAUtil { List> proxies = new ArrayList>( nnAddresses.size()); for (InetSocketAddress nnAddress : nnAddresses.values()) { - NameNodeProxies.ProxyAndInfo proxyInfo = null; - proxyInfo = NameNodeProxies.createNonHAProxy(conf, + ProxyAndInfo proxyInfo = NameNodeProxies.createNonHAProxy(conf, nnAddress, xface, UserGroupInformation.getCurrentUser(), false); proxies.add(proxyInfo); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index a5039a181ba..9d168191061 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.URI; import java.util.HashMap; @@ -32,31 +29,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; -import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.io.retry.RetryUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; @@ -75,9 +60,6 @@ import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - /** * Create proxy objects to communicate with a remote NN. All remote access to an * NN should be funneled through this class. Most of the time you'll want to use @@ -89,37 +71,6 @@ public class NameNodeProxies { private static final Log LOG = LogFactory.getLog(NameNodeProxies.class); - /** - * Wrapper for a client proxy as well as its associated service ID. - * This is simply used as a tuple-like return type for - * {@link NameNodeProxies#createProxy} and - * {@link NameNodeProxies#createNonHAProxy}. - */ - public static class ProxyAndInfo { - private final PROXYTYPE proxy; - private final Text dtService; - private final InetSocketAddress address; - - public ProxyAndInfo(PROXYTYPE proxy, Text dtService, - InetSocketAddress address) { - this.proxy = proxy; - this.dtService = dtService; - this.address = address; - } - - public PROXYTYPE getProxy() { - return proxy; - } - - public Text getDelegationTokenService() { - return dtService; - } - - public InetSocketAddress getAddress() { - return address; - } - } - /** * Creates the namenode proxy with the passed protocol. This will handle * creation of either HA- or non-HA-enabled proxy objects, depending upon @@ -160,103 +111,16 @@ public class NameNodeProxies { URI nameNodeUri, Class xface, AtomicBoolean fallbackToSimpleAuth) throws IOException { AbstractNNFailoverProxyProvider failoverProxyProvider = - createFailoverProxyProvider(conf, nameNodeUri, xface, true, - fallbackToSimpleAuth); - + NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri, + xface, true, fallbackToSimpleAuth); + if (failoverProxyProvider == null) { - // Non-HA case return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); } else { - // HA case - DfsClientConf config = new DfsClientConf(conf); - T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, - RetryPolicies.failoverOnNetworkException( - RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(), - config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(), - config.getFailoverSleepMaxMillis())); - - Text dtService; - if (failoverProxyProvider.useLogicalURI()) { - dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, - HdfsConstants.HDFS_URI_SCHEME); - } else { - dtService = SecurityUtil.buildTokenService( - DFSUtilClient.getNNAddress(nameNodeUri)); - } - return new ProxyAndInfo(proxy, dtService, - DFSUtilClient.getNNAddress(nameNodeUri)); - } - } - - /** - * Generate a dummy namenode proxy instance that utilizes our hacked - * {@link LossyRetryInvocationHandler}. Proxy instance generated using this - * method will proactively drop RPC responses. Currently this method only - * support HA setup. null will be returned if the given configuration is not - * for HA. - * - * @param config the configuration containing the required IPC - * properties, client failover configurations, etc. - * @param nameNodeUri the URI pointing either to a specific NameNode - * or to a logical nameservice. - * @param xface the IPC interface which should be created - * @param numResponseToDrop The number of responses to drop for each RPC call - * @param fallbackToSimpleAuth set to true or false during calls to indicate if - * a secure client falls back to simple auth - * @return an object containing both the proxy and the associated - * delegation token service it corresponds to. Will return null of the - * given configuration does not support HA. - * @throws IOException if there is an error creating the proxy - */ - @SuppressWarnings("unchecked") - public static ProxyAndInfo createProxyWithLossyRetryHandler( - Configuration config, URI nameNodeUri, Class xface, - int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth) - throws IOException { - Preconditions.checkArgument(numResponseToDrop > 0); - AbstractNNFailoverProxyProvider failoverProxyProvider = - createFailoverProxyProvider(config, nameNodeUri, xface, true, - fallbackToSimpleAuth); - - if (failoverProxyProvider != null) { // HA case - int delay = config.getInt( - HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, - HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); - int maxCap = config.getInt( - HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, - HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); - int maxFailoverAttempts = config.getInt( - HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, - HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); - int maxRetryAttempts = config.getInt( - HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, - HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); - InvocationHandler dummyHandler = new LossyRetryInvocationHandler( - numResponseToDrop, failoverProxyProvider, - RetryPolicies.failoverOnNetworkException( - RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, - Math.max(numResponseToDrop + 1, maxRetryAttempts), delay, - maxCap)); - - T proxy = (T) Proxy.newProxyInstance( - failoverProxyProvider.getInterface().getClassLoader(), - new Class[] { xface }, dummyHandler); - Text dtService; - if (failoverProxyProvider.useLogicalURI()) { - dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, - HdfsConstants.HDFS_URI_SCHEME); - } else { - dtService = SecurityUtil.buildTokenService( - DFSUtilClient.getNNAddress(nameNodeUri)); - } - return new ProxyAndInfo(proxy, dtService, - DFSUtilClient.getNNAddress(nameNodeUri)); - } else { - LOG.warn("Currently creating proxy using " + - "LossyRetryInvocationHandler requires NN HA setup"); - return null; + return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface, + failoverProxyProvider); } } @@ -303,8 +167,8 @@ public class NameNodeProxies { T proxy; if (xface == ClientProtocol.class) { - proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi, - withRetries, fallbackToSimpleAuth); + proxy = (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol( + nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth); } else if (xface == JournalProtocol.class) { proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi); } else if (xface == NamenodeProtocol.class) { @@ -390,45 +254,6 @@ public class NameNodeProxies { return new NamenodeProtocolTranslatorPB(proxy); } } - - private static ClientProtocol createNNProxyWithClientProtocol( - InetSocketAddress address, Configuration conf, UserGroupInformation ugi, - boolean withRetries, AtomicBoolean fallbackToSimpleAuth) - throws IOException { - RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); - - final RetryPolicy defaultPolicy = - RetryUtils.getDefaultRetryPolicy( - conf, - HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, - HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, - HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, - HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, - SafeModeException.class.getName()); - - final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); - ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( - ClientNamenodeProtocolPB.class, version, address, ugi, conf, - NetUtils.getDefaultSocketFactory(conf), - org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, - fallbackToSimpleAuth).getProxy(); - - if (withRetries) { // create the proxy with retries - - Map methodNameToPolicyMap - = new HashMap(); - ClientProtocol translatorProxy = - new ClientNamenodeProtocolTranslatorPB(proxy); - return (ClientProtocol) RetryProxy.create( - ClientProtocol.class, - new DefaultFailoverProxyProvider( - ClientProtocol.class, translatorProxy), - methodNameToPolicyMap, - defaultPolicy); - } else { - return new ClientNamenodeProtocolTranslatorPB(proxy); - } - } private static Object createNameNodeProxy(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, Class xface) @@ -439,88 +264,4 @@ public class NameNodeProxies { return proxy; } - /** Gets the configured Failover proxy provider's class */ - @VisibleForTesting - public static Class> getFailoverProxyProviderClass( - Configuration conf, URI nameNodeUri) throws IOException { - if (nameNodeUri == null) { - return null; - } - String host = nameNodeUri.getHost(); - String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX - + "." + host; - try { - @SuppressWarnings("unchecked") - Class> ret = (Class>) conf - .getClass(configKey, null, FailoverProxyProvider.class); - return ret; - } catch (RuntimeException e) { - if (e.getCause() instanceof ClassNotFoundException) { - throw new IOException("Could not load failover proxy provider class " - + conf.get(configKey) + " which is configured for authority " - + nameNodeUri, e); - } else { - throw e; - } - } - } - - /** Creates the Failover proxy provider instance*/ - @VisibleForTesting - public static AbstractNNFailoverProxyProvider createFailoverProxyProvider( - Configuration conf, URI nameNodeUri, Class xface, boolean checkPort, - AtomicBoolean fallbackToSimpleAuth) throws IOException { - Class> failoverProxyProviderClass = null; - AbstractNNFailoverProxyProvider providerNN; - Preconditions.checkArgument( - xface.isAssignableFrom(NamenodeProtocols.class), - "Interface %s is not a NameNode protocol", xface); - try { - // Obtain the class of the proxy provider - failoverProxyProviderClass = getFailoverProxyProviderClass(conf, - nameNodeUri); - if (failoverProxyProviderClass == null) { - return null; - } - // Create a proxy provider instance. - Constructor> ctor = failoverProxyProviderClass - .getConstructor(Configuration.class, URI.class, Class.class); - FailoverProxyProvider provider = ctor.newInstance(conf, nameNodeUri, - xface); - - // If the proxy provider is of an old implementation, wrap it. - if (!(provider instanceof AbstractNNFailoverProxyProvider)) { - providerNN = new WrappedFailoverProxyProvider(provider); - } else { - providerNN = (AbstractNNFailoverProxyProvider)provider; - } - } catch (Exception e) { - String message = "Couldn't create proxy provider " + failoverProxyProviderClass; - if (LOG.isDebugEnabled()) { - LOG.debug(message, e); - } - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - throw new IOException(message, e); - } - } - - // Check the port in the URI, if it is logical. - if (checkPort && providerNN.useLogicalURI()) { - int port = nameNodeUri.getPort(); - if (port > 0 && - port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) { - // Throwing here without any cleanup is fine since we have not - // actually created the underlying proxies yet. - throw new IOException("Port " + port + " specified in URI " - + nameNodeUri + " but host '" + nameNodeUri.getHost() - + "' is a logical (HA) namenode" - + " and does not use port information."); - } - } - providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); - return providerNN; - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 93019789096..a826e4f3479 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -349,7 +349,7 @@ public class PBHelper { new RecoveringBlock(block, locs, PBHelperClient.convert(b.getTruncateBlock())) : new RecoveringBlock(block, locs, b.getNewGenStamp()); } - + public static ReplicaState convert(ReplicaStateProto state) { switch (state) { case RBW: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index ccce7362b1f..c2d4d916261 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -149,7 +149,7 @@ public class ConfiguredFailoverProxyProvider extends if (current.namenode == null) { try { current.namenode = factory.createProxy(conf, - current.address, xface, ugi, false, fallbackToSimpleAuth); + current.address, xface, ugi, false, getFallbackToSimpleAuth()); } catch (IOException e) { LOG.error("Failed to create RPC proxy to NameNode", e); throw new RuntimeException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 549a296a13d..046fc6bd640 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -61,7 +61,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo; +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index c27ead5f659..4af9c75f776 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -57,7 +57,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.NameNodeProxiesClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; @@ -196,7 +196,7 @@ public class TestRetryCacheWithHA { private DFSClient genClientWithDummyHandler() throws IOException { URI nnUri = dfs.getUri(); FailoverProxyProvider failoverProxyProvider = - NameNodeProxies.createFailoverProxyProvider(conf, + NameNodeProxiesClient.createFailoverProxyProvider(conf, nnUri, ClientProtocol.class, true, null); InvocationHandler dummyHandler = new DummyRetryInvocationHandler( failoverProxyProvider, RetryPolicies