HDFS-9039. Separate client and server side methods of o.a.h.hdfs.NameNodeProxies. Contributed by Mingliang Liu.
This commit is contained in:
parent
38420754f5
commit
ed7211563c
|
@ -0,0 +1,366 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
|
import java.lang.reflect.InvocationHandler;
|
||||||
|
import java.lang.reflect.Proxy;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
|
||||||
|
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||||
|
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
import org.apache.hadoop.io.retry.RetryProxy;
|
||||||
|
import org.apache.hadoop.io.retry.RetryUtils;
|
||||||
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create proxy objects with {@link ClientProtocol} to communicate with a remote
|
||||||
|
* NN. Generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol(
|
||||||
|
* Configuration, URI, AtomicBoolean)}, which will create either an HA- or
|
||||||
|
* non-HA-enabled client proxy as appropriate.
|
||||||
|
*
|
||||||
|
* For creating proxy objects with other protocols, please see
|
||||||
|
* {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class NameNodeProxiesClient {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
NameNodeProxiesClient.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapper for a client proxy as well as its associated service ID.
|
||||||
|
* This is simply used as a tuple-like return type for created NN proxy.
|
||||||
|
*/
|
||||||
|
public static class ProxyAndInfo<PROXYTYPE> {
|
||||||
|
private final PROXYTYPE proxy;
|
||||||
|
private final Text dtService;
|
||||||
|
private final InetSocketAddress address;
|
||||||
|
|
||||||
|
public ProxyAndInfo(PROXYTYPE proxy, Text dtService,
|
||||||
|
InetSocketAddress address) {
|
||||||
|
this.proxy = proxy;
|
||||||
|
this.dtService = dtService;
|
||||||
|
this.address = address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PROXYTYPE getProxy() {
|
||||||
|
return proxy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text getDelegationTokenService() {
|
||||||
|
return dtService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public InetSocketAddress getAddress() {
|
||||||
|
return address;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the namenode proxy with the ClientProtocol. This will handle
|
||||||
|
* creation of either HA- or non-HA-enabled proxy objects, depending upon
|
||||||
|
* if the provided URI is a configured logical URI.
|
||||||
|
*
|
||||||
|
* @param conf the configuration containing the required IPC
|
||||||
|
* properties, client failover configurations, etc.
|
||||||
|
* @param nameNodeUri the URI pointing either to a specific NameNode
|
||||||
|
* or to a logical nameservice.
|
||||||
|
* @param fallbackToSimpleAuth set to true or false during calls to indicate
|
||||||
|
* if a secure client falls back to simple auth
|
||||||
|
* @return an object containing both the proxy and the associated
|
||||||
|
* delegation token service it corresponds to
|
||||||
|
* @throws IOException if there is an error creating the proxy
|
||||||
|
* @see {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
|
||||||
|
*/
|
||||||
|
public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol(
|
||||||
|
Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth)
|
||||||
|
throws IOException {
|
||||||
|
AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider =
|
||||||
|
createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
|
||||||
|
true, fallbackToSimpleAuth);
|
||||||
|
|
||||||
|
if (failoverProxyProvider == null) {
|
||||||
|
InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
|
||||||
|
Text dtService = SecurityUtil.buildTokenService(nnAddr);
|
||||||
|
ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf,
|
||||||
|
UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
|
||||||
|
return new ProxyAndInfo<>(proxy, dtService, nnAddr);
|
||||||
|
} else {
|
||||||
|
return createHAProxy(conf, nameNodeUri, ClientProtocol.class,
|
||||||
|
failoverProxyProvider);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a dummy namenode proxy instance that utilizes our hacked
|
||||||
|
* {@link LossyRetryInvocationHandler}. Proxy instance generated using this
|
||||||
|
* method will proactively drop RPC responses. Currently this method only
|
||||||
|
* support HA setup. null will be returned if the given configuration is not
|
||||||
|
* for HA.
|
||||||
|
*
|
||||||
|
* @param config the configuration containing the required IPC
|
||||||
|
* properties, client failover configurations, etc.
|
||||||
|
* @param nameNodeUri the URI pointing either to a specific NameNode
|
||||||
|
* or to a logical nameservice.
|
||||||
|
* @param xface the IPC interface which should be created
|
||||||
|
* @param numResponseToDrop The number of responses to drop for each RPC call
|
||||||
|
* @param fallbackToSimpleAuth set to true or false during calls to indicate
|
||||||
|
* if a secure client falls back to simple auth
|
||||||
|
* @return an object containing both the proxy and the associated
|
||||||
|
* delegation token service it corresponds to. Will return null of the
|
||||||
|
* given configuration does not support HA.
|
||||||
|
* @throws IOException if there is an error creating the proxy
|
||||||
|
*/
|
||||||
|
public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
|
||||||
|
Configuration config, URI nameNodeUri, Class<T> xface,
|
||||||
|
int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkArgument(numResponseToDrop > 0);
|
||||||
|
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
|
||||||
|
createFailoverProxyProvider(config, nameNodeUri, xface, true,
|
||||||
|
fallbackToSimpleAuth);
|
||||||
|
|
||||||
|
if (failoverProxyProvider != null) { // HA case
|
||||||
|
int delay = config.getInt(
|
||||||
|
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
|
||||||
|
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
|
||||||
|
int maxCap = config.getInt(
|
||||||
|
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
|
||||||
|
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
|
||||||
|
int maxFailoverAttempts = config.getInt(
|
||||||
|
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
|
||||||
|
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
|
||||||
|
int maxRetryAttempts = config.getInt(
|
||||||
|
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
|
||||||
|
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
|
||||||
|
InvocationHandler dummyHandler = new LossyRetryInvocationHandler<>(
|
||||||
|
numResponseToDrop, failoverProxyProvider,
|
||||||
|
RetryPolicies.failoverOnNetworkException(
|
||||||
|
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
|
||||||
|
Math.max(numResponseToDrop + 1, maxRetryAttempts), delay,
|
||||||
|
maxCap));
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
T proxy = (T) Proxy.newProxyInstance(
|
||||||
|
failoverProxyProvider.getInterface().getClassLoader(),
|
||||||
|
new Class[]{xface}, dummyHandler);
|
||||||
|
Text dtService;
|
||||||
|
if (failoverProxyProvider.useLogicalURI()) {
|
||||||
|
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
|
||||||
|
HdfsConstants.HDFS_URI_SCHEME);
|
||||||
|
} else {
|
||||||
|
dtService = SecurityUtil.buildTokenService(
|
||||||
|
DFSUtilClient.getNNAddress(nameNodeUri));
|
||||||
|
}
|
||||||
|
return new ProxyAndInfo<>(proxy, dtService,
|
||||||
|
DFSUtilClient.getNNAddress(nameNodeUri));
|
||||||
|
} else {
|
||||||
|
LOG.warn("Currently creating proxy using " +
|
||||||
|
"LossyRetryInvocationHandler requires NN HA setup");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Creates the Failover proxy provider instance*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
|
||||||
|
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
|
||||||
|
AtomicBoolean fallbackToSimpleAuth) throws IOException {
|
||||||
|
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
|
||||||
|
AbstractNNFailoverProxyProvider<T> providerNN;
|
||||||
|
try {
|
||||||
|
// Obtain the class of the proxy provider
|
||||||
|
failoverProxyProviderClass = getFailoverProxyProviderClass(conf,
|
||||||
|
nameNodeUri);
|
||||||
|
if (failoverProxyProviderClass == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
// Create a proxy provider instance.
|
||||||
|
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
|
||||||
|
.getConstructor(Configuration.class, URI.class, Class.class);
|
||||||
|
FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
|
||||||
|
xface);
|
||||||
|
|
||||||
|
// If the proxy provider is of an old implementation, wrap it.
|
||||||
|
if (!(provider instanceof AbstractNNFailoverProxyProvider)) {
|
||||||
|
providerNN = new WrappedFailoverProxyProvider<>(provider);
|
||||||
|
} else {
|
||||||
|
providerNN = (AbstractNNFailoverProxyProvider<T>)provider;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
final String message = "Couldn't create proxy provider " +
|
||||||
|
failoverProxyProviderClass;
|
||||||
|
LOG.debug(message, e);
|
||||||
|
if (e.getCause() instanceof IOException) {
|
||||||
|
throw (IOException) e.getCause();
|
||||||
|
} else {
|
||||||
|
throw new IOException(message, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the port in the URI, if it is logical.
|
||||||
|
if (checkPort && providerNN.useLogicalURI()) {
|
||||||
|
int port = nameNodeUri.getPort();
|
||||||
|
if (port > 0 &&
|
||||||
|
port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) {
|
||||||
|
// Throwing here without any cleanup is fine since we have not
|
||||||
|
// actually created the underlying proxies yet.
|
||||||
|
throw new IOException("Port " + port + " specified in URI "
|
||||||
|
+ nameNodeUri + " but host '" + nameNodeUri.getHost()
|
||||||
|
+ "' is a logical (HA) namenode"
|
||||||
|
+ " and does not use port information.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
|
||||||
|
return providerNN;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Gets the configured Failover proxy provider's class */
|
||||||
|
@VisibleForTesting
|
||||||
|
public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
|
||||||
|
Configuration conf, URI nameNodeUri) throws IOException {
|
||||||
|
if (nameNodeUri == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String host = nameNodeUri.getHost();
|
||||||
|
String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
|
||||||
|
+ "." + host;
|
||||||
|
try {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>)
|
||||||
|
conf.getClass(configKey, null, FailoverProxyProvider.class);
|
||||||
|
return ret;
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
if (e.getCause() instanceof ClassNotFoundException) {
|
||||||
|
throw new IOException("Could not load failover proxy provider class "
|
||||||
|
+ conf.get(configKey) + " which is configured for authority "
|
||||||
|
+ nameNodeUri, e);
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an explicitly HA-enabled proxy object.
|
||||||
|
*
|
||||||
|
* @param conf the configuration object
|
||||||
|
* @param nameNodeUri the URI pointing either to a specific NameNode or to a
|
||||||
|
* logical nameservice.
|
||||||
|
* @param xface the IPC interface which should be created
|
||||||
|
* @param failoverProxyProvider Failover proxy provider
|
||||||
|
* @return an object containing both the proxy and the associated
|
||||||
|
* delegation token service it corresponds to
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static <T> ProxyAndInfo<T> createHAProxy(
|
||||||
|
Configuration conf, URI nameNodeUri, Class<T> xface,
|
||||||
|
AbstractNNFailoverProxyProvider<T> failoverProxyProvider)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(failoverProxyProvider);
|
||||||
|
// HA case
|
||||||
|
DfsClientConf config = new DfsClientConf(conf);
|
||||||
|
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
|
||||||
|
RetryPolicies.failoverOnNetworkException(
|
||||||
|
RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
|
||||||
|
config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
|
||||||
|
config.getFailoverSleepMaxMillis()));
|
||||||
|
|
||||||
|
Text dtService;
|
||||||
|
if (failoverProxyProvider.useLogicalURI()) {
|
||||||
|
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
|
||||||
|
HdfsConstants.HDFS_URI_SCHEME);
|
||||||
|
} else {
|
||||||
|
dtService = SecurityUtil.buildTokenService(
|
||||||
|
DFSUtilClient.getNNAddress(nameNodeUri));
|
||||||
|
}
|
||||||
|
return new ProxyAndInfo<>(proxy, dtService,
|
||||||
|
DFSUtilClient.getNNAddress(nameNodeUri));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ClientProtocol createNonHAProxyWithClientProtocol(
|
||||||
|
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
||||||
|
boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
|
||||||
|
throws IOException {
|
||||||
|
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
|
||||||
|
ProtobufRpcEngine.class);
|
||||||
|
|
||||||
|
final RetryPolicy defaultPolicy =
|
||||||
|
RetryUtils.getDefaultRetryPolicy(
|
||||||
|
conf,
|
||||||
|
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
|
||||||
|
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
|
||||||
|
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
|
||||||
|
HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
|
||||||
|
SafeModeException.class.getName());
|
||||||
|
|
||||||
|
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
|
||||||
|
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
|
||||||
|
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
|
||||||
|
NetUtils.getDefaultSocketFactory(conf),
|
||||||
|
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
|
||||||
|
fallbackToSimpleAuth).getProxy();
|
||||||
|
|
||||||
|
if (withRetries) { // create the proxy with retries
|
||||||
|
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();
|
||||||
|
ClientProtocol translatorProxy =
|
||||||
|
new ClientNamenodeProtocolTranslatorPB(proxy);
|
||||||
|
return (ClientProtocol) RetryProxy.create(
|
||||||
|
ClientProtocol.class,
|
||||||
|
new DefaultFailoverProxyProvider<>(ClientProtocol.class,
|
||||||
|
translatorProxy),
|
||||||
|
methodNameToPolicyMap,
|
||||||
|
defaultPolicy);
|
||||||
|
} else {
|
||||||
|
return new ClientNamenodeProtocolTranslatorPB(proxy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -26,7 +26,7 @@ import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||||
public abstract class AbstractNNFailoverProxyProvider<T> implements
|
public abstract class AbstractNNFailoverProxyProvider<T> implements
|
||||||
FailoverProxyProvider <T> {
|
FailoverProxyProvider <T> {
|
||||||
|
|
||||||
protected AtomicBoolean fallbackToSimpleAuth;
|
private AtomicBoolean fallbackToSimpleAuth;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Inquire whether logical HA URI is used for the implementation. If it is
|
* Inquire whether logical HA URI is used for the implementation. If it is
|
||||||
|
@ -48,4 +48,8 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
|
||||||
AtomicBoolean fallbackToSimpleAuth) {
|
AtomicBoolean fallbackToSimpleAuth) {
|
||||||
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
|
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized AtomicBoolean getFallbackToSimpleAuth() {
|
||||||
|
return fallbackToSimpleAuth;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -17,18 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.URI;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
||||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A NNFailoverProxyProvider implementation which wrapps old implementations
|
* A NNFailoverProxyProvider implementation which wrapps old implementations
|
|
@ -596,6 +596,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-12428. Fix inconsistency between log-level guards and statements.
|
HADOOP-12428. Fix inconsistency between log-level guards and statements.
|
||||||
(Jagadesh Kiran N and Jackie Chang via ozawa)
|
(Jagadesh Kiran N and Jackie Chang via ozawa)
|
||||||
|
|
||||||
|
HDFS-9039. Separate client and server side methods of o.a.h.hdfs.
|
||||||
|
NameNodeProxies. (Mingliang Liu via wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -98,6 +98,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
import org.apache.hadoop.fs.permission.AclStatus;
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
|
@ -319,14 +320,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
int numResponseToDrop = conf.getInt(
|
int numResponseToDrop = conf.getInt(
|
||||||
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
||||||
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
|
ProxyAndInfo<ClientProtocol> proxyInfo = null;
|
||||||
AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
|
AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
|
||||||
if (numResponseToDrop > 0) {
|
if (numResponseToDrop > 0) {
|
||||||
// This case is used for testing.
|
// This case is used for testing.
|
||||||
LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
|
LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
|
||||||
+ " is set to " + numResponseToDrop
|
+ " is set to " + numResponseToDrop
|
||||||
+ ", this hacked client will proactively drop responses");
|
+ ", this hacked client will proactively drop responses");
|
||||||
proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
|
proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
|
||||||
nameNodeUri, ClientProtocol.class, numResponseToDrop,
|
nameNodeUri, ClientProtocol.class, numResponseToDrop,
|
||||||
nnFallbackToSimpleAuth);
|
nnFallbackToSimpleAuth);
|
||||||
}
|
}
|
||||||
|
@ -342,8 +343,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
} else {
|
} else {
|
||||||
Preconditions.checkArgument(nameNodeUri != null,
|
Preconditions.checkArgument(nameNodeUri != null,
|
||||||
"null URI");
|
"null URI");
|
||||||
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
|
proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,
|
||||||
ClientProtocol.class, nnFallbackToSimpleAuth);
|
nameNodeUri, nnFallbackToSimpleAuth);
|
||||||
this.dtService = proxyInfo.getDelegationTokenService();
|
this.dtService = proxyInfo.getDelegationTokenService();
|
||||||
this.namenode = proxyInfo.getProxy();
|
this.namenode = proxyInfo.getProxy();
|
||||||
}
|
}
|
||||||
|
@ -784,8 +785,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
"a failover proxy provider configured.");
|
"a failover proxy provider configured.");
|
||||||
}
|
}
|
||||||
|
|
||||||
NameNodeProxies.ProxyAndInfo<ClientProtocol> info =
|
ProxyAndInfo<ClientProtocol> info =
|
||||||
NameNodeProxies.createProxy(conf, uri, ClientProtocol.class);
|
NameNodeProxiesClient.createProxyWithClientProtocol(conf, uri, null);
|
||||||
assert info.getDelegationTokenService().equals(token.getService()) :
|
assert info.getDelegationTokenService().equals(token.getService()) :
|
||||||
"Returned service '" + info.getDelegationTokenService().toString() +
|
"Returned service '" + info.getDelegationTokenService().toString() +
|
||||||
"' doesn't match expected service '" +
|
"' doesn't match expected service '" +
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
|
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
@ -216,7 +216,7 @@ public class HAUtil {
|
||||||
public static boolean useLogicalUri(Configuration conf, URI nameNodeUri)
|
public static boolean useLogicalUri(Configuration conf, URI nameNodeUri)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Create the proxy provider. Actual proxy is not created.
|
// Create the proxy provider. Actual proxy is not created.
|
||||||
AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxies
|
AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxiesClient
|
||||||
.createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
|
.createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
|
||||||
false, null);
|
false, null);
|
||||||
|
|
||||||
|
@ -332,8 +332,7 @@ public class HAUtil {
|
||||||
List<ProxyAndInfo<T>> proxies = new ArrayList<ProxyAndInfo<T>>(
|
List<ProxyAndInfo<T>> proxies = new ArrayList<ProxyAndInfo<T>>(
|
||||||
nnAddresses.size());
|
nnAddresses.size());
|
||||||
for (InetSocketAddress nnAddress : nnAddresses.values()) {
|
for (InetSocketAddress nnAddress : nnAddresses.values()) {
|
||||||
NameNodeProxies.ProxyAndInfo<T> proxyInfo = null;
|
ProxyAndInfo<T> proxyInfo = NameNodeProxies.createNonHAProxy(conf,
|
||||||
proxyInfo = NameNodeProxies.createNonHAProxy(conf,
|
|
||||||
nnAddress, xface,
|
nnAddress, xface,
|
||||||
UserGroupInformation.getCurrentUser(), false);
|
UserGroupInformation.getCurrentUser(), false);
|
||||||
proxies.add(proxyInfo);
|
proxies.add(proxyInfo);
|
||||||
|
|
|
@ -18,9 +18,6 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
|
||||||
import java.lang.reflect.InvocationHandler;
|
|
||||||
import java.lang.reflect.Proxy;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -32,31 +29,19 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
|
|
||||||
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
|
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
|
|
||||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
|
||||||
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.io.retry.RetryProxy;
|
import org.apache.hadoop.io.retry.RetryProxy;
|
||||||
import org.apache.hadoop.io.retry.RetryUtils;
|
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
|
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
|
||||||
|
@ -75,9 +60,6 @@ import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
||||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
|
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create proxy objects to communicate with a remote NN. All remote access to an
|
* Create proxy objects to communicate with a remote NN. All remote access to an
|
||||||
* NN should be funneled through this class. Most of the time you'll want to use
|
* NN should be funneled through this class. Most of the time you'll want to use
|
||||||
|
@ -89,37 +71,6 @@ public class NameNodeProxies {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(NameNodeProxies.class);
|
private static final Log LOG = LogFactory.getLog(NameNodeProxies.class);
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrapper for a client proxy as well as its associated service ID.
|
|
||||||
* This is simply used as a tuple-like return type for
|
|
||||||
* {@link NameNodeProxies#createProxy} and
|
|
||||||
* {@link NameNodeProxies#createNonHAProxy}.
|
|
||||||
*/
|
|
||||||
public static class ProxyAndInfo<PROXYTYPE> {
|
|
||||||
private final PROXYTYPE proxy;
|
|
||||||
private final Text dtService;
|
|
||||||
private final InetSocketAddress address;
|
|
||||||
|
|
||||||
public ProxyAndInfo(PROXYTYPE proxy, Text dtService,
|
|
||||||
InetSocketAddress address) {
|
|
||||||
this.proxy = proxy;
|
|
||||||
this.dtService = dtService;
|
|
||||||
this.address = address;
|
|
||||||
}
|
|
||||||
|
|
||||||
public PROXYTYPE getProxy() {
|
|
||||||
return proxy;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Text getDelegationTokenService() {
|
|
||||||
return dtService;
|
|
||||||
}
|
|
||||||
|
|
||||||
public InetSocketAddress getAddress() {
|
|
||||||
return address;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates the namenode proxy with the passed protocol. This will handle
|
* Creates the namenode proxy with the passed protocol. This will handle
|
||||||
* creation of either HA- or non-HA-enabled proxy objects, depending upon
|
* creation of either HA- or non-HA-enabled proxy objects, depending upon
|
||||||
|
@ -160,103 +111,16 @@ public class NameNodeProxies {
|
||||||
URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
|
URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
|
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
|
||||||
createFailoverProxyProvider(conf, nameNodeUri, xface, true,
|
NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri,
|
||||||
fallbackToSimpleAuth);
|
xface, true, fallbackToSimpleAuth);
|
||||||
|
|
||||||
if (failoverProxyProvider == null) {
|
if (failoverProxyProvider == null) {
|
||||||
// Non-HA case
|
|
||||||
return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),
|
return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),
|
||||||
xface, UserGroupInformation.getCurrentUser(), true,
|
xface, UserGroupInformation.getCurrentUser(), true,
|
||||||
fallbackToSimpleAuth);
|
fallbackToSimpleAuth);
|
||||||
} else {
|
} else {
|
||||||
// HA case
|
return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface,
|
||||||
DfsClientConf config = new DfsClientConf(conf);
|
failoverProxyProvider);
|
||||||
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
|
|
||||||
RetryPolicies.failoverOnNetworkException(
|
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
|
|
||||||
config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
|
|
||||||
config.getFailoverSleepMaxMillis()));
|
|
||||||
|
|
||||||
Text dtService;
|
|
||||||
if (failoverProxyProvider.useLogicalURI()) {
|
|
||||||
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
|
|
||||||
HdfsConstants.HDFS_URI_SCHEME);
|
|
||||||
} else {
|
|
||||||
dtService = SecurityUtil.buildTokenService(
|
|
||||||
DFSUtilClient.getNNAddress(nameNodeUri));
|
|
||||||
}
|
|
||||||
return new ProxyAndInfo<T>(proxy, dtService,
|
|
||||||
DFSUtilClient.getNNAddress(nameNodeUri));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Generate a dummy namenode proxy instance that utilizes our hacked
|
|
||||||
* {@link LossyRetryInvocationHandler}. Proxy instance generated using this
|
|
||||||
* method will proactively drop RPC responses. Currently this method only
|
|
||||||
* support HA setup. null will be returned if the given configuration is not
|
|
||||||
* for HA.
|
|
||||||
*
|
|
||||||
* @param config the configuration containing the required IPC
|
|
||||||
* properties, client failover configurations, etc.
|
|
||||||
* @param nameNodeUri the URI pointing either to a specific NameNode
|
|
||||||
* or to a logical nameservice.
|
|
||||||
* @param xface the IPC interface which should be created
|
|
||||||
* @param numResponseToDrop The number of responses to drop for each RPC call
|
|
||||||
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
|
|
||||||
* a secure client falls back to simple auth
|
|
||||||
* @return an object containing both the proxy and the associated
|
|
||||||
* delegation token service it corresponds to. Will return null of the
|
|
||||||
* given configuration does not support HA.
|
|
||||||
* @throws IOException if there is an error creating the proxy
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
|
|
||||||
Configuration config, URI nameNodeUri, Class<T> xface,
|
|
||||||
int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
|
|
||||||
throws IOException {
|
|
||||||
Preconditions.checkArgument(numResponseToDrop > 0);
|
|
||||||
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
|
|
||||||
createFailoverProxyProvider(config, nameNodeUri, xface, true,
|
|
||||||
fallbackToSimpleAuth);
|
|
||||||
|
|
||||||
if (failoverProxyProvider != null) { // HA case
|
|
||||||
int delay = config.getInt(
|
|
||||||
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
|
|
||||||
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
|
|
||||||
int maxCap = config.getInt(
|
|
||||||
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
|
|
||||||
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
|
|
||||||
int maxFailoverAttempts = config.getInt(
|
|
||||||
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
|
|
||||||
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
|
|
||||||
int maxRetryAttempts = config.getInt(
|
|
||||||
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
|
|
||||||
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
|
|
||||||
InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
|
|
||||||
numResponseToDrop, failoverProxyProvider,
|
|
||||||
RetryPolicies.failoverOnNetworkException(
|
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
|
|
||||||
Math.max(numResponseToDrop + 1, maxRetryAttempts), delay,
|
|
||||||
maxCap));
|
|
||||||
|
|
||||||
T proxy = (T) Proxy.newProxyInstance(
|
|
||||||
failoverProxyProvider.getInterface().getClassLoader(),
|
|
||||||
new Class[] { xface }, dummyHandler);
|
|
||||||
Text dtService;
|
|
||||||
if (failoverProxyProvider.useLogicalURI()) {
|
|
||||||
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
|
|
||||||
HdfsConstants.HDFS_URI_SCHEME);
|
|
||||||
} else {
|
|
||||||
dtService = SecurityUtil.buildTokenService(
|
|
||||||
DFSUtilClient.getNNAddress(nameNodeUri));
|
|
||||||
}
|
|
||||||
return new ProxyAndInfo<T>(proxy, dtService,
|
|
||||||
DFSUtilClient.getNNAddress(nameNodeUri));
|
|
||||||
} else {
|
|
||||||
LOG.warn("Currently creating proxy using " +
|
|
||||||
"LossyRetryInvocationHandler requires NN HA setup");
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,8 +167,8 @@ public class NameNodeProxies {
|
||||||
|
|
||||||
T proxy;
|
T proxy;
|
||||||
if (xface == ClientProtocol.class) {
|
if (xface == ClientProtocol.class) {
|
||||||
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
|
proxy = (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
|
||||||
withRetries, fallbackToSimpleAuth);
|
nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth);
|
||||||
} else if (xface == JournalProtocol.class) {
|
} else if (xface == JournalProtocol.class) {
|
||||||
proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
|
proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
|
||||||
} else if (xface == NamenodeProtocol.class) {
|
} else if (xface == NamenodeProtocol.class) {
|
||||||
|
@ -391,45 +255,6 @@ public class NameNodeProxies {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ClientProtocol createNNProxyWithClientProtocol(
|
|
||||||
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
|
||||||
boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
|
|
||||||
throws IOException {
|
|
||||||
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
|
|
||||||
|
|
||||||
final RetryPolicy defaultPolicy =
|
|
||||||
RetryUtils.getDefaultRetryPolicy(
|
|
||||||
conf,
|
|
||||||
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
|
|
||||||
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
|
|
||||||
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
|
|
||||||
HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
|
|
||||||
SafeModeException.class.getName());
|
|
||||||
|
|
||||||
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
|
|
||||||
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
|
|
||||||
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
|
|
||||||
NetUtils.getDefaultSocketFactory(conf),
|
|
||||||
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
|
|
||||||
fallbackToSimpleAuth).getProxy();
|
|
||||||
|
|
||||||
if (withRetries) { // create the proxy with retries
|
|
||||||
|
|
||||||
Map<String, RetryPolicy> methodNameToPolicyMap
|
|
||||||
= new HashMap<String, RetryPolicy>();
|
|
||||||
ClientProtocol translatorProxy =
|
|
||||||
new ClientNamenodeProtocolTranslatorPB(proxy);
|
|
||||||
return (ClientProtocol) RetryProxy.create(
|
|
||||||
ClientProtocol.class,
|
|
||||||
new DefaultFailoverProxyProvider<ClientProtocol>(
|
|
||||||
ClientProtocol.class, translatorProxy),
|
|
||||||
methodNameToPolicyMap,
|
|
||||||
defaultPolicy);
|
|
||||||
} else {
|
|
||||||
return new ClientNamenodeProtocolTranslatorPB(proxy);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Object createNameNodeProxy(InetSocketAddress address,
|
private static Object createNameNodeProxy(InetSocketAddress address,
|
||||||
Configuration conf, UserGroupInformation ugi, Class<?> xface)
|
Configuration conf, UserGroupInformation ugi, Class<?> xface)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -439,88 +264,4 @@ public class NameNodeProxies {
|
||||||
return proxy;
|
return proxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Gets the configured Failover proxy provider's class */
|
|
||||||
@VisibleForTesting
|
|
||||||
public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
|
|
||||||
Configuration conf, URI nameNodeUri) throws IOException {
|
|
||||||
if (nameNodeUri == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
String host = nameNodeUri.getHost();
|
|
||||||
String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
|
|
||||||
+ "." + host;
|
|
||||||
try {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf
|
|
||||||
.getClass(configKey, null, FailoverProxyProvider.class);
|
|
||||||
return ret;
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
if (e.getCause() instanceof ClassNotFoundException) {
|
|
||||||
throw new IOException("Could not load failover proxy provider class "
|
|
||||||
+ conf.get(configKey) + " which is configured for authority "
|
|
||||||
+ nameNodeUri, e);
|
|
||||||
} else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Creates the Failover proxy provider instance*/
|
|
||||||
@VisibleForTesting
|
|
||||||
public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
|
|
||||||
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
|
|
||||||
AtomicBoolean fallbackToSimpleAuth) throws IOException {
|
|
||||||
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
|
|
||||||
AbstractNNFailoverProxyProvider<T> providerNN;
|
|
||||||
Preconditions.checkArgument(
|
|
||||||
xface.isAssignableFrom(NamenodeProtocols.class),
|
|
||||||
"Interface %s is not a NameNode protocol", xface);
|
|
||||||
try {
|
|
||||||
// Obtain the class of the proxy provider
|
|
||||||
failoverProxyProviderClass = getFailoverProxyProviderClass(conf,
|
|
||||||
nameNodeUri);
|
|
||||||
if (failoverProxyProviderClass == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
// Create a proxy provider instance.
|
|
||||||
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
|
|
||||||
.getConstructor(Configuration.class, URI.class, Class.class);
|
|
||||||
FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
|
|
||||||
xface);
|
|
||||||
|
|
||||||
// If the proxy provider is of an old implementation, wrap it.
|
|
||||||
if (!(provider instanceof AbstractNNFailoverProxyProvider)) {
|
|
||||||
providerNN = new WrappedFailoverProxyProvider<T>(provider);
|
|
||||||
} else {
|
|
||||||
providerNN = (AbstractNNFailoverProxyProvider<T>)provider;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(message, e);
|
|
||||||
}
|
|
||||||
if (e.getCause() instanceof IOException) {
|
|
||||||
throw (IOException) e.getCause();
|
|
||||||
} else {
|
|
||||||
throw new IOException(message, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the port in the URI, if it is logical.
|
|
||||||
if (checkPort && providerNN.useLogicalURI()) {
|
|
||||||
int port = nameNodeUri.getPort();
|
|
||||||
if (port > 0 &&
|
|
||||||
port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) {
|
|
||||||
// Throwing here without any cleanup is fine since we have not
|
|
||||||
// actually created the underlying proxies yet.
|
|
||||||
throw new IOException("Port " + port + " specified in URI "
|
|
||||||
+ nameNodeUri + " but host '" + nameNodeUri.getHost()
|
|
||||||
+ "' is a logical (HA) namenode"
|
|
||||||
+ " and does not use port information.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
|
|
||||||
return providerNN;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
|
||||||
if (current.namenode == null) {
|
if (current.namenode == null) {
|
||||||
try {
|
try {
|
||||||
current.namenode = factory.createProxy(conf,
|
current.namenode = factory.createProxy(conf,
|
||||||
current.address, xface, ugi, false, fallbackToSimpleAuth);
|
current.address, xface, ugi, false, getFallbackToSimpleAuth());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed to create RPC proxy to NameNode", e);
|
LOG.error("Failed to create RPC proxy to NameNode", e);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
|
|
@ -61,7 +61,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HAUtil;
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
|
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
|
|
@ -57,7 +57,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||||
|
@ -196,7 +196,7 @@ public class TestRetryCacheWithHA {
|
||||||
private DFSClient genClientWithDummyHandler() throws IOException {
|
private DFSClient genClientWithDummyHandler() throws IOException {
|
||||||
URI nnUri = dfs.getUri();
|
URI nnUri = dfs.getUri();
|
||||||
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
|
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
|
||||||
NameNodeProxies.createFailoverProxyProvider(conf,
|
NameNodeProxiesClient.createFailoverProxyProvider(conf,
|
||||||
nnUri, ClientProtocol.class, true, null);
|
nnUri, ClientProtocol.class, true, null);
|
||||||
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
|
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
|
||||||
failoverProxyProvider, RetryPolicies
|
failoverProxyProvider, RetryPolicies
|
||||||
|
|
Loading…
Reference in New Issue