HADOOP-10386. Log proxy hostname in various exceptions being thrown in a HA setup. Contributed by Haohui Mai.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1574716 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
097e8b205e
commit
ad61eec072
|
@ -367,6 +367,9 @@ Release 2.4.0 - UNRELEASED
|
|||
HADOOP-10211. Enable RPC protocol to negotiate SASL-QOP values between
|
||||
clients and servers. (Benoy Antony via Arpit Agarwal)
|
||||
|
||||
HADOOP-10386. Log proxy hostname in various exceptions being thrown in a HA
|
||||
setup. (wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -43,8 +43,8 @@ public class DefaultFailoverProxyProvider<T> implements FailoverProxyProvider<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public T getProxy() {
|
||||
return proxy;
|
||||
public ProxyInfo<T> getProxy() {
|
||||
return new ProxyInfo<T>(proxy, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -30,6 +30,18 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public interface FailoverProxyProvider<T> extends Closeable {
|
||||
public static final class ProxyInfo<T> {
|
||||
public final T proxy;
|
||||
/*
|
||||
* The information (e.g., the IP address) of the current proxy object. It
|
||||
* provides information for debugging purposes.
|
||||
*/
|
||||
public final String proxyInfo;
|
||||
public ProxyInfo(T proxy, String proxyInfo) {
|
||||
this.proxy = proxy;
|
||||
this.proxyInfo = proxyInfo;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the proxy object which should be used until the next failover event
|
||||
|
@ -37,14 +49,14 @@ public interface FailoverProxyProvider<T> extends Closeable {
|
|||
*
|
||||
* @return the proxy object to invoke methods upon
|
||||
*/
|
||||
public T getProxy();
|
||||
public ProxyInfo<T> getProxy();
|
||||
|
||||
/**
|
||||
* Called whenever the associated {@link RetryPolicy} determines that an error
|
||||
* warrants failing over.
|
||||
*
|
||||
* @param currentProxy the proxy object which was being used before this
|
||||
* failover event
|
||||
* @param currentProxy
|
||||
* the proxy object which was being used before this failover event
|
||||
*/
|
||||
public void performFailover(T currentProxy);
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Map;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
|
@ -56,7 +57,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|||
|
||||
private final RetryPolicy defaultPolicy;
|
||||
private final Map<String,RetryPolicy> methodNameToPolicyMap;
|
||||
private T currentProxy;
|
||||
private ProxyInfo<T> currentProxy;
|
||||
|
||||
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
|
||||
RetryPolicy retryPolicy) {
|
||||
|
@ -82,7 +83,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|||
|
||||
// The number of times this method invocation has been failed over.
|
||||
int invocationFailoverCount = 0;
|
||||
final boolean isRpc = isRpcInvocation(currentProxy);
|
||||
final boolean isRpc = isRpcInvocation(currentProxy.proxy);
|
||||
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
|
||||
int retries = 0;
|
||||
while (true) {
|
||||
|
@ -114,9 +115,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|||
invocationFailoverCount, isIdempotentOrAtMostOnce);
|
||||
if (action.action == RetryAction.RetryDecision.FAIL) {
|
||||
if (action.reason != null) {
|
||||
LOG.warn("Exception while invoking " +
|
||||
currentProxy.getClass() + "." + method.getName() +
|
||||
". Not retrying because " + action.reason, e);
|
||||
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
|
||||
+ "." + method.getName() + " over " + currentProxy.proxyInfo
|
||||
+ ". Not retrying because " + action.reason, e);
|
||||
}
|
||||
throw e;
|
||||
} else { // retry or failover
|
||||
|
@ -129,7 +130,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|||
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY &&
|
||||
worthLogging) {
|
||||
String msg = "Exception while invoking " + method.getName()
|
||||
+ " of class " + currentProxy.getClass().getSimpleName();
|
||||
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
|
||||
+ " over " + currentProxy.proxyInfo;
|
||||
|
||||
if (invocationFailoverCount > 0) {
|
||||
msg += " after " + invocationFailoverCount + " fail over attempts";
|
||||
}
|
||||
|
@ -140,8 +143,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|||
} else {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Exception while invoking " + method.getName()
|
||||
+ " of class " + currentProxy.getClass().getSimpleName() +
|
||||
". Retrying " + formatSleepMessage(action.delayMillis), e);
|
||||
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
|
||||
+ " over " + currentProxy.proxyInfo + ". Retrying "
|
||||
+ formatSleepMessage(action.delayMillis), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,7 +158,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|||
// single actual fail over.
|
||||
synchronized (proxyProvider) {
|
||||
if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
|
||||
proxyProvider.performFailover(currentProxy);
|
||||
proxyProvider.performFailover(currentProxy.proxy);
|
||||
proxyProviderFailoverCount++;
|
||||
currentProxy = proxyProvider.getProxy();
|
||||
} else {
|
||||
|
@ -182,7 +186,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|||
if (!method.isAccessible()) {
|
||||
method.setAccessible(true);
|
||||
}
|
||||
return method.invoke(currentProxy, args);
|
||||
return method.invoke(currentProxy.proxy, args);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
|
@ -207,7 +211,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
|||
|
||||
@Override //RpcInvocationHandler
|
||||
public ConnectionId getConnectionId() {
|
||||
return RPC.getConnectionIdForProxy(currentProxy);
|
||||
return RPC.getConnectionIdForProxy(currentProxy.proxy);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -48,8 +48,8 @@ public class TestFailoverProxy {
|
|||
}
|
||||
|
||||
@Override
|
||||
public T getProxy() {
|
||||
return currentlyActive;
|
||||
public ProxyInfo<T> getProxy() {
|
||||
return new ProxyInfo<T>(currentlyActive, currentlyActive.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
|
||||
public class UnreliableImplementation implements UnreliableInterface {
|
||||
class UnreliableImplementation implements UnreliableInterface {
|
||||
|
||||
private int failsOnceInvocationCount,
|
||||
failsOnceWithValueInvocationCount,
|
||||
|
@ -154,6 +154,11 @@ public class UnreliableImplementation implements UnreliableInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "[" + identifier + "]";
|
||||
}
|
||||
|
||||
private static void throwAppropriateException(TypeOfExceptionToFailWith eType,
|
||||
String message) throws UnreliableException, StandbyException, IOException {
|
||||
switch (eType) {
|
||||
|
|
|
@ -116,10 +116,9 @@ public class ConfiguredFailoverProxyProvider<T> implements
|
|||
/**
|
||||
* Lazily initialize the RPC proxy object.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public synchronized T getProxy() {
|
||||
AddressRpcProxyPair current = proxies.get(currentProxyIndex);
|
||||
public synchronized ProxyInfo<T> getProxy() {
|
||||
AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
|
||||
if (current.namenode == null) {
|
||||
try {
|
||||
current.namenode = NameNodeProxies.createNonHAProxy(conf,
|
||||
|
@ -129,7 +128,7 @@ public class ConfiguredFailoverProxyProvider<T> implements
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return (T)current.namenode;
|
||||
return new ProxyInfo<T>((T)current.namenode, current.address.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -83,14 +83,14 @@ public class ConfiguredRMFailoverProxyProvider<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized T getProxy() {
|
||||
public synchronized ProxyInfo<T> getProxy() {
|
||||
String rmId = rmServiceIds[currentProxyIndex];
|
||||
T current = proxies.get(rmId);
|
||||
if (current == null) {
|
||||
current = getProxyInternal();
|
||||
proxies.put(rmId, current);
|
||||
}
|
||||
return current;
|
||||
return new ProxyInfo<T>(current, rmId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue