diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/MultiException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/MultiException.java new file mode 100644 index 00000000000..4963a2d8eaf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/MultiException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.io.retry; + +import java.io.IOException; +import java.util.Map; + +/** + * Holder class that clients can use to return multiple exceptions. + */ +public class MultiException extends IOException { + + private final Map exes; + + public MultiException(Map exes) { + this.exes = exes; + } + + public Map getExceptions() { + return exes; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("{"); + for (Exception e : exes.values()) { + sb.append(e.toString()).append(", "); + } + sb.append("}"); + return "MultiException[" + sb.toString() + "]"; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 5a4e63840db..c31d28e12cd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -23,6 +23,8 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.Collections; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -102,7 +104,7 @@ public class RetryInvocationHandler implements RpcInvocationHandler { Object ret = invokeMethod(method, args); hasMadeASuccessfulCall = true; return ret; - } catch (Exception e) { + } catch (Exception ex) { boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface() .getMethod(method.getName(), method.getParameterTypes()) .isAnnotationPresent(Idempotent.class); @@ -111,15 +113,16 @@ public class RetryInvocationHandler implements RpcInvocationHandler { .getMethod(method.getName(), method.getParameterTypes()) .isAnnotationPresent(AtMostOnce.class); } - RetryAction action = policy.shouldRetry(e, retries++, - invocationFailoverCount, isIdempotentOrAtMostOnce); - if (action.action == RetryAction.RetryDecision.FAIL) { - if (action.reason != null) { + List actions = extractActions(policy, ex, retries++, + invocationFailoverCount, isIdempotentOrAtMostOnce); + RetryAction failAction = getFailAction(actions); + if (failAction != null) { + if (failAction.reason != null) { LOG.warn("Exception while invoking " + currentProxy.proxy.getClass() + "." + method.getName() + " over " + currentProxy.proxyInfo - + ". Not retrying because " + action.reason, e); + + ". Not retrying because " + failAction.reason, ex); } - throw e; + throw ex; } else { // retry or failover // avoid logging the failover if this is the first call on this // proxy object, and we successfully achieve the failover without @@ -127,8 +130,9 @@ public class RetryInvocationHandler implements RpcInvocationHandler { boolean worthLogging = !(invocationFailoverCount == 0 && !hasMadeASuccessfulCall); worthLogging |= LOG.isDebugEnabled(); - if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY && - worthLogging) { + RetryAction failOverAction = getFailOverAction(actions); + long delay = getDelayMillis(actions); + if (failOverAction != null && worthLogging) { String msg = "Exception while invoking " + method.getName() + " of class " + currentProxy.proxy.getClass().getSimpleName() + " over " + currentProxy.proxyInfo; @@ -136,22 +140,22 @@ public class RetryInvocationHandler implements RpcInvocationHandler { if (invocationFailoverCount > 0) { msg += " after " + invocationFailoverCount + " fail over attempts"; } - msg += ". Trying to fail over " + formatSleepMessage(action.delayMillis); - LOG.info(msg, e); + msg += ". Trying to fail over " + formatSleepMessage(delay); + LOG.info(msg, ex); } else { if(LOG.isDebugEnabled()) { LOG.debug("Exception while invoking " + method.getName() + " of class " + currentProxy.proxy.getClass().getSimpleName() + " over " + currentProxy.proxyInfo + ". Retrying " - + formatSleepMessage(action.delayMillis), e); + + formatSleepMessage(delay), ex); } } - - if (action.delayMillis > 0) { - Thread.sleep(action.delayMillis); + + if (delay > 0) { + Thread.sleep(delay); } - if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) { + if (failOverAction != null) { // Make sure that concurrent failed method invocations only cause a // single actual fail over. synchronized (proxyProvider) { @@ -170,7 +174,68 @@ public class RetryInvocationHandler implements RpcInvocationHandler { } } } - + + /** + * Obtain a retry delay from list of RetryActions. + */ + private long getDelayMillis(List actions) { + long retVal = 0; + for (RetryAction action : actions) { + if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY || + action.action == RetryAction.RetryDecision.RETRY) { + if (action.delayMillis > retVal) { + retVal = action.delayMillis; + } + } + } + return retVal; + } + + /** + * Return the first FAILOVER_AND_RETRY action. + */ + private RetryAction getFailOverAction(List actions) { + for (RetryAction action : actions) { + if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) { + return action; + } + } + return null; + } + + /** + * Return the last FAIL action.. only if there are no RETRY actions. + */ + private RetryAction getFailAction(List actions) { + RetryAction fAction = null; + for (RetryAction action : actions) { + if (action.action == RetryAction.RetryDecision.FAIL) { + fAction = action; + } else { + // Atleast 1 RETRY + return null; + } + } + return fAction; + } + + private List extractActions(RetryPolicy policy, Exception ex, + int i, int invocationFailoverCount, + boolean isIdempotentOrAtMostOnce) + throws Exception { + List actions = new LinkedList<>(); + if (ex instanceof MultiException) { + for (Exception th : ((MultiException) ex).getExceptions().values()) { + actions.add(policy.shouldRetry(th, i, invocationFailoverCount, + isIdempotentOrAtMostOnce)); + } + } else { + actions.add(policy.shouldRetry(ex, i, + invocationFailoverCount, isIdempotentOrAtMostOnce)); + } + return actions; + } + private static String formatSleepMessage(long millis) { if (millis > 0) { return "after sleeping for " + millis + "ms."; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fed59180bf0..9dc2a821c28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -410,6 +410,8 @@ Release 2.8.0 - UNRELEASED HDFS-8735. Inotify: All events classes should implement toString() API. (Surendra Singh Lilhore via aajisaka) + HDFS-7858. Improve HA Namenode Failover detection on the client. (asuresh) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 235c88619b1..ccce7362b1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -51,16 +53,40 @@ public class ConfiguredFailoverProxyProvider extends private static final Log LOG = LogFactory.getLog(ConfiguredFailoverProxyProvider.class); - private final Configuration conf; - private final List> proxies = + interface ProxyFactory { + T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException; + } + + static class DefaultProxyFactory implements ProxyFactory { + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class xface, UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + return NameNodeProxies.createNonHAProxy(conf, + nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy(); + } + } + + protected final Configuration conf; + protected final List> proxies = new ArrayList>(); private final UserGroupInformation ugi; - private final Class xface; - + protected final Class xface; + private int currentProxyIndex = 0; + private final ProxyFactory factory; public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, Class xface) { + this(conf, uri, xface, new DefaultProxyFactory()); + } + + @VisibleForTesting + ConfiguredFailoverProxyProvider(Configuration conf, URI uri, + Class xface, ProxyFactory factory) { + Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface class %s is not a valid NameNode protocol!"); @@ -78,9 +104,10 @@ public class ConfiguredFailoverProxyProvider extends HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); this.conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - maxRetriesOnSocketTimeouts); - + CommonConfigurationKeysPublic + .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + maxRetriesOnSocketTimeouts); + try { ugi = UserGroupInformation.getCurrentUser(); @@ -102,6 +129,7 @@ public class ConfiguredFailoverProxyProvider extends // URI of the cluster. Clone this token to apply to each of the // underlying IPC addresses so that the IPC code can find it. HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); + this.factory = factory; } catch (IOException e) { throw new RuntimeException(e); } @@ -120,8 +148,8 @@ public class ConfiguredFailoverProxyProvider extends AddressRpcProxyPair current = proxies.get(currentProxyIndex); if (current.namenode == null) { try { - current.namenode = NameNodeProxies.createNonHAProxy(conf, - current.address, xface, ugi, false, fallbackToSimpleAuth).getProxy(); + current.namenode = factory.createProxy(conf, + current.address, xface, ugi, false, fallbackToSimpleAuth); } catch (IOException e) { LOG.error("Failed to create RPC proxy to NameNode", e); throw new RuntimeException(e); @@ -131,7 +159,11 @@ public class ConfiguredFailoverProxyProvider extends } @Override - public synchronized void performFailover(T currentProxy) { + public void performFailover(T currentProxy) { + incrementProxyIndex(); + } + + synchronized void incrementProxyIndex() { currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java new file mode 100644 index 00000000000..b7216b0c91a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.io.retry.MultiException; + +/** + * A FailoverProxyProvider implementation that technically does not "failover" + * per-se. It constructs a wrapper proxy that sends the request to ALL + * underlying proxies simultaneously. It assumes the in an HA setup, there will + * be only one Active, and the active should respond faster than any configured + * standbys. Once it recieve a response from any one of the configred proxies, + * outstanding requests to other proxies are immediately cancelled. + */ +public class RequestHedgingProxyProvider extends + ConfiguredFailoverProxyProvider { + + private static final Log LOG = + LogFactory.getLog(RequestHedgingProxyProvider.class); + + class RequestHedgingInvocationHandler implements InvocationHandler { + + final Map> targetProxies; + + public RequestHedgingInvocationHandler( + Map> targetProxies) { + this.targetProxies = new HashMap<>(targetProxies); + } + + /** + * Creates a Executor and invokes all proxies concurrently. This + * implementation assumes that Clients have configured proper socket + * timeouts, else the call can block forever. + * + * @param proxy + * @param method + * @param args + * @return + * @throws Throwable + */ + @Override + public Object + invoke(Object proxy, final Method method, final Object[] args) + throws Throwable { + Map, ProxyInfo> proxyMap = new HashMap<>(); + int numAttempts = 0; + + ExecutorService executor = null; + CompletionService completionService; + try { + // Optimization : if only 2 proxies are configured and one had failed + // over, then we dont need to create a threadpool etc. + targetProxies.remove(toIgnore); + if (targetProxies.size() == 1) { + ProxyInfo proxyInfo = targetProxies.values().iterator().next(); + Object retVal = method.invoke(proxyInfo.proxy, args); + successfulProxy = proxyInfo; + return retVal; + } + executor = Executors.newFixedThreadPool(proxies.size()); + completionService = new ExecutorCompletionService<>(executor); + for (final Map.Entry> pEntry : + targetProxies.entrySet()) { + Callable c = new Callable() { + @Override + public Object call() throws Exception { + return method.invoke(pEntry.getValue().proxy, args); + } + }; + proxyMap.put(completionService.submit(c), pEntry.getValue()); + numAttempts++; + } + + Map badResults = new HashMap<>(); + while (numAttempts > 0) { + Future callResultFuture = completionService.take(); + Object retVal; + try { + retVal = callResultFuture.get(); + successfulProxy = proxyMap.get(callResultFuture); + if (LOG.isDebugEnabled()) { + LOG.debug("Invocation successful on [" + + successfulProxy.proxyInfo + "]"); + } + return retVal; + } catch (Exception ex) { + ProxyInfo tProxyInfo = proxyMap.get(callResultFuture); + LOG.warn("Invocation returned exception on " + + "[" + tProxyInfo.proxyInfo + "]"); + badResults.put(tProxyInfo.proxyInfo, ex); + numAttempts--; + } + } + + // At this point we should have All bad results (Exceptions) + // Or should have returned with successful result. + if (badResults.size() == 1) { + throw badResults.values().iterator().next(); + } else { + throw new MultiException(badResults); + } + } finally { + if (executor != null) { + executor.shutdownNow(); + } + } + } + } + + + private volatile ProxyInfo successfulProxy = null; + private volatile String toIgnore = null; + + public RequestHedgingProxyProvider( + Configuration conf, URI uri, Class xface) { + this(conf, uri, xface, new DefaultProxyFactory()); + } + + @VisibleForTesting + RequestHedgingProxyProvider(Configuration conf, URI uri, + Class xface, ProxyFactory factory) { + super(conf, uri, xface, factory); + } + + @SuppressWarnings("unchecked") + @Override + public synchronized ProxyInfo getProxy() { + if (successfulProxy != null) { + return successfulProxy; + } + Map> targetProxyInfos = new HashMap<>(); + StringBuilder combinedInfo = new StringBuilder('['); + for (int i = 0; i < proxies.size(); i++) { + ProxyInfo pInfo = super.getProxy(); + incrementProxyIndex(); + targetProxyInfos.put(pInfo.proxyInfo, pInfo); + combinedInfo.append(pInfo.proxyInfo).append(','); + } + combinedInfo.append(']'); + T wrappedProxy = (T) Proxy.newProxyInstance( + RequestHedgingInvocationHandler.class.getClassLoader(), + new Class[]{xface}, + new RequestHedgingInvocationHandler(targetProxyInfos)); + return new ProxyInfo(wrappedProxy, combinedInfo.toString()); + } + + @Override + public synchronized void performFailover(T currentProxy) { + toIgnore = successfulProxy.proxyInfo; + successfulProxy = null; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md index 0ca9d03c853..9cb7db4e2de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md @@ -189,9 +189,12 @@ The order in which you set these configurations is unimportant, but the values y Configure the name of the Java class which will be used by the DFS Client to determine which NameNode is the current Active, and therefore which NameNode is - currently serving client requests. The only implementation which currently - ships with Hadoop is the **ConfiguredFailoverProxyProvider**, so use this - unless you are using a custom one. For example: + currently serving client requests. The two implementations which currently + ship with Hadoop are the **ConfiguredFailoverProxyProvider** and the + **RequestHedgingProxyProvider** (which, for the first call, concurrently invokes all + namenodes to determine the active one, and on subsequent requests, invokes the active + namenode until a fail-over happens), so use one of these unless you are using a custom + proxy provider. dfs.client.failover.proxy.provider.mycluster diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md index ae1ebde6cdd..4f7366531fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md @@ -208,9 +208,13 @@ The order in which you set these configurations is unimportant, but the values y Configure the name of the Java class which will be used by the DFS Client to determine which NameNode is the current Active, and therefore which NameNode is - currently serving client requests. The only implementation which currently - ships with Hadoop is the **ConfiguredFailoverProxyProvider**, so use this - unless you are using a custom one. For example: + currently serving client requests. The two implementations which currently + ship with Hadoop are the **ConfiguredFailoverProxyProvider** and the + **RequestHedgingProxyProvider** (which, for the first call, concurrently invokes all + namenodes to determine the active one, and on subsequent requests, invokes the active + namenode until a fail-over happens), so use one of these unless you are using a custom + proxy provider. + For example: dfs.client.failover.proxy.provider.mycluster diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java new file mode 100644 index 00000000000..32f807a1e43 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.io.retry.MultiException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.collect.Lists; + +public class TestRequestHedgingProxyProvider { + + private Configuration conf; + private URI nnUri; + private String ns; + + @Before + public void setup() throws URISyntaxException { + ns = "mycluster-" + Time.monotonicNow(); + nnUri = new URI("hdfs://" + ns); + conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns); + conf.set( + DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2"); + conf.set( + DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1", + "machine1.foo.bar:8020"); + conf.set( + DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2", + "machine2.foo.bar:8020"); + } + + @Test + public void testHedgingWhenOneFails() throws Exception { + final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(goodMock.getStats()).thenReturn(new long[] {1}); + final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); + + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + createFactory(goodMock, badMock)); + long[] stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Mockito.verify(badMock).getStats(); + Mockito.verify(goodMock).getStats(); + } + + @Test + public void testHedgingWhenOneIsSlow() throws Exception { + final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(1000); + return new long[]{1}; + } + }); + final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); + + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + createFactory(goodMock, badMock)); + long[] stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + Mockito.verify(badMock).getStats(); + Mockito.verify(goodMock).getStats(); + } + + @Test + public void testHedgingWhenBothFail() throws Exception { + NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); + NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(worseMock.getStats()).thenThrow( + new IOException("Worse mock !!")); + + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + createFactory(badMock, worseMock)); + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since both namenodes throw IOException !!"); + } catch (Exception e) { + Assert.assertTrue(e instanceof MultiException); + } + Mockito.verify(badMock).getStats(); + Mockito.verify(worseMock).getStats(); + } + + @Test + public void testPerformFailover() throws Exception { + final AtomicInteger counter = new AtomicInteger(0); + final int[] isGood = {1}; + final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 1) { + Thread.sleep(1000); + return new long[]{1}; + } + throw new IOException("Was Good mock !!"); + } + }); + final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(badMock.getStats()).thenAnswer(new Answer() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 2) { + Thread.sleep(1000); + return new long[]{2}; + } + throw new IOException("Bad mock !!"); + } + }); + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + createFactory(goodMock, badMock)); + long[] stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + Assert.assertEquals(2, counter.get()); + Mockito.verify(badMock).getStats(); + Mockito.verify(goodMock).getStats(); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + // Ensure only the previous successful one is invoked + Mockito.verifyNoMoreInteractions(badMock); + Assert.assertEquals(3, counter.get()); + + // Flip to standby.. so now this should fail + isGood[0] = 2; + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since previously successful proxy now fails "); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof IOException); + } + + Assert.assertEquals(4, counter.get()); + + provider.performFailover(provider.getProxy().proxy); + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(2, stats[0]); + + // Counter shuodl update only once + Assert.assertEquals(5, counter.get()); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(2, stats[0]); + + // Counter updates only once now + Assert.assertEquals(6, counter.get()); + + // Flip back to old active.. so now this should fail + isGood[0] = 1; + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since previously successful proxy now fails "); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof IOException); + } + + Assert.assertEquals(7, counter.get()); + + provider.performFailover(provider.getProxy().proxy); + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + // Ensure correct proxy was called + Assert.assertEquals(1, stats[0]); + } + + @Test + public void testPerformFailoverWith3Proxies() throws Exception { + conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, + "nn1,nn2,nn3"); + conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3", + "machine3.foo.bar:8020"); + + final AtomicInteger counter = new AtomicInteger(0); + final int[] isGood = {1}; + final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(goodMock.getStats()).thenAnswer(new Answer() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 1) { + Thread.sleep(1000); + return new long[]{1}; + } + throw new IOException("Was Good mock !!"); + } + }); + final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(badMock.getStats()).thenAnswer(new Answer() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 2) { + Thread.sleep(1000); + return new long[]{2}; + } + throw new IOException("Bad mock !!"); + } + }); + final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); + Mockito.when(worseMock.getStats()).thenAnswer(new Answer() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 3) { + Thread.sleep(1000); + return new long[]{3}; + } + throw new IOException("Worse mock !!"); + } + }); + + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, + createFactory(goodMock, badMock, worseMock)); + long[] stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + Assert.assertEquals(3, counter.get()); + Mockito.verify(badMock).getStats(); + Mockito.verify(goodMock).getStats(); + Mockito.verify(worseMock).getStats(); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + // Ensure only the previous successful one is invoked + Mockito.verifyNoMoreInteractions(badMock); + Mockito.verifyNoMoreInteractions(worseMock); + Assert.assertEquals(4, counter.get()); + + // Flip to standby.. so now this should fail + isGood[0] = 2; + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since previously successful proxy now fails "); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof IOException); + } + + Assert.assertEquals(5, counter.get()); + + provider.performFailover(provider.getProxy().proxy); + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(2, stats[0]); + + // Counter updates twice since both proxies are tried on failure + Assert.assertEquals(7, counter.get()); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(2, stats[0]); + + // Counter updates only once now + Assert.assertEquals(8, counter.get()); + + // Flip to Other standby.. so now this should fail + isGood[0] = 3; + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since previously successful proxy now fails "); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof IOException); + } + + // Counter should ipdate only 1 time + Assert.assertEquals(9, counter.get()); + + provider.performFailover(provider.getProxy().proxy); + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + + // Ensure correct proxy was called + Assert.assertEquals(3, stats[0]); + + // Counter updates twice since both proxies are tried on failure + Assert.assertEquals(11, counter.get()); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(3, stats[0]); + + // Counter updates only once now + Assert.assertEquals(12, counter.get()); + } + + private ProxyFactory createFactory( + NamenodeProtocols... protos) { + final Iterator iterator = + Lists.newArrayList(protos).iterator(); + return new ProxyFactory() { + @Override + public NamenodeProtocols createProxy(Configuration conf, + InetSocketAddress nnAddr, Class xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + return iterator.next(); + } + }; + } +}