Merge r1574716 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1574717 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Haohui Mai 2014-03-05 23:12:22 +00:00
parent 2f3536ed12
commit 94ac5ba669
8 changed files with 48 additions and 25 deletions

View File

@ -64,6 +64,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10211. Enable RPC protocol to negotiate SASL-QOP values between HADOOP-10211. Enable RPC protocol to negotiate SASL-QOP values between
clients and servers. (Benoy Antony via Arpit Agarwal) clients and servers. (Benoy Antony via Arpit Agarwal)
HADOOP-10386. Log proxy hostname in various exceptions being thrown in a HA
setup. (wheat9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -43,8 +43,8 @@ public class DefaultFailoverProxyProvider<T> implements FailoverProxyProvider<T>
} }
@Override @Override
public T getProxy() { public ProxyInfo<T> getProxy() {
return proxy; return new ProxyInfo<T>(proxy, null);
} }
@Override @Override

View File

@ -30,6 +30,18 @@ import org.apache.hadoop.classification.InterfaceStability;
*/ */
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface FailoverProxyProvider<T> extends Closeable { public interface FailoverProxyProvider<T> extends Closeable {
public static final class ProxyInfo<T> {
public final T proxy;
/*
* The information (e.g., the IP address) of the current proxy object. It
* provides information for debugging purposes.
*/
public final String proxyInfo;
public ProxyInfo(T proxy, String proxyInfo) {
this.proxy = proxy;
this.proxyInfo = proxyInfo;
}
}
/** /**
* Get the proxy object which should be used until the next failover event * Get the proxy object which should be used until the next failover event
@ -37,14 +49,14 @@ public interface FailoverProxyProvider<T> extends Closeable {
* *
* @return the proxy object to invoke methods upon * @return the proxy object to invoke methods upon
*/ */
public T getProxy(); public ProxyInfo<T> getProxy();
/** /**
* Called whenever the associated {@link RetryPolicy} determines that an error * Called whenever the associated {@link RetryPolicy} determines that an error
* warrants failing over. * warrants failing over.
* *
* @param currentProxy the proxy object which was being used before this * @param currentProxy
* failover event * the proxy object which was being used before this failover event
*/ */
public void performFailover(T currentProxy); public void performFailover(T currentProxy);

View File

@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
@ -57,7 +58,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
private final RetryPolicy defaultPolicy; private final RetryPolicy defaultPolicy;
private final Map<String,RetryPolicy> methodNameToPolicyMap; private final Map<String,RetryPolicy> methodNameToPolicyMap;
private T currentProxy; private ProxyInfo<T> currentProxy;
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider, protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy retryPolicy) { RetryPolicy retryPolicy) {
@ -83,7 +84,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
// The number of times this method invocation has been failed over. // The number of times this method invocation has been failed over.
int invocationFailoverCount = 0; int invocationFailoverCount = 0;
final boolean isRpc = isRpcInvocation(currentProxy); final boolean isRpc = isRpcInvocation(currentProxy.proxy);
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID; final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
int retries = 0; int retries = 0;
while (true) { while (true) {
@ -115,9 +116,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
invocationFailoverCount, isIdempotentOrAtMostOnce); invocationFailoverCount, isIdempotentOrAtMostOnce);
if (action.action == RetryAction.RetryDecision.FAIL) { if (action.action == RetryAction.RetryDecision.FAIL) {
if (action.reason != null) { if (action.reason != null) {
LOG.warn("Exception while invoking " + LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
currentProxy.getClass() + "." + method.getName() + + "." + method.getName() + " over " + currentProxy.proxyInfo
". Not retrying because " + action.reason, e); + ". Not retrying because " + action.reason, e);
} }
throw e; throw e;
} else { // retry or failover } else { // retry or failover
@ -130,7 +131,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY && if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY &&
worthLogging) { worthLogging) {
String msg = "Exception while invoking " + method.getName() String msg = "Exception while invoking " + method.getName()
+ " of class " + currentProxy.getClass().getSimpleName(); + " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo;
if (invocationFailoverCount > 0) { if (invocationFailoverCount > 0) {
msg += " after " + invocationFailoverCount + " fail over attempts"; msg += " after " + invocationFailoverCount + " fail over attempts";
} }
@ -141,8 +144,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
} else { } else {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Exception while invoking " + method.getName() LOG.debug("Exception while invoking " + method.getName()
+ " of class " + currentProxy.getClass().getSimpleName() + + " of class " + currentProxy.proxy.getClass().getSimpleName()
". Retrying " + formatSleepMessage(action.delayMillis), e); + " over " + currentProxy.proxyInfo + ". Retrying "
+ formatSleepMessage(action.delayMillis), e);
} }
} }
@ -155,7 +159,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
// single actual fail over. // single actual fail over.
synchronized (proxyProvider) { synchronized (proxyProvider) {
if (invocationAttemptFailoverCount == proxyProviderFailoverCount) { if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
proxyProvider.performFailover(currentProxy); proxyProvider.performFailover(currentProxy.proxy);
proxyProviderFailoverCount++; proxyProviderFailoverCount++;
currentProxy = proxyProvider.getProxy(); currentProxy = proxyProvider.getProxy();
} else { } else {
@ -183,7 +187,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
if (!method.isAccessible()) { if (!method.isAccessible()) {
method.setAccessible(true); method.setAccessible(true);
} }
return method.invoke(currentProxy, args); return method.invoke(currentProxy.proxy, args);
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
throw e.getCause(); throw e.getCause();
} }
@ -208,7 +212,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
@Override //RpcInvocationHandler @Override //RpcInvocationHandler
public ConnectionId getConnectionId() { public ConnectionId getConnectionId() {
return RPC.getConnectionIdForProxy(currentProxy); return RPC.getConnectionIdForProxy(currentProxy.proxy);
} }
} }

