HDFS-14750. RBF: Support dynamic handler allocation in routers (#4199)

This commit is contained in:
Felix Nguyen 2022-05-13 14:28:53 +08:00 committed by GitHub
parent 5e2f4339fa
commit 680af27aa6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 579 additions and 11 deletions

View File

@ -22,10 +22,13 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -40,7 +43,10 @@ public class AbstractRouterRpcFairnessPolicyController
LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class); LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);
/** Hash table to hold semaphore for each configured name service. */ /** Hash table to hold semaphore for each configured name service. */
private Map<String, Semaphore> permits; private Map<String, AdjustableSemaphore> permits;
private final Map<String, Integer> permitSizes = new HashMap<>();
private Map<String, LongAdder> rejectedPermitsPerNs;
private Map<String, LongAdder> acceptedPermitsPerNs;
public void init(Configuration conf) { public void init(Configuration conf) {
this.permits = new HashMap<>(); this.permits = new HashMap<>();
@ -72,7 +78,7 @@ public class AbstractRouterRpcFairnessPolicyController
} }
protected void insertNameServiceWithPermits(String nsId, int maxPermits) { protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
this.permits.put(nsId, new Semaphore(maxPermits)); this.permits.put(nsId, new AdjustableSemaphore(maxPermits));
} }
protected int getAvailablePermits(String nsId) { protected int getAvailablePermits(String nsId) {
@ -82,7 +88,7 @@ public class AbstractRouterRpcFairnessPolicyController
@Override @Override
public String getAvailableHandlerOnPerNs() { public String getAvailableHandlerOnPerNs() {
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
for (Map.Entry<String, Semaphore> entry : permits.entrySet()) { for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
try { try {
String nsId = entry.getKey(); String nsId = entry.getKey();
int availableHandler = entry.getValue().availablePermits(); int availableHandler = entry.getValue().availablePermits();
@ -93,4 +99,39 @@ public class AbstractRouterRpcFairnessPolicyController
} }
return json.toString(); return json.toString();
} }
@Override
public String getPermitCapacityPerNs() {
JSONObject json = new JSONObject();
for (Map.Entry<String, Integer> entry : permitSizes.entrySet()) {
try {
json.put(entry.getKey(), entry.getValue());
} catch (JSONException e) {
LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e);
}
}
return json.toString();
}
@Override
public void setMetrics(Map<String, LongAdder> rejectedPermits,
Map<String, LongAdder> acceptedPermits) {
this.rejectedPermitsPerNs = rejectedPermits;
this.acceptedPermitsPerNs = acceptedPermits;
}
protected Map<String, AdjustableSemaphore> getPermits() {
return permits;
}
public Map<String, LongAdder> getRejectedPermitsPerNs() {
return rejectedPermitsPerNs;
}
public Map<String, LongAdder> getAcceptedPermitsPerNs() {
return acceptedPermitsPerNs;
}
protected Map<String, Integer> getPermitSizes() {
return permitSizes;
}
} }

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
/**
* Dynamic fairness policy extending {@link StaticRouterRpcFairnessPolicyController}
* and fetching handlers from configuration for all available name services.
* The handlers count changes according to traffic to namespaces.
* Total handlers might NOT strictly add up to the value defined by DFS_ROUTER_HANDLER_COUNT_KEY
* but will not exceed initial handler count + number of nameservices.
*/
public class DynamicRouterRpcFairnessPolicyController
extends StaticRouterRpcFairnessPolicyController {
private static final Logger LOG =
LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class);
private static final ScheduledExecutorService SCHEDULED_EXECUTOR = HadoopExecutors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build());
private PermitsResizerService permitsResizerService;
private ScheduledFuture<?> refreshTask;
private int handlerCount;
private int minimumHandlerPerNs;
/**
* Initializes using the same logic as {@link StaticRouterRpcFairnessPolicyController}
* and starts a periodic semaphore resizer thread.
*
* @param conf configuration
*/
public DynamicRouterRpcFairnessPolicyController(Configuration conf) {
super(conf);
minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
long refreshInterval =
conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY,
DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT,
TimeUnit.SECONDS);
permitsResizerService = new PermitsResizerService();
refreshTask = SCHEDULED_EXECUTOR
.scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
TimeUnit.SECONDS);
}
@VisibleForTesting
public DynamicRouterRpcFairnessPolicyController(Configuration conf, long refreshInterval) {
super(conf);
minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
permitsResizerService = new PermitsResizerService();
refreshTask = SCHEDULED_EXECUTOR
.scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
TimeUnit.SECONDS);
}
@VisibleForTesting
public void refreshPermitsCap() {
permitsResizerService.run();
}
@Override
public void shutdown() {
super.shutdown();
if (refreshTask != null) {
refreshTask.cancel(true);
}
if (SCHEDULED_EXECUTOR != null) {
SCHEDULED_EXECUTOR.shutdown();
}
}
class PermitsResizerService implements Runnable {
@Override
public synchronized void run() {
if ((getRejectedPermitsPerNs() == null) || (getAcceptedPermitsPerNs() == null)) {
return;
}
long totalOps = 0;
Map<String, Long> nsOps = new HashMap<>();
for (Map.Entry<String, AdjustableSemaphore> entry : getPermits().entrySet()) {
long ops = (getRejectedPermitsPerNs().containsKey(entry.getKey()) ?
getRejectedPermitsPerNs().get(entry.getKey()).longValue() :
0) + (getAcceptedPermitsPerNs().containsKey(entry.getKey()) ?
getAcceptedPermitsPerNs().get(entry.getKey()).longValue() :
0);
nsOps.put(entry.getKey(), ops);
totalOps += ops;
}
List<String> underMinimumNss = new ArrayList<>();
List<String> overMinimumNss = new ArrayList<>();
int effectiveOps = 0;
// First iteration: split namespaces into those underused and those that are not.
for (Map.Entry<String, AdjustableSemaphore> entry : getPermits().entrySet()) {
String ns = entry.getKey();
int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps * handlerCount);
if (newPermitCap <= minimumHandlerPerNs) {
underMinimumNss.add(ns);
} else {
overMinimumNss.add(ns);
effectiveOps += nsOps.get(ns);
}
}
// Second iteration part 1: assign minimum handlers
for (String ns: underMinimumNss) {
resizeNsHandlerCapacity(ns, minimumHandlerPerNs);
}
// Second iteration part 2: assign handlers to the rest
int leftoverPermits = handlerCount - minimumHandlerPerNs * underMinimumNss.size();
for (String ns: overMinimumNss) {
int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / effectiveOps * leftoverPermits);
resizeNsHandlerCapacity(ns, newPermitCap);
}
}
private void resizeNsHandlerCapacity(String ns, int newPermitCap) {
AdjustableSemaphore semaphore = getPermits().get(ns);
int oldPermitCap = getPermitSizes().get(ns);
if (newPermitCap <= minimumHandlerPerNs) {
newPermitCap = minimumHandlerPerNs;
}
getPermitSizes().put(ns, newPermitCap);
if (newPermitCap > oldPermitCap) {
semaphore.release(newPermitCap - oldPermitCap);
} else if (newPermitCap < oldPermitCap) {
semaphore.reducePermits(oldPermitCap - newPermitCap);
}
LOG.info("Resized handlers for nsId {} from {} to {}", ns, oldPermitCap, newPermitCap);
}
}
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.federation.fairness; package org.apache.hadoop.hdfs.server.federation.fairness;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
/** /**
@ -51,4 +54,15 @@ public class NoRouterRpcFairnessPolicyController implements
public String getAvailableHandlerOnPerNs(){ public String getAvailableHandlerOnPerNs(){
return "N/A"; return "N/A";
} }
@Override
public String getPermitCapacityPerNs() {
return "N/A";
}
@Override
public void setMetrics(Map<String, LongAdder> rejectedPermitsPerNs,
Map<String, LongAdder> acceptedPermitsPerNs) {
// Nothing
}
} }

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.federation.fairness; package org.apache.hadoop.hdfs.server.federation.fairness;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -67,4 +70,15 @@ public interface RouterRpcFairnessPolicyController {
* Returns the JSON string of the available handler for each Ns. * Returns the JSON string of the available handler for each Ns.
*/ */
String getAvailableHandlerOnPerNs(); String getAvailableHandlerOnPerNs();
/**
* Returns the JSON string of the max handler count for each ns.
*/
String getPermitCapacityPerNs();
/**
* Attaches permits access metrics to the controller.
*/
void setMetrics(Map<String, LongAdder> rejectedPermitsPerNs,
Map<String, LongAdder> acceptedPermitsPerNs);
} }

View File

@ -27,6 +27,8 @@ import java.util.Set;
import java.util.HashSet; import java.util.HashSet;
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
@ -45,6 +47,9 @@ public class StaticRouterRpcFairnessPolicyController extends
public static final String ERROR_MSG = "Configured handlers " public static final String ERROR_MSG = "Configured handlers "
+ DFS_ROUTER_HANDLER_COUNT_KEY + '=' + DFS_ROUTER_HANDLER_COUNT_KEY + '='
+ " %d is less than the minimum required handlers %d"; + " %d is less than the minimum required handlers %d";
public static final String ERROR_NS_MSG =
"Configured handlers %s=%d is less than the minimum required handlers %d";
public StaticRouterRpcFairnessPolicyController(Configuration conf) { public StaticRouterRpcFairnessPolicyController(Configuration conf) {
init(conf); init(conf);
@ -78,6 +83,7 @@ public class StaticRouterRpcFairnessPolicyController extends
handlerCount -= dedicatedHandlers; handlerCount -= dedicatedHandlers;
insertNameServiceWithPermits(nsId, dedicatedHandlers); insertNameServiceWithPermits(nsId, dedicatedHandlers);
logAssignment(nsId, dedicatedHandlers); logAssignment(nsId, dedicatedHandlers);
getPermitSizes().put(nsId, dedicatedHandlers);
} else { } else {
unassignedNS.add(nsId); unassignedNS.add(nsId);
} }
@ -92,6 +98,7 @@ public class StaticRouterRpcFairnessPolicyController extends
for (String nsId : unassignedNS) { for (String nsId : unassignedNS) {
insertNameServiceWithPermits(nsId, handlersPerNS); insertNameServiceWithPermits(nsId, handlersPerNS);
logAssignment(nsId, handlersPerNS); logAssignment(nsId, handlersPerNS);
getPermitSizes().put(nsId, handlersPerNS);
} }
} }
@ -103,6 +110,7 @@ public class StaticRouterRpcFairnessPolicyController extends
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers); LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
insertNameServiceWithPermits(CONCURRENT_NS, insertNameServiceWithPermits(CONCURRENT_NS,
existingPermits + leftOverHandlers); existingPermits + leftOverHandlers);
getPermitSizes().put(CONCURRENT_NS, existingPermits + leftOverHandlers);
} }
LOG.info("Final permit allocation for concurrent ns: {}", LOG.info("Final permit allocation for concurrent ns: {}",
getAvailablePermits(CONCURRENT_NS)); getAvailablePermits(CONCURRENT_NS));
@ -116,15 +124,23 @@ public class StaticRouterRpcFairnessPolicyController extends
private void validateHandlersCount(Configuration conf, int handlerCount, private void validateHandlersCount(Configuration conf, int handlerCount,
Set<String> allConfiguredNS) { Set<String> allConfiguredNS) {
int totalDedicatedHandlers = 0; int totalDedicatedHandlers = 0;
int minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
for (String nsId : allConfiguredNS) { for (String nsId : allConfiguredNS) {
int dedicatedHandlers = int dedicatedHandlers =
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
if (dedicatedHandlers > 0) { if (dedicatedHandlers > 0) {
if (dedicatedHandlers < minimumHandlerPerNs) {
String msg = String.format(ERROR_NS_MSG, DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId,
handlerCount, minimumHandlerPerNs);
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
// Total handlers should not be less than sum of dedicated handlers. // Total handlers should not be less than sum of dedicated handlers.
totalDedicatedHandlers += dedicatedHandlers; totalDedicatedHandlers += dedicatedHandlers;
} else { } else {
// Each NS should have at least one handler assigned. // Each NS has to have a minimum number of handlers assigned.
totalDedicatedHandlers++; totalDedicatedHandlers += minimumHandlerPerNs;
} }
} }
if (totalDedicatedHandlers > handlerCount) { if (totalDedicatedHandlers > handlerCount) {
@ -134,5 +150,4 @@ public class StaticRouterRpcFairnessPolicyController extends
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);
} }
} }
} }

View File

@ -115,6 +115,12 @@ public interface FederationRPCMBean {
*/ */
String getAvailableHandlerOnPerNs(); String getAvailableHandlerOnPerNs();
/**
* JSON representation of max handler count per ns.
* @return JSON string representation.
*/
String getPermitCapacityPerNs();
/** /**
* Get the JSON representation of the async caller thread pool. * Get the JSON representation of the async caller thread pool.
* @return JSON string representation of the async caller thread pool. * @return JSON string representation of the async caller thread pool.

View File

@ -247,6 +247,11 @@ public class FederationRPCMetrics implements FederationRPCMBean {
getRouterRpcFairnessPolicyController().getAvailableHandlerOnPerNs(); getRouterRpcFairnessPolicyController().getAvailableHandlerOnPerNs();
} }
@Override
public String getPermitCapacityPerNs() {
return rpcServer.getRPCClient().getRouterRpcFairnessPolicyController().getPermitCapacityPerNs();
}
@Override @Override
public String getAsyncCallerPool() { public String getAsyncCallerPool() {
return rpcServer.getRPCClient().getAsyncCallerPoolJson(); return rpcServer.getRPCClient().getAsyncCallerPoolJson();

View File

@ -29,7 +29,9 @@ import java.net.URLConnection;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
@ -271,6 +273,22 @@ public final class FederationUtil {
return newInstance(conf, null, null, clazz); return newInstance(conf, null, null, clazz);
} }
/**
* Creates an instance of an RouterRpcFairnessPolicyController
* from the configuration and attaches permits access metrics to the controller.
*
* @param conf Configuration that defines the fairness controller class.
* @param rejectedPermitsPerNs Metrics map ns:rejected permits
* @param acceptedPermitsPerNs Metrics map ns:accepted permits
* @return Fairness policy controller.
*/
public static RouterRpcFairnessPolicyController newFairnessPolicyController(Configuration conf,
Map<String, LongAdder> rejectedPermitsPerNs, Map<String, LongAdder> acceptedPermitsPerNs) {
RouterRpcFairnessPolicyController instance = newFairnessPolicyController(conf);
instance.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
return instance;
}
/** /**
* Collect all configured nameservices. * Collect all configured nameservices.
* *

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.federation.router; package org.apache.hadoop.hdfs.server.federation.router;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController; import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController;
@ -27,13 +29,11 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
import java.util.concurrent.TimeUnit;
/** /**
* Config fields for router-based hdfs federation. * Config fields for router-based hdfs federation.
@ -354,6 +354,13 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
NoRouterRpcFairnessPolicyController.class; NoRouterRpcFairnessPolicyController.class;
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX = public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count."; FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
public static final String DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "minimum.handler.count";
public static final int DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT = 1;
public static final long DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT =
600;
public static final String DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "policy.controller.dynamic.refresh.interval.seconds";
// HDFS Router Federation Rename. // HDFS Router Federation Rename.
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX = public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =

View File

@ -156,8 +156,8 @@ public class RouterRpcClient {
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
this.connectionManager = new ConnectionManager(clientConf); this.connectionManager = new ConnectionManager(clientConf);
this.connectionManager.start(); this.connectionManager.start();
this.routerRpcFairnessPolicyController = this.routerRpcFairnessPolicyController = FederationUtil
FederationUtil.newFairnessPolicyController(conf); .newFairnessPolicyController(conf, rejectedPermitsPerNs, acceptedPermitsPerNs);
int numThreads = conf.getInt( int numThreads = conf.getInt(
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.utils;
import java.util.concurrent.Semaphore;
public class AdjustableSemaphore extends Semaphore {
public AdjustableSemaphore(int permits) {
super(permits);
}
public AdjustableSemaphore(int permits, boolean fair) {
super(permits, fair);
}
public void reducePermits(int reduction) {
super.reducePermits(reduction);
}
}

View File

@ -706,6 +706,24 @@
</description> </description>
</property> </property>
<property>
<name>dfs.federation.router.fairness.minimum.handler.count</name>
<value>1</value>
<description>
Minimum number of handlers assigned per nameservice.
If any dedicated handler count is smaller than this number,
router initialization will fail.
</description>
</property>
<property>
<name>dfs.federation.router.fairness.policy.controller.dynamic.refresh.interval.seconds</name>
<value>600</value>
<description>
Interval (in seconds) between each handler count resize by DynamicFairnessPolicyController
</description>
</property>
<property> <property>
<name>dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE</name> <name>dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE</name>
<value></value> <value></value>

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
/**
* Test functionality of {@link DynamicRouterRpcFairnessPolicyController).
*/
public class TestDynamicRouterRpcFairnessPolicyController {
private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2, ns3.nn1";
@Test
public void testDynamicControllerSimple() throws InterruptedException, TimeoutException {
// verifyDynamicControllerSimple(true);
verifyDynamicControllerSimple(false);
}
@Test
public void testDynamicControllerAllPermitsAcquired() throws InterruptedException {
verifyDynamicControllerAllPermitsAcquired(true);
verifyDynamicControllerAllPermitsAcquired(false);
}
private void verifyDynamicControllerSimple(boolean manualRefresh)
throws InterruptedException, TimeoutException {
// 3 permits each ns
DynamicRouterRpcFairnessPolicyController controller;
if (manualRefresh) {
controller = getFairnessPolicyController(20);
} else {
controller = getFairnessPolicyController(20, 4);
}
String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS};
// Initial permit counts should be 5:5:5
verifyRemainingPermitCounts(new int[] {5, 5, 5, 5}, nss, controller);
// Release all permits
for (int i = 0; i < 5; i++) {
controller.releasePermit("ns1");
controller.releasePermit("ns2");
controller.releasePermit("ns3");
controller.releasePermit(CONCURRENT_NS);
}
// Inject dummy metrics
// Split half half for ns1 and concurrent
Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10);
injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0);
injectDummyMetrics(rejectedPermitsPerNs, "ns3", 10);
injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10);
controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
// Current permits count should be 6:3:6:6
int[] newPermitCounts = new int[] {6, 3, 6, 6};
if (manualRefresh) {
controller.refreshPermitsCap();
} else {
Thread.sleep(5000);
}
verifyRemainingPermitCounts(newPermitCounts, nss, controller);
}
public void verifyDynamicControllerAllPermitsAcquired(boolean manualRefresh)
throws InterruptedException {
// 10 permits each ns
DynamicRouterRpcFairnessPolicyController controller;
if (manualRefresh) {
controller = getFairnessPolicyController(40);
} else {
controller = getFairnessPolicyController(40, 4);
}
String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS};
verifyRemainingPermitCounts(new int[] {10, 10, 10, 10}, nss, controller);
// Inject dummy metrics
Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
injectDummyMetrics(rejectedPermitsPerNs, "ns1", 13);
injectDummyMetrics(rejectedPermitsPerNs, "ns2", 13);
injectDummyMetrics(rejectedPermitsPerNs, "ns3", 13);
injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 1);
// New permit capacity will be 13:13:13:3
controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
if (manualRefresh) {
controller.refreshPermitsCap();
} else {
Thread.sleep(5000);
}
Assert.assertEquals("{\"concurrent\":-7,\"ns2\":3,\"ns1\":3,\"ns3\":3}",
controller.getAvailableHandlerOnPerNs());
// Can acquire 3 more permits for ns1, ns2, ns3
verifyRemainingPermitCounts(new int[] {3, 3, 3, 0}, nss, controller);
// Need to release at least 8 permits for concurrent before it has any free permits
Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
for (int i = 0; i < 7; i++) {
controller.releasePermit(CONCURRENT_NS);
}
Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
controller.releasePermit(CONCURRENT_NS);
Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS));
}
private void verifyRemainingPermitCounts(int[] remainingPermitCounts, String[] nss,
RouterRpcFairnessPolicyController controller) {
assert remainingPermitCounts.length == nss.length;
for (int i = 0; i < remainingPermitCounts.length; i++) {
verifyRemainingPermitCount(remainingPermitCounts[i], nss[i], controller);
}
}
private void verifyRemainingPermitCount(int remainingPermitCount, String nameservice,
RouterRpcFairnessPolicyController controller) {
for (int i = 0; i < remainingPermitCount; i++) {
Assert.assertTrue(controller.acquirePermit(nameservice));
}
Assert.assertFalse(controller.acquirePermit(nameservice));
}
private void injectDummyMetrics(Map<String, LongAdder> metrics, String ns, long value) {
metrics.computeIfAbsent(ns, k -> new LongAdder()).add(value);
}
private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers,
long refreshInterval) {
return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), refreshInterval);
}
private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers) {
return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), Long.MAX_VALUE);
}
private Configuration createConf(int handlers, int minHandlersPerNs) {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers);
conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices);
conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, minHandlersPerNs);
conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
DynamicRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class);
return conf;
}
}

