From a7e29582c31542427f00a10afa4ba7e0d0bf08b5 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Tue, 13 May 2014 16:21:20 +0000 Subject: [PATCH] svn merge -c 1594263 merging from trunk to branch-2 to fix:HDFS-6334. Client failover proxy provider for IP failover based NN HA. Contributed by Kihwal Lee. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1594264 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../java/org/apache/hadoop/hdfs/HAUtil.java | 43 +++++- .../apache/hadoop/hdfs/NameNodeProxies.java | 89 ++++++++---- .../web/resources/DatanodeWebHdfsMethods.java | 4 +- .../ha/AbstractNNFailoverProxyProvider.java | 35 +++++ .../ha/ConfiguredFailoverProxyProvider.java | 14 +- .../namenode/ha/IPFailoverProxyProvider.java | 133 ++++++++++++++++++ .../ha/WrappedFailoverProxyProvider.java | 80 +++++++++++ .../apache/hadoop/hdfs/tools/DFSAdmin.java | 7 +- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 15 +- .../hadoop/hdfs/TestDFSClientFailover.java | 84 ++++++++++- .../namenode/ha/TestRetryCacheWithHA.java | 5 +- 12 files changed, 456 insertions(+), 56 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 10a3fad6e02..1e0440e62f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -16,6 +16,9 @@ Release 2.5.0 - UNRELEASED HDFS-5168. Add cross node dependency support to BlockPlacementPolicy. (Nikola Vujic via szetszwo) + HDFS-6334. Client failover proxy provider for IP failover based NN HA. + (kihwal) + IMPROVEMENTS HDFS-6007. Update documentation about short-circuit local reads (iwasakims diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index d0f43ea2ef8..d2eee25dc09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -37,9 +37,11 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; +import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; @@ -203,17 +205,54 @@ public class HAUtil { /** * @return true if the given nameNodeUri appears to be a logical URI. - * This is the case if there is a failover proxy provider configured - * for it in the given configuration. */ public static boolean isLogicalUri( Configuration conf, URI nameNodeUri) { String host = nameNodeUri.getHost(); + // A logical name must be one of the service IDs. + return DFSUtil.getNameServiceIds(conf).contains(host); + } + + /** + * Check whether the client has a failover proxy provider configured + * for the namenode/nameservice. + * + * @param conf Configuration + * @param nameNodeUri The URI of namenode + * @return true if failover is configured. + */ + public static boolean isClientFailoverConfigured( + Configuration conf, URI nameNodeUri) { + String host = nameNodeUri.getHost(); String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + host; return conf.get(configKey) != null; } + /** + * Check whether logical URI is needed for the namenode and + * the corresponding failover proxy provider in the config. + * + * @param conf Configuration + * @param nameNodeUri The URI of namenode + * @return true if logical URI is needed. false, if not needed. + * @throws IOException most likely due to misconfiguration. + */ + public static boolean useLogicalUri(Configuration conf, URI nameNodeUri) + throws IOException { + // Create the proxy provider. Actual proxy is not created. + AbstractNNFailoverProxyProvider provider = NameNodeProxies + .createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, + false); + + // No need to use logical URI since failover is not configured. + if (provider == null) { + return false; + } + // Check whether the failover proxy provider uses logical URI. + return provider.useLogicalURI(); + } + /** * Parse the file system URI out of the provided token. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index 65e7cb29f9e..3dc6c952209 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; @@ -136,26 +138,29 @@ public class NameNodeProxies { @SuppressWarnings("unchecked") public static ProxyAndInfo createProxy(Configuration conf, URI nameNodeUri, Class xface) throws IOException { - Class> failoverProxyProviderClass = - getFailoverProxyProviderClass(conf, nameNodeUri, xface); + AbstractNNFailoverProxyProvider failoverProxyProvider = + createFailoverProxyProvider(conf, nameNodeUri, xface, true); - if (failoverProxyProviderClass == null) { + if (failoverProxyProvider == null) { // Non-HA case return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true); } else { // HA case - FailoverProxyProvider failoverProxyProvider = NameNodeProxies - .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface, - nameNodeUri); Conf config = new Conf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.maxRetryAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); - - Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); + + Text dtService; + if (failoverProxyProvider.useLogicalURI()) { + dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); + } else { + dtService = SecurityUtil.buildTokenService( + NameNode.getAddress(nameNodeUri)); + } return new ProxyAndInfo(proxy, dtService); } } @@ -183,12 +188,10 @@ public class NameNodeProxies { Configuration config, URI nameNodeUri, Class xface, int numResponseToDrop) throws IOException { Preconditions.checkArgument(numResponseToDrop > 0); - Class> failoverProxyProviderClass = - getFailoverProxyProviderClass(config, nameNodeUri, xface); - if (failoverProxyProviderClass != null) { // HA case - FailoverProxyProvider failoverProxyProvider = - createFailoverProxyProvider(config, failoverProxyProviderClass, - xface, nameNodeUri); + AbstractNNFailoverProxyProvider failoverProxyProvider = + createFailoverProxyProvider(config, nameNodeUri, xface, true); + + if (failoverProxyProvider != null) { // HA case int delay = config.getInt( DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); @@ -211,7 +214,13 @@ public class NameNodeProxies { T proxy = (T) Proxy.newProxyInstance( failoverProxyProvider.getInterface().getClassLoader(), new Class[] { xface }, dummyHandler); - Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); + Text dtService; + if (failoverProxyProvider.useLogicalURI()) { + dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); + } else { + dtService = SecurityUtil.buildTokenService( + NameNode.getAddress(nameNodeUri)); + } return new ProxyAndInfo(proxy, dtService); } else { LOG.warn("Currently creating proxy using " + @@ -396,7 +405,7 @@ public class NameNodeProxies { /** Gets the configured Failover proxy provider's class */ @VisibleForTesting public static Class> getFailoverProxyProviderClass( - Configuration conf, URI nameNodeUri, Class xface) throws IOException { + Configuration conf, URI nameNodeUri) throws IOException { if (nameNodeUri == null) { return null; } @@ -408,17 +417,6 @@ public class NameNodeProxies { @SuppressWarnings("unchecked") Class> ret = (Class>) conf .getClass(configKey, null, FailoverProxyProvider.class); - if (ret != null) { - // If we found a proxy provider, then this URI should be a logical NN. - // Given that, it shouldn't have a non-default port number. - int port = nameNodeUri.getPort(); - if (port > 0 && port != NameNode.DEFAULT_PORT) { - throw new IOException("Port " + port + " specified in URI " - + nameNodeUri + " but host '" + host - + "' is a logical (HA) namenode" - + " and does not use port information."); - } - } return ret; } catch (RuntimeException e) { if (e.getCause() instanceof ClassNotFoundException) { @@ -433,18 +431,33 @@ public class NameNodeProxies { /** Creates the Failover proxy provider instance*/ @VisibleForTesting - public static FailoverProxyProvider createFailoverProxyProvider( - Configuration conf, Class> failoverProxyProviderClass, - Class xface, URI nameNodeUri) throws IOException { + public static AbstractNNFailoverProxyProvider createFailoverProxyProvider( + Configuration conf, URI nameNodeUri, Class xface, boolean checkPort) + throws IOException { + Class> failoverProxyProviderClass = null; + AbstractNNFailoverProxyProvider providerNN; Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { + // Obtain the class of the proxy provider + failoverProxyProviderClass = getFailoverProxyProviderClass(conf, + nameNodeUri); + if (failoverProxyProviderClass == null) { + return null; + } + // Create a proxy provider instance. Constructor> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); FailoverProxyProvider provider = ctor.newInstance(conf, nameNodeUri, xface); - return provider; + + // If the proxy provider is of an old implementation, wrap it. + if (!(provider instanceof AbstractNNFailoverProxyProvider)) { + providerNN = new WrappedFailoverProxyProvider(provider); + } else { + providerNN = (AbstractNNFailoverProxyProvider)provider; + } } catch (Exception e) { String message = "Couldn't create proxy provider " + failoverProxyProviderClass; if (LOG.isDebugEnabled()) { @@ -456,6 +469,20 @@ public class NameNodeProxies { throw new IOException(message, e); } } + + // Check the port in the URI, if it is logical. + if (checkPort && providerNN.useLogicalURI()) { + int port = nameNodeUri.getPort(); + if (port > 0 && port != NameNode.DEFAULT_PORT) { + // Throwing here without any cleanup is fine since we have not + // actually created the underlying proxies yet. + throw new IOException("Port " + port + " specified in URI " + + nameNodeUri + " but host '" + nameNodeUri.getHost() + + "' is a logical (HA) namenode" + + " and does not use port information."); + } + } + return providerNN; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java index 87d766a03f4..fb144d39b82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java @@ -127,8 +127,8 @@ public class DatanodeWebHdfsMethods { token.decodeFromUrlString(delegation); URI nnUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + nnId); - boolean isHA = HAUtil.isLogicalUri(conf, nnUri); - if (isHA) { + boolean isLogical = HAUtil.isLogicalUri(conf, nnUri); + if (isLogical) { token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri)); } else { token.setService(SecurityUtil.buildTokenService(nnUri)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java new file mode 100644 index 00000000000..3c0edfd8f4a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.FailoverProxyProvider; + +public abstract class AbstractNNFailoverProxyProvider implements + FailoverProxyProvider { + + /** + * Inquire whether logical HA URI is used for the implementation. If it is + * used, a special token handling may be needed to make sure a token acquired + * from a node in the HA pair can be used against the other node. + * + * @return true if logical HA URI is used. false, if not used. + */ + public abstract boolean useLogicalURI(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 84ed633874d..4d196a2adef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; -import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; @@ -46,8 +46,8 @@ import com.google.common.base.Preconditions; * to connect to during fail-over. The first configured address is tried first, * and on a fail-over event the other address is tried. */ -public class ConfiguredFailoverProxyProvider implements - FailoverProxyProvider { +public class ConfiguredFailoverProxyProvider extends + AbstractNNFailoverProxyProvider { private static final Log LOG = LogFactory.getLog(ConfiguredFailoverProxyProvider.class); @@ -165,4 +165,12 @@ public class ConfiguredFailoverProxyProvider implements } } } + + /** + * Logical URI is required for this failover proxy provider. + */ + @Override + public boolean useLogicalURI() { + return true; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java new file mode 100644 index 00000000000..4ccec1677b7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.base.Preconditions; + +/** + * A NNFailoverProxyProvider implementation which works on IP failover setup. + * Only one proxy is used to connect to both servers and switching between + * the servers is done by the environment/infrastructure, which guarantees + * clients can consistently reach only one node at a time. + * + * Clients with a live connection will likely get connection reset after an + * IP failover. This case will be handled by the + * FailoverOnNetworkExceptionRetry retry policy. I.e. if the call is + * not idempotent, it won't get retried. + * + * A connection reset while setting up a connection (i.e. before sending a + * request) will be handled in ipc client. + * + * The namenode URI must contain a resolvable host name. + */ +public class IPFailoverProxyProvider extends + AbstractNNFailoverProxyProvider { + private final Configuration conf; + private final Class xface; + private final URI nameNodeUri; + private ProxyInfo nnProxyInfo = null; + + public IPFailoverProxyProvider(Configuration conf, URI uri, + Class xface) { + Preconditions.checkArgument( + xface.isAssignableFrom(NamenodeProtocols.class), + "Interface class %s is not a valid NameNode protocol!"); + this.xface = xface; + this.nameNodeUri = uri; + + this.conf = new Configuration(conf); + int maxRetries = this.conf.getInt( + DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_KEY, + DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + maxRetries); + + int maxRetriesOnSocketTimeouts = this.conf.getInt( + DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + maxRetriesOnSocketTimeouts); + } + + @Override + public Class getInterface() { + return xface; + } + + @Override + public synchronized ProxyInfo getProxy() { + // Create a non-ha proxy if not already created. + if (nnProxyInfo == null) { + try { + // Create a proxy that is not wrapped in RetryProxy + InetSocketAddress nnAddr = NameNode.getAddress(nameNodeUri); + nnProxyInfo = new ProxyInfo(NameNodeProxies.createNonHAProxy( + conf, nnAddr, xface, UserGroupInformation.getCurrentUser(), + false).getProxy(), nnAddr.toString()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + return nnProxyInfo; + } + + /** Nothing to do for IP failover */ + @Override + public void performFailover(T currentProxy) { + } + + /** + * Close the proxy, + */ + @Override + public synchronized void close() throws IOException { + if (nnProxyInfo == null) { + return; + } + if (nnProxyInfo.proxy instanceof Closeable) { + ((Closeable)nnProxyInfo.proxy).close(); + } else { + RPC.stopProxy(nnProxyInfo.proxy); + } + } + + /** + * Logical URI is not used for IP failover. + */ + @Override + public boolean useLogicalURI() { + return false; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java new file mode 100644 index 00000000000..2842fb96e40 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/WrappedFailoverProxyProvider.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.base.Preconditions; + +/** + * A NNFailoverProxyProvider implementation which wrapps old implementations + * directly implementing the {@link FailoverProxyProvider} interface. + * + * It is assumed that the old impelmentation is using logical URI. + */ +public class WrappedFailoverProxyProvider extends + AbstractNNFailoverProxyProvider { + private final FailoverProxyProvider proxyProvider; + + /** + * Wrap the given instance of an old FailoverProxyProvider. + */ + public WrappedFailoverProxyProvider(FailoverProxyProvider provider) { + proxyProvider = provider; + } + + @Override + public Class getInterface() { + return proxyProvider.getInterface(); + } + + @Override + public synchronized ProxyInfo getProxy() { + return proxyProvider.getProxy(); + } + + @Override + public void performFailover(T currentProxy) { + proxyProvider.performFailover(currentProxy); + } + + /** + * Close the proxy, + */ + @Override + public synchronized void close() throws IOException { + proxyProvider.close(); + } + + /** + * Assume logical URI is used for old proxy provider implementations. + */ + @Override + public boolean useLogicalURI() { + return true; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 6e5125563bb..cc5bbcf3772 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -913,9 +913,10 @@ public class DFSAdmin extends FsShell { Configuration dfsConf = dfs.getConf(); URI dfsUri = dfs.getUri(); - boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri); - if (isHaEnabled) { - // In the case of HA, run finalizeUpgrade for all NNs in this nameservice + boolean isHaAndLogicalUri = HAUtil.isLogicalUri(dfsConf, dfsUri); + if (isHaAndLogicalUri) { + // In the case of HA and logical URI, run finalizeUpgrade for all + // NNs in this nameservice. String nsId = dfsUri.getHost(); List namenodes = HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, nsId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 3d9aacc2268..7951253a7fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -176,10 +176,13 @@ public class WebHdfsFileSystem extends FileSystem this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.nnAddrs = resolveNNAddr(); - boolean isHA = HAUtil.isLogicalUri(conf, this.uri); - // In non-HA case, the code needs to call getCanonicalUri() in order to - // handle the case where no port is specified in the URI - this.tokenServiceName = isHA ? HAUtil.buildTokenServiceForLogicalUri(uri) + boolean isHA = HAUtil.isClientFailoverConfigured(conf, this.uri); + boolean isLogicalUri = isHA && HAUtil.isLogicalUri(conf, this.uri); + // In non-HA or non-logical URI case, the code needs to call + // getCanonicalUri() in order to handle the case where no port is + // specified in the URI + this.tokenServiceName = isLogicalUri ? + HAUtil.buildTokenServiceForLogicalUri(uri) : SecurityUtil.buildTokenService(getCanonicalUri()); initializeTokenAspect(); @@ -1095,8 +1098,8 @@ public class WebHdfsFileSystem extends FileSystem /** * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS * resolver when the URL points to an non-HA cluster. When the URL points to - * an HA cluster, the resolver further resolves the logical name (i.e., the - * authority in the URL) into real namenode addresses. + * an HA cluster with its logical name, the resolver further resolves the + * logical name(i.e., the authority in the URL) into real namenode addresses. */ private InetSocketAddress[] resolveNNAddr() throws IOException { Configuration conf = getConf(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java index 7e6975735be..d2a03d698d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -41,12 +42,17 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.IPFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; +import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.StandardSocketFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; import org.hamcrest.BaseMatcher; @@ -172,12 +178,12 @@ public class TestDFSClientFailover { */ @Test public void testLogicalUriShouldNotHavePorts() { - Configuration conf = new HdfsConfiguration(); - conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + ".foo", - ConfiguredFailoverProxyProvider.class.getName()); - Path p = new Path("hdfs://foo:12345/"); + Configuration config = new HdfsConfiguration(conf); + String logicalName = HATestUtil.getLogicalHostname(cluster); + HATestUtil.setFailoverConfigurations(cluster, config, logicalName); + Path p = new Path("hdfs://" + logicalName + ":12345/"); try { - p.getFileSystem(conf).exists(p); + p.getFileSystem(config).exists(p); fail("Did not fail with fake FS"); } catch (IOException ioe) { GenericTestUtils.assertExceptionContains( @@ -278,4 +284,72 @@ public class TestDFSClientFailover { // Ensure that the logical hostname was never resolved. Mockito.verify(spyNS, Mockito.never()).lookupAllHostAddr(Mockito.eq(logicalHost)); } + + /** Dummy implementation of plain FailoverProxyProvider */ + public static class DummyLegacyFailoverProxyProvider + implements FailoverProxyProvider { + private Class xface; + private T proxy; + public DummyLegacyFailoverProxyProvider(Configuration conf, URI uri, + Class xface) { + try { + this.proxy = NameNodeProxies.createNonHAProxy(conf, + NameNode.getAddress(uri), xface, + UserGroupInformation.getCurrentUser(), false).getProxy(); + this.xface = xface; + } catch (IOException ioe) { + } + } + + @Override + public Class getInterface() { + return xface; + } + + @Override + public ProxyInfo getProxy() { + return new ProxyInfo(proxy, "dummy"); + } + + @Override + public void performFailover(T currentProxy) { + } + + @Override + public void close() throws IOException { + } + } + + /** + * Test to verify legacy proxy providers are correctly wrapped. + */ + public void testWrappedFailoverProxyProvider() throws Exception { + // setup the config with the dummy provider class + Configuration config = new HdfsConfiguration(conf); + String logicalName = HATestUtil.getLogicalHostname(cluster); + HATestUtil.setFailoverConfigurations(cluster, config, logicalName); + config.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, + DummyLegacyFailoverProxyProvider.class.getName()); + Path p = new Path("hdfs://" + logicalName + "/"); + + // Logical URI should be used. + assertTrue("Legacy proxy providers should use logical URI.", + HAUtil.useLogicalUri(config, p.toUri())); + } + + /** + * Test to verify IPFailoverProxyProvider is not requiring logical URI. + */ + public void testIPFailoverProxyProviderLogicalUri() throws Exception { + // setup the config with the IP failover proxy provider class + Configuration config = new HdfsConfiguration(conf); + URI nnUri = cluster.getURI(0); + config.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + + nnUri.getHost(), + IPFailoverProxyProvider.class.getName()); + + assertFalse("IPFailoverProxyProvider should not use logical URI.", + HAUtil.useLogicalUri(config, nnUri)); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index 16fbd74abbd..95e55cc2b41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -188,12 +188,9 @@ public class TestRetryCacheWithHA { private DFSClient genClientWithDummyHandler() throws IOException { URI nnUri = dfs.getUri(); - Class> failoverProxyProviderClass = - NameNodeProxies.getFailoverProxyProviderClass(conf, nnUri, - ClientProtocol.class); FailoverProxyProvider failoverProxyProvider = NameNodeProxies.createFailoverProxyProvider(conf, - failoverProxyProviderClass, ClientProtocol.class, nnUri); + nnUri, ClientProtocol.class, true); InvocationHandler dummyHandler = new DummyRetryInvocationHandler( failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,