HDFS-9039. Separate client and server side methods of o.a.h.hdfs.NameNodeProxies. Contributed by Mingliang Liu.

This commit is contained in:
Haohui Mai 2015-09-22 20:52:37 -07:00
parent cc2b473990
commit 63d9f1596c
13 changed files with 398 additions and 293 deletions

View File

@ -0,0 +1,366 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Create proxy objects with {@link ClientProtocol} to communicate with a remote
* NN. Generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol(
* Configuration, URI, AtomicBoolean)}, which will create either an HA- or
* non-HA-enabled client proxy as appropriate.
*
* For creating proxy objects with other protocols, please see
* {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
*/
@InterfaceAudience.Private
public class NameNodeProxiesClient {
private static final Logger LOG = LoggerFactory.getLogger(
NameNodeProxiesClient.class);
/**
* Wrapper for a client proxy as well as its associated service ID.
* This is simply used as a tuple-like return type for created NN proxy.
*/
public static class ProxyAndInfo<PROXYTYPE> {
private final PROXYTYPE proxy;
private final Text dtService;
private final InetSocketAddress address;
public ProxyAndInfo(PROXYTYPE proxy, Text dtService,
InetSocketAddress address) {
this.proxy = proxy;
this.dtService = dtService;
this.address = address;
}
public PROXYTYPE getProxy() {
return proxy;
}
public Text getDelegationTokenService() {
return dtService;
}
public InetSocketAddress getAddress() {
return address;
}
}
/**
* Creates the namenode proxy with the ClientProtocol. This will handle
* creation of either HA- or non-HA-enabled proxy objects, depending upon
* if the provided URI is a configured logical URI.
*
* @param conf the configuration containing the required IPC
* properties, client failover configurations, etc.
* @param nameNodeUri the URI pointing either to a specific NameNode
* or to a logical nameservice.
* @param fallbackToSimpleAuth set to true or false during calls to indicate
* if a secure client falls back to simple auth
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to
* @throws IOException if there is an error creating the proxy
* @see {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
*/
public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol(
Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider =
createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
true, fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
Text dtService = SecurityUtil.buildTokenService(nnAddr);
ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf,
UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
return new ProxyAndInfo<>(proxy, dtService, nnAddr);
} else {
return createHAProxy(conf, nameNodeUri, ClientProtocol.class,
failoverProxyProvider);
}
}
/**
* Generate a dummy namenode proxy instance that utilizes our hacked
* {@link LossyRetryInvocationHandler}. Proxy instance generated using this
* method will proactively drop RPC responses. Currently this method only
* support HA setup. null will be returned if the given configuration is not
* for HA.
*
* @param config the configuration containing the required IPC
* properties, client failover configurations, etc.
* @param nameNodeUri the URI pointing either to a specific NameNode
* or to a logical nameservice.
* @param xface the IPC interface which should be created
* @param numResponseToDrop The number of responses to drop for each RPC call
* @param fallbackToSimpleAuth set to true or false during calls to indicate
* if a secure client falls back to simple auth
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to. Will return null of the
* given configuration does not support HA.
* @throws IOException if there is an error creating the proxy
*/
public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
Configuration config, URI nameNodeUri, Class<T> xface,
int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
Preconditions.checkArgument(numResponseToDrop > 0);
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(config, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider != null) { // HA case
int delay = config.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
int maxCap = config.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
int maxFailoverAttempts = config.getInt(
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = config.getInt(
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
InvocationHandler dummyHandler = new LossyRetryInvocationHandler<>(
numResponseToDrop, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
Math.max(numResponseToDrop + 1, maxRetryAttempts), delay,
maxCap));
@SuppressWarnings("unchecked")
T proxy = (T) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[]{xface}, dummyHandler);
Text dtService;
if (failoverProxyProvider.useLogicalURI()) {
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
HdfsConstants.HDFS_URI_SCHEME);
} else {
dtService = SecurityUtil.buildTokenService(
DFSUtilClient.getNNAddress(nameNodeUri));
}
return new ProxyAndInfo<>(proxy, dtService,
DFSUtilClient.getNNAddress(nameNodeUri));
} else {
LOG.warn("Currently creating proxy using " +
"LossyRetryInvocationHandler requires NN HA setup");
return null;
}
}
/** Creates the Failover proxy provider instance*/
@VisibleForTesting
public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
AbstractNNFailoverProxyProvider<T> providerNN;
try {
// Obtain the class of the proxy provider
failoverProxyProviderClass = getFailoverProxyProviderClass(conf,
nameNodeUri);
if (failoverProxyProviderClass == null) {
return null;
}
// Create a proxy provider instance.
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
.getConstructor(Configuration.class, URI.class, Class.class);
FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
xface);
// If the proxy provider is of an old implementation, wrap it.
if (!(provider instanceof AbstractNNFailoverProxyProvider)) {
providerNN = new WrappedFailoverProxyProvider<>(provider);
} else {
providerNN = (AbstractNNFailoverProxyProvider<T>)provider;
}
} catch (Exception e) {
final String message = "Couldn't create proxy provider " +
failoverProxyProviderClass;
LOG.debug(message, e);
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new IOException(message, e);
}
}
// Check the port in the URI, if it is logical.
if (checkPort && providerNN.useLogicalURI()) {
int port = nameNodeUri.getPort();
if (port > 0 &&
port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) {
// Throwing here without any cleanup is fine since we have not
// actually created the underlying proxies yet.
throw new IOException("Port " + port + " specified in URI "
+ nameNodeUri + " but host '" + nameNodeUri.getHost()
+ "' is a logical (HA) namenode"
+ " and does not use port information.");
}
}
providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
return providerNN;
}
/** Gets the configured Failover proxy provider's class */
@VisibleForTesting
public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
Configuration conf, URI nameNodeUri) throws IOException {
if (nameNodeUri == null) {
return null;
}
String host = nameNodeUri.getHost();
String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + host;
try {
@SuppressWarnings("unchecked")
Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>)
conf.getClass(configKey, null, FailoverProxyProvider.class);
return ret;
} catch (RuntimeException e) {
if (e.getCause() instanceof ClassNotFoundException) {
throw new IOException("Could not load failover proxy provider class "
+ conf.get(configKey) + " which is configured for authority "
+ nameNodeUri, e);
} else {
throw e;
}
}
}
/**
* Creates an explicitly HA-enabled proxy object.
*
* @param conf the configuration object
* @param nameNodeUri the URI pointing either to a specific NameNode or to a
* logical nameservice.
* @param xface the IPC interface which should be created
* @param failoverProxyProvider Failover proxy provider
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createHAProxy(
Configuration conf, URI nameNodeUri, Class<T> xface,
AbstractNNFailoverProxyProvider<T> failoverProxyProvider)
throws IOException {
Preconditions.checkNotNull(failoverProxyProvider);
// HA case
DfsClientConf config = new DfsClientConf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
config.getFailoverSleepMaxMillis()));
Text dtService;
if (failoverProxyProvider.useLogicalURI()) {
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
HdfsConstants.HDFS_URI_SCHEME);
} else {
dtService = SecurityUtil.buildTokenService(
DFSUtilClient.getNNAddress(nameNodeUri));
}
return new ProxyAndInfo<>(proxy, dtService,
DFSUtilClient.getNNAddress(nameNodeUri));
}
public static ClientProtocol createNonHAProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy =
RetryUtils.getDefaultRetryPolicy(
conf,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
SafeModeException.class.getName());
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth).getProxy();
if (withRetries) { // create the proxy with retries
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();
ClientProtocol translatorProxy =
new ClientNamenodeProtocolTranslatorPB(proxy);
return (ClientProtocol) RetryProxy.create(
ClientProtocol.class,
new DefaultFailoverProxyProvider<>(ClientProtocol.class,
translatorProxy),
methodNameToPolicyMap,
defaultPolicy);
} else {
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}
}

View File

@ -26,7 +26,7 @@ import org.apache.hadoop.io.retry.FailoverProxyProvider;
public abstract class AbstractNNFailoverProxyProvider<T> implements
FailoverProxyProvider <T> {
protected AtomicBoolean fallbackToSimpleAuth;
private AtomicBoolean fallbackToSimpleAuth;
/**
* Inquire whether logical HA URI is used for the implementation. If it is
@ -48,4 +48,8 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
AtomicBoolean fallbackToSimpleAuth) {
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}
public synchronized AtomicBoolean getFallbackToSimpleAuth() {
return fallbackToSimpleAuth;
}
}

View File

@ -17,18 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Preconditions;
/**
* A NNFailoverProxyProvider implementation which wrapps old implementations

View File

@ -944,6 +944,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12428. Fix inconsistency between log-level guards and statements.
(Jagadesh Kiran N and Jackie Chang via ozawa)
HDFS-9039. Separate client and server side methods of o.a.h.hdfs.
NameNodeProxies. (Mingliang Liu via wheat9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -94,6 +94,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
@ -313,14 +314,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
int numResponseToDrop = conf.getInt(
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
ProxyAndInfo<ClientProtocol> proxyInfo = null;
AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
if (numResponseToDrop > 0) {
// This case is used for testing.
LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
+ " is set to " + numResponseToDrop
+ ", this hacked client will proactively drop responses");
proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
nameNodeUri, ClientProtocol.class, numResponseToDrop,
nnFallbackToSimpleAuth);
}
@ -336,8 +337,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} else {
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class, nnFallbackToSimpleAuth);
proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,
nameNodeUri, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}
@ -780,8 +781,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
"a failover proxy provider configured.");
}
NameNodeProxies.ProxyAndInfo<ClientProtocol> info =
NameNodeProxies.createProxy(conf, uri, ClientProtocol.class);
ProxyAndInfo<ClientProtocol> info =
NameNodeProxiesClient.createProxyWithClientProtocol(conf, uri, null);
assert info.getDelegationTokenService().equals(token.getService()) :
"Returned service '" + info.getDelegationTokenService().toString() +
"' doesn't match expected service '" +

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -220,9 +220,9 @@ public class HAUtil {
public static boolean useLogicalUri(Configuration conf, URI nameNodeUri)
throws IOException {
// Create the proxy provider. Actual proxy is not created.
AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxies
AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxiesClient
.createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
false, null);
false, null);
// No need to use logical URI since failover is not configured.
if (provider == null) {
@ -336,8 +336,7 @@ public class HAUtil {
List<ProxyAndInfo<T>> proxies = new ArrayList<ProxyAndInfo<T>>(
nnAddresses.size());
for (InetSocketAddress nnAddress : nnAddresses.values()) {
NameNodeProxies.ProxyAndInfo<T> proxyInfo = null;
proxyInfo = NameNodeProxies.createNonHAProxy(conf,
ProxyAndInfo<T> proxyInfo = NameNodeProxies.createNonHAProxy(conf,
nnAddress, xface,
UserGroupInformation.getCurrentUser(), false);
proxies.add(proxyInfo);

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
@ -32,31 +29,19 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
@ -75,9 +60,6 @@ import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Create proxy objects to communicate with a remote NN. All remote access to an
* NN should be funneled through this class. Most of the time you'll want to use
@ -89,37 +71,6 @@ public class NameNodeProxies {
private static final Log LOG = LogFactory.getLog(NameNodeProxies.class);
/**
* Wrapper for a client proxy as well as its associated service ID.
* This is simply used as a tuple-like return type for
* {@link NameNodeProxies#createProxy} and
* {@link NameNodeProxies#createNonHAProxy}.
*/
public static class ProxyAndInfo<PROXYTYPE> {
private final PROXYTYPE proxy;
private final Text dtService;
private final InetSocketAddress address;
public ProxyAndInfo(PROXYTYPE proxy, Text dtService,
InetSocketAddress address) {
this.proxy = proxy;
this.dtService = dtService;
this.address = address;
}
public PROXYTYPE getProxy() {
return proxy;
}
public Text getDelegationTokenService() {
return dtService;
}
public InetSocketAddress getAddress() {
return address;
}
}
/**
* Creates the namenode proxy with the passed protocol. This will handle
* creation of either HA- or non-HA-enabled proxy objects, depending upon
@ -160,103 +111,16 @@ public class NameNodeProxies {
URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(conf, nameNodeUri, xface, true,
fallbackToSimpleAuth);
NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri,
xface, true, fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
// Non-HA case
return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),
xface, UserGroupInformation.getCurrentUser(), true,
fallbackToSimpleAuth);
} else {
// HA case
DfsClientConf config = new DfsClientConf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
config.getFailoverSleepMaxMillis()));
Text dtService;
if (failoverProxyProvider.useLogicalURI()) {
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
HdfsConstants.HDFS_URI_SCHEME);
} else {
dtService = SecurityUtil.buildTokenService(
DFSUtilClient.getNNAddress(nameNodeUri));
}
return new ProxyAndInfo<T>(proxy, dtService,
DFSUtilClient.getNNAddress(nameNodeUri));
}
}
/**
* Generate a dummy namenode proxy instance that utilizes our hacked
* {@link LossyRetryInvocationHandler}. Proxy instance generated using this
* method will proactively drop RPC responses. Currently this method only
* support HA setup. null will be returned if the given configuration is not
* for HA.
*
* @param config the configuration containing the required IPC
* properties, client failover configurations, etc.
* @param nameNodeUri the URI pointing either to a specific NameNode
* or to a logical nameservice.
* @param xface the IPC interface which should be created
* @param numResponseToDrop The number of responses to drop for each RPC call
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to. Will return null of the
* given configuration does not support HA.
* @throws IOException if there is an error creating the proxy
*/
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
Configuration config, URI nameNodeUri, Class<T> xface,
int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
Preconditions.checkArgument(numResponseToDrop > 0);
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(config, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider != null) { // HA case
int delay = config.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
int maxCap = config.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
int maxFailoverAttempts = config.getInt(
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = config.getInt(
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
numResponseToDrop, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
Math.max(numResponseToDrop + 1, maxRetryAttempts), delay,
maxCap));
T proxy = (T) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { xface }, dummyHandler);
Text dtService;
if (failoverProxyProvider.useLogicalURI()) {
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
HdfsConstants.HDFS_URI_SCHEME);
} else {
dtService = SecurityUtil.buildTokenService(
DFSUtilClient.getNNAddress(nameNodeUri));
}
return new ProxyAndInfo<T>(proxy, dtService,
DFSUtilClient.getNNAddress(nameNodeUri));
} else {
LOG.warn("Currently creating proxy using " +
"LossyRetryInvocationHandler requires NN HA setup");
return null;
return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface,
failoverProxyProvider);
}
}
@ -303,8 +167,8 @@ public class NameNodeProxies {
T proxy;
if (xface == ClientProtocol.class) {
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
withRetries, fallbackToSimpleAuth);
proxy = (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth);
} else if (xface == JournalProtocol.class) {
proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
} else if (xface == NamenodeProtocol.class) {
@ -390,45 +254,6 @@ public class NameNodeProxies {
return new NamenodeProtocolTranslatorPB(proxy);
}
}
private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy =
RetryUtils.getDefaultRetryPolicy(
conf,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
SafeModeException.class.getName());
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth).getProxy();
if (withRetries) { // create the proxy with retries
Map<String, RetryPolicy> methodNameToPolicyMap
= new HashMap<String, RetryPolicy>();
ClientProtocol translatorProxy =
new ClientNamenodeProtocolTranslatorPB(proxy);
return (ClientProtocol) RetryProxy.create(
ClientProtocol.class,
new DefaultFailoverProxyProvider<ClientProtocol>(
ClientProtocol.class, translatorProxy),
methodNameToPolicyMap,
defaultPolicy);
} else {
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}
private static Object createNameNodeProxy(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi, Class<?> xface,
@ -439,88 +264,4 @@ public class NameNodeProxies {
return proxy;
}
/** Gets the configured Failover proxy provider's class */
@VisibleForTesting
public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
Configuration conf, URI nameNodeUri) throws IOException {
if (nameNodeUri == null) {
return null;
}
String host = nameNodeUri.getHost();
String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + host;
try {
@SuppressWarnings("unchecked")
Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf
.getClass(configKey, null, FailoverProxyProvider.class);
return ret;
} catch (RuntimeException e) {
if (e.getCause() instanceof ClassNotFoundException) {
throw new IOException("Could not load failover proxy provider class "
+ conf.get(configKey) + " which is configured for authority "
+ nameNodeUri, e);
} else {
throw e;
}
}
}
/** Creates the Failover proxy provider instance*/
@VisibleForTesting
public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
AbstractNNFailoverProxyProvider<T> providerNN;
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface %s is not a NameNode protocol", xface);
try {
// Obtain the class of the proxy provider
failoverProxyProviderClass = getFailoverProxyProviderClass(conf,
nameNodeUri);
if (failoverProxyProviderClass == null) {
return null;
}
// Create a proxy provider instance.
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
.getConstructor(Configuration.class, URI.class, Class.class);
FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
xface);
// If the proxy provider is of an old implementation, wrap it.
if (!(provider instanceof AbstractNNFailoverProxyProvider)) {
providerNN = new WrappedFailoverProxyProvider<T>(provider);
} else {
providerNN = (AbstractNNFailoverProxyProvider<T>)provider;
}
} catch (Exception e) {
String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
if (LOG.isDebugEnabled()) {
LOG.debug(message, e);
}
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new IOException(message, e);
}
}
// Check the port in the URI, if it is logical.
if (checkPort && providerNN.useLogicalURI()) {
int port = nameNodeUri.getPort();
if (port > 0 &&
port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) {
// Throwing here without any cleanup is fine since we have not
// actually created the underlying proxies yet.
throw new IOException("Port " + port + " specified in URI "
+ nameNodeUri + " but host '" + nameNodeUri.getHost()
+ "' is a logical (HA) namenode"
+ " and does not use port information.");
}
}
providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
return providerNN;
}
}

View File

@ -348,7 +348,7 @@ public class PBHelper {
new RecoveringBlock(block, locs, PBHelperClient.convert(b.getTruncateBlock())) :
new RecoveringBlock(block, locs, b.getNewGenStamp());
}
public static ReplicaState convert(ReplicaStateProto state) {
switch (state) {
case RBW:

View File

@ -149,7 +149,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
if (current.namenode == null) {
try {
current.namenode = factory.createProxy(conf,
current.address, xface, ugi, false, fallbackToSimpleAuth);
current.address, xface, ugi, false, getFallbackToSimpleAuth());
} catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e);

View File

@ -60,7 +60,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

View File

@ -57,7 +57,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
@ -196,7 +196,7 @@ public class TestRetryCacheWithHA {
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
NameNodeProxiesClient.createFailoverProxyProvider(conf,
nnUri, ClientProtocol.class, true, null);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies