HDFS-13488. RBF: Reject requests when a Router is overloaded. Contributed by Inigo Goiri.
This commit is contained in:
parent
8f42dafcf8
commit
37269261d1
|
@ -40,6 +40,8 @@ public interface FederationRPCMBean {
|
|||
|
||||
long getProxyOpFailureStandby();
|
||||
|
||||
long getProxyOpFailureClientOverloaded();
|
||||
|
||||
long getProxyOpNotImplemented();
|
||||
|
||||
long getProxyOpRetries();
|
||||
|
|
|
@ -54,6 +54,8 @@ public class FederationRPCMetrics implements FederationRPCMBean {
|
|||
private MutableCounterLong proxyOpFailureStandby;
|
||||
@Metric("Number of operations to hit a standby NN")
|
||||
private MutableCounterLong proxyOpFailureCommunicate;
|
||||
@Metric("Number of operations to hit a client overloaded Router")
|
||||
private MutableCounterLong proxyOpFailureClientOverloaded;
|
||||
@Metric("Number of operations not implemented")
|
||||
private MutableCounterLong proxyOpNotImplemented;
|
||||
@Metric("Number of operation retries")
|
||||
|
@ -118,6 +120,14 @@ public class FederationRPCMetrics implements FederationRPCMBean {
|
|||
return proxyOpFailureCommunicate.value();
|
||||
}
|
||||
|
||||
public void incrProxyOpFailureClientOverloaded() {
|
||||
proxyOpFailureClientOverloaded.incr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProxyOpFailureClientOverloaded() {
|
||||
return proxyOpFailureClientOverloaded.value();
|
||||
}
|
||||
|
||||
public void incrProxyOpNotImplemented() {
|
||||
proxyOpNotImplemented.incr();
|
||||
|
|
|
@ -153,6 +153,11 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
|
|||
metrics.incrProxyOpFailureCommunicate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void proxyOpFailureClientOverloaded() {
|
||||
metrics.incrProxyOpFailureClientOverloaded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void proxyOpNotImplemented() {
|
||||
metrics.incrProxyOpNotImplemented();
|
||||
|
|
|
@ -113,6 +113,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
|||
public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
|
||||
FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
|
||||
public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
|
||||
public static final String DFS_ROUTER_CLIENT_REJECT_OVERLOAD =
|
||||
FEDERATION_ROUTER_PREFIX + "client.reject.overload";
|
||||
public static final boolean DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT = false;
|
||||
|
||||
// HDFS Router State Store connection
|
||||
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
|
||||
|
|
|
@ -35,13 +35,16 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -98,7 +101,7 @@ public class RouterRpcClient {
|
|||
/** Connection pool to the Namenodes per user for performance. */
|
||||
private final ConnectionManager connectionManager;
|
||||
/** Service to run asynchronous calls. */
|
||||
private final ExecutorService executorService;
|
||||
private final ThreadPoolExecutor executorService;
|
||||
/** Retry policy for router -> NN communication. */
|
||||
private final RetryPolicy retryPolicy;
|
||||
/** Optional perf monitor. */
|
||||
|
@ -131,8 +134,16 @@ public class RouterRpcClient {
|
|||
ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat("RPC Router Client-%d")
|
||||
.build();
|
||||
this.executorService = Executors.newFixedThreadPool(
|
||||
numThreads, threadFactory);
|
||||
BlockingQueue<Runnable> workQueue;
|
||||
if (conf.getBoolean(
|
||||
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
|
||||
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
|
||||
workQueue = new ArrayBlockingQueue<>(numThreads);
|
||||
} else {
|
||||
workQueue = new LinkedBlockingQueue<>();
|
||||
}
|
||||
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
|
||||
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
|
||||
|
||||
this.rpcMonitor = monitor;
|
||||
|
||||
|
@ -1106,6 +1117,16 @@ public class RouterRpcClient {
|
|||
}
|
||||
|
||||
return results;
|
||||
} catch (RejectedExecutionException e) {
|
||||
if (rpcMonitor != null) {
|
||||
rpcMonitor.proxyOpFailureClientOverloaded();
|
||||
}
|
||||
int active = executorService.getActiveCount();
|
||||
int total = executorService.getMaximumPoolSize();
|
||||
String msg = "Not enough client threads " + active + "/" + total;
|
||||
LOG.error(msg);
|
||||
throw new StandbyException(
|
||||
"Router " + routerId + " is overloaded: " + msg);
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
|
||||
throw new IOException(
|
||||
|
|
|
@ -75,6 +75,12 @@ public interface RouterRpcMonitor {
|
|||
*/
|
||||
void proxyOpFailureCommunicate();
|
||||
|
||||
/**
|
||||
* Failed to proxy an operation to a Namenode because the client was
|
||||
* overloaded.
|
||||
*/
|
||||
void proxyOpFailureClientOverloaded();
|
||||
|
||||
/**
|
||||
* Failed to proxy an operation because it is not implemented.
|
||||
*/
|
||||
|
|
|
@ -289,7 +289,6 @@ public class RouterRpcServer extends AbstractService
|
|||
// We don't want the server to log the full stack trace for some exceptions
|
||||
this.rpcServer.addTerseExceptions(
|
||||
RemoteException.class,
|
||||
StandbyException.class,
|
||||
SafeModeException.class,
|
||||
FileNotFoundException.class,
|
||||
FileAlreadyExistsException.class,
|
||||
|
@ -298,6 +297,9 @@ public class RouterRpcServer extends AbstractService
|
|||
NotReplicatedYetException.class,
|
||||
IOException.class);
|
||||
|
||||
this.rpcServer.addSuppressedLoggingExceptions(
|
||||
StandbyException.class);
|
||||
|
||||
// The RPC-server port can be ephemeral... ensure we have the correct info
|
||||
InetSocketAddress listenAddress = this.rpcServer.getListenerAddress();
|
||||
this.rpcAddress = new InetSocketAddress(
|
||||
|
@ -413,7 +415,7 @@ public class RouterRpcServer extends AbstractService
|
|||
* @throws UnsupportedOperationException If the operation is not supported.
|
||||
*/
|
||||
protected void checkOperation(OperationCategory op, boolean supported)
|
||||
throws RouterSafeModeException, UnsupportedOperationException {
|
||||
throws StandbyException, UnsupportedOperationException {
|
||||
checkOperation(op);
|
||||
|
||||
if (!supported) {
|
||||
|
@ -435,7 +437,7 @@ public class RouterRpcServer extends AbstractService
|
|||
* client requests.
|
||||
*/
|
||||
protected void checkOperation(OperationCategory op)
|
||||
throws RouterSafeModeException {
|
||||
throws StandbyException {
|
||||
// Log the function we are currently calling.
|
||||
if (rpcMonitor != null) {
|
||||
rpcMonitor.startOp();
|
||||
|
@ -459,7 +461,8 @@ public class RouterRpcServer extends AbstractService
|
|||
if (rpcMonitor != null) {
|
||||
rpcMonitor.routerFailureSafemode();
|
||||
}
|
||||
throw new RouterSafeModeException(router.getRouterId(), op);
|
||||
throw new StandbyException("Router " + router.getRouterId() +
|
||||
" is in safe mode and cannot handle " + op + " requests");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
|
||||
/**
|
||||
* Exception that the Router throws when it is in safe mode. This extends
|
||||
* {@link StandbyException} for the client to try another Router when it gets
|
||||
* this exception.
|
||||
*/
|
||||
public class RouterSafeModeException extends StandbyException {
|
||||
|
||||
private static final long serialVersionUID = 453568188334993493L;
|
||||
|
||||
/** Identifier of the Router that generated this exception. */
|
||||
private final String routerId;
|
||||
|
||||
/**
|
||||
* Build a new Router safe mode exception.
|
||||
* @param router Identifier of the Router.
|
||||
* @param op Category of the operation (READ/WRITE).
|
||||
*/
|
||||
public RouterSafeModeException(String router, OperationCategory op) {
|
||||
super("Router " + router + " is in safe mode and cannot handle " + op
|
||||
+ " requests.");
|
||||
this.routerId = router;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the id of the Router that generated this exception.
|
||||
* @return Id of the Router that generated this exception.
|
||||
*/
|
||||
public String getRouterId() {
|
||||
return this.routerId;
|
||||
}
|
||||
}
|
|
@ -431,4 +431,13 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.client.reject.overload</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Set to true to reject client requests when we run out of RPC client
|
||||
threads.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
|
@ -59,7 +59,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.Whitebox;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.slf4j.Logger;
|
||||
|
|
|
@ -28,6 +28,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
|
@ -37,6 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
|||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Test utility to mimic a federated HDFS cluster with a router and a state
|
||||
|
@ -145,4 +150,27 @@ public class StateStoreDFSCluster extends MiniRouterDFSCluster {
|
|||
entries.add(entry);
|
||||
return entries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the client configuration which targets all the Routers. It uses the HA
|
||||
* setup to fails over between them.
|
||||
* @return Configuration for the client which uses two routers.
|
||||
*/
|
||||
public Configuration getRouterClientConf() {
|
||||
List<RouterContext> routers = getRouters();
|
||||
Configuration clientConf = DFSTestUtil.newHAConfiguration("fed");
|
||||
int i = 0;
|
||||
List<String> names = new ArrayList<>(routers.size());
|
||||
for (RouterContext routerContext : routers) {
|
||||
String name = "r" + i++;
|
||||
clientConf.set(
|
||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + ".fed." + name,
|
||||
"localhost:" + routerContext.getRpcPort());
|
||||
names.add(name);
|
||||
}
|
||||
clientConf.set(DFSUtil.addKeySuffixes(
|
||||
HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, "fed"),
|
||||
StringUtils.join(",", names));
|
||||
return clientConf;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,243 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* Test the Router overload control which rejects requests when the RPC client
|
||||
* is overloaded. This feature is managed by
|
||||
* {@link RBFConfigKeys#DFS_ROUTER_CLIENT_REJECT_OVERLOAD}.
|
||||
*/
|
||||
public class TestRouterClientRejectOverload {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestRouterClientRejectOverload.class);
|
||||
|
||||
private StateStoreDFSCluster cluster;
|
||||
|
||||
@After
|
||||
public void cleanup() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void setupCluster(boolean overloadControl) throws Exception {
|
||||
// Build and start a federated cluster
|
||||
cluster = new StateStoreDFSCluster(false, 2);
|
||||
Configuration routerConf = new RouterConfigBuilder()
|
||||
.stateStore()
|
||||
.metrics()
|
||||
.admin()
|
||||
.rpc()
|
||||
.build();
|
||||
|
||||
// Reduce the number of RPC clients threads to overload the Router easy
|
||||
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
|
||||
// Overload control
|
||||
routerConf.setBoolean(
|
||||
RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, overloadControl);
|
||||
|
||||
// No need for datanodes as we use renewLease() for testing
|
||||
cluster.setNumDatanodesPerNameservice(0);
|
||||
|
||||
cluster.addRouterOverrides(routerConf);
|
||||
cluster.startCluster();
|
||||
cluster.startRouters();
|
||||
cluster.waitClusterUp();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithoutOverloadControl() throws Exception {
|
||||
setupCluster(false);
|
||||
|
||||
// Nobody should get overloaded
|
||||
testOverloaded(0);
|
||||
|
||||
// Set subcluster 0 as slow
|
||||
MiniDFSCluster dfsCluster = cluster.getCluster();
|
||||
NameNode nn0 = dfsCluster.getNameNode(0);
|
||||
simulateSlowNamenode(nn0, 1);
|
||||
|
||||
// Nobody should get overloaded, but it will be really slow
|
||||
testOverloaded(0);
|
||||
|
||||
// No rejected requests expected
|
||||
for (RouterContext router : cluster.getRouters()) {
|
||||
FederationRPCMetrics rpcMetrics =
|
||||
router.getRouter().getRpcServer().getRPCMetrics();
|
||||
assertEquals(0, rpcMetrics.getProxyOpFailureClientOverloaded());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOverloadControl() throws Exception {
|
||||
setupCluster(true);
|
||||
|
||||
List<RouterContext> routers = cluster.getRouters();
|
||||
FederationRPCMetrics rpcMetrics0 =
|
||||
routers.get(0).getRouter().getRpcServer().getRPCMetrics();
|
||||
FederationRPCMetrics rpcMetrics1 =
|
||||
routers.get(1).getRouter().getRpcServer().getRPCMetrics();
|
||||
|
||||
// Nobody should get overloaded
|
||||
testOverloaded(0);
|
||||
assertEquals(0, rpcMetrics0.getProxyOpFailureClientOverloaded());
|
||||
assertEquals(0, rpcMetrics1.getProxyOpFailureClientOverloaded());
|
||||
|
||||
// Set subcluster 0 as slow
|
||||
MiniDFSCluster dfsCluster = cluster.getCluster();
|
||||
NameNode nn0 = dfsCluster.getNameNode(0);
|
||||
simulateSlowNamenode(nn0, 1);
|
||||
|
||||
// The subcluster should be overloaded now and reject 4-5 requests
|
||||
testOverloaded(4, 6);
|
||||
assertTrue(rpcMetrics0.getProxyOpFailureClientOverloaded()
|
||||
+ rpcMetrics1.getProxyOpFailureClientOverloaded() >= 4);
|
||||
|
||||
// Client using HA with 2 Routers
|
||||
// A single Router gets overloaded, but 2 will handle it
|
||||
Configuration clientConf = cluster.getRouterClientConf();
|
||||
|
||||
// Each Router should get a similar number of ops (>=8) out of 2*10
|
||||
long iniProxyOps0 = rpcMetrics0.getProxyOps();
|
||||
long iniProxyOps1 = rpcMetrics1.getProxyOps();
|
||||
testOverloaded(0, 0, new URI("hdfs://fed/"), clientConf, 10);
|
||||
long proxyOps0 = rpcMetrics0.getProxyOps() - iniProxyOps0;
|
||||
long proxyOps1 = rpcMetrics1.getProxyOps() - iniProxyOps1;
|
||||
assertEquals(2 * 10, proxyOps0 + proxyOps1);
|
||||
assertTrue(proxyOps0 + " operations: not distributed", proxyOps0 >= 8);
|
||||
assertTrue(proxyOps1 + " operations: not distributed", proxyOps1 >= 8);
|
||||
}
|
||||
|
||||
private void testOverloaded(int expOverload) throws Exception {
|
||||
testOverloaded(expOverload, expOverload);
|
||||
}
|
||||
|
||||
private void testOverloaded(int expOverloadMin, int expOverloadMax)
|
||||
throws Exception {
|
||||
RouterContext routerContext = cluster.getRandomRouter();
|
||||
URI address = routerContext.getFileSystemURI();
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
testOverloaded(expOverloadMin, expOverloadMax, address, conf, 10);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the Router gets overloaded by submitting requests in parallel.
|
||||
* We check how many requests got rejected at the end.
|
||||
* @param expOverloadMin Min number of requests expected as overloaded.
|
||||
* @param expOverloadMax Max number of requests expected as overloaded.
|
||||
* @param address Destination address.
|
||||
* @param conf Configuration of the client.
|
||||
* @param numOps Number of operations to submit.
|
||||
* @throws Exception If it cannot perform the test.
|
||||
*/
|
||||
private void testOverloaded(int expOverloadMin, int expOverloadMax,
|
||||
final URI address, final Configuration conf, final int numOps)
|
||||
throws Exception {
|
||||
|
||||
// Submit renewLease() ops which go to all subclusters
|
||||
final AtomicInteger overloadException = new AtomicInteger();
|
||||
ExecutorService exec = Executors.newFixedThreadPool(numOps);
|
||||
List<Future<?>> futures = new ArrayList<>();
|
||||
for (int i = 0; i < numOps; i++) {
|
||||
// Stagger the operations a little (50ms)
|
||||
final int sleepTime = i * 50;
|
||||
Future<?> future = exec.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
DFSClient routerClient = null;
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
routerClient = new DFSClient(address, conf);
|
||||
String clientName = routerClient.getClientName();
|
||||
ClientProtocol routerProto = routerClient.getNamenode();
|
||||
routerProto.renewLease(clientName);
|
||||
} catch (RemoteException re) {
|
||||
IOException ioe = re.unwrapRemoteException();
|
||||
assertTrue("Wrong exception: " + ioe,
|
||||
ioe instanceof StandbyException);
|
||||
assertExceptionContains("is overloaded", ioe);
|
||||
overloadException.incrementAndGet();
|
||||
} catch (IOException e) {
|
||||
fail("Unexpected exception: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
fail("Cannot sleep: " + e);
|
||||
} finally {
|
||||
if (routerClient != null) {
|
||||
try {
|
||||
routerClient.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot close the client");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
futures.add(future);
|
||||
}
|
||||
// Wait until all the requests are done
|
||||
while (!futures.isEmpty()) {
|
||||
futures.remove(0).get();
|
||||
}
|
||||
exec.shutdown();
|
||||
|
||||
int num = overloadException.get();
|
||||
if (expOverloadMin == expOverloadMax) {
|
||||
assertEquals(expOverloadMin, num);
|
||||
} else {
|
||||
assertTrue("Expected >=" + expOverloadMin + " but was " + num,
|
||||
num >= expOverloadMin);
|
||||
assertTrue("Expected <=" + expOverloadMax + " but was " + num,
|
||||
num <= expOverloadMax);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,13 +17,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.apache.hadoop.test.GenericTestUtils.waitFor;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -44,13 +44,8 @@ import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
|||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.Whitebox;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.After;
|
||||
|
@ -58,10 +53,6 @@ import org.junit.Before;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
|
@ -70,9 +61,6 @@ import com.google.common.base.Supplier;
|
|||
*/
|
||||
public class TestRouterRPCClientRetries {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestRouterRPCClientRetries.class);
|
||||
|
||||
private static StateStoreDFSCluster cluster;
|
||||
private static NamenodeContext nnContext1;
|
||||
private static RouterContext routerContext;
|
||||
|
@ -144,7 +132,7 @@ public class TestRouterRPCClientRetries {
|
|||
fail("Should have thrown RemoteException error.");
|
||||
} catch (RemoteException e) {
|
||||
String ns0 = cluster.getNameservices().get(0);
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
assertExceptionContains(
|
||||
"No namenode available under nameservice " + ns0, e);
|
||||
}
|
||||
|
||||
|
@ -212,14 +200,14 @@ public class TestRouterRPCClientRetries {
|
|||
// Making subcluster0 slow to reply, should only get DNs from nn1
|
||||
MiniDFSCluster dfsCluster = cluster.getCluster();
|
||||
NameNode nn0 = dfsCluster.getNameNode(0);
|
||||
simulateNNSlow(nn0);
|
||||
simulateSlowNamenode(nn0, 3);
|
||||
waitUpdateLiveNodes(jsonString2, metrics);
|
||||
final String jsonString3 = metrics.getLiveNodes();
|
||||
assertEquals(2, getNumDatanodes(jsonString3));
|
||||
|
||||
// Making subcluster1 slow to reply, shouldn't get any DNs
|
||||
NameNode nn1 = dfsCluster.getNameNode(1);
|
||||
simulateNNSlow(nn1);
|
||||
simulateSlowNamenode(nn1, 3);
|
||||
waitUpdateLiveNodes(jsonString3, metrics);
|
||||
final String jsonString4 = metrics.getLiveNodes();
|
||||
assertEquals(0, getNumDatanodes(jsonString4));
|
||||
|
@ -249,36 +237,11 @@ public class TestRouterRPCClientRetries {
|
|||
private static void waitUpdateLiveNodes(
|
||||
final String oldValue, final NamenodeBeanMetrics metrics)
|
||||
throws Exception {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return !oldValue.equals(metrics.getLiveNodes());
|
||||
}
|
||||
}, 500, 5 * 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulate that a Namenode is slow by adding a sleep to the check operation
|
||||
* in the NN.
|
||||
* @param nn Namenode to simulate slow.
|
||||
* @throws Exception If we cannot add the sleep time.
|
||||
*/
|
||||
private static void simulateNNSlow(final NameNode nn) throws Exception {
|
||||
FSNamesystem namesystem = nn.getNamesystem();
|
||||
HAContext haContext = namesystem.getHAContext();
|
||||
HAContext spyHAContext = spy(haContext);
|
||||
doAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
LOG.info("Simulating slow namenode {}", invocation.getMock());
|
||||
try {
|
||||
Thread.sleep(3 * 1000);
|
||||
} catch(InterruptedException e) {
|
||||
LOG.error("Simulating a slow namenode aborted");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
|
||||
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
|
@ -187,7 +188,7 @@ public class TestRouterSafemode {
|
|||
try {
|
||||
router.getRpcServer().delete("/testfile.txt", true);
|
||||
fail("We should have thrown a safe mode exception");
|
||||
} catch (RouterSafeModeException sme) {
|
||||
} catch (StandbyException sme) {
|
||||
exception = true;
|
||||
}
|
||||
assertTrue("We should have thrown a safe mode exception", exception);
|
||||
|
|
Loading…
Reference in New Issue