HDFS-14090. RBF: Improved isolation for downstream name nodes. {Static}. Contributed by Fengnan Li and CR Hota.

This commit is contained in:
Ayush Saxena 2020-12-04 22:55:48 +05:30
parent e2c1268ebd
commit 7dda804a1a
15 changed files with 940 additions and 7 deletions

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base fairness policy that implements @RouterRpcFairnessPolicyController.
* Internally a map of nameservice to Semaphore is used to control permits.
*/
public class AbstractRouterRpcFairnessPolicyController
implements RouterRpcFairnessPolicyController {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);
/** Hash table to hold semaphore for each configured name service. */
private Map<String, Semaphore> permits;
public void init(Configuration conf) {
this.permits = new HashMap<>();
}
@Override
public boolean acquirePermit(String nsId) {
try {
LOG.debug("Taking lock for nameservice {}", nsId);
return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.debug("Cannot get a permit for nameservice {}", nsId);
}
return false;
}
@Override
public void releasePermit(String nsId) {
this.permits.get(nsId).release();
}
@Override
public void shutdown() {
// drain all semaphores
for (Semaphore sema: this.permits.values()) {
sema.drainPermits();
}
}
protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
this.permits.put(nsId, new Semaphore(maxPermits));
}
protected int getAvailablePermits(String nsId) {
return this.permits.get(nsId).availablePermits();
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import org.apache.hadoop.conf.Configuration;
/**
* A pass through fairness policy that implements
* {@link RouterRpcFairnessPolicyController} and allows any number
* of handlers to connect to any specific downstream name service.
*/
public class NoRouterRpcFairnessPolicyController implements
RouterRpcFairnessPolicyController {
public NoRouterRpcFairnessPolicyController(Configuration conf) {
// Dummy constructor.
}
@Override
public boolean acquirePermit(String nsId) {
return true;
}
@Override
public void releasePermit(String nsId) {
// Dummy, pass through.
}
@Override
public void shutdown() {
// Nothing for now.
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
public class RouterRpcFairnessConstants {
/** Name service keyword to identify fan-out calls. */
public static final String CONCURRENT_NS = "concurrent";
/* Hidden constructor */
protected RouterRpcFairnessConstants() {
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Interface to define handlers assignment for specific name services.
* This is needed to allow handlers provide a certain QoS for all name services
* configured. Implementations can choose to define any algorithm which
* would help maintain QoS. This is different when compared to FairCallQueue
* semantics as fairness has a separate context in router based federation.
* An implementation for example, could allocate a dedicated set of handlers
* per name service and allow handlers to continue making downstream name
* node calls if permissions are available, another implementation could use
* preemption semantics and dynamically increase or decrease handlers
* assigned per name service.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface RouterRpcFairnessPolicyController {
/**
* Request permission for a specific name service to continue the call and
* connect to downstream name node. Controllers based on policies defined
* and allocations done at start-up through assignHandlersToNameservices,
* may provide a permission or reject the request by throwing exception.
*
* @param nsId NS id for which a permission to continue is requested.
* @return true or false based on whether permit is given.
*/
boolean acquirePermit(String nsId);
/**
* Handler threads are expected to invoke this method that signals
* controller to release the resources allocated to the thread for the
* particular name service. This would mean permissions getting available
* for other handlers to request for this specific name service.
*
* @param nsId Name service id for which permission release request is made.
*/
void releasePermit(String nsId);
/**
* Shutdown steps to stop accepting new permission requests and clean-up.
*/
void shutdown();
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.HashSet;
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
/**
* Static fairness policy extending @AbstractRouterRpcFairnessPolicyController
* and fetching handlers from configuration for all available name services.
* The handlers count will not change for this controller.
*/
public class StaticRouterRpcFairnessPolicyController extends
AbstractRouterRpcFairnessPolicyController {
private static final Logger LOG =
LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class);
public StaticRouterRpcFairnessPolicyController(Configuration conf) {
init(conf);
}
public void init(Configuration conf)
throws IllegalArgumentException {
super.init(conf);
// Total handlers configured to process all incoming Rpc.
int handlerCount = conf.getInt(
DFS_ROUTER_HANDLER_COUNT_KEY,
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
LOG.info("Handlers available for fairness assignment {} ", handlerCount);
// Get all name services configured
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(conf);
// Set to hold name services that are not
// configured with dedicated handlers.
Set<String> unassignedNS = new HashSet<>();
// Insert the concurrent nameservice into the set to process together
allConfiguredNS.add(CONCURRENT_NS);
for (String nsId : allConfiguredNS) {
int dedicatedHandlers =
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers > 0) {
handlerCount -= dedicatedHandlers;
// Total handlers should not be less than sum of dedicated
// handlers.
validateCount(nsId, handlerCount, 0);
insertNameServiceWithPermits(nsId, dedicatedHandlers);
logAssignment(nsId, dedicatedHandlers);
} else {
unassignedNS.add(nsId);
}
}
// Assign remaining handlers equally to remaining name services and
// general pool if applicable.
if (!unassignedNS.isEmpty()) {
LOG.info("Unassigned ns {}", unassignedNS.toString());
int handlersPerNS = handlerCount / unassignedNS.size();
LOG.info("Handlers available per ns {}", handlersPerNS);
for (String nsId : unassignedNS) {
// Each NS should have at least one handler assigned.
validateCount(nsId, handlersPerNS, 1);
insertNameServiceWithPermits(nsId, handlersPerNS);
logAssignment(nsId, handlersPerNS);
}
}
// Assign remaining handlers if any to fan out calls.
int leftOverHandlers = handlerCount % unassignedNS.size();
int existingPermits = getAvailablePermits(CONCURRENT_NS);
if (leftOverHandlers > 0) {
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
insertNameServiceWithPermits(CONCURRENT_NS,
existingPermits + leftOverHandlers);
}
LOG.info("Final permit allocation for concurrent ns: {}",
getAvailablePermits(CONCURRENT_NS));
}
private static void logAssignment(String nsId, int count) {
LOG.info("Assigned {} handlers to nsId {} ",
count, nsId);
}
private static void validateCount(String nsId, int handlers, int min) throws
IllegalArgumentException {
if (handlers < min) {
String msg =
"Available handlers " + handlers +
" lower than min " + min +
" for nsId " + nsId;
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Includes router handlers fairness manager and policy implementations.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
package org.apache.hadoop.hdfs.server.federation.fairness;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -99,4 +99,10 @@ public interface FederationRPCMBean {
* @return JSON string representation of the async caller thread pool.
*/
String getAsyncCallerPool();
/**
* Get the number of operations rejected due to lack of permits.
* @return Number of operations rejected due to lack of permits.
*/
long getProxyOpPermitRejected();
}

View File

@ -72,6 +72,9 @@ public class FederationRPCMetrics implements FederationRPCMBean {
@Metric("Failed requests due to safe mode")
private MutableCounterLong routerFailureSafemode;
@Metric("Number of operations to hit permit limits")
private MutableCounterLong proxyOpPermitRejected;
public FederationRPCMetrics(Configuration conf, RouterRpcServer rpcServer) {
this.rpcServer = rpcServer;
@ -264,4 +267,13 @@ public class FederationRPCMetrics implements FederationRPCMBean {
public long getProcessingOps() {
return processingOp.value();
}
public void incrProxyOpPermitRejected() {
proxyOpPermitRejected.incr();
}
@Override
public long getProxyOpPermitRejected() {
return proxyOpPermitRejected.value();
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -24,12 +26,16 @@ import java.io.InputStreamReader;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.net.URLConnection;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@ -248,4 +254,51 @@ public final class FederationUtil {
.storagePolicy(dirStatus.getStoragePolicy())
.symlink(dirStatus.getSymlinkInBytes()).flags(flags).build();
}
/**
* Creates an instance of an RouterRpcFairnessPolicyController
* from the configuration.
*
* @param conf Configuration that defines the fairness controller class.
* @return Fairness policy controller.
*/
public static RouterRpcFairnessPolicyController newFairnessPolicyController(
Configuration conf) {
Class<? extends RouterRpcFairnessPolicyController> clazz = conf.getClass(
RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS_DEFAULT,
RouterRpcFairnessPolicyController.class);
return newInstance(conf, null, null, clazz);
}
/**
* Collect all configured nameservices.
*
* @param conf
* @return Set of name services in config
* @throws IllegalArgumentException
*/
public static Set<String> getAllConfiguredNS(Configuration conf)
throws IllegalArgumentException {
// Get all name services configured
Collection<String> namenodes = conf.getTrimmedStringCollection(
DFS_ROUTER_MONITOR_NAMENODE);
Set<String> nameservices = new HashSet();
for (String namenode : namenodes) {
String[] namenodeSplit = namenode.split("\\.");
String nsId;
if (namenodeSplit.length == 2) {
nsId = namenodeSplit[0];
} else if (namenodeSplit.length == 1) {
nsId = namenode;
} else {
String errorMsg = "Wrong name service specified : " + namenode;
throw new IllegalArgumentException(
errorMsg);
}
nameservices.add(nsId);
}
return nameservices;
}
}

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@ -334,4 +336,16 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final Class<? extends AbstractDelegationTokenSecretManager>
DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS_DEFAULT =
ZKDelegationTokenSecretManagerImpl.class;
// HDFS Router fairness
public static final String FEDERATION_ROUTER_FAIRNESS_PREFIX =
FEDERATION_ROUTER_PREFIX + "fairness.";
public static final String
DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "policy.controller.class";
public static final Class<? extends RouterRpcFairnessPolicyController>
DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS_DEFAULT =
NoRouterRpcFairnessPolicyController.class;
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
}

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_C
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import java.io.EOFException;
import java.io.FileNotFoundException;
@ -61,6 +62,8 @@ import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.federation.fairness.AbstractRouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
@ -129,6 +132,9 @@ public class RouterRpcClient {
private static final String CLIENT_IP_STR = "clientIp";
/** Fairness manager to control handlers assigned per NS. */
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
/**
* Create a router RPC client to manage remote procedure calls to NNs.
*
@ -149,6 +155,8 @@ public class RouterRpcClient {
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
this.connectionManager = new ConnectionManager(clientConf);
this.connectionManager.start();
this.routerRpcFairnessPolicyController =
FederationUtil.newFairnessPolicyController(conf);
int numThreads = conf.getInt(
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
@ -229,6 +237,9 @@ public class RouterRpcClient {
if (this.executorService != null) {
this.executorService.shutdownNow();
}
if (this.routerRpcFairnessPolicyController != null) {
this.routerRpcFairnessPolicyController.shutdown();
}
}
/**
@ -770,13 +781,18 @@ public class RouterRpcClient {
public Object invokeSingle(final String nsId, RemoteMethod method)
throws IOException {
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
List<? extends FederationNamenodeContext> nns =
getNamenodesForNameservice(nsId);
RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
Class<?> proto = method.getProtocol();
Method m = method.getMethod();
Object[] params = method.getParams(loc);
return invokeMethod(ugi, nns, proto, m, params);
acquirePermit(nsId, ugi, method);
try {
List<? extends FederationNamenodeContext> nns =
getNamenodesForNameservice(nsId);
RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
Class<?> proto = method.getProtocol();
Method m = method.getMethod();
Object[] params = method.getParams(loc);
return invokeMethod(ugi, nns, proto, m, params);
} finally {
releasePermit(nsId, ugi, method);
}
}
/**
@ -933,6 +949,7 @@ public class RouterRpcClient {
// Invoke in priority order
for (final RemoteLocationContext loc : locations) {
String ns = loc.getNameserviceId();
acquirePermit(ns, ugi, remoteMethod);
List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
@ -966,6 +983,8 @@ public class RouterRpcClient {
IOException ioe = new IOException(
"Unexpected exception proxying API " + e.getMessage(), e);
thrownExceptions.add(ioe);
} finally {
releasePermit(ns, ugi, remoteMethod);
}
}
@ -1298,6 +1317,7 @@ public class RouterRpcClient {
// Shortcut, just one call
T location = locations.iterator().next();
String ns = location.getNameserviceId();
acquirePermit(ns, ugi, method);
final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
@ -1309,6 +1329,8 @@ public class RouterRpcClient {
} catch (IOException ioe) {
// Localize the exception
throw processException(ioe, location);
} finally {
releasePermit(ns, ugi, method);
}
}
@ -1355,6 +1377,7 @@ public class RouterRpcClient {
rpcMonitor.proxyOp();
}
acquirePermit(CONCURRENT_NS, ugi, method);
try {
List<Future<Object>> futures = null;
if (timeOutMs > 0) {
@ -1411,6 +1434,8 @@ public class RouterRpcClient {
LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
throw new IOException(
"Unexpected error while invoking API " + ex.getMessage(), ex);
} finally {
releasePermit(CONCURRENT_NS, ugi, method);
}
}
@ -1484,4 +1509,55 @@ public class RouterRpcClient {
FederationNamenodeContext namenode = namenodes.get(0);
return namenode.getNameserviceId();
}
/**
* Acquire permit to continue processing the request for specific nsId.
*
* @param nsId Identifier of the block pool.
* @param ugi UserGroupIdentifier associated with the user.
* @param m Remote method that needs to be invoked.
* @throws IOException If permit could not be acquired for the nsId.
*/
private void acquirePermit(
final String nsId, final UserGroupInformation ugi, final RemoteMethod m)
throws IOException {
if (routerRpcFairnessPolicyController != null
&& !routerRpcFairnessPolicyController.acquirePermit(nsId)) {
// Throw StandByException,
// Clients could fail over and try another router.
if (rpcMonitor != null) {
rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected();
}
LOG.debug("Permit denied for ugi: {} for method: {}",
ugi, m.getMethodName());
String msg =
"Router " + router.getRouterId() +
" is overloaded for NS: " + nsId;
throw new StandbyException(msg);
}
}
/**
* Release permit for specific nsId after processing against downstream
* nsId is completed.
*
* @param nsId Identifier of the block pool.
* @param ugi UserGroupIdentifier associated with the user.
* @param m Remote method that needs to be invoked.
*/
private void releasePermit(
final String nsId, final UserGroupInformation ugi, final RemoteMethod m) {
if (routerRpcFairnessPolicyController != null) {
routerRpcFairnessPolicyController.releasePermit(nsId);
LOG.trace("Permit released for ugi: {} for method: {}", ugi,
m.getMethodName());
}
}
@VisibleForTesting
public AbstractRouterRpcFairnessPolicyController
getRouterRpcFairnessPolicyController() {
return (AbstractRouterRpcFairnessPolicyController
)routerRpcFairnessPolicyController;
}
}

View File

@ -677,4 +677,29 @@
</description>
</property>
<property>
<name>dfs.federation.router.fairness.policy.controller.class</name>
<value>org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController</value>
<description>
No fairness policy handler by default, for fairness
StaticFairnessPolicyController should be configured.
</description>
</property>
<property>
<name>dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE</name>
<value></value>
<description>
Dedicated handler count for nameservice EXAMPLENAMESERVICE. The handler
(configed by dfs.federation.router.handler.count)resource is controlled
internally by Semaphore permits. Two requirements have to be satisfied.
1) all downstream nameservices need this config otherwise no permit will
be given thus not proxy will happen. 2) if a special *concurrent*
nameservice is specified, the sum of all configured values is smaller or
equal to the total number of router handlers; if the special *concurrent*
is not specified, the sum of all configured values must be strictly
smaller than the router handlers thus the left will be allocated to the
concurrent calls.
</description>
</property>
</configuration>

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the Router handlers fairness control rejects
* requests when the handlers are overloaded.
*/
public class TestRouterHandlersFairness {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterHandlersFairness.class);
private StateStoreDFSCluster cluster;
@After
public void cleanup() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private void setupCluster(boolean fairnessEnable, boolean ha)
throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(ha, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.rpc()
.build();
// Fairness control
if (fairnessEnable) {
routerConf.setClass(
RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
StaticRouterRpcFairnessPolicyController.class,
RouterRpcFairnessPolicyController.class);
}
// With two name services configured, each nameservice has 1 permit and
// fan-out calls have 1 permit.
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 3);
// Datanodes not needed for this test.
cluster.setNumDatanodesPerNameservice(0);
cluster.addRouterOverrides(routerConf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
}
@Test
public void testFairnessControlOff() throws Exception {
setupCluster(false, false);
startLoadTest(false);
}
@Test
public void testFairnessControlOn() throws Exception {
setupCluster(true, false);
startLoadTest(true);
}
/**
* Start a generic load test as a client against a cluster which has either
* fairness configured or not configured. Test will spawn a set of 100
* threads to simulate concurrent request to test routers. If fairness is
* enabled, the test will successfully report the failure of some threads
* to continue with StandbyException. If fairness is not configured, all
* threads of the same test should successfully complete all the rpcs.
*
* @param fairness Flag to indicate if fairness management is on/off.
* @throws Exception Throws exception.
*/
private void startLoadTest(boolean fairness)
throws Exception {
// Concurrent requests
startLoadTest(true, fairness);
// Sequential requests
startLoadTest(false, fairness);
}
private void startLoadTest(final boolean isConcurrent, final boolean fairness)
throws Exception {
RouterContext routerContext = cluster.getRandomRouter();
if (fairness) {
if (isConcurrent) {
LOG.info("Taking fanout lock first");
// take the lock for concurrent NS to block fanout calls
assertTrue(routerContext.getRouter().getRpcServer()
.getRPCClient().getRouterRpcFairnessPolicyController()
.acquirePermit(RouterRpcFairnessConstants.CONCURRENT_NS));
} else {
for (String ns : cluster.getNameservices()) {
LOG.info("Taking lock first for ns: {}", ns);
assertTrue(routerContext.getRouter().getRpcServer()
.getRPCClient().getRouterRpcFairnessPolicyController()
.acquirePermit(ns));
}
}
}
URI address = routerContext.getFileSystemURI();
Configuration conf = new HdfsConfiguration();
final int numOps = 10;
final AtomicInteger overloadException = new AtomicInteger();
for (int i = 0; i < numOps; i++) {
DFSClient routerClient = null;
try {
routerClient = new DFSClient(address, conf);
String clientName = routerClient.getClientName();
ClientProtocol routerProto = routerClient.getNamenode();
if (isConcurrent) {
invokeConcurrent(routerProto, clientName);
} else {
invokeSequential(routerProto);
}
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException();
assertTrue("Wrong exception: " + ioe,
ioe instanceof StandbyException);
assertExceptionContains("is overloaded for NS", ioe);
overloadException.incrementAndGet();
} catch (Throwable e) {
throw e;
} finally {
if (routerClient != null) {
try {
routerClient.close();
} catch (IOException e) {
LOG.error("Cannot close the client");
}
}
}
overloadException.get();
}
if (fairness) {
assertTrue(overloadException.get() > 0);
if (isConcurrent) {
LOG.info("Release fanout lock that was taken before test");
// take the lock for concurrent NS to block fanout calls
routerContext.getRouter().getRpcServer()
.getRPCClient().getRouterRpcFairnessPolicyController()
.releasePermit(RouterRpcFairnessConstants.CONCURRENT_NS);
} else {
for (String ns : cluster.getNameservices()) {
routerContext.getRouter().getRpcServer()
.getRPCClient().getRouterRpcFairnessPolicyController()
.releasePermit(ns);
}
}
} else {
assertEquals("Number of failed RPCs without fairness configured",
0, overloadException.get());
}
}
private void invokeSequential(ClientProtocol routerProto) throws IOException {
routerProto.getFileInfo("/test.txt");
}
private void invokeConcurrent(ClientProtocol routerProto, String clientName)
throws IOException {
routerProto.renewLease(clientName);
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test functionality of {@link RouterRpcFairnessPolicyController).
*/
public class TestRouterRpcFairnessPolicyController {
private static String nameServices =
"ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2";
@Test
public void testHandlerAllocationEqualAssignment() {
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
= getFairnessPolicyController(30);
verifyHandlerAllocation(routerRpcFairnessPolicyController);
}
@Test
public void testHandlerAllocationWithLeftOverHandler() {
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
= getFairnessPolicyController(31);
// One extra handler should be allocated to commons.
assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
verifyHandlerAllocation(routerRpcFairnessPolicyController);
}
@Test
public void testHandlerAllocationPreconfigured() {
Configuration conf = createConf(40);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30);
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
FederationUtil.newFairnessPolicyController(conf);
// ns1 should have 30 permits allocated
for (int i=0; i<30; i++) {
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
}
// ns2 should have 5 permits.
// concurrent should have 5 permits.
for (int i=0; i<5; i++) {
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
assertTrue(
routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
}
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2"));
assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
}
@Test
public void testAllocationErrorWithZeroHandlers() {
Configuration conf = createConf(0);
verifyInstantiationError(conf);
}
@Test
public void testAllocationErrorForLowDefaultHandlers() {
Configuration conf = createConf(1);
verifyInstantiationError(conf);
}
@Test
public void testAllocationErrorForLowDefaultHandlersPerNS() {
Configuration conf = createConf(1);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "concurrent", 1);
verifyInstantiationError(conf);
}
@Test
public void testAllocationErrorForLowPreconfiguredHandlers() {
Configuration conf = createConf(1);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 2);
verifyInstantiationError(conf);
}
private void verifyInstantiationError(Configuration conf) {
GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer
.captureLogs(LoggerFactory.getLogger(
StaticRouterRpcFairnessPolicyController.class));
try {
FederationUtil.newFairnessPolicyController(conf);
} catch (IllegalArgumentException e) {
// Ignore the exception as it is expected here.
}
assertTrue("Should contain error message",
logs.getOutput().contains("lower than min"));
}
private RouterRpcFairnessPolicyController getFairnessPolicyController(
int handlers) {
return FederationUtil.newFairnessPolicyController(createConf(handlers));
}
private void verifyHandlerAllocation(
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController) {
for (int i=0; i<10; i++) {
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
assertTrue(
routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
}
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2"));
assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
routerRpcFairnessPolicyController.releasePermit("ns1");
routerRpcFairnessPolicyController.releasePermit("ns2");
routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS);
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
}
private Configuration createConf(int handlers) {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers);
conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices);
conf.setClass(
RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
StaticRouterRpcFairnessPolicyController.class,
RouterRpcFairnessPolicyController.class);
return conf;
}
}

View File

@ -47,5 +47,7 @@ public class TestRBFConfigFields extends TestConfigurationFieldsBase {
// Allocate
xmlPropsToSkipCompare = new HashSet<String>();
xmlPrefixToSkipCompare = new HashSet<String>();
xmlPrefixToSkipCompare.add(
RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX);
}
}