diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java index 2677c2813b9..fe498c66b7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java @@ -22,13 +22,10 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.LongAdder; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +40,7 @@ public class AbstractRouterRpcFairnessPolicyController LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class); /** Hash table to hold semaphore for each configured name service. */ - private Map permits; - private final Map permitSizes = new HashMap<>(); - private Map rejectedPermitsPerNs; - private Map acceptedPermitsPerNs; + private Map permits; public void init(Configuration conf) { this.permits = new HashMap<>(); @@ -78,7 +72,7 @@ public class AbstractRouterRpcFairnessPolicyController } protected void insertNameServiceWithPermits(String nsId, int maxPermits) { - this.permits.put(nsId, new AdjustableSemaphore(maxPermits)); + this.permits.put(nsId, new Semaphore(maxPermits)); } protected int getAvailablePermits(String nsId) { @@ -88,7 +82,7 @@ public class AbstractRouterRpcFairnessPolicyController @Override public String getAvailableHandlerOnPerNs() { JSONObject json = new JSONObject(); - for (Map.Entry entry : permits.entrySet()) { + for (Map.Entry entry : permits.entrySet()) { try { String nsId = entry.getKey(); int availableHandler = entry.getValue().availablePermits(); @@ -99,39 +93,4 @@ public class AbstractRouterRpcFairnessPolicyController } return json.toString(); } - - @Override - public String getPermitCapacityPerNs() { - JSONObject json = new JSONObject(); - for (Map.Entry entry : permitSizes.entrySet()) { - try { - json.put(entry.getKey(), entry.getValue()); - } catch (JSONException e) { - LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e); - } - } - return json.toString(); - } - - @Override - public void setMetrics(Map rejectedPermits, - Map acceptedPermits) { - this.rejectedPermitsPerNs = rejectedPermits; - this.acceptedPermitsPerNs = acceptedPermits; - } - - protected Map getPermits() { - return permits; - } - - public Map getRejectedPermitsPerNs() { - return rejectedPermitsPerNs; - } - public Map getAcceptedPermitsPerNs() { - return acceptedPermitsPerNs; - } - - protected Map getPermitSizes() { - return permitSizes; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java deleted file mode 100644 index 899020b62b1..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.federation.fairness; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore; -import org.apache.hadoop.util.concurrent.HadoopExecutors; - -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; - -/** - * Dynamic fairness policy extending {@link StaticRouterRpcFairnessPolicyController} - * and fetching handlers from configuration for all available name services. - * The handlers count changes according to traffic to namespaces. - * Total handlers might NOT strictly add up to the value defined by DFS_ROUTER_HANDLER_COUNT_KEY - * but will not exceed initial handler count + number of nameservices. - */ -public class DynamicRouterRpcFairnessPolicyController - extends StaticRouterRpcFairnessPolicyController { - - private static final Logger LOG = - LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class); - - private static final ScheduledExecutorService SCHEDULED_EXECUTOR = HadoopExecutors - .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build()); - private PermitsResizerService permitsResizerService; - private ScheduledFuture refreshTask; - private int handlerCount; - private int minimumHandlerPerNs; - - /** - * Initializes using the same logic as {@link StaticRouterRpcFairnessPolicyController} - * and starts a periodic semaphore resizer thread. - * - * @param conf configuration - */ - public DynamicRouterRpcFairnessPolicyController(Configuration conf) { - super(conf); - minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, - DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT); - handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); - long refreshInterval = - conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY, - DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT, - TimeUnit.SECONDS); - permitsResizerService = new PermitsResizerService(); - refreshTask = SCHEDULED_EXECUTOR - .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval, - TimeUnit.SECONDS); - } - - @VisibleForTesting - public DynamicRouterRpcFairnessPolicyController(Configuration conf, long refreshInterval) { - super(conf); - minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, - DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT); - handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); - permitsResizerService = new PermitsResizerService(); - refreshTask = SCHEDULED_EXECUTOR - .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval, - TimeUnit.SECONDS); - } - - @VisibleForTesting - public void refreshPermitsCap() { - permitsResizerService.run(); - } - - @Override - public void shutdown() { - super.shutdown(); - if (refreshTask != null) { - refreshTask.cancel(true); - } - if (SCHEDULED_EXECUTOR != null) { - SCHEDULED_EXECUTOR.shutdown(); - } - } - - class PermitsResizerService implements Runnable { - - @Override - public synchronized void run() { - if ((getRejectedPermitsPerNs() == null) || (getAcceptedPermitsPerNs() == null)) { - return; - } - long totalOps = 0; - Map nsOps = new HashMap<>(); - for (Map.Entry entry : getPermits().entrySet()) { - long ops = (getRejectedPermitsPerNs().containsKey(entry.getKey()) ? - getRejectedPermitsPerNs().get(entry.getKey()).longValue() : - 0) + (getAcceptedPermitsPerNs().containsKey(entry.getKey()) ? - getAcceptedPermitsPerNs().get(entry.getKey()).longValue() : - 0); - nsOps.put(entry.getKey(), ops); - totalOps += ops; - } - - List underMinimumNss = new ArrayList<>(); - List overMinimumNss = new ArrayList<>(); - int effectiveOps = 0; - - // First iteration: split namespaces into those underused and those that are not. - for (Map.Entry entry : getPermits().entrySet()) { - String ns = entry.getKey(); - int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps * handlerCount); - - if (newPermitCap <= minimumHandlerPerNs) { - underMinimumNss.add(ns); - } else { - overMinimumNss.add(ns); - effectiveOps += nsOps.get(ns); - } - } - - // Second iteration part 1: assign minimum handlers - for (String ns: underMinimumNss) { - resizeNsHandlerCapacity(ns, minimumHandlerPerNs); - } - // Second iteration part 2: assign handlers to the rest - int leftoverPermits = handlerCount - minimumHandlerPerNs * underMinimumNss.size(); - for (String ns: overMinimumNss) { - int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / effectiveOps * leftoverPermits); - resizeNsHandlerCapacity(ns, newPermitCap); - } - } - - private void resizeNsHandlerCapacity(String ns, int newPermitCap) { - AdjustableSemaphore semaphore = getPermits().get(ns); - int oldPermitCap = getPermitSizes().get(ns); - if (newPermitCap <= minimumHandlerPerNs) { - newPermitCap = minimumHandlerPerNs; - } - getPermitSizes().put(ns, newPermitCap); - if (newPermitCap > oldPermitCap) { - semaphore.release(newPermitCap - oldPermitCap); - } else if (newPermitCap < oldPermitCap) { - semaphore.reducePermits(oldPermitCap - newPermitCap); - } - LOG.info("Resized handlers for nsId {} from {} to {}", ns, oldPermitCap, newPermitCap); - } - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java index 6ce8ac2afd6..3b85da59e1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hdfs.server.federation.fairness; -import java.util.Map; -import java.util.concurrent.atomic.LongAdder; - import org.apache.hadoop.conf.Configuration; /** @@ -54,15 +51,4 @@ public class NoRouterRpcFairnessPolicyController implements public String getAvailableHandlerOnPerNs(){ return "N/A"; } - - @Override - public String getPermitCapacityPerNs() { - return "N/A"; - } - - @Override - public void setMetrics(Map rejectedPermitsPerNs, - Map acceptedPermitsPerNs) { - // Nothing - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java index d6921add562..354383a168f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hdfs.server.federation.fairness; -import java.util.Map; -import java.util.concurrent.atomic.LongAdder; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -70,15 +67,4 @@ public interface RouterRpcFairnessPolicyController { * Returns the JSON string of the available handler for each Ns. */ String getAvailableHandlerOnPerNs(); - - /** - * Returns the JSON string of the max handler count for each ns. - */ - String getPermitCapacityPerNs(); - - /** - * Attaches permits access metrics to the controller. - */ - void setMetrics(Map rejectedPermitsPerNs, - Map acceptedPermitsPerNs); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java index 191ad46d96c..aa0777fc03d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java @@ -27,8 +27,6 @@ import java.util.Set; import java.util.HashSet; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; @@ -47,9 +45,6 @@ public class StaticRouterRpcFairnessPolicyController extends public static final String ERROR_MSG = "Configured handlers " + DFS_ROUTER_HANDLER_COUNT_KEY + '=' + " %d is less than the minimum required handlers %d"; - public static final String ERROR_NS_MSG = - "Configured handlers %s=%d is less than the minimum required handlers %d"; - public StaticRouterRpcFairnessPolicyController(Configuration conf) { init(conf); @@ -83,7 +78,6 @@ public class StaticRouterRpcFairnessPolicyController extends handlerCount -= dedicatedHandlers; insertNameServiceWithPermits(nsId, dedicatedHandlers); logAssignment(nsId, dedicatedHandlers); - getPermitSizes().put(nsId, dedicatedHandlers); } else { unassignedNS.add(nsId); } @@ -98,7 +92,6 @@ public class StaticRouterRpcFairnessPolicyController extends for (String nsId : unassignedNS) { insertNameServiceWithPermits(nsId, handlersPerNS); logAssignment(nsId, handlersPerNS); - getPermitSizes().put(nsId, handlersPerNS); } } @@ -110,7 +103,6 @@ public class StaticRouterRpcFairnessPolicyController extends LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers); insertNameServiceWithPermits(CONCURRENT_NS, existingPermits + leftOverHandlers); - getPermitSizes().put(CONCURRENT_NS, existingPermits + leftOverHandlers); } LOG.info("Final permit allocation for concurrent ns: {}", getAvailablePermits(CONCURRENT_NS)); @@ -124,23 +116,15 @@ public class StaticRouterRpcFairnessPolicyController extends private void validateHandlersCount(Configuration conf, int handlerCount, Set allConfiguredNS) { int totalDedicatedHandlers = 0; - int minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, - DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT); for (String nsId : allConfiguredNS) { int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); if (dedicatedHandlers > 0) { - if (dedicatedHandlers < minimumHandlerPerNs) { - String msg = String.format(ERROR_NS_MSG, DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, - handlerCount, minimumHandlerPerNs); - LOG.error(msg); - throw new IllegalArgumentException(msg); - } // Total handlers should not be less than sum of dedicated handlers. totalDedicatedHandlers += dedicatedHandlers; } else { - // Each NS has to have a minimum number of handlers assigned. - totalDedicatedHandlers += minimumHandlerPerNs; + // Each NS should have at least one handler assigned. + totalDedicatedHandlers++; } } if (totalDedicatedHandlers > handlerCount) { @@ -150,4 +134,5 @@ public class StaticRouterRpcFairnessPolicyController extends throw new IllegalArgumentException(msg); } } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java index 3499962d73b..979e7504a87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -115,12 +115,6 @@ public interface FederationRPCMBean { */ String getAvailableHandlerOnPerNs(); - /** - * JSON representation of max handler count per ns. - * @return JSON string representation. - */ - String getPermitCapacityPerNs(); - /** * Get the JSON representation of the async caller thread pool. * @return JSON string representation of the async caller thread pool. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 1ec49b37d33..823bc7b8af2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -247,11 +247,6 @@ public class FederationRPCMetrics implements FederationRPCMBean { getRouterRpcFairnessPolicyController().getAvailableHandlerOnPerNs(); } - @Override - public String getPermitCapacityPerNs() { - return rpcServer.getRPCClient().getRouterRpcFairnessPolicyController().getPermitCapacityPerNs(); - } - @Override public String getAsyncCallerPool() { return rpcServer.getRPCClient().getAsyncCallerPoolJson(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java index 30fae5caf37..7ff853946d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -29,9 +29,7 @@ import java.net.URLConnection; import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; -import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; @@ -273,22 +271,6 @@ public final class FederationUtil { return newInstance(conf, null, null, clazz); } - /** - * Creates an instance of an RouterRpcFairnessPolicyController - * from the configuration and attaches permits access metrics to the controller. - * - * @param conf Configuration that defines the fairness controller class. - * @param rejectedPermitsPerNs Metrics map ns:rejected permits - * @param acceptedPermitsPerNs Metrics map ns:accepted permits - * @return Fairness policy controller. - */ - public static RouterRpcFairnessPolicyController newFairnessPolicyController(Configuration conf, - Map rejectedPermitsPerNs, Map acceptedPermitsPerNs) { - RouterRpcFairnessPolicyController instance = newFairnessPolicyController(conf); - instance.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs); - return instance; - } - /** * Collect all configured nameservices. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 7ba982a9597..741e470c6fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hdfs.server.federation.router; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController; @@ -29,11 +27,13 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; -import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl; + +import java.util.concurrent.TimeUnit; /** * Config fields for router-based hdfs federation. @@ -354,13 +354,6 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { NoRouterRpcFairnessPolicyController.class; public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX = FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count."; - public static final String DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY = - FEDERATION_ROUTER_FAIRNESS_PREFIX + "minimum.handler.count"; - public static final int DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT = 1; - public static final long DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT = - 600; - public static final String DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY = - FEDERATION_ROUTER_FAIRNESS_PREFIX + "policy.controller.dynamic.refresh.interval.seconds"; // HDFS Router Federation Rename. public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 78f9bbd6cc6..34a2c47c3ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -156,8 +156,8 @@ public class RouterRpcClient { HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); this.connectionManager = new ConnectionManager(clientConf); this.connectionManager.start(); - this.routerRpcFairnessPolicyController = FederationUtil - .newFairnessPolicyController(conf, rejectedPermitsPerNs, acceptedPermitsPerNs); + this.routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); int numThreads = conf.getInt( RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/AdjustableSemaphore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/AdjustableSemaphore.java deleted file mode 100644 index 34a0563232f..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/AdjustableSemaphore.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.federation.utils; - -import java.util.concurrent.Semaphore; - -public class AdjustableSemaphore extends Semaphore { - - public AdjustableSemaphore(int permits) { - super(permits); - } - - public AdjustableSemaphore(int permits, boolean fair) { - super(permits, fair); - } - - public void reducePermits(int reduction) { - super.reducePermits(reduction); - } -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index b35cc569a4c..fcf6a28475f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -706,24 +706,6 @@ - - dfs.federation.router.fairness.minimum.handler.count - 1 - - Minimum number of handlers assigned per nameservice. - If any dedicated handler count is smaller than this number, - router initialization will fail. - - - - - dfs.federation.router.fairness.policy.controller.dynamic.refresh.interval.seconds - 600 - - Interval (in seconds) between each handler count resize by DynamicFairnessPolicyController - - - dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java deleted file mode 100644 index 057f7a36468..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.federation.fairness; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.LongAdder; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; - -import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; - -/** - * Test functionality of {@link DynamicRouterRpcFairnessPolicyController). - */ -public class TestDynamicRouterRpcFairnessPolicyController { - - private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2, ns3.nn1"; - - @Test - public void testDynamicControllerSimple() throws InterruptedException, TimeoutException { -// verifyDynamicControllerSimple(true); - verifyDynamicControllerSimple(false); - } - - @Test - public void testDynamicControllerAllPermitsAcquired() throws InterruptedException { - verifyDynamicControllerAllPermitsAcquired(true); - verifyDynamicControllerAllPermitsAcquired(false); - } - - private void verifyDynamicControllerSimple(boolean manualRefresh) - throws InterruptedException, TimeoutException { - // 3 permits each ns - DynamicRouterRpcFairnessPolicyController controller; - if (manualRefresh) { - controller = getFairnessPolicyController(20); - } else { - controller = getFairnessPolicyController(20, 4); - } - - String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS}; - // Initial permit counts should be 5:5:5 - verifyRemainingPermitCounts(new int[] {5, 5, 5, 5}, nss, controller); - - // Release all permits - for (int i = 0; i < 5; i++) { - controller.releasePermit("ns1"); - controller.releasePermit("ns2"); - controller.releasePermit("ns3"); - controller.releasePermit(CONCURRENT_NS); - } - - // Inject dummy metrics - // Split half half for ns1 and concurrent - Map rejectedPermitsPerNs = new HashMap<>(); - Map acceptedPermitsPerNs = new HashMap<>(); - injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10); - injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0); - injectDummyMetrics(rejectedPermitsPerNs, "ns3", 10); - injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10); - controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs); - - // Current permits count should be 6:3:6:6 - int[] newPermitCounts = new int[] {6, 3, 6, 6}; - - if (manualRefresh) { - controller.refreshPermitsCap(); - } else { - Thread.sleep(5000); - } - verifyRemainingPermitCounts(newPermitCounts, nss, controller); - - } - - public void verifyDynamicControllerAllPermitsAcquired(boolean manualRefresh) - throws InterruptedException { - // 10 permits each ns - DynamicRouterRpcFairnessPolicyController controller; - if (manualRefresh) { - controller = getFairnessPolicyController(40); - } else { - controller = getFairnessPolicyController(40, 4); - } - - String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS}; - verifyRemainingPermitCounts(new int[] {10, 10, 10, 10}, nss, controller); - - // Inject dummy metrics - Map rejectedPermitsPerNs = new HashMap<>(); - Map acceptedPermitsPerNs = new HashMap<>(); - injectDummyMetrics(rejectedPermitsPerNs, "ns1", 13); - injectDummyMetrics(rejectedPermitsPerNs, "ns2", 13); - injectDummyMetrics(rejectedPermitsPerNs, "ns3", 13); - injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 1); - // New permit capacity will be 13:13:13:3 - controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs); - if (manualRefresh) { - controller.refreshPermitsCap(); - } else { - Thread.sleep(5000); - } - Assert.assertEquals("{\"concurrent\":-7,\"ns2\":3,\"ns1\":3,\"ns3\":3}", - controller.getAvailableHandlerOnPerNs()); - - // Can acquire 3 more permits for ns1, ns2, ns3 - verifyRemainingPermitCounts(new int[] {3, 3, 3, 0}, nss, controller); - // Need to release at least 8 permits for concurrent before it has any free permits - Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS)); - for (int i = 0; i < 7; i++) { - controller.releasePermit(CONCURRENT_NS); - } - Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS)); - controller.releasePermit(CONCURRENT_NS); - Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS)); - } - - private void verifyRemainingPermitCounts(int[] remainingPermitCounts, String[] nss, - RouterRpcFairnessPolicyController controller) { - assert remainingPermitCounts.length == nss.length; - for (int i = 0; i < remainingPermitCounts.length; i++) { - verifyRemainingPermitCount(remainingPermitCounts[i], nss[i], controller); - } - } - - private void verifyRemainingPermitCount(int remainingPermitCount, String nameservice, - RouterRpcFairnessPolicyController controller) { - for (int i = 0; i < remainingPermitCount; i++) { - Assert.assertTrue(controller.acquirePermit(nameservice)); - } - Assert.assertFalse(controller.acquirePermit(nameservice)); - } - - private void injectDummyMetrics(Map metrics, String ns, long value) { - metrics.computeIfAbsent(ns, k -> new LongAdder()).add(value); - } - - private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers, - long refreshInterval) { - return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), refreshInterval); - } - - private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers) { - return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), Long.MAX_VALUE); - } - - private Configuration createConf(int handlers, int minHandlersPerNs) { - Configuration conf = new HdfsConfiguration(); - conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers); - conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices); - conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, minHandlersPerNs); - conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, - DynamicRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class); - return conf; - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java index 261f070df09..8307f666b5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java @@ -27,7 +27,6 @@ import org.junit.Test; import org.slf4j.LoggerFactory; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; @@ -103,14 +102,6 @@ public class TestRouterRpcFairnessPolicyController { verifyInstantiationError(conf, 1, 3); } - @Test - public void testAllocationErrorTooFewDedicatedHandlers() { - Configuration conf = createConf(9); - conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, 3); - conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 1); - verifyInstantiationError(conf, CONCURRENT_NS, 9, 3); - } - @Test public void testGetAvailableHandlerOnPerNs() { RouterRpcFairnessPolicyController routerRpcFairnessPolicyController @@ -122,18 +113,6 @@ public class TestRouterRpcFairnessPolicyController { routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); } - @Test - public void testGetPermitCapacityPerNs() { - RouterRpcFairnessPolicyController routerRpcFairnessPolicyController - = getFairnessPolicyController(30); - assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}", - routerRpcFairnessPolicyController.getPermitCapacityPerNs()); - routerRpcFairnessPolicyController.acquirePermit("ns1"); - routerRpcFairnessPolicyController.acquirePermit("ns2"); - assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}", - routerRpcFairnessPolicyController.getPermitCapacityPerNs()); - } - @Test public void testGetAvailableHandlerOnPerNsForNoFairness() { Configuration conf = new Configuration(); @@ -191,20 +170,6 @@ public class TestRouterRpcFairnessPolicyController { logs.getOutput().contains(errorMsg)); } - private void verifyInstantiationError(Configuration conf, String ns, int handlerCount, - int minimumHandler) { - GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class)); - try { - FederationUtil.newFairnessPolicyController(conf); - } catch (IllegalArgumentException e) { - // Ignore the exception as it is expected here. - } - String errorMsg = String.format(StaticRouterRpcFairnessPolicyController.ERROR_NS_MSG, - DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + ns, handlerCount, minimumHandler); - assertTrue("Should contain error message: " + errorMsg, logs.getOutput().contains(errorMsg)); - } - private RouterRpcFairnessPolicyController getFairnessPolicyController( int handlers) { return FederationUtil.newFairnessPolicyController(createConf(handlers));