diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java index bd6cab0184a..7c77ccf7a6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.Router; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.SubClusterTimeoutException; import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; @@ -391,6 +392,8 @@ public class NamenodeBeanMetrics } } catch (StandbyException e) { LOG.error("Cannot get {} nodes, Router in safe mode", type); + } catch (SubClusterTimeoutException e) { + LOG.error("Cannot get {} nodes, subclusters timed out responding", type); } catch (IOException e) { LOG.error("Cannot get " + type + " nodes", e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 4723b4c27f1..e2c9cb44d9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1007,7 +1007,7 @@ public class RouterRpcClient { String msg = "Invocation to \"" + loc + "\" for \"" + method + "\" timed out"; LOG.error(msg); - IOException ioe = new IOException(msg); + IOException ioe = new SubClusterTimeoutException(msg); exceptions.put(location, ioe); } catch (ExecutionException ex) { Throwable cause = ex.getCause(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/SubClusterTimeoutException.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/SubClusterTimeoutException.java new file mode 100644 index 00000000000..dac5bd6c21d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/SubClusterTimeoutException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; + + +/** + * Exception when timing out waiting for the reply of a subcluster. + */ +public class SubClusterTimeoutException extends IOException { + + private static final long serialVersionUID = 1L; + + public SubClusterTimeoutException(String msg) { + super(msg); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 0ad8536587b..0a4de33f835 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; @@ -132,6 +133,9 @@ public class MiniRouterDFSCluster { /** Namenode configuration overrides. */ private Configuration namenodeOverrides; + /** If the DNs are shared. */ + private boolean sharedDNs = true; + /** * Router context. @@ -558,6 +562,13 @@ public class MiniRouterDFSCluster { this.numDatanodesPerNameservice = num; } + /** + * Set the DNs to belong to only one subcluster. + */ + public void setIndependentDNs() { + this.sharedDNs = false; + } + public String getNameservicesKey() { StringBuilder sb = new StringBuilder(); for (String nsId : this.nameservices) { @@ -677,15 +688,33 @@ public class MiniRouterDFSCluster { } topology.setFederation(true); + // Set independent DNs across subclusters + int numDNs = nameservices.size() * numDatanodesPerNameservice; + Configuration[] dnConfs = null; + if (!sharedDNs) { + dnConfs = new Configuration[numDNs]; + int dnId = 0; + for (String nsId : nameservices) { + Configuration subclusterConf = new Configuration(); + subclusterConf.set(DFS_INTERNAL_NAMESERVICES_KEY, nsId); + for (int i = 0; i < numDatanodesPerNameservice; i++) { + dnConfs[dnId] = subclusterConf; + dnId++; + } + } + } + // Start mini DFS cluster String ns0 = nameservices.get(0); Configuration nnConf = generateNamenodeConfiguration(ns0); if (overrideConf != null) { nnConf.addResource(overrideConf); } + cluster = new MiniDFSCluster.Builder(nnConf) - .numDataNodes(nameservices.size() * numDatanodesPerNameservice) + .numDataNodes(numDNs) .nnTopology(topology) + .dataNodeConfOverlays(dnConfs) .build(); cluster.waitActive(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java index 1e0f9a17b50..91dc2e7027e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -30,40 +35,65 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; +import org.mockito.internal.util.reflection.Whitebox; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Supplier; /** * Test retry behavior of the Router RPC Client. */ public class TestRouterRPCClientRetries { + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterRPCClientRetries.class); + private static StateStoreDFSCluster cluster; private static NamenodeContext nnContext1; private static RouterContext routerContext; private static MembershipNamenodeResolver resolver; private static ClientProtocol routerProtocol; + @Rule + public final Timeout testTimeout = new Timeout(100000); + @Before public void setUp() throws Exception { // Build and start a federated cluster cluster = new StateStoreDFSCluster(false, 2); Configuration routerConf = new RouterConfigBuilder() .stateStore() + .metrics() .admin() .rpc() .build(); + routerConf.setTimeDuration( + NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); // reduce IPC client connection retry times and interval time Configuration clientConf = new Configuration(false); @@ -72,6 +102,9 @@ public class TestRouterRPCClientRetries { clientConf.setInt( CommonConfigurationKeys.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 100); + // Set the DNs to belong to only one subcluster + cluster.setIndependentDNs(); + cluster.addRouterOverrides(routerConf); // override some settings for the client cluster.startCluster(clientConf); @@ -157,4 +190,95 @@ public class TestRouterRPCClientRetries { assertTrue(resolver.registerNamenode(report)); resolver.loadCache(true); } + + @Test + public void testNamenodeMetricsSlow() throws Exception { + final Router router = routerContext.getRouter(); + final NamenodeBeanMetrics metrics = router.getNamenodeMetrics(); + + // Initially, there are 4 DNs in total + final String jsonString0 = metrics.getLiveNodes(); + assertEquals(4, getNumDatanodes(jsonString0)); + + // The response should be cached + assertEquals(jsonString0, metrics.getLiveNodes()); + + // Check that the cached value gets updated eventually + waitUpdateLiveNodes(jsonString0, metrics); + final String jsonString2 = metrics.getLiveNodes(); + assertNotEquals(jsonString0, jsonString2); + assertEquals(4, getNumDatanodes(jsonString2)); + + // Making subcluster0 slow to reply, should only get DNs from nn1 + MiniDFSCluster dfsCluster = cluster.getCluster(); + NameNode nn0 = dfsCluster.getNameNode(0); + simulateNNSlow(nn0); + waitUpdateLiveNodes(jsonString2, metrics); + final String jsonString3 = metrics.getLiveNodes(); + assertEquals(2, getNumDatanodes(jsonString3)); + + // Making subcluster1 slow to reply, shouldn't get any DNs + NameNode nn1 = dfsCluster.getNameNode(1); + simulateNNSlow(nn1); + waitUpdateLiveNodes(jsonString3, metrics); + final String jsonString4 = metrics.getLiveNodes(); + assertEquals(0, getNumDatanodes(jsonString4)); + } + + /** + * Get the number of nodes in a JSON string. + * @param jsonString JSON string containing nodes. + * @return Number of nodes. + * @throws JSONException If the JSON string is not properly formed. + */ + private static int getNumDatanodes(final String jsonString) + throws JSONException { + JSONObject jsonObject = new JSONObject(jsonString); + if (jsonObject.length() == 0) { + return 0; + } + return jsonObject.names().length(); + } + + /** + * Wait until the cached live nodes value is updated. + * @param oldValue Old cached value. + * @param metrics Namenode metrics beans to get the live nodes from. + * @throws Exception If it cannot wait. + */ + private static void waitUpdateLiveNodes( + final String oldValue, final NamenodeBeanMetrics metrics) + throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return !oldValue.equals(metrics.getLiveNodes()); + } + }, 500, 5 * 1000); + } + + /** + * Simulate that a Namenode is slow by adding a sleep to the check operation + * in the NN. + * @param nn Namenode to simulate slow. + * @throws Exception If we cannot add the sleep time. + */ + private static void simulateNNSlow(final NameNode nn) throws Exception { + FSNamesystem namesystem = nn.getNamesystem(); + HAContext haContext = namesystem.getHAContext(); + HAContext spyHAContext = spy(haContext); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + LOG.info("Simulating slow namenode {}", invocation.getMock()); + try { + Thread.sleep(3 * 1000); + } catch(InterruptedException e) { + LOG.error("Simulating a slow namenode aborted"); + } + return null; + } + }).when(spyHAContext).checkOperation(any(OperationCategory.class)); + Whitebox.setInternalState(namesystem, "haContext", spyHAContext); + } }