HDFS-13384. RBF: Improve timeout RPC call mechanism. Contributed by Inigo Goiri.

(cherry picked from commit e87be8a2a4)
This commit is contained in:
Yiqun Lin 2018-04-10 15:34:42 +08:00
parent 0a216c3285
commit d4e438ff63
5 changed files with 192 additions and 3 deletions

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.SubClusterTimeoutException;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
@ -391,6 +392,8 @@ public class NamenodeBeanMetrics
}
} catch (StandbyException e) {
LOG.error("Cannot get {} nodes, Router in safe mode", type);
} catch (SubClusterTimeoutException e) {
LOG.error("Cannot get {} nodes, subclusters timed out responding", type);
} catch (IOException e) {
LOG.error("Cannot get " + type + " nodes", e);
}

View File

@ -1007,7 +1007,7 @@ public class RouterRpcClient {
String msg =
"Invocation to \"" + loc + "\" for \"" + method + "\" timed out";
LOG.error(msg);
IOException ioe = new IOException(msg);
IOException ioe = new SubClusterTimeoutException(msg);
exceptions.put(location, ioe);
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
/**
* Exception when timing out waiting for the reply of a subcluster.
*/
public class SubClusterTimeoutException extends IOException {
private static final long serialVersionUID = 1L;
public SubClusterTimeoutException(String msg) {
super(msg);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
@ -132,6 +133,9 @@ public class MiniRouterDFSCluster {
/** Namenode configuration overrides. */
private Configuration namenodeOverrides;
/** If the DNs are shared. */
private boolean sharedDNs = true;
/**
* Router context.
@ -558,6 +562,13 @@ public class MiniRouterDFSCluster {
this.numDatanodesPerNameservice = num;
}
/**
* Set the DNs to belong to only one subcluster.
*/
public void setIndependentDNs() {
this.sharedDNs = false;
}
public String getNameservicesKey() {
StringBuilder sb = new StringBuilder();
for (String nsId : this.nameservices) {
@ -677,15 +688,33 @@ public class MiniRouterDFSCluster {
}
topology.setFederation(true);
// Set independent DNs across subclusters
int numDNs = nameservices.size() * numDatanodesPerNameservice;
Configuration[] dnConfs = null;
if (!sharedDNs) {
dnConfs = new Configuration[numDNs];
int dnId = 0;
for (String nsId : nameservices) {
Configuration subclusterConf = new Configuration();
subclusterConf.set(DFS_INTERNAL_NAMESERVICES_KEY, nsId);
for (int i = 0; i < numDatanodesPerNameservice; i++) {
dnConfs[dnId] = subclusterConf;
dnId++;
}
}
}
// Start mini DFS cluster
String ns0 = nameservices.get(0);
Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
}
cluster = new MiniDFSCluster.Builder(nnConf)
.numDataNodes(nameservices.size() * numDatanodesPerNameservice)
.numDataNodes(numDNs)
.nnTopology(topology)
.dataNodeConfOverlays(dnConfs)
.build();
cluster.waitActive();

View File

@ -18,11 +18,16 @@
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -30,40 +35,65 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
/**
* Test retry behavior of the Router RPC Client.
*/
public class TestRouterRPCClientRetries {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterRPCClientRetries.class);
private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext1;
private static RouterContext routerContext;
private static MembershipNamenodeResolver resolver;
private static ClientProtocol routerProtocol;
@Rule
public final Timeout testTimeout = new Timeout(100000);
@Before
public void setUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.metrics()
.admin()
.rpc()
.build();
routerConf.setTimeDuration(
NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
// reduce IPC client connection retry times and interval time
Configuration clientConf = new Configuration(false);
@ -72,6 +102,9 @@ public class TestRouterRPCClientRetries {
clientConf.setInt(
CommonConfigurationKeys.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 100);
// Set the DNs to belong to only one subcluster
cluster.setIndependentDNs();
cluster.addRouterOverrides(routerConf);
// override some settings for the client
cluster.startCluster(clientConf);
@ -157,4 +190,95 @@ public class TestRouterRPCClientRetries {
assertTrue(resolver.registerNamenode(report));
resolver.loadCache(true);
}
@Test
public void testNamenodeMetricsSlow() throws Exception {
final Router router = routerContext.getRouter();
final NamenodeBeanMetrics metrics = router.getNamenodeMetrics();
// Initially, there are 4 DNs in total
final String jsonString0 = metrics.getLiveNodes();
assertEquals(4, getNumDatanodes(jsonString0));
// The response should be cached
assertEquals(jsonString0, metrics.getLiveNodes());
// Check that the cached value gets updated eventually
waitUpdateLiveNodes(jsonString0, metrics);
final String jsonString2 = metrics.getLiveNodes();
assertNotEquals(jsonString0, jsonString2);
assertEquals(4, getNumDatanodes(jsonString2));
// Making subcluster0 slow to reply, should only get DNs from nn1
MiniDFSCluster dfsCluster = cluster.getCluster();
NameNode nn0 = dfsCluster.getNameNode(0);
simulateNNSlow(nn0);
waitUpdateLiveNodes(jsonString2, metrics);
final String jsonString3 = metrics.getLiveNodes();
assertEquals(2, getNumDatanodes(jsonString3));
// Making subcluster1 slow to reply, shouldn't get any DNs
NameNode nn1 = dfsCluster.getNameNode(1);
simulateNNSlow(nn1);
waitUpdateLiveNodes(jsonString3, metrics);
final String jsonString4 = metrics.getLiveNodes();
assertEquals(0, getNumDatanodes(jsonString4));
}
/**
* Get the number of nodes in a JSON string.
* @param jsonString JSON string containing nodes.
* @return Number of nodes.
* @throws JSONException If the JSON string is not properly formed.
*/
private static int getNumDatanodes(final String jsonString)
throws JSONException {
JSONObject jsonObject = new JSONObject(jsonString);
if (jsonObject.length() == 0) {
return 0;
}
return jsonObject.names().length();
}
/**
* Wait until the cached live nodes value is updated.
* @param oldValue Old cached value.
* @param metrics Namenode metrics beans to get the live nodes from.
* @throws Exception If it cannot wait.
*/
private static void waitUpdateLiveNodes(
final String oldValue, final NamenodeBeanMetrics metrics)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !oldValue.equals(metrics.getLiveNodes());
}
}, 500, 5 * 1000);
}
/**
* Simulate that a Namenode is slow by adding a sleep to the check operation
* in the NN.
* @param nn Namenode to simulate slow.
* @throws Exception If we cannot add the sleep time.
*/
private static void simulateNNSlow(final NameNode nn) throws Exception {
FSNamesystem namesystem = nn.getNamesystem();
HAContext haContext = namesystem.getHAContext();
HAContext spyHAContext = spy(haContext);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
LOG.info("Simulating slow namenode {}", invocation.getMock());
try {
Thread.sleep(3 * 1000);
} catch(InterruptedException e) {
LOG.error("Simulating a slow namenode aborted");
}
return null;
}
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
}
}