svn merge -c 1349124 from trunk for HDFS-3504.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1349126 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-06-12 05:33:54 +00:00
parent 8e1730243b
commit 4c831ec671
16 changed files with 676 additions and 121 deletions

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.ipc.RpcInvocationHandler;
class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private FailoverProxyProvider proxyProvider;
private final FailoverProxyProvider proxyProvider;
/**
* The number of times the associated proxyProvider has ever been failed over.
@ -42,26 +42,25 @@ class RetryInvocationHandler implements RpcInvocationHandler {
private long proxyProviderFailoverCount = 0;
private volatile boolean hasMadeASuccessfulCall = false;
private RetryPolicy defaultPolicy;
private Map<String,RetryPolicy> methodNameToPolicyMap;
private final RetryPolicy defaultPolicy;
private final Map<String,RetryPolicy> methodNameToPolicyMap;
private Object currentProxy;
public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
RetryPolicy retryPolicy) {
this.proxyProvider = proxyProvider;
this.defaultPolicy = retryPolicy;
this.methodNameToPolicyMap = Collections.emptyMap();
this.currentProxy = proxyProvider.getProxy();
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
}
public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
RetryPolicy defaultPolicy,
Map<String, RetryPolicy> methodNameToPolicyMap) {
this.proxyProvider = proxyProvider;
this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
this.defaultPolicy = defaultPolicy;
this.methodNameToPolicyMap = methodNameToPolicyMap;
this.currentProxy = proxyProvider.getProxy();
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
RetryPolicy policy = methodNameToPolicyMap.get(method.getName());

View File

@ -22,10 +22,13 @@ import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@ -33,8 +36,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import com.google.common.annotations.VisibleForTesting;
/**
* <p>
* A collection of useful implementations of {@link RetryPolicy}.
@ -44,7 +45,12 @@ public class RetryPolicies {
public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
private static final Random RAND = new Random();
private static ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random();
}
};
/**
* <p>
@ -157,17 +163,35 @@ public class RetryPolicies {
}
}
/**
* Retry up to maxRetries.
* The actual sleep time of the n-th retry is f(n, sleepTime),
* where f is a function provided by the subclass implementation.
*
* The object of the subclasses should be immutable;
* otherwise, the subclass must override hashCode(), equals(..) and toString().
*/
static abstract class RetryLimited implements RetryPolicy {
int maxRetries;
long sleepTime;
TimeUnit timeUnit;
final int maxRetries;
final long sleepTime;
final TimeUnit timeUnit;
private String myString;
RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) {
if (maxRetries < 0) {
throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 0");
}
if (sleepTime < 0) {
throw new IllegalArgumentException("sleepTime = " + sleepTime + " < 0");
}
public RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) {
this.maxRetries = maxRetries;
this.sleepTime = sleepTime;
this.timeUnit = timeUnit;
}
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isMethodIdempotent) throws Exception {
if (retries >= maxRetries) {
@ -178,6 +202,30 @@ public class RetryPolicies {
}
protected abstract long calculateSleepTime(int retries);
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public boolean equals(final Object that) {
if (this == that) {
return true;
} else if (that == null || this.getClass() != that.getClass()) {
return false;
}
return this.toString().equals(that.toString());
}
@Override
public String toString() {
if (myString == null) {
myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries
+ ", sleepTime=" + sleepTime + " " + timeUnit + ")";
}
return myString;
}
}
static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited {
@ -208,6 +256,169 @@ public class RetryPolicies {
}
}
/**
* Given pairs of number of retries and sleep time (n0, t0), (n1, t1), ...,
* the first n0 retries sleep t0 milliseconds on average,
* the following n1 retries sleep t1 milliseconds on average, and so on.
*
* For all the sleep, the actual sleep time is randomly uniform distributed
* in the close interval [0.5t, 1.5t], where t is the sleep time specified.
*
* The objects of this class are immutable.
*/
public static class MultipleLinearRandomRetry implements RetryPolicy {
/** Pairs of numRetries and sleepSeconds */
public static class Pair {
final int numRetries;
final int sleepMillis;
public Pair(final int numRetries, final int sleepMillis) {
if (numRetries < 0) {
throw new IllegalArgumentException("numRetries = " + numRetries+" < 0");
}
if (sleepMillis < 0) {
throw new IllegalArgumentException("sleepMillis = " + sleepMillis + " < 0");
}
this.numRetries = numRetries;
this.sleepMillis = sleepMillis;
}
@Override
public String toString() {
return numRetries + "x" + sleepMillis + "ms";
}
}
private final List<Pair> pairs;
private String myString;
public MultipleLinearRandomRetry(List<Pair> pairs) {
if (pairs == null || pairs.isEmpty()) {
throw new IllegalArgumentException("pairs must be neither null nor empty.");
}
this.pairs = Collections.unmodifiableList(pairs);
}
@Override
public RetryAction shouldRetry(Exception e, int curRetry, int failovers,
boolean isMethodIdempotent) throws Exception {
final Pair p = searchPair(curRetry);
if (p == null) {
//no more retries.
return RetryAction.FAIL;
}
//calculate sleep time and return.
final double ratio = RANDOM.get().nextDouble() + 0.5;//0.5 <= ratio <=1.5
final long sleepTime = Math.round(p.sleepMillis * ratio);
return new RetryAction(RetryAction.RetryDecision.RETRY, sleepTime);
}
/**
* Given the current number of retry, search the corresponding pair.
* @return the corresponding pair,
* or null if the current number of retry > maximum number of retry.
*/
private Pair searchPair(int curRetry) {
int i = 0;
for(; i < pairs.size() && curRetry > pairs.get(i).numRetries; i++) {
curRetry -= pairs.get(i).numRetries;
}
return i == pairs.size()? null: pairs.get(i);
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public boolean equals(final Object that) {
if (this == that) {
return true;
} else if (that == null || this.getClass() != that.getClass()) {
return false;
}
return this.toString().equals(that.toString());
}
@Override
public String toString() {
if (myString == null) {
myString = getClass().getSimpleName() + pairs;
}
return myString;
}
/**
* Parse the given string as a MultipleLinearRandomRetry object.
* The format of the string is "t_1, n_1, t_2, n_2, ...",
* where t_i and n_i are the i-th pair of sleep time and number of retires.
* Note that the white spaces in the string are ignored.
*
* @return the parsed object, or null if the parsing fails.
*/
public static MultipleLinearRandomRetry parseCommaSeparatedString(String s) {
final String[] elements = s.split(",");
if (elements.length == 0) {
LOG.warn("Illegal value: there is no element in \"" + s + "\".");
return null;
}
if (elements.length % 2 != 0) {
LOG.warn("Illegal value: the number of elements in \"" + s + "\" is "
+ elements.length + " but an even number of elements is expected.");
return null;
}
final List<RetryPolicies.MultipleLinearRandomRetry.Pair> pairs
= new ArrayList<RetryPolicies.MultipleLinearRandomRetry.Pair>();
for(int i = 0; i < elements.length; ) {
//parse the i-th sleep-time
final int sleep = parsePositiveInt(elements, i++, s);
if (sleep == -1) {
return null; //parse fails
}
//parse the i-th number-of-retries
final int retries = parsePositiveInt(elements, i++, s);
if (retries == -1) {
return null; //parse fails
}
pairs.add(new RetryPolicies.MultipleLinearRandomRetry.Pair(retries, sleep));
}
return new RetryPolicies.MultipleLinearRandomRetry(pairs);
}
/**
* Parse the i-th element as an integer.
* @return -1 if the parsing fails or the parsed value <= 0;
* otherwise, return the parsed value.
*/
private static int parsePositiveInt(final String[] elements,
final int i, final String originalString) {
final String s = elements[i].trim();
final int n;
try {
n = Integer.parseInt(s);
} catch(NumberFormatException nfe) {
LOG.warn("Failed to parse \"" + s + "\", which is the index " + i
+ " element in \"" + originalString + "\"", nfe);
return -1;
}
if (n <= 0) {
LOG.warn("The value " + n + " <= 0: it is parsed from the string \""
+ s + "\" which is the index " + i + " element in \""
+ originalString + "\"");
return -1;
}
return n;
}
}
static class ExceptionDependentRetry implements RetryPolicy {
RetryPolicy defaultPolicy;
@ -265,6 +476,14 @@ public class RetryPolicies {
public ExponentialBackoffRetry(
int maxRetries, long sleepTime, TimeUnit timeUnit) {
super(maxRetries, sleepTime, timeUnit);
if (maxRetries < 0) {
throw new IllegalArgumentException("maxRetries = " + maxRetries + " < 0");
} else if (maxRetries >= Long.SIZE - 1) {
//calculateSleepTime may overflow.
throw new IllegalArgumentException("maxRetries = " + maxRetries
+ " >= " + (Long.SIZE - 1));
}
}
@Override
@ -353,11 +572,10 @@ public class RetryPolicies {
* @param cap value at which to cap the base sleep time
* @return an amount of time to sleep
*/
@VisibleForTesting
public static long calculateExponentialTime(long time, int retries,
private static long calculateExponentialTime(long time, int retries,
long cap) {
long baseTime = Math.min(time * ((long)1 << retries), cap);
return (long) (baseTime * (RAND.nextFloat() + 0.5));
long baseTime = Math.min(time * (1L << retries), cap);
return (long) (baseTime * (RANDOM.get().nextDouble() + 0.5));
}
private static long calculateExponentialTime(long time, int retries) {

View File

@ -60,6 +60,12 @@ public interface RetryPolicy {
this.reason = reason;
}
@Override
public String toString() {
return getClass().getSimpleName() + "(action=" + action
+ ", delayMillis=" + delayMillis + ", reason=" + reason + ")";
}
public enum RetryDecision {
FAIL,
RETRY,

View File

@ -75,9 +75,10 @@ public class RetryProxy {
*/
public static Object create(Class<?> iface, Object implementation,
Map<String,RetryPolicy> methodNameToPolicyMap) {
return RetryProxy.create(iface,
return create(iface,
new DefaultFailoverProxyProvider(iface, implementation),
methodNameToPolicyMap);
methodNameToPolicyMap,
RetryPolicies.TRY_ONCE_THEN_FAIL);
}
/**
@ -92,11 +93,13 @@ public class RetryProxy {
* @return the retry proxy
*/
public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider,
Map<String,RetryPolicy> methodNameToPolicyMap) {
Map<String,RetryPolicy> methodNameToPolicyMap,
RetryPolicy defaultPolicy) {
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
new RetryInvocationHandler(proxyProvider, methodNameToPolicyMap)
new RetryInvocationHandler(proxyProvider, defaultPolicy,
methodNameToPolicyMap)
);
}
}

View File

@ -18,47 +18,51 @@
package org.apache.hadoop.ipc;
import java.net.InetAddress;
import java.net.Socket;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.io.IOException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SaslRpcClient;
@ -67,8 +71,8 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
@ -80,8 +84,8 @@ import org.apache.hadoop.util.ReflectionUtils;
*/
public class Client {
public static final Log LOG =
LogFactory.getLog(Client.class);
public static final Log LOG = LogFactory.getLog(Client.class);
private Hashtable<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>();
@ -228,8 +232,7 @@ public class Client {
private int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private int maxRetries; //the max. no. of retries for socket connections
// the max. no. of retries for socket connections on time out exceptions
private final RetryPolicy connectionRetryPolicy;
private int maxRetriesOnSocketTimeouts;
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private boolean doPing; //do we need to send ping message
@ -253,7 +256,7 @@ public class Client {
}
this.rpcTimeout = remoteId.getRpcTimeout();
this.maxIdleTime = remoteId.getMaxIdleTime();
this.maxRetries = remoteId.getMaxRetries();
this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing();
@ -488,7 +491,7 @@ public class Client {
if (updateAddress()) {
timeoutFailures = ioFailures = 0;
}
handleConnectionFailure(ioFailures++, maxRetries, ie);
handleConnectionFailure(ioFailures++, ie);
}
}
}
@ -680,8 +683,36 @@ public class Client {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
LOG.info("Retrying connect to server: " + server +
". Already tried " + curRetries + " time(s).");
LOG.info("Retrying connect to server: " + server + ". Already tried "
+ curRetries + " time(s); maxRetries=" + maxRetries);
}
private void handleConnectionFailure(int curRetries, IOException ioe
) throws IOException {
closeConnection();
final RetryAction action;
try {
action = connectionRetryPolicy.shouldRetry(ioe, curRetries, 0, true);
} catch(Exception e) {
throw e instanceof IOException? (IOException)e: new IOException(e);
}
if (action.action == RetryAction.RetryDecision.FAIL) {
if (action.reason != null) {
LOG.warn("Failed to connect to server: " + server + ": "
+ action.reason, ioe);
}
throw ioe;
}
try {
Thread.sleep(action.delayMillis);
} catch (InterruptedException e) {
throw (IOException)new InterruptedIOException("Interrupted: action="
+ action + ", retry policy=" + connectionRetryPolicy).initCause(e);
}
LOG.info("Retrying connect to server: " + server + ". Already tried "
+ curRetries + " time(s); retry policy is " + connectionRetryPolicy);
}
/**
@ -849,6 +880,10 @@ public class Client {
try {
RpcResponseHeaderProto response =
RpcResponseHeaderProto.parseDelimitedFrom(in);
if (response == null) {
throw new IOException("Response is null.");
}
int callId = response.getCallId();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId);
@ -1287,7 +1322,7 @@ public class Client {
private final String serverPrincipal;
private final int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private final int maxRetries; //the max. no. of retries for socket connections
private final RetryPolicy connectionRetryPolicy;
// the max. no. of retries for socket connections on time out exceptions
private final int maxRetriesOnSocketTimeouts;
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
@ -1297,7 +1332,7 @@ public class Client {
ConnectionId(InetSocketAddress address, Class<?> protocol,
UserGroupInformation ticket, int rpcTimeout,
String serverPrincipal, int maxIdleTime,
int maxRetries, int maxRetriesOnSocketTimeouts,
RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts,
boolean tcpNoDelay, boolean doPing, int pingInterval) {
this.protocol = protocol;
this.address = address;
@ -1305,7 +1340,7 @@ public class Client {
this.rpcTimeout = rpcTimeout;
this.serverPrincipal = serverPrincipal;
this.maxIdleTime = maxIdleTime;
this.maxRetries = maxRetries;
this.connectionRetryPolicy = connectionRetryPolicy;
this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts;
this.tcpNoDelay = tcpNoDelay;
this.doPing = doPing;
@ -1336,10 +1371,6 @@ public class Client {
return maxIdleTime;
}
int getMaxRetries() {
return maxRetries;
}
/** max connection retries on socket time outs */
public int getMaxRetriesOnSocketTimeouts() {
return maxRetriesOnSocketTimeouts;
@ -1357,6 +1388,12 @@ public class Client {
return pingInterval;
}
static ConnectionId getConnectionId(InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
Configuration conf) throws IOException {
return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);
}
/**
* Returns a ConnectionId object.
* @param addr Remote address for the connection.
@ -1367,9 +1404,18 @@ public class Client {
* @return A ConnectionId instance
* @throws IOException
*/
public static ConnectionId getConnectionId(InetSocketAddress addr,
static ConnectionId getConnectionId(InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
Configuration conf) throws IOException {
RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException {
if (connectionRetryPolicy == null) {
final int max = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
max, 1, TimeUnit.SECONDS);
}
String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
boolean doPing =
conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
@ -1377,8 +1423,7 @@ public class Client {
rpcTimeout, remotePrincipal,
conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT),
conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT),
connectionRetryPolicy,
conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT),
@ -1421,7 +1466,7 @@ public class Client {
return isEqual(this.address, that.address)
&& this.doPing == that.doPing
&& this.maxIdleTime == that.maxIdleTime
&& this.maxRetries == that.maxRetries
&& isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy)
&& this.pingInterval == that.pingInterval
&& isEqual(this.protocol, that.protocol)
&& this.rpcTimeout == that.rpcTimeout
@ -1434,11 +1479,10 @@ public class Client {
@Override
public int hashCode() {
int result = 1;
int result = connectionRetryPolicy.hashCode();
result = PRIME * result + ((address == null) ? 0 : address.hashCode());
result = PRIME * result + (doPing ? 1231 : 1237);
result = PRIME * result + maxIdleTime;
result = PRIME * result + maxRetries;
result = PRIME * result + pingInterval;
result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
result = PRIME * result + rpcTimeout;

View File

@ -36,9 +36,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputOutputStream;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@ -66,15 +66,24 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ClientCache CLIENTS = new ClientCache();
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, null);
}
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
) throws IOException {
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol
.getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
addr, ticket, conf, factory, rpcTimeout)), false);
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}
@Override
@ -97,11 +106,12 @@ public class ProtobufRpcEngine implements RpcEngine {
private final long clientProtocolVersion;
private final String protocolName;
public Invoker(Class<?> protocol, InetSocketAddress addr,
private Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf), conf, factory);
int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory);
}
/**

View File

@ -41,6 +41,7 @@ import org.apache.commons.logging.*;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
import org.apache.hadoop.net.NetUtils;
@ -326,7 +327,7 @@ public class RPC {
long clientVersion,
InetSocketAddress addr, Configuration conf,
long connTimeout) throws IOException {
return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout);
}
/**
@ -347,7 +348,7 @@ public class RPC {
int rpcTimeout,
long timeout) throws IOException {
return waitForProtocolProxy(protocol, clientVersion, addr,
conf, rpcTimeout, timeout).getProxy();
conf, rpcTimeout, null, timeout).getProxy();
}
/**
@ -367,6 +368,7 @@ public class RPC {
long clientVersion,
InetSocketAddress addr, Configuration conf,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
long timeout) throws IOException {
long startTime = System.currentTimeMillis();
IOException ioe;
@ -374,7 +376,7 @@ public class RPC {
try {
return getProtocolProxy(protocol, clientVersion, addr,
UserGroupInformation.getCurrentUser(), conf, NetUtils
.getDefaultSocketFactory(conf), rpcTimeout);
.getDefaultSocketFactory(conf), rpcTimeout, connectionRetryPolicy);
} catch(ConnectException se) { // namenode has not been started
LOG.info("Server at " + addr + " not available yet, Zzzzz...");
ioe = se;
@ -463,7 +465,7 @@ public class RPC {
Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(
protocol, clientVersion, addr, ticket, conf, factory, 0);
protocol, clientVersion, addr, ticket, conf, factory, 0, null);
}
/**
@ -489,7 +491,7 @@ public class RPC {
SocketFactory factory,
int rpcTimeout) throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, ticket,
conf, factory, rpcTimeout).getProxy();
conf, factory, rpcTimeout, null).getProxy();
}
/**
@ -512,12 +514,13 @@ public class RPC {
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol,conf).getProxy(protocol,
clientVersion, addr, ticket, conf, factory, rpcTimeout);
return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
}
/**

View File

@ -98,7 +98,8 @@ public class RemoteException extends IOException {
attrs.getValue("message"));
}
@Override
public String toString() {
return className + ": " + getMessage();
return getClass().getName() + "(" + className + "): " + getMessage();
}
}

View File

@ -26,6 +26,7 @@ import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@ -40,7 +41,8 @@ public interface RpcEngine {
<T> ProtocolProxy<T> getProxy(Class<T> protocol,
long clientVersion, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException;
SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException;
/** Expert: Make multiple, parallel calls to a set of servers. */
Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,

