HDFS-13488. RBF: Reject requests when a Router is overloaded. Contributed by Inigo Goiri.

(cherry picked from commit 37269261d1)
(cherry picked from commit 5fef28d0d4)
This commit is contained in:
Yiqun Lin 2018-05-02 14:49:39 +08:00
parent 9fd93ee533
commit 06632c0665
13 changed files with 348 additions and 107 deletions

View File

@ -40,6 +40,8 @@ public interface FederationRPCMBean {
long getProxyOpFailureStandby(); long getProxyOpFailureStandby();
long getProxyOpFailureClientOverloaded();
long getProxyOpNotImplemented(); long getProxyOpNotImplemented();
long getProxyOpRetries(); long getProxyOpRetries();

View File

@ -54,6 +54,8 @@ public class FederationRPCMetrics implements FederationRPCMBean {
private MutableCounterLong proxyOpFailureStandby; private MutableCounterLong proxyOpFailureStandby;
@Metric("Number of operations to hit a standby NN") @Metric("Number of operations to hit a standby NN")
private MutableCounterLong proxyOpFailureCommunicate; private MutableCounterLong proxyOpFailureCommunicate;
@Metric("Number of operations to hit a client overloaded Router")
private MutableCounterLong proxyOpFailureClientOverloaded;
@Metric("Number of operations not implemented") @Metric("Number of operations not implemented")
private MutableCounterLong proxyOpNotImplemented; private MutableCounterLong proxyOpNotImplemented;
@Metric("Number of operation retries") @Metric("Number of operation retries")
@ -118,6 +120,14 @@ public class FederationRPCMetrics implements FederationRPCMBean {
return proxyOpFailureCommunicate.value(); return proxyOpFailureCommunicate.value();
} }
public void incrProxyOpFailureClientOverloaded() {
proxyOpFailureClientOverloaded.incr();
}
@Override
public long getProxyOpFailureClientOverloaded() {
return proxyOpFailureClientOverloaded.value();
}
public void incrProxyOpNotImplemented() { public void incrProxyOpNotImplemented() {
proxyOpNotImplemented.incr(); proxyOpNotImplemented.incr();

View File

@ -153,6 +153,11 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
metrics.incrProxyOpFailureCommunicate(); metrics.incrProxyOpFailureCommunicate();
} }
@Override
public void proxyOpFailureClientOverloaded() {
metrics.incrProxyOpFailureClientOverloaded();
}
@Override @Override
public void proxyOpNotImplemented() { public void proxyOpNotImplemented() {
metrics.incrProxyOpNotImplemented(); metrics.incrProxyOpNotImplemented();

View File

@ -113,6 +113,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS = public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts"; FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3; public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
public static final String DFS_ROUTER_CLIENT_REJECT_OVERLOAD =
FEDERATION_ROUTER_PREFIX + "client.reject.overload";
public static final boolean DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT = false;
// HDFS Router State Store connection // HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =

View File

@ -35,13 +35,16 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -98,7 +101,7 @@ public class RouterRpcClient {
/** Connection pool to the Namenodes per user for performance. */ /** Connection pool to the Namenodes per user for performance. */
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
/** Service to run asynchronous calls. */ /** Service to run asynchronous calls. */
private final ExecutorService executorService; private final ThreadPoolExecutor executorService;
/** Retry policy for router -> NN communication. */ /** Retry policy for router -> NN communication. */
private final RetryPolicy retryPolicy; private final RetryPolicy retryPolicy;
/** Optional perf monitor. */ /** Optional perf monitor. */
@ -131,8 +134,16 @@ public class RouterRpcClient {
ThreadFactory threadFactory = new ThreadFactoryBuilder() ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router Client-%d") .setNameFormat("RPC Router Client-%d")
.build(); .build();
this.executorService = Executors.newFixedThreadPool( BlockingQueue<Runnable> workQueue;
numThreads, threadFactory); if (conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
workQueue = new ArrayBlockingQueue<>(numThreads);
} else {
workQueue = new LinkedBlockingQueue<>();
}
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
this.rpcMonitor = monitor; this.rpcMonitor = monitor;
@ -1098,6 +1109,16 @@ public class RouterRpcClient {
} }
return results; return results;
} catch (RejectedExecutionException e) {
if (rpcMonitor != null) {
rpcMonitor.proxyOpFailureClientOverloaded();
}
int active = executorService.getActiveCount();
int total = executorService.getMaximumPoolSize();
String msg = "Not enough client threads " + active + "/" + total;
LOG.error(msg);
throw new StandbyException(
"Router " + routerId + " is overloaded: " + msg);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
throw new IOException( throw new IOException(

View File

@ -75,6 +75,12 @@ public interface RouterRpcMonitor {
*/ */
void proxyOpFailureCommunicate(); void proxyOpFailureCommunicate();
/**
* Failed to proxy an operation to a Namenode because the client was
* overloaded.
*/
void proxyOpFailureClientOverloaded();
/** /**
* Failed to proxy an operation because it is not implemented. * Failed to proxy an operation because it is not implemented.
*/ */

View File

@ -274,7 +274,6 @@ public class RouterRpcServer extends AbstractService
// We don't want the server to log the full stack trace for some exceptions // We don't want the server to log the full stack trace for some exceptions
this.rpcServer.addTerseExceptions( this.rpcServer.addTerseExceptions(
RemoteException.class, RemoteException.class,
StandbyException.class,
SafeModeException.class, SafeModeException.class,
FileNotFoundException.class, FileNotFoundException.class,
FileAlreadyExistsException.class, FileAlreadyExistsException.class,
@ -283,6 +282,9 @@ public class RouterRpcServer extends AbstractService
NotReplicatedYetException.class, NotReplicatedYetException.class,
IOException.class); IOException.class);
this.rpcServer.addSuppressedLoggingExceptions(
StandbyException.class);
// The RPC-server port can be ephemeral... ensure we have the correct info // The RPC-server port can be ephemeral... ensure we have the correct info
InetSocketAddress listenAddress = this.rpcServer.getListenerAddress(); InetSocketAddress listenAddress = this.rpcServer.getListenerAddress();
this.rpcAddress = new InetSocketAddress( this.rpcAddress = new InetSocketAddress(
@ -397,7 +399,7 @@ public class RouterRpcServer extends AbstractService
* @throws UnsupportedOperationException If the operation is not supported. * @throws UnsupportedOperationException If the operation is not supported.
*/ */
protected void checkOperation(OperationCategory op, boolean supported) protected void checkOperation(OperationCategory op, boolean supported)
throws RouterSafeModeException, UnsupportedOperationException { throws StandbyException, UnsupportedOperationException {
checkOperation(op); checkOperation(op);
if (!supported) { if (!supported) {
@ -419,7 +421,7 @@ public class RouterRpcServer extends AbstractService
* client requests. * client requests.
*/ */
protected void checkOperation(OperationCategory op) protected void checkOperation(OperationCategory op)
throws RouterSafeModeException { throws StandbyException {
// Log the function we are currently calling. // Log the function we are currently calling.
if (rpcMonitor != null) { if (rpcMonitor != null) {
rpcMonitor.startOp(); rpcMonitor.startOp();
@ -443,7 +445,8 @@ public class RouterRpcServer extends AbstractService
if (rpcMonitor != null) { if (rpcMonitor != null) {
rpcMonitor.routerFailureSafemode(); rpcMonitor.routerFailureSafemode();
} }
throw new RouterSafeModeException(router.getRouterId(), op); throw new StandbyException("Router " + router.getRouterId() +
" is in safe mode and cannot handle " + op + " requests");
} }
} }

View File

@ -1,53 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.ipc.StandbyException;
/**
* Exception that the Router throws when it is in safe mode. This extends
* {@link StandbyException} for the client to try another Router when it gets
* this exception.
*/
public class RouterSafeModeException extends StandbyException {
private static final long serialVersionUID = 453568188334993493L;
/** Identifier of the Router that generated this exception. */
private final String routerId;
/**
* Build a new Router safe mode exception.
* @param router Identifier of the Router.
* @param op Category of the operation (READ/WRITE).
*/
public RouterSafeModeException(String router, OperationCategory op) {
super("Router " + router + " is in safe mode and cannot handle " + op
+ " requests.");
this.routerId = router;
}
/**
* Get the id of the Router that generated this exception.
* @return Id of the Router that generated this exception.
*/
public String getRouterId() {
return this.routerId;
}
}

View File

@ -431,4 +431,13 @@
</description> </description>
</property> </property>
<property>
<name>dfs.federation.router.client.reject.overload</name>
<value>false</value>
<description>
Set to true to reject client requests when we run out of RPC client
threads.
</description>
</property>
</configuration> </configuration>

View File

@ -28,6 +28,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@ -37,6 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.util.StringUtils;
/** /**
* Test utility to mimic a federated HDFS cluster with a router and a state * Test utility to mimic a federated HDFS cluster with a router and a state
@ -145,4 +150,27 @@ public class StateStoreDFSCluster extends MiniRouterDFSCluster {
entries.add(entry); entries.add(entry);
return entries; return entries;
} }
/**
* Get the client configuration which targets all the Routers. It uses the HA
* setup to fails over between them.
* @return Configuration for the client which uses two routers.
*/
public Configuration getRouterClientConf() {
List<RouterContext> routers = getRouters();
Configuration clientConf = DFSTestUtil.newHAConfiguration("fed");
int i = 0;
List<String> names = new ArrayList<>(routers.size());
for (RouterContext routerContext : routers) {
String name = "r" + i++;
clientConf.set(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + ".fed." + name,
"localhost:" + routerContext.getRpcPort());
names.add(name);
}
clientConf.set(DFSUtil.addKeySuffixes(
HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, "fed"),
StringUtils.join(",", names));
return clientConf;
}
} }

View File

@ -0,0 +1,243 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the Router overload control which rejects requests when the RPC client
* is overloaded. This feature is managed by
* {@link RBFConfigKeys#DFS_ROUTER_CLIENT_REJECT_OVERLOAD}.
*/
public class TestRouterClientRejectOverload {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterClientRejectOverload.class);
private StateStoreDFSCluster cluster;
@After
public void cleanup() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private void setupCluster(boolean overloadControl) throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.metrics()
.admin()
.rpc()
.build();
// Reduce the number of RPC clients threads to overload the Router easy
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
// Overload control
routerConf.setBoolean(
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, overloadControl);
// No need for datanodes as we use renewLease() for testing
cluster.setNumDatanodesPerNameservice(0);
cluster.addRouterOverrides(routerConf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
}
@Test
public void testWithoutOverloadControl() throws Exception {
setupCluster(false);
// Nobody should get overloaded
testOverloaded(0);
// Set subcluster 0 as slow
MiniDFSCluster dfsCluster = cluster.getCluster();
NameNode nn0 = dfsCluster.getNameNode(0);
simulateSlowNamenode(nn0, 1);
// Nobody should get overloaded, but it will be really slow
testOverloaded(0);
// No rejected requests expected
for (RouterContext router : cluster.getRouters()) {
FederationRPCMetrics rpcMetrics =
router.getRouter().getRpcServer().getRPCMetrics();
assertEquals(0, rpcMetrics.getProxyOpFailureClientOverloaded());
}
}
@Test
public void testOverloadControl() throws Exception {
setupCluster(true);
List<RouterContext> routers = cluster.getRouters();
FederationRPCMetrics rpcMetrics0 =
routers.get(0).getRouter().getRpcServer().getRPCMetrics();
FederationRPCMetrics rpcMetrics1 =
routers.get(1).getRouter().getRpcServer().getRPCMetrics();
// Nobody should get overloaded
testOverloaded(0);
assertEquals(0, rpcMetrics0.getProxyOpFailureClientOverloaded());
assertEquals(0, rpcMetrics1.getProxyOpFailureClientOverloaded());
// Set subcluster 0 as slow
MiniDFSCluster dfsCluster = cluster.getCluster();
NameNode nn0 = dfsCluster.getNameNode(0);
simulateSlowNamenode(nn0, 1);
// The subcluster should be overloaded now and reject 4-5 requests
testOverloaded(4, 6);
assertTrue(rpcMetrics0.getProxyOpFailureClientOverloaded()
+ rpcMetrics1.getProxyOpFailureClientOverloaded() >= 4);
// Client using HA with 2 Routers
// A single Router gets overloaded, but 2 will handle it
Configuration clientConf = cluster.getRouterClientConf();
// Each Router should get a similar number of ops (>=8) out of 2*10
long iniProxyOps0 = rpcMetrics0.getProxyOps();
long iniProxyOps1 = rpcMetrics1.getProxyOps();
testOverloaded(0, 0, new URI("hdfs://fed/"), clientConf, 10);
long proxyOps0 = rpcMetrics0.getProxyOps() - iniProxyOps0;
long proxyOps1 = rpcMetrics1.getProxyOps() - iniProxyOps1;
assertEquals(2 * 10, proxyOps0 + proxyOps1);
assertTrue(proxyOps0 + " operations: not distributed", proxyOps0 >= 8);
assertTrue(proxyOps1 + " operations: not distributed", proxyOps1 >= 8);
}
private void testOverloaded(int expOverload) throws Exception {
testOverloaded(expOverload, expOverload);
}
private void testOverloaded(int expOverloadMin, int expOverloadMax)
throws Exception {
RouterContext routerContext = cluster.getRandomRouter();
URI address = routerContext.getFileSystemURI();
Configuration conf = new HdfsConfiguration();
testOverloaded(expOverloadMin, expOverloadMax, address, conf, 10);
}
/**
* Test if the Router gets overloaded by submitting requests in parallel.
* We check how many requests got rejected at the end.
* @param expOverloadMin Min number of requests expected as overloaded.
* @param expOverloadMax Max number of requests expected as overloaded.
* @param address Destination address.
* @param conf Configuration of the client.
* @param numOps Number of operations to submit.
* @throws Exception If it cannot perform the test.
*/
private void testOverloaded(int expOverloadMin, int expOverloadMax,
final URI address, final Configuration conf, final int numOps)
throws Exception {
// Submit renewLease() ops which go to all subclusters
final AtomicInteger overloadException = new AtomicInteger();
ExecutorService exec = Executors.newFixedThreadPool(numOps);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
// Stagger the operations a little (50ms)
final int sleepTime = i * 50;
Future<?> future = exec.submit(new Runnable() {
@Override
public void run() {
DFSClient routerClient = null;
try {
Thread.sleep(sleepTime);
routerClient = new DFSClient(address, conf);
String clientName = routerClient.getClientName();
ClientProtocol routerProto = routerClient.getNamenode();
routerProto.renewLease(clientName);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException();
assertTrue("Wrong exception: " + ioe,
ioe instanceof StandbyException);
assertExceptionContains("is overloaded", ioe);
overloadException.incrementAndGet();
} catch (IOException e) {
fail("Unexpected exception: " + e);
} catch (InterruptedException e) {
fail("Cannot sleep: " + e);
} finally {
if (routerClient != null) {
try {
routerClient.close();
} catch (IOException e) {
LOG.error("Cannot close the client");
}
}
}
}
});
futures.add(future);
}
// Wait until all the requests are done
while (!futures.isEmpty()) {
futures.remove(0).get();
}
exec.shutdown();
int num = overloadException.get();
if (expOverloadMin == expOverloadMax) {
assertEquals(expOverloadMin, num);
} else {
assertTrue("Expected >=" + expOverloadMin + " but was " + num,
num >= expOverloadMin);
assertTrue("Expected <=" + expOverloadMax + " but was " + num,
num <= expOverloadMax);
}
}
}

View File

@ -17,13 +17,13 @@
*/ */
package org.apache.hadoop.hdfs.server.federation.router; package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.GenericTestUtils.waitFor;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -44,12 +44,8 @@ import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.junit.After; import org.junit.After;
@ -57,11 +53,6 @@ import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
@ -70,9 +61,6 @@ import com.google.common.base.Supplier;
*/ */
public class TestRouterRPCClientRetries { public class TestRouterRPCClientRetries {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterRPCClientRetries.class);
private static StateStoreDFSCluster cluster; private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext1; private static NamenodeContext nnContext1;
private static RouterContext routerContext; private static RouterContext routerContext;
@ -144,7 +132,7 @@ public class TestRouterRPCClientRetries {
fail("Should have thrown RemoteException error."); fail("Should have thrown RemoteException error.");
} catch (RemoteException e) { } catch (RemoteException e) {
String ns0 = cluster.getNameservices().get(0); String ns0 = cluster.getNameservices().get(0);
GenericTestUtils.assertExceptionContains( assertExceptionContains(
"No namenode available under nameservice " + ns0, e); "No namenode available under nameservice " + ns0, e);
} }
@ -212,14 +200,14 @@ public class TestRouterRPCClientRetries {
// Making subcluster0 slow to reply, should only get DNs from nn1 // Making subcluster0 slow to reply, should only get DNs from nn1
MiniDFSCluster dfsCluster = cluster.getCluster(); MiniDFSCluster dfsCluster = cluster.getCluster();
NameNode nn0 = dfsCluster.getNameNode(0); NameNode nn0 = dfsCluster.getNameNode(0);
simulateNNSlow(nn0); simulateSlowNamenode(nn0, 3);
waitUpdateLiveNodes(jsonString2, metrics); waitUpdateLiveNodes(jsonString2, metrics);
final String jsonString3 = metrics.getLiveNodes(); final String jsonString3 = metrics.getLiveNodes();
assertEquals(2, getNumDatanodes(jsonString3)); assertEquals(2, getNumDatanodes(jsonString3));
// Making subcluster1 slow to reply, shouldn't get any DNs // Making subcluster1 slow to reply, shouldn't get any DNs
NameNode nn1 = dfsCluster.getNameNode(1); NameNode nn1 = dfsCluster.getNameNode(1);
simulateNNSlow(nn1); simulateSlowNamenode(nn1, 3);
waitUpdateLiveNodes(jsonString3, metrics); waitUpdateLiveNodes(jsonString3, metrics);
final String jsonString4 = metrics.getLiveNodes(); final String jsonString4 = metrics.getLiveNodes();
assertEquals(0, getNumDatanodes(jsonString4)); assertEquals(0, getNumDatanodes(jsonString4));
@ -249,36 +237,11 @@ public class TestRouterRPCClientRetries {
private static void waitUpdateLiveNodes( private static void waitUpdateLiveNodes(
final String oldValue, final NamenodeBeanMetrics metrics) final String oldValue, final NamenodeBeanMetrics metrics)
throws Exception { throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() { waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
return !oldValue.equals(metrics.getLiveNodes()); return !oldValue.equals(metrics.getLiveNodes());
} }
}, 500, 5 * 1000); }, 500, 5 * 1000);
} }
/**
* Simulate that a Namenode is slow by adding a sleep to the check operation
* in the NN.
* @param nn Namenode to simulate slow.
* @throws Exception If we cannot add the sleep time.
*/
private static void simulateNNSlow(final NameNode nn) throws Exception {
FSNamesystem namesystem = nn.getNamesystem();
HAContext haContext = namesystem.getHAContext();
HAContext spyHAContext = spy(haContext);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
LOG.info("Simulating slow namenode {}", invocation.getMock());
try {
Thread.sleep(3 * 1000);
} catch(InterruptedException e) {
LOG.error("Simulating a slow namenode aborted");
}
return null;
}
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
}
} }

View File

@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.After; import org.junit.After;
@ -187,7 +188,7 @@ public class TestRouterSafemode {
try { try {
router.getRpcServer().delete("/testfile.txt", true); router.getRpcServer().delete("/testfile.txt", true);
fail("We should have thrown a safe mode exception"); fail("We should have thrown a safe mode exception");
} catch (RouterSafeModeException sme) { } catch (StandbyException sme) {
exception = true; exception = true;
} }
assertTrue("We should have thrown a safe mode exception", exception); assertTrue("We should have thrown a safe mode exception", exception);