HDFS-11538. Move ClientProtocol HA proxies into hadoop-hdfs-client. Contributed by Huafeng Wang.

(cherry picked from commit e154893d3b)
This commit is contained in:
Andrew Wang 2017-04-05 12:52:34 -07:00
parent eff4b2fa07
commit d12a0a25b4
18 changed files with 309 additions and 202 deletions

View File

@ -152,6 +152,19 @@ static String addSuffix(String key, String suffix) {
return key + "." + suffix;
}
/**
* Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
* the configuration.
*
* @param conf configuration
* @return list of InetSocketAddresses
*/
public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
Configuration conf) {
return DFSUtilClient.getAddresses(conf, null,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/**
* Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
* the configuration.

View File

@ -20,15 +20,29 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
import static org.apache.hadoop.security.SecurityUtil.buildTokenService;
@InterfaceAudience.Private
public class HAUtilClient {
private static final Logger LOG = LoggerFactory.getLogger(HAUtilClient.class);
private static final DelegationTokenSelector tokenSelector =
new DelegationTokenSelector();
/**
* @return true if the given nameNodeUri appears to be a logical URI.
*/
@ -92,4 +106,45 @@ public static URI getServiceUriFromToken(final String scheme, Token<?> token) {
public static boolean isTokenForLogicalUri(Token<?> token) {
return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX);
}
/**
* Locate a delegation token associated with the given HA cluster URI, and if
* one is found, clone it to also represent the underlying namenode address.
* @param ugi the UGI to modify
* @param haUri the logical URI for the cluster
* @param nnAddrs collection of NNs in the cluster to which the token
* applies
*/
public static void cloneDelegationTokenForLogicalUri(
UserGroupInformation ugi, URI haUri,
Collection<InetSocketAddress> nnAddrs) {
// this cloning logic is only used by hdfs
Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri,
HdfsConstants.HDFS_URI_SCHEME);
Token<DelegationTokenIdentifier> haToken =
tokenSelector.selectToken(haService, ugi.getTokens());
if (haToken != null) {
for (InetSocketAddress singleNNAddr : nnAddrs) {
// this is a minor hack to prevent physical HA tokens from being
// exposed to the user via UGI.getCredentials(), otherwise these
// cloned tokens may be inadvertently propagated to jobs
Token<DelegationTokenIdentifier> specificToken =
haToken.privateClone(buildTokenService(singleNNAddr));
Text alias = new Text(
HAUtilClient.buildTokenServicePrefixForLogicalUri(
HdfsConstants.HDFS_URI_SCHEME)
+ "//" + specificToken.getService());
ugi.addToken(alias, specificToken);
if (LOG.isDebugEnabled()) {
LOG.debug("Mapped HA service delegation token for logical URI " +
haUri + " to namenode " + singleNNAddr);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No HA service delegation token found for logical URI " +
haUri);
}
}
}
}

View File

@ -28,6 +28,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -212,6 +214,14 @@ public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return createFailoverProxyProvider(conf, nameNodeUri, xface, checkPort,
fallbackToSimpleAuth, new ClientHAProxyFactory<T>());
}
protected static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
AtomicBoolean fallbackToSimpleAuth, HAProxyFactory<T> proxyFactory)
throws IOException {
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
AbstractNNFailoverProxyProvider<T> providerNN;
try {
@ -223,9 +233,10 @@ public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider
}
// Create a proxy provider instance.
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
.getConstructor(Configuration.class, URI.class, Class.class);
.getConstructor(Configuration.class, URI.class,
Class.class, HAProxyFactory.class);
FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
xface);
xface, proxyFactory);
// If the proxy provider is of an old implementation, wrap it.
if (!(provider instanceof AbstractNNFailoverProxyProvider)) {

View File

@ -65,6 +65,7 @@ public interface HdfsClientConfigKeys {
String PREFIX = "dfs.client.";
String DFS_NAMESERVICES = "dfs.nameservices";
String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470;

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
public class ClientHAProxyFactory<T> implements HAProxyFactory<T> {
@Override
@SuppressWarnings("unchecked")
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
nnAddr, conf, ugi, false, fallbackToSimpleAuth);
}
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries)
throws IOException {
return createProxy(conf, nnAddr, xface, ugi, withRetries, null);
}
}

