From e5fe326270259a6bd2a79b5082c65ea9e17ba4da Mon Sep 17 00:00:00 2001 From: Takanobu Asanuma Date: Thu, 10 Sep 2020 01:56:58 +0900 Subject: [PATCH] HADOOP-17165. Implement service-user feature in DecayRPCScheduler. (#2240) --- .../apache/hadoop/ipc/DecayRpcScheduler.java | 39 ++++++++++++++++-- .../src/main/resources/core-default.xml | 8 ++++ .../src/site/markdown/FairCallQueue.md | 4 ++ .../conf/TestCommonConfigurationFields.java | 2 + .../hadoop/ipc/TestDecayRpcScheduler.java | 40 ++++++++++++++++++- 5 files changed, 89 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 45cbd4e99df..dcfa62a0bc8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -20,11 +20,14 @@ package org.apache.hadoop.ipc; import java.lang.ref.WeakReference; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; @@ -108,6 +111,13 @@ public class DecayRpcScheduler implements RpcScheduler, public static final String IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY = "faircallqueue.decay-scheduler.thresholds"; + /** + * Service users will always be scheduled into the highest-priority queue. + * They are specified as a comma-separated list. + */ + public static final String IPC_DECAYSCHEDULER_SERVICE_USERS_KEY = + "decay-scheduler.service-users"; + // Specifies the identity to use when the IdentityProvider cannot handle // a schedulable. public static final String DECAYSCHEDULER_UNKNOWN_IDENTITY = @@ -178,6 +188,7 @@ public class DecayRpcScheduler implements RpcScheduler, private static final double PRECISION = 0.0001; private MetricsProxy metricsProxy; private final CostProvider costProvider; + private Set serviceUserNames; /** * This TimerTask will call decayCurrentCosts until @@ -229,6 +240,7 @@ public class DecayRpcScheduler implements RpcScheduler, conf); this.backOffResponseTimeThresholds = parseBackOffResponseTimeThreshold(ns, conf, numLevels); + this.serviceUserNames = this.parseServiceUserNames(ns, conf); // Setup response time metrics responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels); @@ -359,6 +371,12 @@ public class DecayRpcScheduler implements RpcScheduler, return decimals; } + private Set parseServiceUserNames(String ns, Configuration conf) { + Collection collection = conf.getStringCollection( + ns + "." + IPC_DECAYSCHEDULER_SERVICE_USERS_KEY); + return new HashSet<>(collection); + } + /** * Generate default thresholds if user did not specify. Strategy is * to halve each time, since queue usage tends to be exponential. @@ -486,7 +504,7 @@ public class DecayRpcScheduler implements RpcScheduler, AtomicLong value = entry.getValue().get(0); long snapshot = value.get(); - int computedLevel = computePriorityLevel(snapshot); + int computedLevel = computePriorityLevel(snapshot, id); nextCache.put(id, computedLevel); } @@ -534,9 +552,15 @@ public class DecayRpcScheduler implements RpcScheduler, * Given the cost for an identity, compute a scheduling decision. * * @param cost the cost for an identity + * @param identity the identity of the user * @return scheduling decision from 0 to numLevels - 1 */ - private int computePriorityLevel(long cost) { + private int computePriorityLevel(long cost, Object identity) { + // The priority for service users is always 0 + if (isServiceUser((String)identity)) { + return 0; + } + long totalCallSnapshot = totalDecayedCallCost.get(); double proportion = 0; @@ -576,7 +600,7 @@ public class DecayRpcScheduler implements RpcScheduler, // Cache was no good, compute it List costList = callCosts.get(identity); long currentCost = costList == null ? 0 : costList.get(0).get(); - int priority = computePriorityLevel(currentCost); + int priority = computePriorityLevel(currentCost, identity); LOG.debug("compute priority for {} priority {}", identity, priority); return priority; } @@ -598,6 +622,10 @@ public class DecayRpcScheduler implements RpcScheduler, return cachedOrComputedPriorityLevel(identity); } + private boolean isServiceUser(String userName) { + return this.serviceUserNames.contains(userName); + } + @Override public boolean shouldBackOff(Schedulable obj) { Boolean backOff = false; @@ -698,6 +726,11 @@ public class DecayRpcScheduler implements RpcScheduler, return thresholds; } + @VisibleForTesting + Set getServiceUserNames() { + return serviceUserNames; + } + @VisibleForTesting void forceDecay() { decayCurrentCosts(); diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 999fc4aa98f..086fa35a20e 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2584,6 +2584,14 @@ + + ipc.[port_number].decay-scheduler.service-users + + Service users will always be scheduled into the highest-priority + queue. They are specified as a comma-separated list. + + + ipc.[port_number].weighted-cost.lockshared 10 diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md index 887d3053d26..69d988154b7 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md @@ -91,6 +91,9 @@ This is configurable via the **identity provider**, which defaults to the **User provider simply uses the username of the client submitting the request. However, a custom identity provider can be used to performing throttling based on other groupings, or using an external identity provider. +If particular users submit important requests and you don't want to limit them, you can set them up as the +**service-users**. They are always scheduled into the high-priority queue. + ### Cost-based Fair Call Queue Though the fair call queue itself does a good job of mitigating the impact from users who submit a very high _number_ @@ -138,6 +141,7 @@ omitted. | decay-scheduler.backoff.responsetime.enable | DecayRpcScheduler | Whether or not to enable the backoff by response time feature. | false | | decay-scheduler.backoff.responsetime.thresholds | DecayRpcScheduler | The response time thresholds, as time durations, for each priority queue. If the average response time for a queue is above this threshold, backoff will occur in lower priority queues. This should be a comma-separated list of length equal to the number of priority levels. | Threshold increases by 10s per level (e.g., for 4 levels: `10s,20s,30s,40s`) | | decay-scheduler.metrics.top.user.count | DecayRpcScheduler | The number of top (i.e., heaviest) users to emit metric information about. | 10 | +| decay-scheduler.service-users | DecayRpcScheduler | Service users will always be scheduled into the highest-priority queue. They are specified as a comma-separated list. | | | weighted-cost.lockshared | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds a shared (read) lock. | 10 | | weighted-cost.lockexclusive | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds an exclusive (write) lock. | 100 | | weighted-cost.{handler,lockfree,response} | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phases which do not involve holding a lock. See `org.apache.hadoop.ipc.ProcessingDetails.Timing` for more details on each phase. | 1 | diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java index dd9f41a7a35..30ed4a41937 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java @@ -172,6 +172,8 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase { "ipc.[port_number].decay-scheduler.backoff.responsetime.thresholds"); xmlPropsToSkipCompare.add( "ipc.[port_number].decay-scheduler.metrics.top.user.count"); + xmlPropsToSkipCompare.add( + "ipc.[port_number].decay-scheduler.service-users"); xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.lockshared"); xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.lockexclusive"); xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.handler"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index 71723325e2c..5614f05307a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -20,9 +20,14 @@ package org.apache.hadoop.ipc; import static java.lang.Thread.sleep; +import java.util.Map; +import org.eclipse.jetty.util.ajax.JSON; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -383,4 +388,37 @@ public class TestDecayRpcScheduler { scheduler.addResponseTime("ignored", mockCall, emptyProcessingDetails); return priority; } + + @Test + public void testServiceUsers() { + Configuration conf = new Configuration(); + conf.setLong("ipc.19." + + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999); + conf.set("ipc.19." + DecayRpcScheduler.IPC_DECAYSCHEDULER_SERVICE_USERS_KEY, + "service1,service2"); + scheduler = new DecayRpcScheduler(4, "ipc.19", conf); + + assertTrue(scheduler.getServiceUserNames().contains("service1")); + assertTrue(scheduler.getServiceUserNames().contains("service2")); + + for (int i = 0; i < 10; i++) { + getPriorityIncrementCallCount("user1"); + getPriorityIncrementCallCount("service1"); + getPriorityIncrementCallCount("service2"); + } + + assertNotEquals(0, scheduler.getPriorityLevel(mockCall("user1"))); + // The priorities of service users should be always 0. + assertEquals(0, scheduler.getPriorityLevel(mockCall("service1"))); + assertEquals(0, scheduler.getPriorityLevel(mockCall("service2"))); + + // DecayRpcScheduler caches priorities after decay + scheduler.forceDecay(); + // Check priorities on cache + String summary = scheduler.getSchedulingDecisionSummary(); + Map summaryMap = (Map) JSON.parse(summary); + assertNotEquals(0L, summaryMap.get("user1")); + assertEquals(0L, summaryMap.get("service1")); + assertEquals(0L, summaryMap.get("service2")); + } } \ No newline at end of file