Revert "HDFS-14750. RBF: Support dynamic handler allocation in routers (#4199)"
This reverts commit 680af27aa6
.
This commit is contained in:
parent
680af27aa6
commit
caa25ae583
|
@ -22,13 +22,10 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
|
||||||
|
|
||||||
import org.codehaus.jettison.json.JSONException;
|
import org.codehaus.jettison.json.JSONException;
|
||||||
import org.codehaus.jettison.json.JSONObject;
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -43,10 +40,7 @@ public class AbstractRouterRpcFairnessPolicyController
|
||||||
LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);
|
LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);
|
||||||
|
|
||||||
/** Hash table to hold semaphore for each configured name service. */
|
/** Hash table to hold semaphore for each configured name service. */
|
||||||
private Map<String, AdjustableSemaphore> permits;
|
private Map<String, Semaphore> permits;
|
||||||
private final Map<String, Integer> permitSizes = new HashMap<>();
|
|
||||||
private Map<String, LongAdder> rejectedPermitsPerNs;
|
|
||||||
private Map<String, LongAdder> acceptedPermitsPerNs;
|
|
||||||
|
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
this.permits = new HashMap<>();
|
this.permits = new HashMap<>();
|
||||||
|
@ -78,7 +72,7 @@ public class AbstractRouterRpcFairnessPolicyController
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
|
protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
|
||||||
this.permits.put(nsId, new AdjustableSemaphore(maxPermits));
|
this.permits.put(nsId, new Semaphore(maxPermits));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int getAvailablePermits(String nsId) {
|
protected int getAvailablePermits(String nsId) {
|
||||||
|
@ -88,7 +82,7 @@ public class AbstractRouterRpcFairnessPolicyController
|
||||||
@Override
|
@Override
|
||||||
public String getAvailableHandlerOnPerNs() {
|
public String getAvailableHandlerOnPerNs() {
|
||||||
JSONObject json = new JSONObject();
|
JSONObject json = new JSONObject();
|
||||||
for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
|
for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
|
||||||
try {
|
try {
|
||||||
String nsId = entry.getKey();
|
String nsId = entry.getKey();
|
||||||
int availableHandler = entry.getValue().availablePermits();
|
int availableHandler = entry.getValue().availablePermits();
|
||||||
|
@ -99,39 +93,4 @@ public class AbstractRouterRpcFairnessPolicyController
|
||||||
}
|
}
|
||||||
return json.toString();
|
return json.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getPermitCapacityPerNs() {
|
|
||||||
JSONObject json = new JSONObject();
|
|
||||||
for (Map.Entry<String, Integer> entry : permitSizes.entrySet()) {
|
|
||||||
try {
|
|
||||||
json.put(entry.getKey(), entry.getValue());
|
|
||||||
} catch (JSONException e) {
|
|
||||||
LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return json.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setMetrics(Map<String, LongAdder> rejectedPermits,
|
|
||||||
Map<String, LongAdder> acceptedPermits) {
|
|
||||||
this.rejectedPermitsPerNs = rejectedPermits;
|
|
||||||
this.acceptedPermitsPerNs = acceptedPermits;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<String, AdjustableSemaphore> getPermits() {
|
|
||||||
return permits;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, LongAdder> getRejectedPermitsPerNs() {
|
|
||||||
return rejectedPermitsPerNs;
|
|
||||||
}
|
|
||||||
public Map<String, LongAdder> getAcceptedPermitsPerNs() {
|
|
||||||
return acceptedPermitsPerNs;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<String, Integer> getPermitSizes() {
|
|
||||||
return permitSizes;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,178 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.federation.fairness;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ScheduledFuture;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
|
|
||||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Dynamic fairness policy extending {@link StaticRouterRpcFairnessPolicyController}
|
|
||||||
* and fetching handlers from configuration for all available name services.
|
|
||||||
* The handlers count changes according to traffic to namespaces.
|
|
||||||
* Total handlers might NOT strictly add up to the value defined by DFS_ROUTER_HANDLER_COUNT_KEY
|
|
||||||
* but will not exceed initial handler count + number of nameservices.
|
|
||||||
*/
|
|
||||||
public class DynamicRouterRpcFairnessPolicyController
|
|
||||||
extends StaticRouterRpcFairnessPolicyController {
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class);
|
|
||||||
|
|
||||||
private static final ScheduledExecutorService SCHEDULED_EXECUTOR = HadoopExecutors
|
|
||||||
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
|
|
||||||
.setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build());
|
|
||||||
private PermitsResizerService permitsResizerService;
|
|
||||||
private ScheduledFuture<?> refreshTask;
|
|
||||||
private int handlerCount;
|
|
||||||
private int minimumHandlerPerNs;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initializes using the same logic as {@link StaticRouterRpcFairnessPolicyController}
|
|
||||||
* and starts a periodic semaphore resizer thread.
|
|
||||||
*
|
|
||||||
* @param conf configuration
|
|
||||||
*/
|
|
||||||
public DynamicRouterRpcFairnessPolicyController(Configuration conf) {
|
|
||||||
super(conf);
|
|
||||||
minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
|
|
||||||
DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
|
|
||||||
handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
|
|
||||||
long refreshInterval =
|
|
||||||
conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY,
|
|
||||||
DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
permitsResizerService = new PermitsResizerService();
|
|
||||||
refreshTask = SCHEDULED_EXECUTOR
|
|
||||||
.scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public DynamicRouterRpcFairnessPolicyController(Configuration conf, long refreshInterval) {
|
|
||||||
super(conf);
|
|
||||||
minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
|
|
||||||
DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
|
|
||||||
handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
|
|
||||||
permitsResizerService = new PermitsResizerService();
|
|
||||||
refreshTask = SCHEDULED_EXECUTOR
|
|
||||||
.scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public void refreshPermitsCap() {
|
|
||||||
permitsResizerService.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void shutdown() {
|
|
||||||
super.shutdown();
|
|
||||||
if (refreshTask != null) {
|
|
||||||
refreshTask.cancel(true);
|
|
||||||
}
|
|
||||||
if (SCHEDULED_EXECUTOR != null) {
|
|
||||||
SCHEDULED_EXECUTOR.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class PermitsResizerService implements Runnable {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void run() {
|
|
||||||
if ((getRejectedPermitsPerNs() == null) || (getAcceptedPermitsPerNs() == null)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
long totalOps = 0;
|
|
||||||
Map<String, Long> nsOps = new HashMap<>();
|
|
||||||
for (Map.Entry<String, AdjustableSemaphore> entry : getPermits().entrySet()) {
|
|
||||||
long ops = (getRejectedPermitsPerNs().containsKey(entry.getKey()) ?
|
|
||||||
getRejectedPermitsPerNs().get(entry.getKey()).longValue() :
|
|
||||||
0) + (getAcceptedPermitsPerNs().containsKey(entry.getKey()) ?
|
|
||||||
getAcceptedPermitsPerNs().get(entry.getKey()).longValue() :
|
|
||||||
0);
|
|
||||||
nsOps.put(entry.getKey(), ops);
|
|
||||||
totalOps += ops;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<String> underMinimumNss = new ArrayList<>();
|
|
||||||
List<String> overMinimumNss = new ArrayList<>();
|
|
||||||
int effectiveOps = 0;
|
|
||||||
|
|
||||||
// First iteration: split namespaces into those underused and those that are not.
|
|
||||||
for (Map.Entry<String, AdjustableSemaphore> entry : getPermits().entrySet()) {
|
|
||||||
String ns = entry.getKey();
|
|
||||||
int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps * handlerCount);
|
|
||||||
|
|
||||||
if (newPermitCap <= minimumHandlerPerNs) {
|
|
||||||
underMinimumNss.add(ns);
|
|
||||||
} else {
|
|
||||||
overMinimumNss.add(ns);
|
|
||||||
effectiveOps += nsOps.get(ns);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Second iteration part 1: assign minimum handlers
|
|
||||||
for (String ns: underMinimumNss) {
|
|
||||||
resizeNsHandlerCapacity(ns, minimumHandlerPerNs);
|
|
||||||
}
|
|
||||||
// Second iteration part 2: assign handlers to the rest
|
|
||||||
int leftoverPermits = handlerCount - minimumHandlerPerNs * underMinimumNss.size();
|
|
||||||
for (String ns: overMinimumNss) {
|
|
||||||
int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / effectiveOps * leftoverPermits);
|
|
||||||
resizeNsHandlerCapacity(ns, newPermitCap);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void resizeNsHandlerCapacity(String ns, int newPermitCap) {
|
|
||||||
AdjustableSemaphore semaphore = getPermits().get(ns);
|
|
||||||
int oldPermitCap = getPermitSizes().get(ns);
|
|
||||||
if (newPermitCap <= minimumHandlerPerNs) {
|
|
||||||
newPermitCap = minimumHandlerPerNs;
|
|
||||||
}
|
|
||||||
getPermitSizes().put(ns, newPermitCap);
|
|
||||||
if (newPermitCap > oldPermitCap) {
|
|
||||||
semaphore.release(newPermitCap - oldPermitCap);
|
|
||||||
} else if (newPermitCap < oldPermitCap) {
|
|
||||||
semaphore.reducePermits(oldPermitCap - newPermitCap);
|
|
||||||
}
|
|
||||||
LOG.info("Resized handlers for nsId {} from {} to {}", ns, oldPermitCap, newPermitCap);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -18,9 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.federation.fairness;
|
package org.apache.hadoop.hdfs.server.federation.fairness;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -54,15 +51,4 @@ public class NoRouterRpcFairnessPolicyController implements
|
||||||
public String getAvailableHandlerOnPerNs(){
|
public String getAvailableHandlerOnPerNs(){
|
||||||
return "N/A";
|
return "N/A";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getPermitCapacityPerNs() {
|
|
||||||
return "N/A";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setMetrics(Map<String, LongAdder> rejectedPermitsPerNs,
|
|
||||||
Map<String, LongAdder> acceptedPermitsPerNs) {
|
|
||||||
// Nothing
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.federation.fairness;
|
package org.apache.hadoop.hdfs.server.federation.fairness;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
@ -70,15 +67,4 @@ public interface RouterRpcFairnessPolicyController {
|
||||||
* Returns the JSON string of the available handler for each Ns.
|
* Returns the JSON string of the available handler for each Ns.
|
||||||
*/
|
*/
|
||||||
String getAvailableHandlerOnPerNs();
|
String getAvailableHandlerOnPerNs();
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the JSON string of the max handler count for each ns.
|
|
||||||
*/
|
|
||||||
String getPermitCapacityPerNs();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Attaches permits access metrics to the controller.
|
|
||||||
*/
|
|
||||||
void setMetrics(Map<String, LongAdder> rejectedPermitsPerNs,
|
|
||||||
Map<String, LongAdder> acceptedPermitsPerNs);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,8 +27,6 @@ import java.util.Set;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
|
||||||
|
@ -47,9 +45,6 @@ public class StaticRouterRpcFairnessPolicyController extends
|
||||||
public static final String ERROR_MSG = "Configured handlers "
|
public static final String ERROR_MSG = "Configured handlers "
|
||||||
+ DFS_ROUTER_HANDLER_COUNT_KEY + '='
|
+ DFS_ROUTER_HANDLER_COUNT_KEY + '='
|
||||||
+ " %d is less than the minimum required handlers %d";
|
+ " %d is less than the minimum required handlers %d";
|
||||||
public static final String ERROR_NS_MSG =
|
|
||||||
"Configured handlers %s=%d is less than the minimum required handlers %d";
|
|
||||||
|
|
||||||
|
|
||||||
public StaticRouterRpcFairnessPolicyController(Configuration conf) {
|
public StaticRouterRpcFairnessPolicyController(Configuration conf) {
|
||||||
init(conf);
|
init(conf);
|
||||||
|
@ -83,7 +78,6 @@ public class StaticRouterRpcFairnessPolicyController extends
|
||||||
handlerCount -= dedicatedHandlers;
|
handlerCount -= dedicatedHandlers;
|
||||||
insertNameServiceWithPermits(nsId, dedicatedHandlers);
|
insertNameServiceWithPermits(nsId, dedicatedHandlers);
|
||||||
logAssignment(nsId, dedicatedHandlers);
|
logAssignment(nsId, dedicatedHandlers);
|
||||||
getPermitSizes().put(nsId, dedicatedHandlers);
|
|
||||||
} else {
|
} else {
|
||||||
unassignedNS.add(nsId);
|
unassignedNS.add(nsId);
|
||||||
}
|
}
|
||||||
|
@ -98,7 +92,6 @@ public class StaticRouterRpcFairnessPolicyController extends
|
||||||
for (String nsId : unassignedNS) {
|
for (String nsId : unassignedNS) {
|
||||||
insertNameServiceWithPermits(nsId, handlersPerNS);
|
insertNameServiceWithPermits(nsId, handlersPerNS);
|
||||||
logAssignment(nsId, handlersPerNS);
|
logAssignment(nsId, handlersPerNS);
|
||||||
getPermitSizes().put(nsId, handlersPerNS);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,7 +103,6 @@ public class StaticRouterRpcFairnessPolicyController extends
|
||||||
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
|
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
|
||||||
insertNameServiceWithPermits(CONCURRENT_NS,
|
insertNameServiceWithPermits(CONCURRENT_NS,
|
||||||
existingPermits + leftOverHandlers);
|
existingPermits + leftOverHandlers);
|
||||||
getPermitSizes().put(CONCURRENT_NS, existingPermits + leftOverHandlers);
|
|
||||||
}
|
}
|
||||||
LOG.info("Final permit allocation for concurrent ns: {}",
|
LOG.info("Final permit allocation for concurrent ns: {}",
|
||||||
getAvailablePermits(CONCURRENT_NS));
|
getAvailablePermits(CONCURRENT_NS));
|
||||||
|
@ -124,23 +116,15 @@ public class StaticRouterRpcFairnessPolicyController extends
|
||||||
private void validateHandlersCount(Configuration conf, int handlerCount,
|
private void validateHandlersCount(Configuration conf, int handlerCount,
|
||||||
Set<String> allConfiguredNS) {
|
Set<String> allConfiguredNS) {
|
||||||
int totalDedicatedHandlers = 0;
|
int totalDedicatedHandlers = 0;
|
||||||
int minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
|
|
||||||
DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
|
|
||||||
for (String nsId : allConfiguredNS) {
|
for (String nsId : allConfiguredNS) {
|
||||||
int dedicatedHandlers =
|
int dedicatedHandlers =
|
||||||
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
|
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
|
||||||
if (dedicatedHandlers > 0) {
|
if (dedicatedHandlers > 0) {
|
||||||
if (dedicatedHandlers < minimumHandlerPerNs) {
|
|
||||||
String msg = String.format(ERROR_NS_MSG, DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId,
|
|
||||||
handlerCount, minimumHandlerPerNs);
|
|
||||||
LOG.error(msg);
|
|
||||||
throw new IllegalArgumentException(msg);
|
|
||||||
}
|
|
||||||
// Total handlers should not be less than sum of dedicated handlers.
|
// Total handlers should not be less than sum of dedicated handlers.
|
||||||
totalDedicatedHandlers += dedicatedHandlers;
|
totalDedicatedHandlers += dedicatedHandlers;
|
||||||
} else {
|
} else {
|
||||||
// Each NS has to have a minimum number of handlers assigned.
|
// Each NS should have at least one handler assigned.
|
||||||
totalDedicatedHandlers += minimumHandlerPerNs;
|
totalDedicatedHandlers++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (totalDedicatedHandlers > handlerCount) {
|
if (totalDedicatedHandlers > handlerCount) {
|
||||||
|
@ -150,4 +134,5 @@ public class StaticRouterRpcFairnessPolicyController extends
|
||||||
throw new IllegalArgumentException(msg);
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,12 +115,6 @@ public interface FederationRPCMBean {
|
||||||
*/
|
*/
|
||||||
String getAvailableHandlerOnPerNs();
|
String getAvailableHandlerOnPerNs();
|
||||||
|
|
||||||
/**
|
|
||||||
* JSON representation of max handler count per ns.
|
|
||||||
* @return JSON string representation.
|
|
||||||
*/
|
|
||||||
String getPermitCapacityPerNs();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the JSON representation of the async caller thread pool.
|
* Get the JSON representation of the async caller thread pool.
|
||||||
* @return JSON string representation of the async caller thread pool.
|
* @return JSON string representation of the async caller thread pool.
|
||||||
|
|
|
@ -247,11 +247,6 @@ public class FederationRPCMetrics implements FederationRPCMBean {
|
||||||
getRouterRpcFairnessPolicyController().getAvailableHandlerOnPerNs();
|
getRouterRpcFairnessPolicyController().getAvailableHandlerOnPerNs();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getPermitCapacityPerNs() {
|
|
||||||
return rpcServer.getRPCClient().getRouterRpcFairnessPolicyController().getPermitCapacityPerNs();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getAsyncCallerPool() {
|
public String getAsyncCallerPool() {
|
||||||
return rpcServer.getRPCClient().getAsyncCallerPoolJson();
|
return rpcServer.getRPCClient().getAsyncCallerPoolJson();
|
||||||
|
|
|
@ -29,9 +29,7 @@ import java.net.URLConnection;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
@ -273,22 +271,6 @@ public final class FederationUtil {
|
||||||
return newInstance(conf, null, null, clazz);
|
return newInstance(conf, null, null, clazz);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates an instance of an RouterRpcFairnessPolicyController
|
|
||||||
* from the configuration and attaches permits access metrics to the controller.
|
|
||||||
*
|
|
||||||
* @param conf Configuration that defines the fairness controller class.
|
|
||||||
* @param rejectedPermitsPerNs Metrics map ns:rejected permits
|
|
||||||
* @param acceptedPermitsPerNs Metrics map ns:accepted permits
|
|
||||||
* @return Fairness policy controller.
|
|
||||||
*/
|
|
||||||
public static RouterRpcFairnessPolicyController newFairnessPolicyController(Configuration conf,
|
|
||||||
Map<String, LongAdder> rejectedPermitsPerNs, Map<String, LongAdder> acceptedPermitsPerNs) {
|
|
||||||
RouterRpcFairnessPolicyController instance = newFairnessPolicyController(conf);
|
|
||||||
instance.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
|
|
||||||
return instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Collect all configured nameservices.
|
* Collect all configured nameservices.
|
||||||
*
|
*
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.federation.router;
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController;
|
import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController;
|
||||||
|
@ -29,11 +27,13 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
|
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
|
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Config fields for router-based hdfs federation.
|
* Config fields for router-based hdfs federation.
|
||||||
|
@ -354,13 +354,6 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
||||||
NoRouterRpcFairnessPolicyController.class;
|
NoRouterRpcFairnessPolicyController.class;
|
||||||
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
|
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
|
||||||
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
|
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
|
||||||
public static final String DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY =
|
|
||||||
FEDERATION_ROUTER_FAIRNESS_PREFIX + "minimum.handler.count";
|
|
||||||
public static final int DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT = 1;
|
|
||||||
public static final long DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT =
|
|
||||||
600;
|
|
||||||
public static final String DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY =
|
|
||||||
FEDERATION_ROUTER_FAIRNESS_PREFIX + "policy.controller.dynamic.refresh.interval.seconds";
|
|
||||||
|
|
||||||
// HDFS Router Federation Rename.
|
// HDFS Router Federation Rename.
|
||||||
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
|
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
|
||||||
|
|
|
@ -156,8 +156,8 @@ public class RouterRpcClient {
|
||||||
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
|
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
|
||||||
this.connectionManager = new ConnectionManager(clientConf);
|
this.connectionManager = new ConnectionManager(clientConf);
|
||||||
this.connectionManager.start();
|
this.connectionManager.start();
|
||||||
this.routerRpcFairnessPolicyController = FederationUtil
|
this.routerRpcFairnessPolicyController =
|
||||||
.newFairnessPolicyController(conf, rejectedPermitsPerNs, acceptedPermitsPerNs);
|
FederationUtil.newFairnessPolicyController(conf);
|
||||||
|
|
||||||
int numThreads = conf.getInt(
|
int numThreads = conf.getInt(
|
||||||
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
|
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.federation.utils;
|
|
||||||
|
|
||||||
import java.util.concurrent.Semaphore;
|
|
||||||
|
|
||||||
public class AdjustableSemaphore extends Semaphore {
|
|
||||||
|
|
||||||
public AdjustableSemaphore(int permits) {
|
|
||||||
super(permits);
|
|
||||||
}
|
|
||||||
|
|
||||||
public AdjustableSemaphore(int permits, boolean fair) {
|
|
||||||
super(permits, fair);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void reducePermits(int reduction) {
|
|
||||||
super.reducePermits(reduction);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -706,24 +706,6 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
|
||||||
<name>dfs.federation.router.fairness.minimum.handler.count</name>
|
|
||||||
<value>1</value>
|
|
||||||
<description>
|
|
||||||
Minimum number of handlers assigned per nameservice.
|
|
||||||
If any dedicated handler count is smaller than this number,
|
|
||||||
router initialization will fail.
|
|
||||||
</description>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
<property>
|
|
||||||
<name>dfs.federation.router.fairness.policy.controller.dynamic.refresh.interval.seconds</name>
|
|
||||||
<value>600</value>
|
|
||||||
<description>
|
|
||||||
Interval (in seconds) between each handler count resize by DynamicFairnessPolicyController
|
|
||||||
</description>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE</name>
|
<name>dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE</name>
|
||||||
<value></value>
|
<value></value>
|
||||||
|
|
|
@ -1,181 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.federation.fairness;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test functionality of {@link DynamicRouterRpcFairnessPolicyController).
|
|
||||||
*/
|
|
||||||
public class TestDynamicRouterRpcFairnessPolicyController {
|
|
||||||
|
|
||||||
private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2, ns3.nn1";
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDynamicControllerSimple() throws InterruptedException, TimeoutException {
|
|
||||||
// verifyDynamicControllerSimple(true);
|
|
||||||
verifyDynamicControllerSimple(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDynamicControllerAllPermitsAcquired() throws InterruptedException {
|
|
||||||
verifyDynamicControllerAllPermitsAcquired(true);
|
|
||||||
verifyDynamicControllerAllPermitsAcquired(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyDynamicControllerSimple(boolean manualRefresh)
|
|
||||||
throws InterruptedException, TimeoutException {
|
|
||||||
// 3 permits each ns
|
|
||||||
DynamicRouterRpcFairnessPolicyController controller;
|
|
||||||
if (manualRefresh) {
|
|
||||||
controller = getFairnessPolicyController(20);
|
|
||||||
} else {
|
|
||||||
controller = getFairnessPolicyController(20, 4);
|
|
||||||
}
|
|
||||||
|
|
||||||
String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS};
|
|
||||||
// Initial permit counts should be 5:5:5
|
|
||||||
verifyRemainingPermitCounts(new int[] {5, 5, 5, 5}, nss, controller);
|
|
||||||
|
|
||||||
// Release all permits
|
|
||||||
for (int i = 0; i < 5; i++) {
|
|
||||||
controller.releasePermit("ns1");
|
|
||||||
controller.releasePermit("ns2");
|
|
||||||
controller.releasePermit("ns3");
|
|
||||||
controller.releasePermit(CONCURRENT_NS);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inject dummy metrics
|
|
||||||
// Split half half for ns1 and concurrent
|
|
||||||
Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
|
|
||||||
Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
|
|
||||||
injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10);
|
|
||||||
injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0);
|
|
||||||
injectDummyMetrics(rejectedPermitsPerNs, "ns3", 10);
|
|
||||||
injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10);
|
|
||||||
controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
|
|
||||||
|
|
||||||
// Current permits count should be 6:3:6:6
|
|
||||||
int[] newPermitCounts = new int[] {6, 3, 6, 6};
|
|
||||||
|
|
||||||
if (manualRefresh) {
|
|
||||||
controller.refreshPermitsCap();
|
|
||||||
} else {
|
|
||||||
Thread.sleep(5000);
|
|
||||||
}
|
|
||||||
verifyRemainingPermitCounts(newPermitCounts, nss, controller);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public void verifyDynamicControllerAllPermitsAcquired(boolean manualRefresh)
|
|
||||||
throws InterruptedException {
|
|
||||||
// 10 permits each ns
|
|
||||||
DynamicRouterRpcFairnessPolicyController controller;
|
|
||||||
if (manualRefresh) {
|
|
||||||
controller = getFairnessPolicyController(40);
|
|
||||||
} else {
|
|
||||||
controller = getFairnessPolicyController(40, 4);
|
|
||||||
}
|
|
||||||
|
|
||||||
String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS};
|
|
||||||
verifyRemainingPermitCounts(new int[] {10, 10, 10, 10}, nss, controller);
|
|
||||||
|
|
||||||
// Inject dummy metrics
|
|
||||||
Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
|
|
||||||
Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
|
|
||||||
injectDummyMetrics(rejectedPermitsPerNs, "ns1", 13);
|
|
||||||
injectDummyMetrics(rejectedPermitsPerNs, "ns2", 13);
|
|
||||||
injectDummyMetrics(rejectedPermitsPerNs, "ns3", 13);
|
|
||||||
injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 1);
|
|
||||||
// New permit capacity will be 13:13:13:3
|
|
||||||
controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
|
|
||||||
if (manualRefresh) {
|
|
||||||
controller.refreshPermitsCap();
|
|
||||||
} else {
|
|
||||||
Thread.sleep(5000);
|
|
||||||
}
|
|
||||||
Assert.assertEquals("{\"concurrent\":-7,\"ns2\":3,\"ns1\":3,\"ns3\":3}",
|
|
||||||
controller.getAvailableHandlerOnPerNs());
|
|
||||||
|
|
||||||
// Can acquire 3 more permits for ns1, ns2, ns3
|
|
||||||
verifyRemainingPermitCounts(new int[] {3, 3, 3, 0}, nss, controller);
|
|
||||||
// Need to release at least 8 permits for concurrent before it has any free permits
|
|
||||||
Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
|
|
||||||
for (int i = 0; i < 7; i++) {
|
|
||||||
controller.releasePermit(CONCURRENT_NS);
|
|
||||||
}
|
|
||||||
Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
|
|
||||||
controller.releasePermit(CONCURRENT_NS);
|
|
||||||
Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyRemainingPermitCounts(int[] remainingPermitCounts, String[] nss,
|
|
||||||
RouterRpcFairnessPolicyController controller) {
|
|
||||||
assert remainingPermitCounts.length == nss.length;
|
|
||||||
for (int i = 0; i < remainingPermitCounts.length; i++) {
|
|
||||||
verifyRemainingPermitCount(remainingPermitCounts[i], nss[i], controller);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyRemainingPermitCount(int remainingPermitCount, String nameservice,
|
|
||||||
RouterRpcFairnessPolicyController controller) {
|
|
||||||
for (int i = 0; i < remainingPermitCount; i++) {
|
|
||||||
Assert.assertTrue(controller.acquirePermit(nameservice));
|
|
||||||
}
|
|
||||||
Assert.assertFalse(controller.acquirePermit(nameservice));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void injectDummyMetrics(Map<String, LongAdder> metrics, String ns, long value) {
|
|
||||||
metrics.computeIfAbsent(ns, k -> new LongAdder()).add(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers,
|
|
||||||
long refreshInterval) {
|
|
||||||
return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), refreshInterval);
|
|
||||||
}
|
|
||||||
|
|
||||||
private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers) {
|
|
||||||
return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), Long.MAX_VALUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Configuration createConf(int handlers, int minHandlersPerNs) {
|
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers);
|
|
||||||
conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices);
|
|
||||||
conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, minHandlersPerNs);
|
|
||||||
conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
|
|
||||||
DynamicRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class);
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -27,7 +27,6 @@ import org.junit.Test;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
|
||||||
|
@ -103,14 +102,6 @@ public class TestRouterRpcFairnessPolicyController {
|
||||||
verifyInstantiationError(conf, 1, 3);
|
verifyInstantiationError(conf, 1, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAllocationErrorTooFewDedicatedHandlers() {
|
|
||||||
Configuration conf = createConf(9);
|
|
||||||
conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, 3);
|
|
||||||
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 1);
|
|
||||||
verifyInstantiationError(conf, CONCURRENT_NS, 9, 3);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAvailableHandlerOnPerNs() {
|
public void testGetAvailableHandlerOnPerNs() {
|
||||||
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
|
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
|
||||||
|
@ -122,18 +113,6 @@ public class TestRouterRpcFairnessPolicyController {
|
||||||
routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
|
routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetPermitCapacityPerNs() {
|
|
||||||
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
|
|
||||||
= getFairnessPolicyController(30);
|
|
||||||
assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}",
|
|
||||||
routerRpcFairnessPolicyController.getPermitCapacityPerNs());
|
|
||||||
routerRpcFairnessPolicyController.acquirePermit("ns1");
|
|
||||||
routerRpcFairnessPolicyController.acquirePermit("ns2");
|
|
||||||
assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}",
|
|
||||||
routerRpcFairnessPolicyController.getPermitCapacityPerNs());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAvailableHandlerOnPerNsForNoFairness() {
|
public void testGetAvailableHandlerOnPerNsForNoFairness() {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -191,20 +170,6 @@ public class TestRouterRpcFairnessPolicyController {
|
||||||
logs.getOutput().contains(errorMsg));
|
logs.getOutput().contains(errorMsg));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyInstantiationError(Configuration conf, String ns, int handlerCount,
|
|
||||||
int minimumHandler) {
|
|
||||||
GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
|
||||||
LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class));
|
|
||||||
try {
|
|
||||||
FederationUtil.newFairnessPolicyController(conf);
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
// Ignore the exception as it is expected here.
|
|
||||||
}
|
|
||||||
String errorMsg = String.format(StaticRouterRpcFairnessPolicyController.ERROR_NS_MSG,
|
|
||||||
DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + ns, handlerCount, minimumHandler);
|
|
||||||
assertTrue("Should contain error message: " + errorMsg, logs.getOutput().contains(errorMsg));
|
|
||||||
}
|
|
||||||
|
|
||||||
private RouterRpcFairnessPolicyController getFairnessPolicyController(
|
private RouterRpcFairnessPolicyController getFairnessPolicyController(
|
||||||
int handlers) {
|
int handlers) {
|
||||||
return FederationUtil.newFairnessPolicyController(createConf(handlers));
|
return FederationUtil.newFairnessPolicyController(createConf(handlers));
|
||||||
|
|
Loading…
Reference in New Issue