View File

@ -31,6 +31,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.VersionedProtocol;
@ -259,9 +260,14 @@ public class WritableRpcEngine implements RpcEngine {
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout)
int rpcTimeout, RetryPolicy connectionRetryPolicy)
throws IOException {
if (connectionRetryPolicy != null) {
throw new UnsupportedOperationException(
"Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
}
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout));

View File

@ -18,50 +18,55 @@
package org.apache.hadoop.ipc;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.TestSaslRPC.TestSaslImpl;
import org.apache.hadoop.ipc.TestSaslRPC.TestSaslProtocol;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.MockitoUtil;
import org.junit.Test;
import static org.junit.Assert.*;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
import static org.apache.hadoop.test.MetricsAsserts.*;
/** Unit tests for RPC. */
@SuppressWarnings("deprecation")
public class TestRPC {
@ -250,7 +255,8 @@ public class TestRPC {
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
) throws IOException {
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new StoppedInvocationHandler());
return new ProtocolProxy<T>(protocol, proxy, false);

View File

@ -70,6 +70,10 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3520. Add transfer rate logging to TransferFsImage. (eli)
HDFS-3504. Support configurable retry policy in DFSClient for RPC
connections and RPC calls, and add MultipleLinearRandomRetry, a new retry
policy. (szetszwo)
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log

View File

@ -38,6 +38,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.client.retry.policy.enabled";
public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false;
public static final String DFS_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.client.retry.policy.spec";
public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";

View File

@ -47,10 +47,12 @@ import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@ -66,6 +68,7 @@ import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
/**
* Create proxy objects to communicate with a remote NN. All remote access to an
@ -240,12 +243,106 @@ public class NameNodeProxies {
return new NamenodeProtocolTranslatorPB(proxy);
}
/**
* Return the default retry policy used in RPC.
*
* If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
*
* Otherwise, first unwrap ServiceException if possible, and then
* (1) use multipleLinearRandomRetry for
* - SafeModeException, or
* - IOException other than RemoteException, or
* - ServiceException; and
* (2) use TRY_ONCE_THEN_FAIL for
* - non-SafeMode RemoteException, or
* - non-IOException.
*
* Note that dfs.client.retry.max < 0 is not allowed.
*/
private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
}
if (multipleLinearRandomRetry == null) {
//no retry
return RetryPolicies.TRY_ONCE_THEN_FAIL;
} else {
return new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isMethodIdempotent) throws Exception {
if (e instanceof ServiceException) {
//unwrap ServiceException
final Throwable cause = e.getCause();
if (cause != null && cause instanceof Exception) {
e = (Exception)cause;
}
}
//see (1) and (2) in the javadoc of this method.
final RetryPolicy p;
if (e instanceof RemoteException) {
final RemoteException re = (RemoteException)e;
p = SafeModeException.class.getName().equals(re.getClassName())?
multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
} else if (e instanceof IOException || e instanceof ServiceException) {
p = multipleLinearRandomRetry;
} else { //non-IOException
p = RetryPolicies.TRY_ONCE_THEN_FAIL;
}
if (LOG.isDebugEnabled()) {
LOG.debug("RETRY " + retries + ") policy="
+ p.getClass().getSimpleName() + ", exception=" + e);
}
LOG.info("RETRY " + retries + ") policy="
+ p.getClass().getSimpleName() + ", exception=" + e);
return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
}
};
}
}
/**
* Return the MultipleLinearRandomRetry policy specified in the conf,
* or null if the feature is disabled.
* If the policy is specified in the conf but the policy cannot be parsed,
* the default policy is returned.
*
* Conf property: N pairs of sleep-time and number-of-retries
* dfs.client.retry.policy = "s1,n1,s2,n2,..."
*/
private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
final boolean enabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
if (!enabled) {
return null;
}
final String policy = conf.get(
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
}
private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException {
ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB) NameNodeProxies
.createNameNodeProxy(address, conf, ugi, ClientNamenodeProtocolPB.class);
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), 0, defaultPolicy).getProxy();
if (withRetries) { // create the proxy with retries
RetryPolicy createPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(5,
HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
@ -258,17 +355,21 @@ public class NameNodeProxies {
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
= new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
.retryByRemoteException(defaultPolicy,
remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
defaultPolicy, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap
= new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
proxy = (ClientNamenodeProtocolPB) RetryProxy
.create(ClientNamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
ClientNamenodeProtocolPB.class,
new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
ClientNamenodeProtocolPB.class, proxy),
methodNameToPolicyMap,
defaultPolicy);
}
return new ClientNamenodeProtocolTranslatorPB(proxy);
}

View File

@ -25,8 +25,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
@ -39,6 +37,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
@ -66,12 +66,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceProtocolHelper;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -1375,7 +1372,6 @@ public class MiniDFSCluster {
waitClusterUp();
LOG.info("Restarted the namenode");
waitActive();
LOG.info("Cluster is active");
}
}
@ -1751,6 +1747,7 @@ public class MiniDFSCluster {
}
}
}
LOG.info("Cluster is active");
}
private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,