View File

@ -48,8 +48,8 @@ public class TestFailoverProxy {
} }
@Override @Override
public T getProxy() { public ProxyInfo<T> getProxy() {
return currentlyActive; return new ProxyInfo<T>(currentlyActive, currentlyActive.toString());
} }
@Override @Override

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
public class UnreliableImplementation implements UnreliableInterface { class UnreliableImplementation implements UnreliableInterface {
private int failsOnceInvocationCount, private int failsOnceInvocationCount,
failsOnceWithValueInvocationCount, failsOnceWithValueInvocationCount,
@ -154,6 +154,11 @@ public class UnreliableImplementation implements UnreliableInterface {
} }
} }
@Override
public String toString() {
return getClass().getSimpleName() + "[" + identifier + "]";
}
private static void throwAppropriateException(TypeOfExceptionToFailWith eType, private static void throwAppropriateException(TypeOfExceptionToFailWith eType,
String message) throws UnreliableException, StandbyException, IOException { String message) throws UnreliableException, StandbyException, IOException {
switch (eType) { switch (eType) {

View File

@ -116,10 +116,9 @@ public class ConfiguredFailoverProxyProvider<T> implements
/** /**
* Lazily initialize the RPC proxy object. * Lazily initialize the RPC proxy object.
*/ */
@SuppressWarnings("unchecked")
@Override @Override
public synchronized T getProxy() { public synchronized ProxyInfo<T> getProxy() {
AddressRpcProxyPair current = proxies.get(currentProxyIndex); AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
if (current.namenode == null) { if (current.namenode == null) {
try { try {
current.namenode = NameNodeProxies.createNonHAProxy(conf, current.namenode = NameNodeProxies.createNonHAProxy(conf,
@ -129,7 +128,7 @@ public class ConfiguredFailoverProxyProvider<T> implements
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
return (T)current.namenode; return new ProxyInfo<T>((T)current.namenode, current.address.toString());
} }
@Override @Override

View File

@ -83,14 +83,14 @@ public class ConfiguredRMFailoverProxyProvider<T>
} }
@Override @Override
public synchronized T getProxy() { public synchronized ProxyInfo<T> getProxy() {
String rmId = rmServiceIds[currentProxyIndex]; String rmId = rmServiceIds[currentProxyIndex];
T current = proxies.get(rmId); T current = proxies.get(rmId);
if (current == null) { if (current == null) {
current = getProxyInternal(); current = getProxyInternal();
proxies.put(rmId, current); proxies.put(rmId, current);
} }
return current; return new ProxyInfo<T>(current, rmId);
} }
@Override @Override