From 94ac5ba669941a59b61369c634e7c5548ac017b4 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Wed, 5 Mar 2014 23:12:22 +0000 Subject: [PATCH] Merge r1574716 from trunk. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1574717 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 +++ .../retry/DefaultFailoverProxyProvider.java | 4 +-- .../io/retry/FailoverProxyProvider.java | 18 ++++++++++--- .../io/retry/RetryInvocationHandler.java | 26 +++++++++++-------- .../hadoop/io/retry/TestFailoverProxy.java | 4 +-- .../io/retry/UnreliableImplementation.java | 7 ++++- .../ha/ConfiguredFailoverProxyProvider.java | 7 +++-- .../ConfiguredRMFailoverProxyProvider.java | 4 +-- 8 files changed, 48 insertions(+), 25 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index bae3e015ac5..59effb62992 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -64,6 +64,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10211. Enable RPC protocol to negotiate SASL-QOP values between clients and servers. (Benoy Antony via Arpit Agarwal) + HADOOP-10386. Log proxy hostname in various exceptions being thrown in a HA + setup. (wheat9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java index ae37d0bed4a..4e97314931f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java @@ -43,8 +43,8 @@ public Class getInterface() { } @Override - public T getProxy() { - return proxy; + public ProxyInfo getProxy() { + return new ProxyInfo(proxy, null); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java index c267d028798..5acb936aad5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java @@ -30,6 +30,18 @@ */ @InterfaceStability.Evolving public interface FailoverProxyProvider extends Closeable { + public static final class ProxyInfo { + public final T proxy; + /* + * The information (e.g., the IP address) of the current proxy object. It + * provides information for debugging purposes. + */ + public final String proxyInfo; + public ProxyInfo(T proxy, String proxyInfo) { + this.proxy = proxy; + this.proxyInfo = proxyInfo; + } + } /** * Get the proxy object which should be used until the next failover event @@ -37,14 +49,14 @@ public interface FailoverProxyProvider extends Closeable { * * @return the proxy object to invoke methods upon */ - public T getProxy(); + public ProxyInfo getProxy(); /** * Called whenever the associated {@link RetryPolicy} determines that an error * warrants failing over. * - * @param currentProxy the proxy object which was being used before this - * failover event + * @param currentProxy + * the proxy object which was being used before this failover event */ public void performFailover(T currentProxy); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index e2401a5e44b..2131fd80e80 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client.ConnectionId; @@ -57,7 +58,7 @@ public class RetryInvocationHandler implements RpcInvocationHandler { private final RetryPolicy defaultPolicy; private final Map methodNameToPolicyMap; - private T currentProxy; + private ProxyInfo currentProxy; protected RetryInvocationHandler(FailoverProxyProvider proxyProvider, RetryPolicy retryPolicy) { @@ -83,7 +84,7 @@ public Object invoke(Object proxy, Method method, Object[] args) // The number of times this method invocation has been failed over. int invocationFailoverCount = 0; - final boolean isRpc = isRpcInvocation(currentProxy); + final boolean isRpc = isRpcInvocation(currentProxy.proxy); final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID; int retries = 0; while (true) { @@ -115,9 +116,9 @@ public Object invoke(Object proxy, Method method, Object[] args) invocationFailoverCount, isIdempotentOrAtMostOnce); if (action.action == RetryAction.RetryDecision.FAIL) { if (action.reason != null) { - LOG.warn("Exception while invoking " + - currentProxy.getClass() + "." + method.getName() + - ". Not retrying because " + action.reason, e); + LOG.warn("Exception while invoking " + currentProxy.proxy.getClass() + + "." + method.getName() + " over " + currentProxy.proxyInfo + + ". Not retrying because " + action.reason, e); } throw e; } else { // retry or failover @@ -130,7 +131,9 @@ public Object invoke(Object proxy, Method method, Object[] args) if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY && worthLogging) { String msg = "Exception while invoking " + method.getName() - + " of class " + currentProxy.getClass().getSimpleName(); + + " of class " + currentProxy.proxy.getClass().getSimpleName() + + " over " + currentProxy.proxyInfo; + if (invocationFailoverCount > 0) { msg += " after " + invocationFailoverCount + " fail over attempts"; } @@ -141,8 +144,9 @@ public Object invoke(Object proxy, Method method, Object[] args) } else { if(LOG.isDebugEnabled()) { LOG.debug("Exception while invoking " + method.getName() - + " of class " + currentProxy.getClass().getSimpleName() + - ". Retrying " + formatSleepMessage(action.delayMillis), e); + + " of class " + currentProxy.proxy.getClass().getSimpleName() + + " over " + currentProxy.proxyInfo + ". Retrying " + + formatSleepMessage(action.delayMillis), e); } } @@ -155,7 +159,7 @@ public Object invoke(Object proxy, Method method, Object[] args) // single actual fail over. synchronized (proxyProvider) { if (invocationAttemptFailoverCount == proxyProviderFailoverCount) { - proxyProvider.performFailover(currentProxy); + proxyProvider.performFailover(currentProxy.proxy); proxyProviderFailoverCount++; currentProxy = proxyProvider.getProxy(); } else { @@ -183,7 +187,7 @@ protected Object invokeMethod(Method method, Object[] args) throws Throwable { if (!method.isAccessible()) { method.setAccessible(true); } - return method.invoke(currentProxy, args); + return method.invoke(currentProxy.proxy, args); } catch (InvocationTargetException e) { throw e.getCause(); } @@ -208,7 +212,7 @@ public void close() throws IOException { @Override //RpcInvocationHandler public ConnectionId getConnectionId() { - return RPC.getConnectionIdForProxy(currentProxy); + return RPC.getConnectionIdForProxy(currentProxy.proxy); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java index 3b4ebae4812..7d55fe1c13c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java @@ -48,8 +48,8 @@ public FlipFlopProxyProvider(Class iface, T activeImpl, } @Override - public T getProxy() { - return currentlyActive; + public ProxyInfo getProxy() { + return new ProxyInfo(currentlyActive, currentlyActive.toString()); } @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java index 5b77698b100..ce9c16ea602 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java @@ -22,7 +22,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; -public class UnreliableImplementation implements UnreliableInterface { +class UnreliableImplementation implements UnreliableInterface { private int failsOnceInvocationCount, failsOnceWithValueInvocationCount, @@ -154,6 +154,11 @@ public void nonIdempotentVoidFailsIfIdentifierDoesntMatch(String identifier) } } + @Override + public String toString() { + return getClass().getSimpleName() + "[" + identifier + "]"; + } + private static void throwAppropriateException(TypeOfExceptionToFailWith eType, String message) throws UnreliableException, StandbyException, IOException { switch (eType) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index eab36481e1a..ebe6ef87a5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -116,10 +116,9 @@ public Class getInterface() { /** * Lazily initialize the RPC proxy object. */ - @SuppressWarnings("unchecked") @Override - public synchronized T getProxy() { - AddressRpcProxyPair current = proxies.get(currentProxyIndex); + public synchronized ProxyInfo getProxy() { + AddressRpcProxyPair current = proxies.get(currentProxyIndex); if (current.namenode == null) { try { current.namenode = NameNodeProxies.createNonHAProxy(conf, @@ -129,7 +128,7 @@ public synchronized T getProxy() { throw new RuntimeException(e); } } - return (T)current.namenode; + return new ProxyInfo((T)current.namenode, current.address.toString()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java index ef56edd4293..5577d207754 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java @@ -83,14 +83,14 @@ private T getProxyInternal() { } @Override - public synchronized T getProxy() { + public synchronized ProxyInfo getProxy() { String rmId = rmServiceIds[currentProxyIndex]; T current = proxies.get(rmId); if (current == null) { current = getProxyInternal(); proxies.put(rmId, current); } - return current; + return new ProxyInfo(current, rmId); } @Override