View File

@ -25,22 +25,16 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A FailoverProxyProvider implementation which allows one to configure two URIs
@ -50,24 +44,8 @@
public class ConfiguredFailoverProxyProvider<T> extends
AbstractNNFailoverProxyProvider<T> {
private static final Log LOG =
LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
interface ProxyFactory<T> {
T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException;
}
static class DefaultProxyFactory<T> implements ProxyFactory<T> {
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return NameNodeProxies.createNonHAProxy(conf,
nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy();
}
}
private static final Logger LOG =
LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class);
protected final Configuration conf;
protected final List<AddressRpcProxyPair<T>> proxies =
@ -76,22 +54,11 @@ public T createProxy(Configuration conf, InetSocketAddress nnAddr,
protected final Class<T> xface;
private int currentProxyIndex = 0;
private final ProxyFactory<T> factory;
private final HAProxyFactory<T> factory;
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface) {
this(conf, uri, xface, new DefaultProxyFactory<T>());
}
@VisibleForTesting
ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, ProxyFactory<T> factory) {
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface class %s is not a valid NameNode protocol!");
Class<T> xface, HAProxyFactory<T> factory) {
this.xface = xface;
this.conf = new Configuration(conf);
int maxRetries = this.conf.getInt(
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
@ -111,8 +78,8 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
try {
ugi = UserGroupInformation.getCurrentUser();
Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(
conf);
Map<String, Map<String, InetSocketAddress>> map =
DFSUtilClient.getHaNnRpcAddresses(conf);
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
if (addressesInNN == null || addressesInNN.size() == 0) {
@ -128,7 +95,7 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
// The client may have a delegation token set for the logical
// URI of the cluster. Clone this token to apply to each of the
// underlying IPC addresses so that the IPC code can find it.
HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
this.factory = factory;
} catch (IOException e) {
throw new RuntimeException(e);

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This interface aims to decouple the proxy creation implementation that used
* in {@link AbstractNNFailoverProxyProvider}. Client side can use
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} to initialize the
* proxy while the server side can use NamenodeProtocols
*/
@InterfaceAudience.Private
public interface HAProxyFactory<T> {
T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException;
T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException;
}

View File

@ -25,14 +25,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Preconditions;
/**
* A NNFailoverProxyProvider implementation which works on IP failover setup.
* Only one proxy is used to connect to both servers and switching between
@ -54,15 +50,14 @@ public class IPFailoverProxyProvider<T> extends
private final Configuration conf;
private final Class<T> xface;
private final URI nameNodeUri;
private final HAProxyFactory<T> factory;
private ProxyInfo<T> nnProxyInfo = null;
public IPFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface) {
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface class %s is not a valid NameNode protocol!");
Class<T> xface, HAProxyFactory<T> factory) {
this.xface = xface;
this.nameNodeUri = uri;
this.factory = factory;
this.conf = new Configuration(conf);
int maxRetries = this.conf.getInt(
@ -92,9 +87,8 @@ public synchronized ProxyInfo<T> getProxy() {
try {
// Create a proxy that is not wrapped in RetryProxy
InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
nnProxyInfo = new ProxyInfo<T>(NameNodeProxies.createNonHAProxy(
conf, nnAddr, xface, UserGroupInformation.getCurrentUser(),
false).getProxy(), nnAddr.toString());
nnProxyInfo = new ProxyInfo<T>(factory.createProxy(conf, nnAddr, xface,
UserGroupInformation.getCurrentUser(), false), nnAddr.toString());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.io.retry.MultiException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -147,15 +146,9 @@ public Object call() throws Exception {
private volatile ProxyInfo<T> successfulProxy = null;
private volatile String toIgnore = null;
public RequestHedgingProxyProvider(
Configuration conf, URI uri, Class<T> xface) {
this(conf, uri, xface, new DefaultProxyFactory<T>());
}
@VisibleForTesting
RequestHedgingProxyProvider(Configuration conf, URI uri,
Class<T> xface, ProxyFactory<T> factory) {
super(conf, uri, xface, factory);
public RequestHedgingProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> proxyFactory) {
super(conf, uri, xface, proxyFactory);
}
@SuppressWarnings("unchecked")

View File

@ -29,9 +29,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
@ -66,20 +65,20 @@ public void setup() throws URISyntaxException {
ns = "mycluster-" + Time.monotonicNow();
nnUri = new URI("hdfs://" + ns);
conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns);
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
conf.set(
DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2");
HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2");
conf.set(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1",
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1",
"machine1.foo.bar:8020");
conf.set(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2",
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2",
"machine2.foo.bar:8020");
}
@Test
public void testHedgingWhenOneFails() throws Exception {
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -87,11 +86,11 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
return new long[]{1};
}
});
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(badMock, goodMock));
long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
@ -101,7 +100,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
@Test
public void testHedgingWhenOneIsSlow() throws Exception {
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -109,11 +108,11 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
return new long[]{1};
}
});
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(goodMock, badMock));
long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
@ -124,14 +123,14 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
@Test
public void testHedgingWhenBothFail() throws Exception {
NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
ClientProtocol worseMock = Mockito.mock(ClientProtocol.class);
Mockito.when(worseMock.getStats()).thenThrow(
new IOException("Worse mock !!"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(badMock, worseMock));
try {
provider.getProxy().proxy.getStats();
@ -147,7 +146,7 @@ public void testHedgingWhenBothFail() throws Exception {
public void testPerformFailover() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final int[] isGood = {1};
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -159,7 +158,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Was Good mock !!");
}
});
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -171,8 +170,8 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Bad mock !!");
}
});
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(goodMock, badMock));
long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
@ -234,14 +233,14 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
@Test
public void testPerformFailoverWith3Proxies() throws Exception {
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
"nn1,nn2,nn3");
conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3",
conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3",
"machine3.foo.bar:8020");
final AtomicInteger counter = new AtomicInteger(0);
final int[] isGood = {1};
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -253,7 +252,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Was Good mock !!");
}
});
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -265,7 +264,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Bad mock !!");
}
});
final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
final ClientProtocol worseMock = Mockito.mock(ClientProtocol.class);
Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
@ -278,8 +277,8 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
}
});
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(goodMock, badMock, worseMock));
long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
@ -355,14 +354,14 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
@Test
public void testHedgingWhenFileNotFoundException() throws Exception {
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
ClientProtocol active = Mockito.mock(ClientProtocol.class);
Mockito
.when(active.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong()))
.thenThrow(new RemoteException("java.io.FileNotFoundException",
"File does not exist!"));
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
ClientProtocol standby = Mockito.mock(ClientProtocol.class);
Mockito
.when(standby.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong()))
@ -370,9 +369,9 @@ public void testHedgingWhenFileNotFoundException() throws Exception {
new RemoteException("org.apache.hadoop.ipc.StandbyException",
"Standby NameNode"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri,
NamenodeProtocols.class, createFactory(active, standby));
ClientProtocol.class, createFactory(active, standby));
try {
provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
Assert.fail("Should fail since the active namenode throws"
@ -394,18 +393,18 @@ public void testHedgingWhenFileNotFoundException() throws Exception {
@Test
public void testHedgingWhenConnectException() throws Exception {
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
ClientProtocol active = Mockito.mock(ClientProtocol.class);
Mockito.when(active.getStats()).thenThrow(new ConnectException());
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
ClientProtocol standby = Mockito.mock(ClientProtocol.class);
Mockito.when(standby.getStats())
.thenThrow(
new RemoteException("org.apache.hadoop.ipc.StandbyException",
"Standby NameNode"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri,
NamenodeProtocols.class, createFactory(active, standby));
ClientProtocol.class, createFactory(active, standby));
try {
provider.getProxy().proxy.getStats();
Assert.fail("Should fail since the active namenode throws"
@ -428,15 +427,15 @@ public void testHedgingWhenConnectException() throws Exception {
@Test
public void testHedgingWhenConnectAndEOFException() throws Exception {
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
ClientProtocol active = Mockito.mock(ClientProtocol.class);
Mockito.when(active.getStats()).thenThrow(new EOFException());
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
ClientProtocol standby = Mockito.mock(ClientProtocol.class);
Mockito.when(standby.getStats()).thenThrow(new ConnectException());
RequestHedgingProxyProvider<NamenodeProtocols> provider =
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri,
NamenodeProtocols.class, createFactory(active, standby));
ClientProtocol.class, createFactory(active, standby));
try {
provider.getProxy().proxy.getStats();
Assert.fail("Should fail since both active and standby namenodes throw"
@ -453,18 +452,25 @@ public void testHedgingWhenConnectAndEOFException() throws Exception {
Mockito.verify(standby).getStats();
}
private ProxyFactory<NamenodeProtocols> createFactory(
NamenodeProtocols... protos) {
final Iterator<NamenodeProtocols> iterator =
private HAProxyFactory<ClientProtocol> createFactory(
ClientProtocol... protos) {
final Iterator<ClientProtocol> iterator =
Lists.newArrayList(protos).iterator();
return new ProxyFactory<NamenodeProtocols>() {
return new HAProxyFactory<ClientProtocol>() {
@Override
public NamenodeProtocols createProxy(Configuration conf,
InetSocketAddress nnAddr, Class<NamenodeProtocols> xface,
public ClientProtocol createProxy(Configuration conf,
InetSocketAddress nnAddr, Class<ClientProtocol> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return iterator.next();
}
@Override
public ClientProtocol createProxy(Configuration conf,
InetSocketAddress nnAddr, Class<ClientProtocol> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException {
return iterator.next();
}
};
}
}

View File

@ -140,7 +140,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host";
public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
public static final String DFS_NAMENODE_RPC_ADDRESS_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host";
public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address";
public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host";

View File

@ -441,19 +441,6 @@ public static Set<String> getAllNnPrincipals(Configuration conf) throws IOExcept
return principals;
}
/**
* Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
* the configuration.
*
* @param conf configuration
* @return list of InetSocketAddresses
*/
public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
Configuration conf) {
return DFSUtilClient.getAddresses(conf, null,
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/**
* Returns list of InetSocketAddress corresponding to backup node rpc
* addresses from the configuration.
@ -685,7 +672,7 @@ public static String addressMapToString(
public static String nnAddressesAsString(Configuration conf) {
Map<String, Map<String, InetSocketAddress>> addresses =
getHaNnRpcAddresses(conf);
DFSUtilClient.getHaNnRpcAddresses(conf);
return addressMapToString(addresses);
}

View File

@ -29,7 +29,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.security.SecurityUtil.buildTokenService;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -39,8 +38,6 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -48,17 +45,12 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@ -67,12 +59,6 @@
@InterfaceAudience.Private
public class HAUtil {
private static final Log LOG =
LogFactory.getLog(HAUtil.class);
private static final DelegationTokenSelector tokenSelector =
new DelegationTokenSelector();
private static final String[] HA_SPECIAL_INDEPENDENT_KEYS = new String []{
DFS_NAMENODE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_BIND_HOST_KEY,
@ -97,7 +83,7 @@ private HAUtil() { /* Hidden constructor */ }
*/
public static boolean isHAEnabled(Configuration conf, String nsId) {
Map<String, Map<String, InetSocketAddress>> addresses =
DFSUtil.getHaNnRpcAddresses(conf);
DFSUtilClient.getHaNnRpcAddresses(conf);
if (addresses == null) return false;
Map<String, InetSocketAddress> nnMap = addresses.get(nsId);
return nnMap != null && nnMap.size() > 1;
@ -254,47 +240,6 @@ public static boolean useLogicalUri(Configuration conf, URI nameNodeUri)
return provider.useLogicalURI();
}
/**
* Locate a delegation token associated with the given HA cluster URI, and if
* one is found, clone it to also represent the underlying namenode address.
* @param ugi the UGI to modify
* @param haUri the logical URI for the cluster
* @param nnAddrs collection of NNs in the cluster to which the token
* applies
*/
public static void cloneDelegationTokenForLogicalUri(
UserGroupInformation ugi, URI haUri,
Collection<InetSocketAddress> nnAddrs) {
// this cloning logic is only used by hdfs
Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri,
HdfsConstants.HDFS_URI_SCHEME);
Token<DelegationTokenIdentifier> haToken =
tokenSelector.selectToken(haService, ugi.getTokens());
if (haToken != null) {
for (InetSocketAddress singleNNAddr : nnAddrs) {
// this is a minor hack to prevent physical HA tokens from being
// exposed to the user via UGI.getCredentials(), otherwise these
// cloned tokens may be inadvertently propagated to jobs
Token<DelegationTokenIdentifier> specificToken =
haToken.privateClone(buildTokenService(singleNNAddr));
Text alias = new Text(
HAUtilClient.buildTokenServicePrefixForLogicalUri(
HdfsConstants.HDFS_URI_SCHEME)
+ "//" + specificToken.getService());
ugi.addToken(alias, specificToken);
if (LOG.isDebugEnabled()) {
LOG.debug("Mapped HA service delegation token for logical URI " +
haUri + " to namenode " + singleNNAddr);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No HA service delegation token found for logical URI " +
haUri);
}
}
}
/**
* Get the internet address of the currently-active NN. This should rarely be
* used, since callers of this method who connect directly to the NN using the

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.Text;
@ -112,7 +113,7 @@ public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
throws IOException {
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri,
xface, true, fallbackToSimpleAuth);
xface, true, fallbackToSimpleAuth, new NameNodeHAProxyFactory<T>());
if (failoverProxyProvider == null) {
return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
public class NameNodeHAProxyFactory<T> implements HAProxyFactory<T> {
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
ugi, withRetries, fallbackToSimpleAuth).getProxy();
}
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries)
throws IOException {
return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
ugi, withRetries).getProxy();
}
}

View File

@ -42,10 +42,10 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.IPFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
@ -333,7 +333,7 @@ public static class DummyLegacyFailoverProxyProvider<T>
private Class<T> xface;
private T proxy;
public DummyLegacyFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface) {
Class<T> xface, HAProxyFactory<T> proxyFactory) {
try {
this.proxy = NameNodeProxies.createNonHAProxy(conf,
DFSUtilClient.getNNAddress(uri), xface,

View File

@ -514,7 +514,7 @@ public void testHANameNodesWithFederation() throws URISyntaxException {
NS2_NN2_HOST);
Map<String, Map<String, InetSocketAddress>> map =
DFSUtil.getHaNnRpcAddresses(conf);
DFSUtilClient.getHaNnRpcAddresses(conf);
assertTrue(HAUtil.isHAEnabled(conf, "ns1"));
assertTrue(HAUtil.isHAEnabled(conf, "ns2"));

View File

@ -292,7 +292,7 @@ public void testHAUtilClonesDelegationTokens() throws Exception {
nn0.getNameNodeAddress().getPort()));
nnAddrs.add(new InetSocketAddress("localhost",
nn1.getNameNodeAddress().getPort()));
HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
assertEquals(3, tokens.size());
@ -321,7 +321,7 @@ public void testHAUtilClonesDelegationTokens() throws Exception {
}
// reclone the tokens, and see if they match now
HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
for (InetSocketAddress addr : nnAddrs) {
Text ipcDtService = SecurityUtil.buildTokenService(addr);
Token<DelegationTokenIdentifier> token2 =