HDFS-7858. Improve HA Namenode Failover detection on the client. (asuresh)

This commit is contained in:
Arun Suresh 2015-07-27 23:02:03 -07:00
parent e21dde501a
commit 030fcfa99c
8 changed files with 724 additions and 33 deletions

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.io.retry;
import java.io.IOException;
import java.util.Map;
/**
* Holder class that clients can use to return multiple exceptions.
*/
public class MultiException extends IOException {
private final Map<String, Exception> exes;
public MultiException(Map<String, Exception> exes) {
this.exes = exes;
}
public Map<String, Exception> getExceptions() {
return exes;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("{");
for (Exception e : exes.values()) {
sb.append(e.toString()).append(", ");
}
sb.append("}");
return "MultiException[" + sb.toString() + "]";
}
}

View File

@ -23,6 +23,8 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -101,7 +103,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
Object ret = invokeMethod(method, args); Object ret = invokeMethod(method, args);
hasMadeASuccessfulCall = true; hasMadeASuccessfulCall = true;
return ret; return ret;
} catch (Exception e) { } catch (Exception ex) {
boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface() boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes()) .getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class); .isAnnotationPresent(Idempotent.class);
@ -110,15 +112,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
.getMethod(method.getName(), method.getParameterTypes()) .getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(AtMostOnce.class); .isAnnotationPresent(AtMostOnce.class);
} }
RetryAction action = policy.shouldRetry(e, retries++, List<RetryAction> actions = extractActions(policy, ex, retries++,
invocationFailoverCount, isIdempotentOrAtMostOnce); invocationFailoverCount, isIdempotentOrAtMostOnce);
if (action.action == RetryAction.RetryDecision.FAIL) { RetryAction failAction = getFailAction(actions);
if (action.reason != null) { if (failAction != null) {
if (failAction.reason != null) {
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass() LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
+ "." + method.getName() + " over " + currentProxy.proxyInfo + "." + method.getName() + " over " + currentProxy.proxyInfo
+ ". Not retrying because " + action.reason, e); + ". Not retrying because " + failAction.reason, ex);
} }
throw e; throw ex;
} else { // retry or failover } else { // retry or failover
// avoid logging the failover if this is the first call on this // avoid logging the failover if this is the first call on this
// proxy object, and we successfully achieve the failover without // proxy object, and we successfully achieve the failover without
@ -126,8 +129,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
boolean worthLogging = boolean worthLogging =
!(invocationFailoverCount == 0 && !hasMadeASuccessfulCall); !(invocationFailoverCount == 0 && !hasMadeASuccessfulCall);
worthLogging |= LOG.isDebugEnabled(); worthLogging |= LOG.isDebugEnabled();
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY && RetryAction failOverAction = getFailOverAction(actions);
worthLogging) { long delay = getDelayMillis(actions);
if (failOverAction != null && worthLogging) {
String msg = "Exception while invoking " + method.getName() String msg = "Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName() + " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo; + " over " + currentProxy.proxyInfo;
@ -135,22 +139,22 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
if (invocationFailoverCount > 0) { if (invocationFailoverCount > 0) {
msg += " after " + invocationFailoverCount + " fail over attempts"; msg += " after " + invocationFailoverCount + " fail over attempts";
} }
msg += ". Trying to fail over " + formatSleepMessage(action.delayMillis); msg += ". Trying to fail over " + formatSleepMessage(delay);
LOG.info(msg, e); LOG.info(msg, ex);
} else { } else {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Exception while invoking " + method.getName() LOG.debug("Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName() + " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo + ". Retrying " + " over " + currentProxy.proxyInfo + ". Retrying "
+ formatSleepMessage(action.delayMillis), e); + formatSleepMessage(delay), ex);
} }
} }
if (action.delayMillis > 0) { if (delay > 0) {
Thread.sleep(action.delayMillis); Thread.sleep(delay);
} }
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) { if (failOverAction != null) {
// Make sure that concurrent failed method invocations only cause a // Make sure that concurrent failed method invocations only cause a
// single actual fail over. // single actual fail over.
synchronized (proxyProvider) { synchronized (proxyProvider) {
@ -169,7 +173,68 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
} }
} }
} }
/**
* Obtain a retry delay from list of RetryActions.
*/
private long getDelayMillis(List<RetryAction> actions) {
long retVal = 0;
for (RetryAction action : actions) {
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY ||
action.action == RetryAction.RetryDecision.RETRY) {
if (action.delayMillis > retVal) {
retVal = action.delayMillis;
}
}
}
return retVal;
}
/**
* Return the first FAILOVER_AND_RETRY action.
*/
private RetryAction getFailOverAction(List<RetryAction> actions) {
for (RetryAction action : actions) {
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
return action;
}
}
return null;
}
/**
* Return the last FAIL action.. only if there are no RETRY actions.
*/
private RetryAction getFailAction(List<RetryAction> actions) {
RetryAction fAction = null;
for (RetryAction action : actions) {
if (action.action == RetryAction.RetryDecision.FAIL) {
fAction = action;
} else {
// Atleast 1 RETRY
return null;
}
}
return fAction;
}
private List<RetryAction> extractActions(RetryPolicy policy, Exception ex,
int i, int invocationFailoverCount,
boolean isIdempotentOrAtMostOnce)
throws Exception {
List<RetryAction> actions = new LinkedList<>();
if (ex instanceof MultiException) {
for (Exception th : ((MultiException) ex).getExceptions().values()) {
actions.add(policy.shouldRetry(th, i, invocationFailoverCount,
isIdempotentOrAtMostOnce));
}
} else {
actions.add(policy.shouldRetry(ex, i,
invocationFailoverCount, isIdempotentOrAtMostOnce));
}
return actions;
}
private static String formatSleepMessage(long millis) { private static String formatSleepMessage(long millis) {
if (millis > 0) { if (millis > 0) {
return "after sleeping for " + millis + "ms."; return "after sleeping for " + millis + "ms.";

View File

@ -753,6 +753,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8735. Inotify: All events classes should implement toString() API. HDFS-8735. Inotify: All events classes should implement toString() API.
(Surendra Singh Lilhore via aajisaka) (Surendra Singh Lilhore via aajisaka)
HDFS-7858. Improve HA Namenode Failover detection on the client. (asuresh)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** /**
@ -51,16 +53,40 @@ public class ConfiguredFailoverProxyProvider<T> extends
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(ConfiguredFailoverProxyProvider.class); LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
private final Configuration conf; interface ProxyFactory<T> {
private final List<AddressRpcProxyPair<T>> proxies = T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException;
}
static class DefaultProxyFactory<T> implements ProxyFactory<T> {
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class<T> xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return NameNodeProxies.createNonHAProxy(conf,
nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy();
}
}
protected final Configuration conf;
protected final List<AddressRpcProxyPair<T>> proxies =
new ArrayList<AddressRpcProxyPair<T>>(); new ArrayList<AddressRpcProxyPair<T>>();
private final UserGroupInformation ugi; private final UserGroupInformation ugi;
private final Class<T> xface; protected final Class<T> xface;
private int currentProxyIndex = 0; private int currentProxyIndex = 0;
private final ProxyFactory<T> factory;
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface) { Class<T> xface) {
this(conf, uri, xface, new DefaultProxyFactory<T>());
}
@VisibleForTesting
ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, ProxyFactory<T> factory) {
Preconditions.checkArgument( Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class), xface.isAssignableFrom(NamenodeProtocols.class),
"Interface class %s is not a valid NameNode protocol!"); "Interface class %s is not a valid NameNode protocol!");
@ -78,9 +104,10 @@ public class ConfiguredFailoverProxyProvider<T> extends
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
this.conf.setInt( this.conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, CommonConfigurationKeysPublic
maxRetriesOnSocketTimeouts); .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
maxRetriesOnSocketTimeouts);
try { try {
ugi = UserGroupInformation.getCurrentUser(); ugi = UserGroupInformation.getCurrentUser();
@ -102,6 +129,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
// URI of the cluster. Clone this token to apply to each of the // URI of the cluster. Clone this token to apply to each of the
// underlying IPC addresses so that the IPC code can find it. // underlying IPC addresses so that the IPC code can find it.
HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
this.factory = factory;
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -120,8 +148,8 @@ public class ConfiguredFailoverProxyProvider<T> extends
AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex); AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
if (current.namenode == null) { if (current.namenode == null) {
try { try {
current.namenode = NameNodeProxies.createNonHAProxy(conf, current.namenode = factory.createProxy(conf,
current.address, xface, ugi, false, fallbackToSimpleAuth).getProxy(); current.address, xface, ugi, false, fallbackToSimpleAuth);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e); LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e); throw new RuntimeException(e);
@ -131,7 +159,11 @@ public class ConfiguredFailoverProxyProvider<T> extends
} }
@Override @Override
public synchronized void performFailover(T currentProxy) { public void performFailover(T currentProxy) {
incrementProxyIndex();
}
synchronized void incrementProxyIndex() {
currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
} }

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.io.retry.MultiException;
/**
* A FailoverProxyProvider implementation that technically does not "failover"
* per-se. It constructs a wrapper proxy that sends the request to ALL
* underlying proxies simultaneously. It assumes the in an HA setup, there will
* be only one Active, and the active should respond faster than any configured
* standbys. Once it recieve a response from any one of the configred proxies,
* outstanding requests to other proxies are immediately cancelled.
*/
public class RequestHedgingProxyProvider<T> extends
ConfiguredFailoverProxyProvider<T> {
private static final Log LOG =
LogFactory.getLog(RequestHedgingProxyProvider.class);
class RequestHedgingInvocationHandler implements InvocationHandler {
final Map<String, ProxyInfo<T>> targetProxies;
public RequestHedgingInvocationHandler(
Map<String, ProxyInfo<T>> targetProxies) {
this.targetProxies = new HashMap<>(targetProxies);
}
/**
* Creates a Executor and invokes all proxies concurrently. This
* implementation assumes that Clients have configured proper socket
* timeouts, else the call can block forever.
*
* @param proxy
* @param method
* @param args
* @return
* @throws Throwable
*/
@Override
public Object
invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
int numAttempts = 0;
ExecutorService executor = null;
CompletionService<Object> completionService;
try {
// Optimization : if only 2 proxies are configured and one had failed
// over, then we dont need to create a threadpool etc.
targetProxies.remove(toIgnore);
if (targetProxies.size() == 1) {
ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next();
Object retVal = method.invoke(proxyInfo.proxy, args);
successfulProxy = proxyInfo;
return retVal;
}
executor = Executors.newFixedThreadPool(proxies.size());
completionService = new ExecutorCompletionService<>(executor);
for (final Map.Entry<String, ProxyInfo<T>> pEntry :
targetProxies.entrySet()) {
Callable<Object> c = new Callable<Object>() {
@Override
public Object call() throws Exception {
return method.invoke(pEntry.getValue().proxy, args);
}
};
proxyMap.put(completionService.submit(c), pEntry.getValue());
numAttempts++;
}
Map<String, Exception> badResults = new HashMap<>();
while (numAttempts > 0) {
Future<Object> callResultFuture = completionService.take();
Object retVal;
try {
retVal = callResultFuture.get();
successfulProxy = proxyMap.get(callResultFuture);
if (LOG.isDebugEnabled()) {
LOG.debug("Invocation successful on ["
+ successfulProxy.proxyInfo + "]");
}
return retVal;
} catch (Exception ex) {
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
LOG.warn("Invocation returned exception on "
+ "[" + tProxyInfo.proxyInfo + "]");
badResults.put(tProxyInfo.proxyInfo, ex);
numAttempts--;
}
}
// At this point we should have All bad results (Exceptions)
// Or should have returned with successful result.
if (badResults.size() == 1) {
throw badResults.values().iterator().next();
} else {
throw new MultiException(badResults);
}
} finally {
if (executor != null) {
executor.shutdownNow();
}
}
}
}
private volatile ProxyInfo<T> successfulProxy = null;
private volatile String toIgnore = null;
public RequestHedgingProxyProvider(
Configuration conf, URI uri, Class<T> xface) {
this(conf, uri, xface, new DefaultProxyFactory<T>());
}
@VisibleForTesting
RequestHedgingProxyProvider(Configuration conf, URI uri,
Class<T> xface, ProxyFactory<T> factory) {
super(conf, uri, xface, factory);
}
@SuppressWarnings("unchecked")
@Override
public synchronized ProxyInfo<T> getProxy() {
if (successfulProxy != null) {
return successfulProxy;
}
Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
StringBuilder combinedInfo = new StringBuilder('[');
for (int i = 0; i < proxies.size(); i++) {
ProxyInfo<T> pInfo = super.getProxy();
incrementProxyIndex();
targetProxyInfos.put(pInfo.proxyInfo, pInfo);
combinedInfo.append(pInfo.proxyInfo).append(',');
}
combinedInfo.append(']');
T wrappedProxy = (T) Proxy.newProxyInstance(
RequestHedgingInvocationHandler.class.getClassLoader(),
new Class<?>[]{xface},
new RequestHedgingInvocationHandler(targetProxyInfos));
return new ProxyInfo<T>(wrappedProxy, combinedInfo.toString());
}
@Override
public synchronized void performFailover(T currentProxy) {
toIgnore = successfulProxy.proxyInfo;
successfulProxy = null;
}
}

View File

@ -195,9 +195,12 @@ The order in which you set these configurations is unimportant, but the values y
Configure the name of the Java class which will be used by the DFS Client to Configure the name of the Java class which will be used by the DFS Client to
determine which NameNode is the current Active, and therefore which NameNode is determine which NameNode is the current Active, and therefore which NameNode is
currently serving client requests. The only implementation which currently currently serving client requests. The two implementations which currently
ships with Hadoop is the **ConfiguredFailoverProxyProvider**, so use this ship with Hadoop are the **ConfiguredFailoverProxyProvider** and the
unless you are using a custom one. For example: **RequestHedgingProxyProvider** (which, for the first call, concurrently invokes all
namenodes to determine the active one, and on subsequent requests, invokes the active
namenode until a fail-over happens), so use one of these unless you are using a custom
proxy provider.
<property> <property>
<name>dfs.client.failover.proxy.provider.mycluster</name> <name>dfs.client.failover.proxy.provider.mycluster</name>

View File

@ -216,9 +216,13 @@ The order in which you set these configurations is unimportant, but the values y
Configure the name of the Java class which will be used by the DFS Client to Configure the name of the Java class which will be used by the DFS Client to
determine which NameNode is the current Active, and therefore which NameNode is determine which NameNode is the current Active, and therefore which NameNode is
currently serving client requests. The only implementation which currently currently serving client requests. The two implementations which currently
ships with Hadoop is the **ConfiguredFailoverProxyProvider**, so use this ship with Hadoop are the **ConfiguredFailoverProxyProvider** and the
unless you are using a custom one. For example: **RequestHedgingProxyProvider** (which, for the first call, concurrently invokes all
namenodes to determine the active one, and on subsequent requests, invokes the active
namenode until a fail-over happens), so use one of these unless you are using a custom
proxy provider.
For example:
<property> <property>
<name>dfs.client.failover.proxy.provider.mycluster</name> <name>dfs.client.failover.proxy.provider.mycluster</name>

View File

@ -0,0 +1,350 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.collect.Lists;
public class TestRequestHedgingProxyProvider {
private Configuration conf;
private URI nnUri;
private String ns;
@Before
public void setup() throws URISyntaxException {
ns = "mycluster-" + Time.monotonicNow();
nnUri = new URI("hdfs://" + ns);
conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns);
conf.set(
DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2");
conf.set(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1",
"machine1.foo.bar:8020");
conf.set(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2",
"machine2.foo.bar:8020");
}
@Test
public void testHedgingWhenOneFails() throws Exception {
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(goodMock.getStats()).thenReturn(new long[] {1});
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
createFactory(goodMock, badMock));
long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Mockito.verify(badMock).getStats();
Mockito.verify(goodMock).getStats();
}
@Test
public void testHedgingWhenOneIsSlow() throws Exception {
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(1000);
return new long[]{1};
}
});
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
createFactory(goodMock, badMock));
long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(1, stats[0]);
Mockito.verify(badMock).getStats();
Mockito.verify(goodMock).getStats();
}
@Test
public void testHedgingWhenBothFail() throws Exception {
NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(worseMock.getStats()).thenThrow(
new IOException("Worse mock !!"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
createFactory(badMock, worseMock));
try {
provider.getProxy().proxy.getStats();
Assert.fail("Should fail since both namenodes throw IOException !!");
} catch (Exception e) {
Assert.assertTrue(e instanceof MultiException);
}
Mockito.verify(badMock).getStats();
Mockito.verify(worseMock).getStats();
}
@Test
public void testPerformFailover() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final int[] isGood = {1};
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
if (isGood[0] == 1) {
Thread.sleep(1000);
return new long[]{1};
}
throw new IOException("Was Good mock !!");
}
});
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
if (isGood[0] == 2) {
Thread.sleep(1000);
return new long[]{2};
}
throw new IOException("Bad mock !!");
}
});
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
createFactory(goodMock, badMock));
long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(1, stats[0]);
Assert.assertEquals(2, counter.get());
Mockito.verify(badMock).getStats();
Mockito.verify(goodMock).getStats();
stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(1, stats[0]);
// Ensure only the previous successful one is invoked
Mockito.verifyNoMoreInteractions(badMock);
Assert.assertEquals(3, counter.get());
// Flip to standby.. so now this should fail
isGood[0] = 2;
try {
provider.getProxy().proxy.getStats();
Assert.fail("Should fail since previously successful proxy now fails ");
} catch (Exception ex) {
Assert.assertTrue(ex instanceof IOException);
}
Assert.assertEquals(4, counter.get());
provider.performFailover(provider.getProxy().proxy);
stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(2, stats[0]);
// Counter shuodl update only once
Assert.assertEquals(5, counter.get());
stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(2, stats[0]);
// Counter updates only once now
Assert.assertEquals(6, counter.get());
// Flip back to old active.. so now this should fail
isGood[0] = 1;
try {
provider.getProxy().proxy.getStats();
Assert.fail("Should fail since previously successful proxy now fails ");
} catch (Exception ex) {
Assert.assertTrue(ex instanceof IOException);
}
Assert.assertEquals(7, counter.get());
provider.performFailover(provider.getProxy().proxy);
stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
// Ensure correct proxy was called
Assert.assertEquals(1, stats[0]);
}
@Test
public void testPerformFailoverWith3Proxies() throws Exception {
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
"nn1,nn2,nn3");
conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3",
"machine3.foo.bar:8020");
final AtomicInteger counter = new AtomicInteger(0);
final int[] isGood = {1};
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
if (isGood[0] == 1) {
Thread.sleep(1000);
return new long[]{1};
}
throw new IOException("Was Good mock !!");
}
});
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
if (isGood[0] == 2) {
Thread.sleep(1000);
return new long[]{2};
}
throw new IOException("Bad mock !!");
}
});
final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() {
@Override
public long[] answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
if (isGood[0] == 3) {
Thread.sleep(1000);
return new long[]{3};
}
throw new IOException("Worse mock !!");
}
});
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
createFactory(goodMock, badMock, worseMock));
long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(1, stats[0]);
Assert.assertEquals(3, counter.get());
Mockito.verify(badMock).getStats();
Mockito.verify(goodMock).getStats();
Mockito.verify(worseMock).getStats();
stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(1, stats[0]);
// Ensure only the previous successful one is invoked
Mockito.verifyNoMoreInteractions(badMock);
Mockito.verifyNoMoreInteractions(worseMock);
Assert.assertEquals(4, counter.get());
// Flip to standby.. so now this should fail
isGood[0] = 2;
try {
provider.getProxy().proxy.getStats();
Assert.fail("Should fail since previously successful proxy now fails ");
} catch (Exception ex) {
Assert.assertTrue(ex instanceof IOException);
}
Assert.assertEquals(5, counter.get());
provider.performFailover(provider.getProxy().proxy);
stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(2, stats[0]);
// Counter updates twice since both proxies are tried on failure
Assert.assertEquals(7, counter.get());
stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(2, stats[0]);
// Counter updates only once now
Assert.assertEquals(8, counter.get());
// Flip to Other standby.. so now this should fail
isGood[0] = 3;
try {
provider.getProxy().proxy.getStats();
Assert.fail("Should fail since previously successful proxy now fails ");
} catch (Exception ex) {
Assert.assertTrue(ex instanceof IOException);
}
// Counter should ipdate only 1 time
Assert.assertEquals(9, counter.get());
provider.performFailover(provider.getProxy().proxy);
stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
// Ensure correct proxy was called
Assert.assertEquals(3, stats[0]);
// Counter updates twice since both proxies are tried on failure
Assert.assertEquals(11, counter.get());
stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Assert.assertEquals(3, stats[0]);
// Counter updates only once now
Assert.assertEquals(12, counter.get());
}
private ProxyFactory<NamenodeProtocols> createFactory(
NamenodeProtocols... protos) {
final Iterator<NamenodeProtocols> iterator =
Lists.newArrayList(protos).iterator();
return new ProxyFactory<NamenodeProtocols>() {
@Override
public NamenodeProtocols createProxy(Configuration conf,
InetSocketAddress nnAddr, Class<NamenodeProtocols> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return iterator.next();
}
};
}
}