diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index d089bdc7052..fe5a9da1f87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -154,6 +154,19 @@ public class DFSUtilClient { return key + "." + suffix; } + /** + * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from + * the configuration. + * + * @param conf configuration + * @return list of InetSocketAddresses + */ + public static Map> getHaNnRpcAddresses( + Configuration conf) { + return DFSUtilClient.getAddresses(conf, null, + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); + } + /** * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from * the configuration. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java index 9f28cfcde7c..47288f77df8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java @@ -20,15 +20,29 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; import java.net.URI; +import java.util.Collection; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX; +import static org.apache.hadoop.security.SecurityUtil.buildTokenService; @InterfaceAudience.Private public class HAUtilClient { + private static final Logger LOG = LoggerFactory.getLogger(HAUtilClient.class); + + private static final DelegationTokenSelector tokenSelector = + new DelegationTokenSelector(); + /** * @return true if the given nameNodeUri appears to be a logical URI. */ @@ -92,4 +106,45 @@ public class HAUtilClient { public static boolean isTokenForLogicalUri(Token token) { return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX); } + + /** + * Locate a delegation token associated with the given HA cluster URI, and if + * one is found, clone it to also represent the underlying namenode address. + * @param ugi the UGI to modify + * @param haUri the logical URI for the cluster + * @param nnAddrs collection of NNs in the cluster to which the token + * applies + */ + public static void cloneDelegationTokenForLogicalUri( + UserGroupInformation ugi, URI haUri, + Collection nnAddrs) { + // this cloning logic is only used by hdfs + Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri, + HdfsConstants.HDFS_URI_SCHEME); + Token haToken = + tokenSelector.selectToken(haService, ugi.getTokens()); + if (haToken != null) { + for (InetSocketAddress singleNNAddr : nnAddrs) { + // this is a minor hack to prevent physical HA tokens from being + // exposed to the user via UGI.getCredentials(), otherwise these + // cloned tokens may be inadvertently propagated to jobs + Token specificToken = + haToken.privateClone(buildTokenService(singleNNAddr)); + Text alias = new Text( + HAUtilClient.buildTokenServicePrefixForLogicalUri( + HdfsConstants.HDFS_URI_SCHEME) + + "//" + specificToken.getService()); + ugi.addToken(alias, specificToken); + if (LOG.isDebugEnabled()) { + LOG.debug("Mapped HA service delegation token for logical URI " + + haUri + " to namenode " + singleNNAddr); + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No HA service delegation token found for logical URI " + + haUri); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index 5ca7030acdd..a092f02630d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; +import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,6 +214,14 @@ public class NameNodeProxiesClient { public static AbstractNNFailoverProxyProvider createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth) throws IOException { + return createFailoverProxyProvider(conf, nameNodeUri, xface, checkPort, + fallbackToSimpleAuth, new ClientHAProxyFactory()); + } + + protected static AbstractNNFailoverProxyProvider createFailoverProxyProvider( + Configuration conf, URI nameNodeUri, Class xface, boolean checkPort, + AtomicBoolean fallbackToSimpleAuth, HAProxyFactory proxyFactory) + throws IOException { Class> failoverProxyProviderClass = null; AbstractNNFailoverProxyProvider providerNN; try { @@ -223,9 +233,10 @@ public class NameNodeProxiesClient { } // Create a proxy provider instance. Constructor> ctor = failoverProxyProviderClass - .getConstructor(Configuration.class, URI.class, Class.class); + .getConstructor(Configuration.class, URI.class, + Class.class, HAProxyFactory.class); FailoverProxyProvider provider = ctor.newInstance(conf, nameNodeUri, - xface); + xface, proxyFactory); // If the proxy provider is of an old implementation, wrap it. if (!(provider instanceof AbstractNNFailoverProxyProvider)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index c189ad5c9da..f1a46993a2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -65,6 +65,7 @@ public interface HdfsClientConfigKeys { String PREFIX = "dfs.client."; String DFS_NAMESERVICES = "dfs.nameservices"; + String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java new file mode 100644 index 00000000000..b887d87100e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxiesClient; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ClientHAProxyFactory implements HAProxyFactory { + @Override + @SuppressWarnings("unchecked") + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class xface, UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol( + nnAddr, conf, ugi, false, fallbackToSimpleAuth); + } + + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class xface, UserGroupInformation ugi, boolean withRetries) + throws IOException { + return createProxy(conf, nnAddr, xface, ugi, withRetries, null); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java similarity index 76% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 0e8fa448806..e9c8791c5c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -26,22 +26,16 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HAUtil; -import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.HAUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A FailoverProxyProvider implementation which allows one to configure @@ -51,25 +45,9 @@ import com.google.common.base.Preconditions; */ public class ConfiguredFailoverProxyProvider extends AbstractNNFailoverProxyProvider { - - private static final Log LOG = - LogFactory.getLog(ConfiguredFailoverProxyProvider.class); - - interface ProxyFactory { - T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, - UserGroupInformation ugi, boolean withRetries, - AtomicBoolean fallbackToSimpleAuth) throws IOException; - } - static class DefaultProxyFactory implements ProxyFactory { - @Override - public T createProxy(Configuration conf, InetSocketAddress nnAddr, - Class xface, UserGroupInformation ugi, boolean withRetries, - AtomicBoolean fallbackToSimpleAuth) throws IOException { - return NameNodeProxies.createNonHAProxy(conf, - nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy(); - } - } + private static final Logger LOG = + LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class); protected final Configuration conf; protected final List> proxies = @@ -78,22 +56,11 @@ public class ConfiguredFailoverProxyProvider extends protected final Class xface; private int currentProxyIndex = 0; - private final ProxyFactory factory; + private final HAProxyFactory factory; public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, - Class xface) { - this(conf, uri, xface, new DefaultProxyFactory()); - } - - @VisibleForTesting - ConfiguredFailoverProxyProvider(Configuration conf, URI uri, - Class xface, ProxyFactory factory) { - - Preconditions.checkArgument( - xface.isAssignableFrom(NamenodeProtocols.class), - "Interface class %s is not a valid NameNode protocol!"); + Class xface, HAProxyFactory factory) { this.xface = xface; - this.conf = new Configuration(conf); int maxRetries = this.conf.getInt( HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, @@ -101,7 +68,7 @@ public class ConfiguredFailoverProxyProvider extends this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, maxRetries); - + int maxRetriesOnSocketTimeouts = this.conf.getInt( HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); @@ -112,16 +79,16 @@ public class ConfiguredFailoverProxyProvider extends try { ugi = UserGroupInformation.getCurrentUser(); - - Map> map = DFSUtil.getHaNnRpcAddresses( - conf); + + Map> map = + DFSUtilClient.getHaNnRpcAddresses(conf); Map addressesInNN = map.get(uri.getHost()); - + if (addressesInNN == null || addressesInNN.size() == 0) { throw new RuntimeException("Could not find any configured addresses " + "for URI " + uri); } - + Collection addressesOfNns = addressesInNN.values(); for (InetSocketAddress address : addressesOfNns) { proxies.add(new AddressRpcProxyPair(address)); @@ -137,13 +104,13 @@ public class ConfiguredFailoverProxyProvider extends // The client may have a delegation token set for the logical // URI of the cluster. Clone this token to apply to each of the // underlying IPC addresses so that the IPC code can find it. - HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); this.factory = factory; } catch (IOException e) { throw new RuntimeException(e); } } - + @Override public Class getInterface() { return xface; @@ -183,7 +150,7 @@ public class ConfiguredFailoverProxyProvider extends private static class AddressRpcProxyPair { public final InetSocketAddress address; public T namenode; - + public AddressRpcProxyPair(InetSocketAddress address) { this.address = address; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java new file mode 100644 index 00000000000..f92a74ff7ce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This interface aims to decouple the proxy creation implementation that used + * in {@link AbstractNNFailoverProxyProvider}. Client side can use + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} to initialize the + * proxy while the server side can use NamenodeProtocols + */ +@InterfaceAudience.Private +public interface HAProxyFactory { + + T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException; + + T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries) throws IOException; + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java similarity index 87% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java index 4e1cb9e68dd..ed250a0f42e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java @@ -25,14 +25,10 @@ import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; -import com.google.common.base.Preconditions; - /** * A NNFailoverProxyProvider implementation which works on IP failover setup. * Only one proxy is used to connect to both servers and switching between @@ -40,7 +36,7 @@ import com.google.common.base.Preconditions; * clients can consistently reach only one node at a time. * * Clients with a live connection will likely get connection reset after an - * IP failover. This case will be handled by the + * IP failover. This case will be handled by the * FailoverOnNetworkExceptionRetry retry policy. I.e. if the call is * not idempotent, it won't get retried. * @@ -54,15 +50,14 @@ public class IPFailoverProxyProvider extends private final Configuration conf; private final Class xface; private final URI nameNodeUri; + private final HAProxyFactory factory; private ProxyInfo nnProxyInfo = null; - + public IPFailoverProxyProvider(Configuration conf, URI uri, - Class xface) { - Preconditions.checkArgument( - xface.isAssignableFrom(NamenodeProtocols.class), - "Interface class %s is not a valid NameNode protocol!"); + Class xface, HAProxyFactory factory) { this.xface = xface; this.nameNodeUri = uri; + this.factory = factory; this.conf = new Configuration(conf); int maxRetries = this.conf.getInt( @@ -71,7 +66,7 @@ public class IPFailoverProxyProvider extends this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, maxRetries); - + int maxRetriesOnSocketTimeouts = this.conf.getInt( HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); @@ -79,7 +74,7 @@ public class IPFailoverProxyProvider extends CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetriesOnSocketTimeouts); } - + @Override public Class getInterface() { return xface; @@ -92,9 +87,8 @@ public class IPFailoverProxyProvider extends try { // Create a proxy that is not wrapped in RetryProxy InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); - nnProxyInfo = new ProxyInfo(NameNodeProxies.createNonHAProxy( - conf, nnAddr, xface, UserGroupInformation.getCurrentUser(), - false).getProxy(), nnAddr.toString()); + nnProxyInfo = new ProxyInfo(factory.createProxy(conf, nnAddr, xface, + UserGroupInformation.getCurrentUser(), false), nnAddr.toString()); } catch (IOException ioe) { throw new RuntimeException(ioe); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java similarity index 95% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java index a765e95411d..49fe4be5e35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java @@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.io.retry.MultiException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,15 +146,9 @@ public class RequestHedgingProxyProvider extends private volatile ProxyInfo successfulProxy = null; private volatile String toIgnore = null; - public RequestHedgingProxyProvider( - Configuration conf, URI uri, Class xface) { - this(conf, uri, xface, new DefaultProxyFactory()); - } - - @VisibleForTesting - RequestHedgingProxyProvider(Configuration conf, URI uri, - Class xface, ProxyFactory factory) { - super(conf, uri, xface, factory); + public RequestHedgingProxyProvider(Configuration conf, URI uri, + Class xface, HAProxyFactory proxyFactory) { + super(conf, uri, xface, proxyFactory); } @SuppressWarnings("unchecked") diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java similarity index 81% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java index fec139dc304..04e77ad1a8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java @@ -29,9 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; @@ -66,20 +65,20 @@ public class TestRequestHedgingProxyProvider { ns = "mycluster-" + Time.monotonicNow(); nnUri = new URI("hdfs://" + ns); conf = new Configuration(); - conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns); + conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); conf.set( - DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2"); + HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2"); conf.set( - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1", + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1", "machine1.foo.bar:8020"); conf.set( - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2", + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2", "machine2.foo.bar:8020"); } @Test public void testHedgingWhenOneFails() throws Exception { - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -87,11 +86,11 @@ public class TestRequestHedgingProxyProvider { return new long[]{1}; } }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(badMock, goodMock)); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); @@ -101,7 +100,7 @@ public class TestRequestHedgingProxyProvider { @Test public void testHedgingWhenOneIsSlow() throws Exception { - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -109,11 +108,11 @@ public class TestRequestHedgingProxyProvider { return new long[]{1}; } }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(goodMock, badMock)); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); @@ -124,14 +123,14 @@ public class TestRequestHedgingProxyProvider { @Test public void testHedgingWhenBothFail() throws Exception { - NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); - NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); + ClientProtocol worseMock = Mockito.mock(ClientProtocol.class); Mockito.when(worseMock.getStats()).thenThrow( new IOException("Worse mock !!")); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(badMock, worseMock)); try { provider.getProxy().proxy.getStats(); @@ -147,7 +146,7 @@ public class TestRequestHedgingProxyProvider { public void testPerformFailover() throws Exception { final AtomicInteger counter = new AtomicInteger(0); final int[] isGood = {1}; - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -159,7 +158,7 @@ public class TestRequestHedgingProxyProvider { throw new IOException("Was Good mock !!"); } }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -171,8 +170,8 @@ public class TestRequestHedgingProxyProvider { throw new IOException("Bad mock !!"); } }); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(goodMock, badMock)); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); @@ -234,14 +233,14 @@ public class TestRequestHedgingProxyProvider { @Test public void testPerformFailoverWith3Proxies() throws Exception { - conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, + conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2,nn3"); - conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3", + conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3", "machine3.foo.bar:8020"); final AtomicInteger counter = new AtomicInteger(0); final int[] isGood = {1}; - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -253,7 +252,7 @@ public class TestRequestHedgingProxyProvider { throw new IOException("Was Good mock !!"); } }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); Mockito.when(badMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -265,7 +264,7 @@ public class TestRequestHedgingProxyProvider { throw new IOException("Bad mock !!"); } }); - final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); + final ClientProtocol worseMock = Mockito.mock(ClientProtocol.class); Mockito.when(worseMock.getStats()).thenAnswer(new Answer() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { @@ -278,8 +277,8 @@ public class TestRequestHedgingProxyProvider { } }); - RequestHedgingProxyProvider provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(goodMock, badMock, worseMock)); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); @@ -355,14 +354,14 @@ public class TestRequestHedgingProxyProvider { @Test public void testHedgingWhenFileNotFoundException() throws Exception { - NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); + ClientProtocol active = Mockito.mock(ClientProtocol.class); Mockito .when(active.getBlockLocations(Matchers.anyString(), Matchers.anyLong(), Matchers.anyLong())) .thenThrow(new RemoteException("java.io.FileNotFoundException", "File does not exist!")); - NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); + ClientProtocol standby = Mockito.mock(ClientProtocol.class); Mockito .when(standby.getBlockLocations(Matchers.anyString(), Matchers.anyLong(), Matchers.anyLong())) @@ -370,9 +369,9 @@ public class TestRequestHedgingProxyProvider { new RemoteException("org.apache.hadoop.ipc.StandbyException", "Standby NameNode")); - RequestHedgingProxyProvider provider = + RequestHedgingProxyProvider provider = new RequestHedgingProxyProvider<>(conf, nnUri, - NamenodeProtocols.class, createFactory(active, standby)); + ClientProtocol.class, createFactory(active, standby)); try { provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); Assert.fail("Should fail since the active namenode throws" @@ -394,18 +393,18 @@ public class TestRequestHedgingProxyProvider { @Test public void testHedgingWhenConnectException() throws Exception { - NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); + ClientProtocol active = Mockito.mock(ClientProtocol.class); Mockito.when(active.getStats()).thenThrow(new ConnectException()); - NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); + ClientProtocol standby = Mockito.mock(ClientProtocol.class); Mockito.when(standby.getStats()) .thenThrow( new RemoteException("org.apache.hadoop.ipc.StandbyException", "Standby NameNode")); - RequestHedgingProxyProvider provider = + RequestHedgingProxyProvider provider = new RequestHedgingProxyProvider<>(conf, nnUri, - NamenodeProtocols.class, createFactory(active, standby)); + ClientProtocol.class, createFactory(active, standby)); try { provider.getProxy().proxy.getStats(); Assert.fail("Should fail since the active namenode throws" @@ -428,15 +427,15 @@ public class TestRequestHedgingProxyProvider { @Test public void testHedgingWhenConnectAndEOFException() throws Exception { - NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); + ClientProtocol active = Mockito.mock(ClientProtocol.class); Mockito.when(active.getStats()).thenThrow(new EOFException()); - NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); + ClientProtocol standby = Mockito.mock(ClientProtocol.class); Mockito.when(standby.getStats()).thenThrow(new ConnectException()); - RequestHedgingProxyProvider provider = + RequestHedgingProxyProvider provider = new RequestHedgingProxyProvider<>(conf, nnUri, - NamenodeProtocols.class, createFactory(active, standby)); + ClientProtocol.class, createFactory(active, standby)); try { provider.getProxy().proxy.getStats(); Assert.fail("Should fail since both active and standby namenodes throw" @@ -453,18 +452,25 @@ public class TestRequestHedgingProxyProvider { Mockito.verify(standby).getStats(); } - private ProxyFactory createFactory( - NamenodeProtocols... protos) { - final Iterator iterator = + private HAProxyFactory createFactory( + ClientProtocol... protos) { + final Iterator iterator = Lists.newArrayList(protos).iterator(); - return new ProxyFactory() { + return new HAProxyFactory() { @Override - public NamenodeProtocols createProxy(Configuration conf, - InetSocketAddress nnAddr, Class xface, + public ClientProtocol createProxy(Configuration conf, + InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException { return iterator.next(); } + + @Override + public ClientProtocol createProxy(Configuration conf, + InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries) throws IOException { + return iterator.next(); + } }; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 47323596617..3051a44ab69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -145,7 +145,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT; public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host"; - public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; + public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host"; public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address"; public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index e0a4e18f22e..7d1102ffc0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -451,19 +451,6 @@ public class DFSUtil { return principals; } - /** - * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from - * the configuration. - * - * @param conf configuration - * @return list of InetSocketAddresses - */ - public static Map> getHaNnRpcAddresses( - Configuration conf) { - return DFSUtilClient.getAddresses(conf, null, - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); - } - /** * Returns list of InetSocketAddress corresponding to backup node rpc * addresses from the configuration. @@ -695,7 +682,7 @@ public class DFSUtil { public static String nnAddressesAsString(Configuration conf) { Map> addresses = - getHaNnRpcAddresses(conf); + DFSUtilClient.getHaNnRpcAddresses(conf); return addressMapToString(addresses); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 72d69588fdf..043e08787c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -29,7 +29,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KE import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; -import static org.apache.hadoop.security.SecurityUtil.buildTokenService; import java.io.IOException; import java.net.InetSocketAddress; @@ -39,8 +38,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -48,17 +45,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -66,12 +58,6 @@ import com.google.common.collect.Lists; @InterfaceAudience.Private public class HAUtil { - - private static final Log LOG = - LogFactory.getLog(HAUtil.class); - - private static final DelegationTokenSelector tokenSelector = - new DelegationTokenSelector(); private static final String[] HA_SPECIAL_INDEPENDENT_KEYS = new String []{ DFS_NAMENODE_RPC_ADDRESS_KEY, @@ -97,7 +83,7 @@ public class HAUtil { */ public static boolean isHAEnabled(Configuration conf, String nsId) { Map> addresses = - DFSUtil.getHaNnRpcAddresses(conf); + DFSUtilClient.getHaNnRpcAddresses(conf); if (addresses == null) return false; Map nnMap = addresses.get(nsId); return nnMap != null && nnMap.size() > 1; @@ -254,47 +240,6 @@ public class HAUtil { return provider.useLogicalURI(); } - /** - * Locate a delegation token associated with the given HA cluster URI, and if - * one is found, clone it to also represent the underlying namenode address. - * @param ugi the UGI to modify - * @param haUri the logical URI for the cluster - * @param nnAddrs collection of NNs in the cluster to which the token - * applies - */ - public static void cloneDelegationTokenForLogicalUri( - UserGroupInformation ugi, URI haUri, - Collection nnAddrs) { - // this cloning logic is only used by hdfs - Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri, - HdfsConstants.HDFS_URI_SCHEME); - Token haToken = - tokenSelector.selectToken(haService, ugi.getTokens()); - if (haToken != null) { - for (InetSocketAddress singleNNAddr : nnAddrs) { - // this is a minor hack to prevent physical HA tokens from being - // exposed to the user via UGI.getCredentials(), otherwise these - // cloned tokens may be inadvertently propagated to jobs - Token specificToken = - haToken.privateClone(buildTokenService(singleNNAddr)); - Text alias = new Text( - HAUtilClient.buildTokenServicePrefixForLogicalUri( - HdfsConstants.HDFS_URI_SCHEME) - + "//" + specificToken.getService()); - ugi.addToken(alias, specificToken); - if (LOG.isDebugEnabled()) { - LOG.debug("Mapped HA service delegation token for logical URI " + - haUri + " to namenode " + singleNNAddr); - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No HA service delegation token found for logical URI " + - haUri); - } - } - } - /** * Get the internet address of the currently-active NN. This should rarely be * used, since callers of this method who connect directly to the NN using the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index 9d168191061..668e8e6b11f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.Text; @@ -112,7 +113,7 @@ public class NameNodeProxies { throws IOException { AbstractNNFailoverProxyProvider failoverProxyProvider = NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri, - xface, true, fallbackToSimpleAuth); + xface, true, fallbackToSimpleAuth, new NameNodeHAProxyFactory()); if (failoverProxyProvider == null) { return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java new file mode 100644 index 00000000000..036b6eb367d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +public class NameNodeHAProxyFactory implements HAProxyFactory { + + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class xface, UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface, + ugi, withRetries, fallbackToSimpleAuth).getProxy(); + } + + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class xface, UserGroupInformation ugi, boolean withRetries) + throws IOException { + return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface, + ugi, withRetries).getProxy(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java index 75635193eb2..c14ebb41dfa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java @@ -42,10 +42,10 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.IPFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.retry.FailoverProxyProvider; @@ -333,7 +333,7 @@ public class TestDFSClientFailover { private Class xface; private T proxy; public DummyLegacyFailoverProxyProvider(Configuration conf, URI uri, - Class xface) { + Class xface, HAProxyFactory proxyFactory) { try { this.proxy = NameNodeProxies.createNonHAProxy(conf, DFSUtilClient.getNNAddress(uri), xface, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java index 53bad06e363..6c10a674a06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java @@ -514,7 +514,7 @@ public class TestDFSUtil { NS2_NN2_HOST); Map> map = - DFSUtil.getHaNnRpcAddresses(conf); + DFSUtilClient.getHaNnRpcAddresses(conf); assertTrue(HAUtil.isHAEnabled(conf, "ns1")); assertTrue(HAUtil.isHAEnabled(conf, "ns2")); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java index 632bbf688e0..ca44c798010 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -292,7 +292,7 @@ public class TestDelegationTokensWithHA { nn0.getNameNodeAddress().getPort())); nnAddrs.add(new InetSocketAddress("localhost", nn1.getNameNodeAddress().getPort())); - HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); Collection> tokens = ugi.getTokens(); assertEquals(3, tokens.size()); @@ -321,7 +321,7 @@ public class TestDelegationTokensWithHA { } // reclone the tokens, and see if they match now - HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); for (InetSocketAddress addr : nnAddrs) { Text ipcDtService = SecurityUtil.buildTokenService(addr); Token token2 =