HDFS-5118. Merge change r1520637 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1520639 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
461587e36a
commit
f059931fed
|
@ -0,0 +1,62 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.io.retry;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A dummy invocation handler extending RetryInvocationHandler. It drops the
|
||||
* first N number of responses. This invocation handler is only used for testing.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class LossyRetryInvocationHandler<T> extends RetryInvocationHandler<T> {
|
||||
private final int numToDrop;
|
||||
private static final ThreadLocal<Integer> RetryCount =
|
||||
new ThreadLocal<Integer>();
|
||||
|
||||
public LossyRetryInvocationHandler(int numToDrop,
|
||||
FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
|
||||
super(proxyProvider, retryPolicy);
|
||||
this.numToDrop = numToDrop;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object invoke(Object proxy, Method method, Object[] args)
|
||||
throws Throwable {
|
||||
RetryCount.set(0);
|
||||
return super.invoke(proxy, method, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object invokeMethod(Method method, Object[] args) throws Throwable {
|
||||
Object result = super.invokeMethod(method, args);
|
||||
int retryCount = RetryCount.get();
|
||||
if (retryCount < this.numToDrop) {
|
||||
RetryCount.set(++retryCount);
|
||||
LOG.info("Drop the response. Current retryCount == " + retryCount);
|
||||
throw new UnknownHostException("Fake Exception");
|
||||
} else {
|
||||
LOG.info("retryCount == " + retryCount
|
||||
+ ". It's time to normally process the response");
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -64,7 +64,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|||
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
|
||||
}
|
||||
|
||||
RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
|
||||
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
|
||||
RetryPolicy defaultPolicy,
|
||||
Map<String, RetryPolicy> methodNameToPolicyMap) {
|
||||
this.proxyProvider = proxyProvider;
|
||||
|
|
|
@ -6,6 +6,9 @@ Release 2.3.0 - UNRELEASED
|
|||
|
||||
NEW FEATURES
|
||||
|
||||
HDFS-5118. Provide testing support for DFSClient to drop RPC responses.
|
||||
(jing9)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-4657. Limit the number of blocks logged by the NN after a block
|
||||
|
|
|
@ -27,6 +27,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIE
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
|
||||
|
@ -44,9 +47,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIR
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||
|
@ -102,6 +102,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
|
|||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.VolumeId;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -115,13 +116,13 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
||||
|
@ -146,6 +147,7 @@ import org.apache.hadoop.io.EnumSetWritable;
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -453,7 +455,11 @@ public class DFSClient implements java.io.Closeable {
|
|||
|
||||
/**
|
||||
* Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
|
||||
* Exactly one of nameNodeUri or rpcNamenode must be null.
|
||||
* If HA is enabled and a positive value is set for
|
||||
* {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the
|
||||
* configuration, the DFSClient will use {@link LossyRetryInvocationHandler}
|
||||
* as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode
|
||||
* must be null.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
|
||||
|
@ -477,7 +483,20 @@ public class DFSClient implements java.io.Closeable {
|
|||
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
|
||||
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
|
||||
|
||||
if (rpcNamenode != null) {
|
||||
int numResponseToDrop = conf.getInt(
|
||||
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
||||
if (numResponseToDrop > 0) {
|
||||
// This case is used for testing.
|
||||
LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
|
||||
+ " is set to " + numResponseToDrop
|
||||
+ ", this hacked client will proactively drop responses");
|
||||
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies
|
||||
.createProxyWithLossyRetryHandler(conf, nameNodeUri,
|
||||
ClientProtocol.class, numResponseToDrop);
|
||||
this.dtService = proxyInfo.getDelegationTokenService();
|
||||
this.namenode = proxyInfo.getProxy();
|
||||
} else if (rpcNamenode != null) {
|
||||
// This case is used for testing.
|
||||
Preconditions.checkArgument(nameNodeUri == null);
|
||||
this.namenode = rpcNamenode;
|
||||
|
@ -516,7 +535,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
this.defaultWriteCachingStrategy =
|
||||
new CachingStrategy(writeDropBehind, readahead);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return the socket addresses to use with each configured
|
||||
* local interface. Local interfaces may be specified by IP
|
||||
|
|
|
@ -497,6 +497,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
|
||||
public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
|
||||
public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
|
||||
|
||||
// The number of NN response dropped by client proactively in each RPC call.
|
||||
// For testing NN retry cache, we can set this property with positive value.
|
||||
public static final String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = "dfs.client.test.drop.namenode.response.number";
|
||||
public static final int DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0;
|
||||
|
||||
|
||||
// Hidden configuration undocumented in hdfs-site. xml
|
||||
// Timeout to wait for block receiver and responder thread to stop
|
||||
|
|
|
@ -17,10 +17,18 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
|
@ -48,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
|
||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
|
@ -144,6 +153,61 @@ public class NameNodeProxies {
|
|||
return new ProxyAndInfo<T>(proxy, dtService);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a dummy namenode proxy instance that utilizes our hacked
|
||||
* {@link LossyRetryInvocationHandler}. Proxy instance generated using this
|
||||
* method will proactively drop RPC responses. Currently this method only
|
||||
* support HA setup. IllegalStateException will be thrown if the given
|
||||
* configuration is not for HA.
|
||||
*
|
||||
* @param config the configuration containing the required IPC
|
||||
* properties, client failover configurations, etc.
|
||||
* @param nameNodeUri the URI pointing either to a specific NameNode
|
||||
* or to a logical nameservice.
|
||||
* @param xface the IPC interface which should be created
|
||||
* @param numResponseToDrop The number of responses to drop for each RPC call
|
||||
* @return an object containing both the proxy and the associated
|
||||
* delegation token service it corresponds to
|
||||
* @throws IOException if there is an error creating the proxy
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
|
||||
Configuration config, URI nameNodeUri, Class<T> xface,
|
||||
int numResponseToDrop) throws IOException {
|
||||
Preconditions.checkArgument(numResponseToDrop > 0);
|
||||
Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
|
||||
getFailoverProxyProviderClass(config, nameNodeUri, xface);
|
||||
if (failoverProxyProviderClass != null) { // HA case
|
||||
FailoverProxyProvider<T> failoverProxyProvider =
|
||||
createFailoverProxyProvider(config, failoverProxyProviderClass,
|
||||
xface, nameNodeUri);
|
||||
int delay = config.getInt(
|
||||
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
|
||||
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
|
||||
int maxCap = config.getInt(
|
||||
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
|
||||
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
|
||||
int maxFailoverAttempts = config.getInt(
|
||||
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
|
||||
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
|
||||
InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
|
||||
numResponseToDrop, failoverProxyProvider,
|
||||
RetryPolicies.failoverOnNetworkException(
|
||||
RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||
Math.max(numResponseToDrop + 1, maxFailoverAttempts), delay,
|
||||
maxCap));
|
||||
|
||||
T proxy = (T) Proxy.newProxyInstance(
|
||||
failoverProxyProvider.getInterface().getClassLoader(),
|
||||
new Class[] { xface }, dummyHandler);
|
||||
Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
|
||||
return new ProxyAndInfo<T>(proxy, dtService);
|
||||
} else {
|
||||
throw new IllegalStateException("Currently creating proxy using " +
|
||||
"LossyRetryInvocationHandler requires NN HA setup");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an explicitly non-HA-enabled proxy object. Most of the time you
|
||||
|
|
Loading…
Reference in New Issue