View File

@ -25,44 +25,51 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.net.SocketTimeoutException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.log4j.Level;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.ThrowsException;
import org.mockito.invocation.InvocationOnMock;
@ -339,7 +346,7 @@ public class TestDFSClientRetries extends TestCase {
// We shouldn't have gained an extra block by the RPC.
assertEquals(blockCount, blockCount2);
return (LocatedBlock) ret2;
return ret2;
}
}).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
@ -744,5 +751,149 @@ public class TestDFSClientRetries extends TestCase {
server.stop();
}
}
/** Test client retry with namenode restarting. */
public void testNamenodeRestart() throws Exception {
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
final List<Exception> exceptions = new ArrayList<Exception>();
final Path dir = new Path("/testNamenodeRestart");
final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
final short numDatanodes = 3;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.build();
try {
cluster.waitActive();
//create a file
final DistributedFileSystem dfs = cluster.getFileSystem();
final long length = 1L << 20;
final Path file1 = new Path(dir, "foo");
DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L);
//get file status
final FileStatus s1 = dfs.getFileStatus(file1);
assertEquals(length, s1.getLen());
//shutdown namenode
cluster.shutdownNameNode(0);
//namenode is down, create another file in a thread
final Path file3 = new Path(dir, "file");
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
//it should retry till namenode is up.
final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf);
DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L);
} catch (Exception e) {
exceptions.add(e);
}
}
});
thread.start();
//restart namenode in a new thread
new Thread(new Runnable() {
@Override
public void run() {
try {
//sleep, restart, and then wait active
TimeUnit.SECONDS.sleep(30);
cluster.restartNameNode(0, false);
cluster.waitActive();
} catch (Exception e) {
exceptions.add(e);
}
}
}).start();
//namenode is down, it should retry until namenode is up again.
final FileStatus s2 = dfs.getFileStatus(file1);
assertEquals(s1, s2);
//check file1 and file3
thread.join();
assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3));
//enter safe mode
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
//leave safe mode in a new thread
new Thread(new Runnable() {
@Override
public void run() {
try {
//sleep and then leave safe mode
TimeUnit.SECONDS.sleep(30);
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
} catch (Exception e) {
exceptions.add(e);
}
}
}).start();
//namenode is in safe mode, create should retry until it leaves safe mode.
final Path file2 = new Path(dir, "bar");
DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L);
assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2));
//make sure it won't retry on exceptions like FileNotFoundException
final Path nonExisting = new Path(dir, "nonExisting");
LOG.info("setPermission: " + nonExisting);
try {
dfs.setPermission(nonExisting, new FsPermission((short)0));
fail();
} catch(FileNotFoundException fnfe) {
LOG.info("GOOD!", fnfe);
}
if (!exceptions.isEmpty()) {
LOG.error("There are " + exceptions.size() + " exception(s):");
for(int i = 0; i < exceptions.size(); i++) {
LOG.error("Exception " + i, exceptions.get(i));
}
fail();
}
} finally {
cluster.shutdown();
}
}
public void testMultipleLinearRandomRetry() {
parseMultipleLinearRandomRetry(null, "");
parseMultipleLinearRandomRetry(null, "11");
parseMultipleLinearRandomRetry(null, "11,22,33");
parseMultipleLinearRandomRetry(null, "11,22,33,44,55");
parseMultipleLinearRandomRetry(null, "AA");
parseMultipleLinearRandomRetry(null, "11,AA");
parseMultipleLinearRandomRetry(null, "11,22,33,FF");
parseMultipleLinearRandomRetry(null, "11,-22");
parseMultipleLinearRandomRetry(null, "-11,22");
parseMultipleLinearRandomRetry("[22x11ms]",
"11,22");
parseMultipleLinearRandomRetry("[22x11ms, 44x33ms]",
"11,22,33,44");
parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]",
"11,22,33,44,55,66");
parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]",
" 11, 22, 33, 44, 55, 66 ");
}
static void parseMultipleLinearRandomRetry(String expected, String s) {
final MultipleLinearRandomRetry r = MultipleLinearRandomRetry.parseCommaSeparatedString(s);
LOG.info("input=" + s + ", parsed=" + r + ", expected=" + expected);
if (r == null) {
assertEquals(expected, null);
} else {
assertEquals("MultipleLinearRandomRetry" + expected, r.toString());
}
}
}