HDFS-13384. RBF: Improve timeout RPC call mechanism. Contributed by Inigo Goiri.
This commit is contained in:
parent
a92200f4a6
commit
e87be8a2a4
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.SubClusterTimeoutException;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
|
||||||
|
@ -396,6 +397,8 @@ public class NamenodeBeanMetrics
|
||||||
}
|
}
|
||||||
} catch (StandbyException e) {
|
} catch (StandbyException e) {
|
||||||
LOG.error("Cannot get {} nodes, Router in safe mode", type);
|
LOG.error("Cannot get {} nodes, Router in safe mode", type);
|
||||||
|
} catch (SubClusterTimeoutException e) {
|
||||||
|
LOG.error("Cannot get {} nodes, subclusters timed out responding", type);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Cannot get " + type + " nodes", e);
|
LOG.error("Cannot get " + type + " nodes", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1007,7 +1007,7 @@ public class RouterRpcClient {
|
||||||
String msg =
|
String msg =
|
||||||
"Invocation to \"" + loc + "\" for \"" + method + "\" timed out";
|
"Invocation to \"" + loc + "\" for \"" + method + "\" timed out";
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
IOException ioe = new IOException(msg);
|
IOException ioe = new SubClusterTimeoutException(msg);
|
||||||
exceptions.put(location, ioe);
|
exceptions.put(location, ioe);
|
||||||
} catch (ExecutionException ex) {
|
} catch (ExecutionException ex) {
|
||||||
Throwable cause = ex.getCause();
|
Throwable cause = ex.getCause();
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception when timing out waiting for the reply of a subcluster.
|
||||||
|
*/
|
||||||
|
public class SubClusterTimeoutException extends IOException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public SubClusterTimeoutException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
||||||
|
@ -132,6 +133,9 @@ public class MiniRouterDFSCluster {
|
||||||
/** Namenode configuration overrides. */
|
/** Namenode configuration overrides. */
|
||||||
private Configuration namenodeOverrides;
|
private Configuration namenodeOverrides;
|
||||||
|
|
||||||
|
/** If the DNs are shared. */
|
||||||
|
private boolean sharedDNs = true;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Router context.
|
* Router context.
|
||||||
|
@ -558,6 +562,13 @@ public class MiniRouterDFSCluster {
|
||||||
this.numDatanodesPerNameservice = num;
|
this.numDatanodesPerNameservice = num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the DNs to belong to only one subcluster.
|
||||||
|
*/
|
||||||
|
public void setIndependentDNs() {
|
||||||
|
this.sharedDNs = false;
|
||||||
|
}
|
||||||
|
|
||||||
public String getNameservicesKey() {
|
public String getNameservicesKey() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
for (String nsId : this.nameservices) {
|
for (String nsId : this.nameservices) {
|
||||||
|
@ -677,15 +688,33 @@ public class MiniRouterDFSCluster {
|
||||||
}
|
}
|
||||||
topology.setFederation(true);
|
topology.setFederation(true);
|
||||||
|
|
||||||
|
// Set independent DNs across subclusters
|
||||||
|
int numDNs = nameservices.size() * numDatanodesPerNameservice;
|
||||||
|
Configuration[] dnConfs = null;
|
||||||
|
if (!sharedDNs) {
|
||||||
|
dnConfs = new Configuration[numDNs];
|
||||||
|
int dnId = 0;
|
||||||
|
for (String nsId : nameservices) {
|
||||||
|
Configuration subclusterConf = new Configuration();
|
||||||
|
subclusterConf.set(DFS_INTERNAL_NAMESERVICES_KEY, nsId);
|
||||||
|
for (int i = 0; i < numDatanodesPerNameservice; i++) {
|
||||||
|
dnConfs[dnId] = subclusterConf;
|
||||||
|
dnId++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Start mini DFS cluster
|
// Start mini DFS cluster
|
||||||
String ns0 = nameservices.get(0);
|
String ns0 = nameservices.get(0);
|
||||||
Configuration nnConf = generateNamenodeConfiguration(ns0);
|
Configuration nnConf = generateNamenodeConfiguration(ns0);
|
||||||
if (overrideConf != null) {
|
if (overrideConf != null) {
|
||||||
nnConf.addResource(overrideConf);
|
nnConf.addResource(overrideConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(nnConf)
|
cluster = new MiniDFSCluster.Builder(nnConf)
|
||||||
.numDataNodes(nameservices.size() * numDatanodesPerNameservice)
|
.numDataNodes(numDNs)
|
||||||
.nnTopology(topology)
|
.nnTopology(topology)
|
||||||
|
.dataNodeConfOverlays(dnConfs)
|
||||||
.build();
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
|
|
@ -18,11 +18,16 @@
|
||||||
package org.apache.hadoop.hdfs.server.federation.router;
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
@ -30,40 +35,65 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.codehaus.jettison.json.JSONException;
|
||||||
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test retry behavior of the Router RPC Client.
|
* Test retry behavior of the Router RPC Client.
|
||||||
*/
|
*/
|
||||||
public class TestRouterRPCClientRetries {
|
public class TestRouterRPCClientRetries {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestRouterRPCClientRetries.class);
|
||||||
|
|
||||||
private static StateStoreDFSCluster cluster;
|
private static StateStoreDFSCluster cluster;
|
||||||
private static NamenodeContext nnContext1;
|
private static NamenodeContext nnContext1;
|
||||||
private static RouterContext routerContext;
|
private static RouterContext routerContext;
|
||||||
private static MembershipNamenodeResolver resolver;
|
private static MembershipNamenodeResolver resolver;
|
||||||
private static ClientProtocol routerProtocol;
|
private static ClientProtocol routerProtocol;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final Timeout testTimeout = new Timeout(100000);
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
// Build and start a federated cluster
|
// Build and start a federated cluster
|
||||||
cluster = new StateStoreDFSCluster(false, 2);
|
cluster = new StateStoreDFSCluster(false, 2);
|
||||||
Configuration routerConf = new RouterConfigBuilder()
|
Configuration routerConf = new RouterConfigBuilder()
|
||||||
.stateStore()
|
.stateStore()
|
||||||
|
.metrics()
|
||||||
.admin()
|
.admin()
|
||||||
.rpc()
|
.rpc()
|
||||||
.build();
|
.build();
|
||||||
|
routerConf.setTimeDuration(
|
||||||
|
NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
// reduce IPC client connection retry times and interval time
|
// reduce IPC client connection retry times and interval time
|
||||||
Configuration clientConf = new Configuration(false);
|
Configuration clientConf = new Configuration(false);
|
||||||
|
@ -72,6 +102,9 @@ public class TestRouterRPCClientRetries {
|
||||||
clientConf.setInt(
|
clientConf.setInt(
|
||||||
CommonConfigurationKeys.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 100);
|
CommonConfigurationKeys.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 100);
|
||||||
|
|
||||||
|
// Set the DNs to belong to only one subcluster
|
||||||
|
cluster.setIndependentDNs();
|
||||||
|
|
||||||
cluster.addRouterOverrides(routerConf);
|
cluster.addRouterOverrides(routerConf);
|
||||||
// override some settings for the client
|
// override some settings for the client
|
||||||
cluster.startCluster(clientConf);
|
cluster.startCluster(clientConf);
|
||||||
|
@ -157,4 +190,95 @@ public class TestRouterRPCClientRetries {
|
||||||
assertTrue(resolver.registerNamenode(report));
|
assertTrue(resolver.registerNamenode(report));
|
||||||
resolver.loadCache(true);
|
resolver.loadCache(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNamenodeMetricsSlow() throws Exception {
|
||||||
|
final Router router = routerContext.getRouter();
|
||||||
|
final NamenodeBeanMetrics metrics = router.getNamenodeMetrics();
|
||||||
|
|
||||||
|
// Initially, there are 4 DNs in total
|
||||||
|
final String jsonString0 = metrics.getLiveNodes();
|
||||||
|
assertEquals(4, getNumDatanodes(jsonString0));
|
||||||
|
|
||||||
|
// The response should be cached
|
||||||
|
assertEquals(jsonString0, metrics.getLiveNodes());
|
||||||
|
|
||||||
|
// Check that the cached value gets updated eventually
|
||||||
|
waitUpdateLiveNodes(jsonString0, metrics);
|
||||||
|
final String jsonString2 = metrics.getLiveNodes();
|
||||||
|
assertNotEquals(jsonString0, jsonString2);
|
||||||
|
assertEquals(4, getNumDatanodes(jsonString2));
|
||||||
|
|
||||||
|
// Making subcluster0 slow to reply, should only get DNs from nn1
|
||||||
|
MiniDFSCluster dfsCluster = cluster.getCluster();
|
||||||
|
NameNode nn0 = dfsCluster.getNameNode(0);
|
||||||
|
simulateNNSlow(nn0);
|
||||||
|
waitUpdateLiveNodes(jsonString2, metrics);
|
||||||
|
final String jsonString3 = metrics.getLiveNodes();
|
||||||
|
assertEquals(2, getNumDatanodes(jsonString3));
|
||||||
|
|
||||||
|
// Making subcluster1 slow to reply, shouldn't get any DNs
|
||||||
|
NameNode nn1 = dfsCluster.getNameNode(1);
|
||||||
|
simulateNNSlow(nn1);
|
||||||
|
waitUpdateLiveNodes(jsonString3, metrics);
|
||||||
|
final String jsonString4 = metrics.getLiveNodes();
|
||||||
|
assertEquals(0, getNumDatanodes(jsonString4));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of nodes in a JSON string.
|
||||||
|
* @param jsonString JSON string containing nodes.
|
||||||
|
* @return Number of nodes.
|
||||||
|
* @throws JSONException If the JSON string is not properly formed.
|
||||||
|
*/
|
||||||
|
private static int getNumDatanodes(final String jsonString)
|
||||||
|
throws JSONException {
|
||||||
|
JSONObject jsonObject = new JSONObject(jsonString);
|
||||||
|
if (jsonObject.length() == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return jsonObject.names().length();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until the cached live nodes value is updated.
|
||||||
|
* @param oldValue Old cached value.
|
||||||
|
* @param metrics Namenode metrics beans to get the live nodes from.
|
||||||
|
* @throws Exception If it cannot wait.
|
||||||
|
*/
|
||||||
|
private static void waitUpdateLiveNodes(
|
||||||
|
final String oldValue, final NamenodeBeanMetrics metrics)
|
||||||
|
throws Exception {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return !oldValue.equals(metrics.getLiveNodes());
|
||||||
|
}
|
||||||
|
}, 500, 5 * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate that a Namenode is slow by adding a sleep to the check operation
|
||||||
|
* in the NN.
|
||||||
|
* @param nn Namenode to simulate slow.
|
||||||
|
* @throws Exception If we cannot add the sleep time.
|
||||||
|
*/
|
||||||
|
private static void simulateNNSlow(final NameNode nn) throws Exception {
|
||||||
|
FSNamesystem namesystem = nn.getNamesystem();
|
||||||
|
HAContext haContext = namesystem.getHAContext();
|
||||||
|
HAContext spyHAContext = spy(haContext);
|
||||||
|
doAnswer(new Answer<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
LOG.info("Simulating slow namenode {}", invocation.getMock());
|
||||||
|
try {
|
||||||
|
Thread.sleep(3 * 1000);
|
||||||
|
} catch(InterruptedException e) {
|
||||||
|
LOG.error("Simulating a slow namenode aborted");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
|
||||||
|
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue