From 4c831ec67104eb3f9b6e6dea151a1303e9ad7afb Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Tue, 12 Jun 2012 05:33:54 +0000 Subject: [PATCH] svn merge -c 1349124 from trunk for HDFS-3504. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1349126 13f79535-47bb-0310-9956-ffa450edef68 --- .../io/retry/RetryInvocationHandler.java | 17 +- .../apache/hadoop/io/retry/RetryPolicies.java | 242 +++++++++++++++++- .../apache/hadoop/io/retry/RetryPolicy.java | 6 + .../apache/hadoop/io/retry/RetryProxy.java | 11 +- .../java/org/apache/hadoop/ipc/Client.java | 122 ++++++--- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 28 +- .../main/java/org/apache/hadoop/ipc/RPC.java | 19 +- .../apache/hadoop/ipc/RemoteException.java | 5 +- .../java/org/apache/hadoop/ipc/RpcEngine.java | 4 +- .../apache/hadoop/ipc/WritableRpcEngine.java | 8 +- .../java/org/apache/hadoop/ipc/TestRPC.java | 30 ++- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../apache/hadoop/hdfs/NameNodeProxies.java | 113 +++++++- .../apache/hadoop/hdfs/MiniDFSCluster.java | 13 +- .../hadoop/hdfs/TestDFSClientRetries.java | 171 ++++++++++++- 16 files changed, 676 insertions(+), 121 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index c3c9f4d6a9f..6b997460b49 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -34,7 +34,7 @@ import org.apache.hadoop.ipc.RpcInvocationHandler; class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); - private FailoverProxyProvider proxyProvider; + private final FailoverProxyProvider proxyProvider; /** * The number of times the associated proxyProvider has ever been failed over. @@ -42,26 +42,25 @@ class RetryInvocationHandler implements RpcInvocationHandler { private long proxyProviderFailoverCount = 0; private volatile boolean hasMadeASuccessfulCall = false; - private RetryPolicy defaultPolicy; - private Map methodNameToPolicyMap; + private final RetryPolicy defaultPolicy; + private final Map methodNameToPolicyMap; private Object currentProxy; public RetryInvocationHandler(FailoverProxyProvider proxyProvider, RetryPolicy retryPolicy) { - this.proxyProvider = proxyProvider; - this.defaultPolicy = retryPolicy; - this.methodNameToPolicyMap = Collections.emptyMap(); - this.currentProxy = proxyProvider.getProxy(); + this(proxyProvider, retryPolicy, Collections.emptyMap()); } - + public RetryInvocationHandler(FailoverProxyProvider proxyProvider, + RetryPolicy defaultPolicy, Map methodNameToPolicyMap) { this.proxyProvider = proxyProvider; - this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL; + this.defaultPolicy = defaultPolicy; this.methodNameToPolicyMap = methodNameToPolicyMap; this.currentProxy = proxyProvider.getProxy(); } + @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RetryPolicy policy = methodNameToPolicyMap.get(method.getName()); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index 2be8b759998..8b8387ce2ce 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -22,10 +22,13 @@ import java.net.ConnectException; import java.net.NoRouteToHostException; import java.net.SocketException; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Map.Entry; +import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -33,8 +36,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; -import com.google.common.annotations.VisibleForTesting; - /** *

* A collection of useful implementations of {@link RetryPolicy}. @@ -44,7 +45,12 @@ public class RetryPolicies { public static final Log LOG = LogFactory.getLog(RetryPolicies.class); - private static final Random RAND = new Random(); + private static ThreadLocal RANDOM = new ThreadLocal() { + @Override + protected Random initialValue() { + return new Random(); + } + }; /** *

@@ -157,17 +163,35 @@ public class RetryPolicies { } } + /** + * Retry up to maxRetries. + * The actual sleep time of the n-th retry is f(n, sleepTime), + * where f is a function provided by the subclass implementation. + * + * The object of the subclasses should be immutable; + * otherwise, the subclass must override hashCode(), equals(..) and toString(). + */ static abstract class RetryLimited implements RetryPolicy { - int maxRetries; - long sleepTime; - TimeUnit timeUnit; + final int maxRetries; + final long sleepTime; + final TimeUnit timeUnit; - public RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) { + private String myString; + + RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) { + if (maxRetries < 0) { + throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 0"); + } + if (sleepTime < 0) { + throw new IllegalArgumentException("sleepTime = " + sleepTime + " < 0"); + } + this.maxRetries = maxRetries; this.sleepTime = sleepTime; this.timeUnit = timeUnit; } + @Override public RetryAction shouldRetry(Exception e, int retries, int failovers, boolean isMethodIdempotent) throws Exception { if (retries >= maxRetries) { @@ -178,6 +202,30 @@ public class RetryPolicies { } protected abstract long calculateSleepTime(int retries); + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(final Object that) { + if (this == that) { + return true; + } else if (that == null || this.getClass() != that.getClass()) { + return false; + } + return this.toString().equals(that.toString()); + } + + @Override + public String toString() { + if (myString == null) { + myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries + + ", sleepTime=" + sleepTime + " " + timeUnit + ")"; + } + return myString; + } } static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited { @@ -208,6 +256,169 @@ public class RetryPolicies { } } + /** + * Given pairs of number of retries and sleep time (n0, t0), (n1, t1), ..., + * the first n0 retries sleep t0 milliseconds on average, + * the following n1 retries sleep t1 milliseconds on average, and so on. + * + * For all the sleep, the actual sleep time is randomly uniform distributed + * in the close interval [0.5t, 1.5t], where t is the sleep time specified. + * + * The objects of this class are immutable. + */ + public static class MultipleLinearRandomRetry implements RetryPolicy { + /** Pairs of numRetries and sleepSeconds */ + public static class Pair { + final int numRetries; + final int sleepMillis; + + public Pair(final int numRetries, final int sleepMillis) { + if (numRetries < 0) { + throw new IllegalArgumentException("numRetries = " + numRetries+" < 0"); + } + if (sleepMillis < 0) { + throw new IllegalArgumentException("sleepMillis = " + sleepMillis + " < 0"); + } + + this.numRetries = numRetries; + this.sleepMillis = sleepMillis; + } + + @Override + public String toString() { + return numRetries + "x" + sleepMillis + "ms"; + } + } + + private final List pairs; + private String myString; + + public MultipleLinearRandomRetry(List pairs) { + if (pairs == null || pairs.isEmpty()) { + throw new IllegalArgumentException("pairs must be neither null nor empty."); + } + this.pairs = Collections.unmodifiableList(pairs); + } + + @Override + public RetryAction shouldRetry(Exception e, int curRetry, int failovers, + boolean isMethodIdempotent) throws Exception { + final Pair p = searchPair(curRetry); + if (p == null) { + //no more retries. + return RetryAction.FAIL; + } + + //calculate sleep time and return. + final double ratio = RANDOM.get().nextDouble() + 0.5;//0.5 <= ratio <=1.5 + final long sleepTime = Math.round(p.sleepMillis * ratio); + return new RetryAction(RetryAction.RetryDecision.RETRY, sleepTime); + } + + /** + * Given the current number of retry, search the corresponding pair. + * @return the corresponding pair, + * or null if the current number of retry > maximum number of retry. + */ + private Pair searchPair(int curRetry) { + int i = 0; + for(; i < pairs.size() && curRetry > pairs.get(i).numRetries; i++) { + curRetry -= pairs.get(i).numRetries; + } + return i == pairs.size()? null: pairs.get(i); + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(final Object that) { + if (this == that) { + return true; + } else if (that == null || this.getClass() != that.getClass()) { + return false; + } + return this.toString().equals(that.toString()); + } + + @Override + public String toString() { + if (myString == null) { + myString = getClass().getSimpleName() + pairs; + } + return myString; + } + + /** + * Parse the given string as a MultipleLinearRandomRetry object. + * The format of the string is "t_1, n_1, t_2, n_2, ...", + * where t_i and n_i are the i-th pair of sleep time and number of retires. + * Note that the white spaces in the string are ignored. + * + * @return the parsed object, or null if the parsing fails. + */ + public static MultipleLinearRandomRetry parseCommaSeparatedString(String s) { + final String[] elements = s.split(","); + if (elements.length == 0) { + LOG.warn("Illegal value: there is no element in \"" + s + "\"."); + return null; + } + if (elements.length % 2 != 0) { + LOG.warn("Illegal value: the number of elements in \"" + s + "\" is " + + elements.length + " but an even number of elements is expected."); + return null; + } + + final List pairs + = new ArrayList(); + + for(int i = 0; i < elements.length; ) { + //parse the i-th sleep-time + final int sleep = parsePositiveInt(elements, i++, s); + if (sleep == -1) { + return null; //parse fails + } + + //parse the i-th number-of-retries + final int retries = parsePositiveInt(elements, i++, s); + if (retries == -1) { + return null; //parse fails + } + + pairs.add(new RetryPolicies.MultipleLinearRandomRetry.Pair(retries, sleep)); + } + return new RetryPolicies.MultipleLinearRandomRetry(pairs); + } + + /** + * Parse the i-th element as an integer. + * @return -1 if the parsing fails or the parsed value <= 0; + * otherwise, return the parsed value. + */ + private static int parsePositiveInt(final String[] elements, + final int i, final String originalString) { + final String s = elements[i].trim(); + final int n; + try { + n = Integer.parseInt(s); + } catch(NumberFormatException nfe) { + LOG.warn("Failed to parse \"" + s + "\", which is the index " + i + + " element in \"" + originalString + "\"", nfe); + return -1; + } + + if (n <= 0) { + LOG.warn("The value " + n + " <= 0: it is parsed from the string \"" + + s + "\" which is the index " + i + " element in \"" + + originalString + "\""); + return -1; + } + return n; + } + } + static class ExceptionDependentRetry implements RetryPolicy { RetryPolicy defaultPolicy; @@ -265,6 +476,14 @@ public class RetryPolicies { public ExponentialBackoffRetry( int maxRetries, long sleepTime, TimeUnit timeUnit) { super(maxRetries, sleepTime, timeUnit); + + if (maxRetries < 0) { + throw new IllegalArgumentException("maxRetries = " + maxRetries + " < 0"); + } else if (maxRetries >= Long.SIZE - 1) { + //calculateSleepTime may overflow. + throw new IllegalArgumentException("maxRetries = " + maxRetries + + " >= " + (Long.SIZE - 1)); + } } @Override @@ -353,11 +572,10 @@ public class RetryPolicies { * @param cap value at which to cap the base sleep time * @return an amount of time to sleep */ - @VisibleForTesting - public static long calculateExponentialTime(long time, int retries, + private static long calculateExponentialTime(long time, int retries, long cap) { - long baseTime = Math.min(time * ((long)1 << retries), cap); - return (long) (baseTime * (RAND.nextFloat() + 0.5)); + long baseTime = Math.min(time * (1L << retries), cap); + return (long) (baseTime * (RANDOM.get().nextDouble() + 0.5)); } private static long calculateExponentialTime(long time, int retries) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java index ed673e950f8..e1f38994579 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java @@ -60,6 +60,12 @@ public interface RetryPolicy { this.reason = reason; } + @Override + public String toString() { + return getClass().getSimpleName() + "(action=" + action + + ", delayMillis=" + delayMillis + ", reason=" + reason + ")"; + } + public enum RetryDecision { FAIL, RETRY, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java index 13e8a41eba3..3cc6a2ec2dc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java @@ -75,9 +75,10 @@ public class RetryProxy { */ public static Object create(Class iface, Object implementation, Map methodNameToPolicyMap) { - return RetryProxy.create(iface, + return create(iface, new DefaultFailoverProxyProvider(iface, implementation), - methodNameToPolicyMap); + methodNameToPolicyMap, + RetryPolicies.TRY_ONCE_THEN_FAIL); } /** @@ -92,11 +93,13 @@ public class RetryProxy { * @return the retry proxy */ public static Object create(Class iface, FailoverProxyProvider proxyProvider, - Map methodNameToPolicyMap) { + Map methodNameToPolicyMap, + RetryPolicy defaultPolicy) { return Proxy.newProxyInstance( proxyProvider.getInterface().getClassLoader(), new Class[] { iface }, - new RetryInvocationHandler(proxyProvider, methodNameToPolicyMap) + new RetryInvocationHandler(proxyProvider, defaultPolicy, + methodNameToPolicyMap) ); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index ef32cfde3a9..d382c99f616 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -18,47 +18,51 @@ package org.apache.hadoop.ipc; -import java.net.InetAddress; -import java.net.Socket; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.io.IOException; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.FilterInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; - +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.util.Hashtable; import java.util.Iterator; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; -import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.net.SocketFactory; -import org.apache.commons.logging.*; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SaslRpcClient; @@ -67,8 +71,8 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ReflectionUtils; @@ -80,8 +84,8 @@ import org.apache.hadoop.util.ReflectionUtils; */ public class Client { - public static final Log LOG = - LogFactory.getLog(Client.class); + public static final Log LOG = LogFactory.getLog(Client.class); + private Hashtable connections = new Hashtable(); @@ -228,8 +232,7 @@ public class Client { private int rpcTimeout; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs - private int maxRetries; //the max. no. of retries for socket connections - // the max. no. of retries for socket connections on time out exceptions + private final RetryPolicy connectionRetryPolicy; private int maxRetriesOnSocketTimeouts; private boolean tcpNoDelay; // if T then disable Nagle's Algorithm private boolean doPing; //do we need to send ping message @@ -253,7 +256,7 @@ public class Client { } this.rpcTimeout = remoteId.getRpcTimeout(); this.maxIdleTime = remoteId.getMaxIdleTime(); - this.maxRetries = remoteId.getMaxRetries(); + this.connectionRetryPolicy = remoteId.connectionRetryPolicy; this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts(); this.tcpNoDelay = remoteId.getTcpNoDelay(); this.doPing = remoteId.getDoPing(); @@ -488,7 +491,7 @@ public class Client { if (updateAddress()) { timeoutFailures = ioFailures = 0; } - handleConnectionFailure(ioFailures++, maxRetries, ie); + handleConnectionFailure(ioFailures++, ie); } } } @@ -680,8 +683,36 @@ public class Client { Thread.sleep(1000); } catch (InterruptedException ignored) {} - LOG.info("Retrying connect to server: " + server + - ". Already tried " + curRetries + " time(s)."); + LOG.info("Retrying connect to server: " + server + ". Already tried " + + curRetries + " time(s); maxRetries=" + maxRetries); + } + + private void handleConnectionFailure(int curRetries, IOException ioe + ) throws IOException { + closeConnection(); + + final RetryAction action; + try { + action = connectionRetryPolicy.shouldRetry(ioe, curRetries, 0, true); + } catch(Exception e) { + throw e instanceof IOException? (IOException)e: new IOException(e); + } + if (action.action == RetryAction.RetryDecision.FAIL) { + if (action.reason != null) { + LOG.warn("Failed to connect to server: " + server + ": " + + action.reason, ioe); + } + throw ioe; + } + + try { + Thread.sleep(action.delayMillis); + } catch (InterruptedException e) { + throw (IOException)new InterruptedIOException("Interrupted: action=" + + action + ", retry policy=" + connectionRetryPolicy).initCause(e); + } + LOG.info("Retrying connect to server: " + server + ". Already tried " + + curRetries + " time(s); retry policy is " + connectionRetryPolicy); } /** @@ -849,6 +880,10 @@ public class Client { try { RpcResponseHeaderProto response = RpcResponseHeaderProto.parseDelimitedFrom(in); + if (response == null) { + throw new IOException("Response is null."); + } + int callId = response.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); @@ -1287,7 +1322,7 @@ public class Client { private final String serverPrincipal; private final int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs - private final int maxRetries; //the max. no. of retries for socket connections + private final RetryPolicy connectionRetryPolicy; // the max. no. of retries for socket connections on time out exceptions private final int maxRetriesOnSocketTimeouts; private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm @@ -1297,7 +1332,7 @@ public class Client { ConnectionId(InetSocketAddress address, Class protocol, UserGroupInformation ticket, int rpcTimeout, String serverPrincipal, int maxIdleTime, - int maxRetries, int maxRetriesOnSocketTimeouts, + RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts, boolean tcpNoDelay, boolean doPing, int pingInterval) { this.protocol = protocol; this.address = address; @@ -1305,7 +1340,7 @@ public class Client { this.rpcTimeout = rpcTimeout; this.serverPrincipal = serverPrincipal; this.maxIdleTime = maxIdleTime; - this.maxRetries = maxRetries; + this.connectionRetryPolicy = connectionRetryPolicy; this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts; this.tcpNoDelay = tcpNoDelay; this.doPing = doPing; @@ -1336,10 +1371,6 @@ public class Client { return maxIdleTime; } - int getMaxRetries() { - return maxRetries; - } - /** max connection retries on socket time outs */ public int getMaxRetriesOnSocketTimeouts() { return maxRetriesOnSocketTimeouts; @@ -1357,6 +1388,12 @@ public class Client { return pingInterval; } + static ConnectionId getConnectionId(InetSocketAddress addr, + Class protocol, UserGroupInformation ticket, int rpcTimeout, + Configuration conf) throws IOException { + return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf); + } + /** * Returns a ConnectionId object. * @param addr Remote address for the connection. @@ -1367,9 +1404,18 @@ public class Client { * @return A ConnectionId instance * @throws IOException */ - public static ConnectionId getConnectionId(InetSocketAddress addr, + static ConnectionId getConnectionId(InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout, - Configuration conf) throws IOException { + RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException { + + if (connectionRetryPolicy == null) { + final int max = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT); + connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( + max, 1, TimeUnit.SECONDS); + } + String remotePrincipal = getRemotePrincipal(conf, addr, protocol); boolean doPing = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); @@ -1377,8 +1423,7 @@ public class Client { rpcTimeout, remotePrincipal, conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT), - conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT), + connectionRetryPolicy, conf.getInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT), @@ -1421,7 +1466,7 @@ public class Client { return isEqual(this.address, that.address) && this.doPing == that.doPing && this.maxIdleTime == that.maxIdleTime - && this.maxRetries == that.maxRetries + && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy) && this.pingInterval == that.pingInterval && isEqual(this.protocol, that.protocol) && this.rpcTimeout == that.rpcTimeout @@ -1434,11 +1479,10 @@ public class Client { @Override public int hashCode() { - int result = 1; + int result = connectionRetryPolicy.hashCode(); result = PRIME * result + ((address == null) ? 0 : address.hashCode()); result = PRIME * result + (doPing ? 1231 : 1237); result = PRIME * result + maxIdleTime; - result = PRIME * result + maxRetries; result = PRIME * result + pingInterval; result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode()); result = PRIME * result + rpcTimeout; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 2d3f91e5e4c..a63a72d36d7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -36,9 +36,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputOutputStream; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; - import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -66,15 +66,24 @@ public class ProtobufRpcEngine implements RpcEngine { private static final ClientCache CLIENTS = new ClientCache(); + public ProtocolProxy getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, + rpcTimeout, null); + } + @Override @SuppressWarnings("unchecked") public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException { + SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy + ) throws IOException { - return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance(protocol - .getClassLoader(), new Class[] { protocol }, new Invoker(protocol, - addr, ticket, conf, factory, rpcTimeout)), false); + final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, + rpcTimeout, connectionRetryPolicy); + return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[]{protocol}, invoker), false); } @Override @@ -97,11 +106,12 @@ public class ProtobufRpcEngine implements RpcEngine { private final long clientProtocolVersion; private final String protocolName; - public Invoker(Class protocol, InetSocketAddress addr, + private Invoker(Class protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout) throws IOException { - this(protocol, Client.ConnectionId.getConnectionId(addr, protocol, - ticket, rpcTimeout, conf), conf, factory); + int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { + this(protocol, Client.ConnectionId.getConnectionId( + addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), + conf, factory); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 56fbd7d5a1b..6a8a71f83ad 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -41,6 +41,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.io.*; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.net.NetUtils; @@ -326,7 +327,7 @@ public class RPC { long clientVersion, InetSocketAddress addr, Configuration conf, long connTimeout) throws IOException { - return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, connTimeout); + return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout); } /** @@ -347,7 +348,7 @@ public class RPC { int rpcTimeout, long timeout) throws IOException { return waitForProtocolProxy(protocol, clientVersion, addr, - conf, rpcTimeout, timeout).getProxy(); + conf, rpcTimeout, null, timeout).getProxy(); } /** @@ -367,6 +368,7 @@ public class RPC { long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout, + RetryPolicy connectionRetryPolicy, long timeout) throws IOException { long startTime = System.currentTimeMillis(); IOException ioe; @@ -374,7 +376,7 @@ public class RPC { try { return getProtocolProxy(protocol, clientVersion, addr, UserGroupInformation.getCurrentUser(), conf, NetUtils - .getDefaultSocketFactory(conf), rpcTimeout); + .getDefaultSocketFactory(conf), rpcTimeout, connectionRetryPolicy); } catch(ConnectException se) { // namenode has not been started LOG.info("Server at " + addr + " not available yet, Zzzzz..."); ioe = se; @@ -463,7 +465,7 @@ public class RPC { Configuration conf, SocketFactory factory) throws IOException { return getProtocolProxy( - protocol, clientVersion, addr, ticket, conf, factory, 0); + protocol, clientVersion, addr, ticket, conf, factory, 0, null); } /** @@ -489,7 +491,7 @@ public class RPC { SocketFactory factory, int rpcTimeout) throws IOException { return getProtocolProxy(protocol, clientVersion, addr, ticket, - conf, factory, rpcTimeout).getProxy(); + conf, factory, rpcTimeout, null).getProxy(); } /** @@ -512,12 +514,13 @@ public class RPC { UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout) throws IOException { + int rpcTimeout, + RetryPolicy connectionRetryPolicy) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } - return getProtocolEngine(protocol,conf).getProxy(protocol, - clientVersion, addr, ticket, conf, factory, rpcTimeout); + return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, + addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java index d431b4a898c..f74aa881d20 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java @@ -97,8 +97,9 @@ public class RemoteException extends IOException { return new RemoteException(attrs.getValue("class"), attrs.getValue("message")); } - + + @Override public String toString() { - return className + ": " + getMessage(); + return getClass().getName() + "(" + className + "): " + getMessage(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index 09980da452c..5dc48adef28 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -26,6 +26,7 @@ import javax.net.SocketFactory; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -40,7 +41,8 @@ public interface RpcEngine { ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException; + SocketFactory factory, int rpcTimeout, + RetryPolicy connectionRetryPolicy) throws IOException; /** Expert: Make multiple, parallel calls to a set of servers. */ Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 6fd800cad6e..45b9cb199a5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -31,6 +31,7 @@ import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.VersionedProtocol; @@ -259,9 +260,14 @@ public class WritableRpcEngine implements RpcEngine { public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout) + int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { + if (connectionRetryPolicy != null) { + throw new UnsupportedOperationException( + "Not supported: connectionRetryPolicy=" + connectionRetryPolicy); + } + T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index cc0c5c9f54c..5d3d335b32d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -18,50 +18,55 @@ package org.apache.hadoop.ipc; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.Closeable; import java.io.IOException; -import java.net.ConnectException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.net.ConnectException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Arrays; import javax.net.SocketFactory; -import org.apache.commons.logging.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.Client.ConnectionId; -import org.apache.hadoop.ipc.TestSaslRPC.TestSaslImpl; -import org.apache.hadoop.ipc.TestSaslRPC.TestSaslProtocol; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.MockitoUtil; import org.junit.Test; -import static org.junit.Assert.*; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.DescriptorProtos.EnumDescriptorProto; -import static org.apache.hadoop.test.MetricsAsserts.*; - /** Unit tests for RPC. */ @SuppressWarnings("deprecation") public class TestRPC { @@ -250,7 +255,8 @@ public class TestRPC { @Override public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException { + SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy + ) throws IOException { T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new StoppedInvocationHandler()); return new ProtocolProxy(protocol, proxy, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 75d31f83b73..74daf5f7ddf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -70,6 +70,10 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3520. Add transfer rate logging to TransferFsImage. (eli) + HDFS-3504. Support configurable retry policy in DFSClient for RPC + connections and RPC calls, and add MultipleLinearRandomRetry, a new retry + policy. (szetszwo) + OPTIMIZATIONS HDFS-2982. Startup performance suffers when there are many edit log diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index e5a62511354..946150fbff8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -38,6 +38,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096; public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum"; public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512; + public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.client.retry.policy.enabled"; + public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false; + public static final String DFS_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.client.retry.policy.spec"; + public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,... public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type"; public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C"; public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index 3a40fe78817..1de42a5c366 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -47,10 +47,12 @@ import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolPB; import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -66,6 +68,7 @@ import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol; import com.google.common.base.Preconditions; +import com.google.protobuf.ServiceException; /** * Create proxy objects to communicate with a remote NN. All remote access to an @@ -240,12 +243,106 @@ public class NameNodeProxies { return new NamenodeProtocolTranslatorPB(proxy); } + /** + * Return the default retry policy used in RPC. + * + * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL. + * + * Otherwise, first unwrap ServiceException if possible, and then + * (1) use multipleLinearRandomRetry for + * - SafeModeException, or + * - IOException other than RemoteException, or + * - ServiceException; and + * (2) use TRY_ONCE_THEN_FAIL for + * - non-SafeMode RemoteException, or + * - non-IOException. + * + * Note that dfs.client.retry.max < 0 is not allowed. + */ + private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) { + final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry); + } + if (multipleLinearRandomRetry == null) { + //no retry + return RetryPolicies.TRY_ONCE_THEN_FAIL; + } else { + return new RetryPolicy() { + @Override + public RetryAction shouldRetry(Exception e, int retries, int failovers, + boolean isMethodIdempotent) throws Exception { + if (e instanceof ServiceException) { + //unwrap ServiceException + final Throwable cause = e.getCause(); + if (cause != null && cause instanceof Exception) { + e = (Exception)cause; + } + } + + //see (1) and (2) in the javadoc of this method. + final RetryPolicy p; + if (e instanceof RemoteException) { + final RemoteException re = (RemoteException)e; + p = SafeModeException.class.getName().equals(re.getClassName())? + multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL; + } else if (e instanceof IOException || e instanceof ServiceException) { + p = multipleLinearRandomRetry; + } else { //non-IOException + p = RetryPolicies.TRY_ONCE_THEN_FAIL; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("RETRY " + retries + ") policy=" + + p.getClass().getSimpleName() + ", exception=" + e); + } + LOG.info("RETRY " + retries + ") policy=" + + p.getClass().getSimpleName() + ", exception=" + e); + return p.shouldRetry(e, retries, failovers, isMethodIdempotent); + } + }; + } + } + + /** + * Return the MultipleLinearRandomRetry policy specified in the conf, + * or null if the feature is disabled. + * If the policy is specified in the conf but the policy cannot be parsed, + * the default policy is returned. + * + * Conf property: N pairs of sleep-time and number-of-retries + * dfs.client.retry.policy = "s1,n1,s2,n2,..." + */ + private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) { + final boolean enabled = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT); + if (!enabled) { + return null; + } + + final String policy = conf.get( + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY, + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT); + + final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy); + return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString( + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT); + } + private static ClientProtocol createNNProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries) throws IOException { - ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB) NameNodeProxies - .createNameNodeProxy(address, conf, ugi, ClientNamenodeProtocolPB.class); + RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); + + final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf); + final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); + ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( + ClientNamenodeProtocolPB.class, version, address, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), 0, defaultPolicy).getProxy(); + if (withRetries) { // create the proxy with retries + RetryPolicy createPolicy = RetryPolicies .retryUpToMaximumCountWithFixedSleep(5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); @@ -258,17 +355,21 @@ public class NameNodeProxies { Map, RetryPolicy> exceptionToPolicyMap = new HashMap, RetryPolicy>(); exceptionToPolicyMap.put(RemoteException.class, RetryPolicies - .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL, + .retryByRemoteException(defaultPolicy, remoteExceptionToPolicyMap)); RetryPolicy methodPolicy = RetryPolicies.retryByException( - RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + defaultPolicy, exceptionToPolicyMap); Map methodNameToPolicyMap = new HashMap(); methodNameToPolicyMap.put("create", methodPolicy); - proxy = (ClientNamenodeProtocolPB) RetryProxy - .create(ClientNamenodeProtocolPB.class, proxy, methodNameToPolicyMap); + proxy = (ClientNamenodeProtocolPB) RetryProxy.create( + ClientNamenodeProtocolPB.class, + new DefaultFailoverProxyProvider( + ClientNamenodeProtocolPB.class, proxy), + methodNameToPolicyMap, + defaultPolicy); } return new ClientNamenodeProtocolTranslatorPB(proxy); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index d57f792472e..746d6ac33a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -25,8 +25,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; @@ -39,6 +37,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; @@ -66,12 +66,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; -import org.apache.hadoop.ha.HAServiceProtocolHelper; -import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; -import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -1375,7 +1372,6 @@ public class MiniDFSCluster { waitClusterUp(); LOG.info("Restarted the namenode"); waitActive(); - LOG.info("Cluster is active"); } } @@ -1751,6 +1747,7 @@ public class MiniDFSCluster { } } } + LOG.info("Cluster is active"); } private synchronized boolean shouldWait(DatanodeInfo[] dnInfo, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index e04a6496b5b..ddc19843ad3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -25,44 +25,51 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import java.net.SocketTimeoutException; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.LongWritable; +import java.io.FileNotFoundException; import java.io.IOException; -import java.net.InetSocketAddress; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; -import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; +import org.apache.log4j.Level; import org.mockito.Mockito; import org.mockito.internal.stubbing.answers.ThrowsException; import org.mockito.invocation.InvocationOnMock; @@ -339,7 +346,7 @@ public class TestDFSClientRetries extends TestCase { // We shouldn't have gained an extra block by the RPC. assertEquals(blockCount, blockCount2); - return (LocatedBlock) ret2; + return ret2; } }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any()); @@ -744,5 +751,149 @@ public class TestDFSClientRetries extends TestCase { server.stop(); } } -} + /** Test client retry with namenode restarting. */ + public void testNamenodeRestart() throws Exception { + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + + final List exceptions = new ArrayList(); + + final Path dir = new Path("/testNamenodeRestart"); + + final Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true); + + final short numDatanodes = 3; + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes) + .build(); + try { + cluster.waitActive(); + + //create a file + final DistributedFileSystem dfs = cluster.getFileSystem(); + final long length = 1L << 20; + final Path file1 = new Path(dir, "foo"); + DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L); + + //get file status + final FileStatus s1 = dfs.getFileStatus(file1); + assertEquals(length, s1.getLen()); + + //shutdown namenode + cluster.shutdownNameNode(0); + + //namenode is down, create another file in a thread + final Path file3 = new Path(dir, "file"); + final Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + //it should retry till namenode is up. + final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf); + DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L); + } catch (Exception e) { + exceptions.add(e); + } + } + }); + thread.start(); + + //restart namenode in a new thread + new Thread(new Runnable() { + @Override + public void run() { + try { + //sleep, restart, and then wait active + TimeUnit.SECONDS.sleep(30); + cluster.restartNameNode(0, false); + cluster.waitActive(); + } catch (Exception e) { + exceptions.add(e); + } + } + }).start(); + + //namenode is down, it should retry until namenode is up again. + final FileStatus s2 = dfs.getFileStatus(file1); + assertEquals(s1, s2); + + //check file1 and file3 + thread.join(); + assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3)); + + //enter safe mode + dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + + //leave safe mode in a new thread + new Thread(new Runnable() { + @Override + public void run() { + try { + //sleep and then leave safe mode + TimeUnit.SECONDS.sleep(30); + dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + } catch (Exception e) { + exceptions.add(e); + } + } + }).start(); + + //namenode is in safe mode, create should retry until it leaves safe mode. + final Path file2 = new Path(dir, "bar"); + DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L); + assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2)); + + //make sure it won't retry on exceptions like FileNotFoundException + final Path nonExisting = new Path(dir, "nonExisting"); + LOG.info("setPermission: " + nonExisting); + try { + dfs.setPermission(nonExisting, new FsPermission((short)0)); + fail(); + } catch(FileNotFoundException fnfe) { + LOG.info("GOOD!", fnfe); + } + + if (!exceptions.isEmpty()) { + LOG.error("There are " + exceptions.size() + " exception(s):"); + for(int i = 0; i < exceptions.size(); i++) { + LOG.error("Exception " + i, exceptions.get(i)); + } + fail(); + } + } finally { + cluster.shutdown(); + } + } + + public void testMultipleLinearRandomRetry() { + parseMultipleLinearRandomRetry(null, ""); + parseMultipleLinearRandomRetry(null, "11"); + parseMultipleLinearRandomRetry(null, "11,22,33"); + parseMultipleLinearRandomRetry(null, "11,22,33,44,55"); + parseMultipleLinearRandomRetry(null, "AA"); + parseMultipleLinearRandomRetry(null, "11,AA"); + parseMultipleLinearRandomRetry(null, "11,22,33,FF"); + parseMultipleLinearRandomRetry(null, "11,-22"); + parseMultipleLinearRandomRetry(null, "-11,22"); + + parseMultipleLinearRandomRetry("[22x11ms]", + "11,22"); + parseMultipleLinearRandomRetry("[22x11ms, 44x33ms]", + "11,22,33,44"); + parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]", + "11,22,33,44,55,66"); + parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]", + " 11, 22, 33, 44, 55, 66 "); + } + + static void parseMultipleLinearRandomRetry(String expected, String s) { + final MultipleLinearRandomRetry r = MultipleLinearRandomRetry.parseCommaSeparatedString(s); + LOG.info("input=" + s + ", parsed=" + r + ", expected=" + expected); + if (r == null) { + assertEquals(expected, null); + } else { + assertEquals("MultipleLinearRandomRetry" + expected, r.toString()); + } + } +}