View File

@ -27,6 +27,7 @@ import org.junit.Test;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
@ -102,6 +103,14 @@ public class TestRouterRpcFairnessPolicyController {
verifyInstantiationError(conf, 1, 3); verifyInstantiationError(conf, 1, 3);
} }
@Test
public void testAllocationErrorTooFewDedicatedHandlers() {
Configuration conf = createConf(9);
conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, 3);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 1);
verifyInstantiationError(conf, CONCURRENT_NS, 9, 3);
}
@Test @Test
public void testGetAvailableHandlerOnPerNs() { public void testGetAvailableHandlerOnPerNs() {
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
@ -113,6 +122,18 @@ public class TestRouterRpcFairnessPolicyController {
routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
} }
@Test
public void testGetPermitCapacityPerNs() {
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
= getFairnessPolicyController(30);
assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}",
routerRpcFairnessPolicyController.getPermitCapacityPerNs());
routerRpcFairnessPolicyController.acquirePermit("ns1");
routerRpcFairnessPolicyController.acquirePermit("ns2");
assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}",
routerRpcFairnessPolicyController.getPermitCapacityPerNs());
}
@Test @Test
public void testGetAvailableHandlerOnPerNsForNoFairness() { public void testGetAvailableHandlerOnPerNsForNoFairness() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -170,6 +191,20 @@ public class TestRouterRpcFairnessPolicyController {
logs.getOutput().contains(errorMsg)); logs.getOutput().contains(errorMsg));
} }
private void verifyInstantiationError(Configuration conf, String ns, int handlerCount,
int minimumHandler) {
GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class));
try {
FederationUtil.newFairnessPolicyController(conf);
} catch (IllegalArgumentException e) {
// Ignore the exception as it is expected here.
}
String errorMsg = String.format(StaticRouterRpcFairnessPolicyController.ERROR_NS_MSG,
DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + ns, handlerCount, minimumHandler);
assertTrue("Should contain error message: " + errorMsg, logs.getOutput().contains(errorMsg));
}
private RouterRpcFairnessPolicyController getFairnessPolicyController( private RouterRpcFairnessPolicyController getFairnessPolicyController(
int handlers) { int handlers) {
return FederationUtil.newFairnessPolicyController(createConf(handlers)); return FederationUtil.newFairnessPolicyController(createConf(handlers));