From 96ea7f056238183cbd73d07bb488fd6440ff233a Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Fri, 24 May 2019 17:09:52 -0700 Subject: [PATCH] HDFS-14403. Cost-based extension to the RPC Fair Call Queue. Contributed by Christopher Gregorian. (cherry picked from 129576f628d370def74e56112aba3a93e97bbf70) (cherry picked from e4b650f91e7353f98fee2a725490ca45aa81e1a4) --- .../hadoop/fs/CommonConfigurationKeys.java | 1 + .../apache/hadoop/ipc/CallQueueManager.java | 1 - .../org/apache/hadoop/ipc/CostProvider.java | 46 ++++ .../apache/hadoop/ipc/DecayRpcScheduler.java | 235 ++++++++++-------- .../hadoop/ipc/DefaultCostProvider.java | 43 ++++ .../hadoop/ipc/WeightedTimeCostProvider.java | 110 ++++++++ .../hadoop/ipc/TestDecayRpcScheduler.java | 174 ++++++++++--- .../java/org/apache/hadoop/ipc/TestRPC.java | 102 ++++---- .../ipc/TestWeightedTimeCostProvider.java | 86 +++++++ 9 files changed, 613 insertions(+), 185 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 1eb27f8fca8..75749499ff7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -104,6 +104,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl"; public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl"; public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl"; + public static final String IPC_COST_PROVIDER_KEY = "cost-provider.impl"; public static final String IPC_BACKOFF_ENABLE = "backoff.enable"; public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 765ce183737..b1c9f6f402c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -197,7 +197,6 @@ public class CallQueueManager } // This should be only called once per call and cached in the call object - // each getPriorityLevel call will increment the counter for the caller int getPriorityLevel(Schedulable e) { return scheduler.getPriorityLevel(e); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java new file mode 100644 index 00000000000..cf76e7d64f0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import org.apache.hadoop.conf.Configuration; + +/** + * Used by {@link DecayRpcScheduler} to get the cost of users' operations. This + * is configurable using + * {@link org.apache.hadoop.fs.CommonConfigurationKeys#IPC_COST_PROVIDER_KEY}. + */ +public interface CostProvider { + + /** + * Initialize this provider using the given configuration, examining only + * ones which fall within the provided namespace. + * + * @param namespace The namespace to use when looking up configurations. + * @param conf The configuration + */ + void init(String namespace, Configuration conf); + + /** + * Get cost from {@link ProcessingDetails} which will be used in scheduler. + * + * @param details Process details + * @return The cost of the call + */ + long getCost(ProcessingDetails details); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 3c1569a2c84..e2dd4f78a3b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -58,8 +58,8 @@ import org.slf4j.LoggerFactory; import static org.apache.hadoop.ipc.ProcessingDetails.Timing; /** - * The decay RPC scheduler counts incoming requests in a map, then - * decays the counts at a fixed time interval. The scheduler is optimized + * The decay RPC scheduler tracks the cost of incoming requests in a map, then + * decays the costs at a fixed time interval. The scheduler is optimized * for large periods (on the order of seconds), as it offloads work to the * decay sweep. */ @@ -134,15 +134,15 @@ public class DecayRpcScheduler implements RpcScheduler, private static final ObjectWriter WRITER = new ObjectMapper().writer(); // Track the decayed and raw (no decay) number of calls for each schedulable - // identity from all previous decay windows: idx 0 for decayed call count and - // idx 1 for the raw call count - private final ConcurrentHashMap> callCounts = + // identity from all previous decay windows: idx 0 for decayed call cost and + // idx 1 for the raw call cost + private final ConcurrentHashMap> callCosts = new ConcurrentHashMap>(); - // Should be the sum of all AtomicLongs in decayed callCounts - private final AtomicLong totalDecayedCallCount = new AtomicLong(); - // The sum of all AtomicLongs in raw callCounts - private final AtomicLong totalRawCallCount = new AtomicLong(); + // Should be the sum of all AtomicLongs in decayed callCosts + private final AtomicLong totalDecayedCallCost = new AtomicLong(); + // The sum of all AtomicLongs in raw callCosts + private final AtomicLong totalRawCallCost = new AtomicLong(); // Track total call count and response time in current decay window @@ -160,7 +160,7 @@ public class DecayRpcScheduler implements RpcScheduler, // Tune the behavior of the scheduler private final long decayPeriodMillis; // How long between each tick - private final double decayFactor; // nextCount = currentCount * decayFactor + private final double decayFactor; // nextCost = currentCost * decayFactor private final int numLevels; private final double[] thresholds; private final IdentityProvider identityProvider; @@ -170,9 +170,10 @@ public class DecayRpcScheduler implements RpcScheduler, private final int topUsersCount; // e.g., report top 10 users' metrics private static final double PRECISION = 0.0001; private MetricsProxy metricsProxy; + private final CostProvider costProvider; /** - * This TimerTask will call decayCurrentCounts until + * This TimerTask will call decayCurrentCosts until * the scheduler has been garbage collected. */ public static class DecayTask extends TimerTask { @@ -188,7 +189,7 @@ public class DecayRpcScheduler implements RpcScheduler, public void run() { DecayRpcScheduler sched = schedulerRef.get(); if (sched != null) { - sched.decayCurrentCounts(); + sched.decayCurrentCosts(); } else { // Our scheduler was garbage collected since it is no longer in use, // so we should terminate the timer as well @@ -215,6 +216,7 @@ public class DecayRpcScheduler implements RpcScheduler, this.decayFactor = parseDecayFactor(ns, conf); this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf); this.identityProvider = this.parseIdentityProvider(ns, conf); + this.costProvider = this.parseCostProvider(ns, conf); this.thresholds = parseThresholds(ns, conf, numLevels); this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns, conf); @@ -242,6 +244,24 @@ public class DecayRpcScheduler implements RpcScheduler, recomputeScheduleCache(); } + private CostProvider parseCostProvider(String ns, Configuration conf) { + List providers = conf.getInstances( + ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY, + CostProvider.class); + + if (providers.size() < 1) { + LOG.info("CostProvider not specified, defaulting to DefaultCostProvider"); + return new DefaultCostProvider(); + } else if (providers.size() > 1) { + LOG.warn("Found multiple CostProviders; using: {}", + providers.get(0).getClass()); + } + + CostProvider provider = providers.get(0); // use the first + provider.init(ns, conf); + return provider; + } + // Load configs private IdentityProvider parseIdentityProvider(String ns, Configuration conf) { @@ -388,41 +408,41 @@ public class DecayRpcScheduler implements RpcScheduler, } /** - * Decay the stored counts for each user and clean as necessary. + * Decay the stored costs for each user and clean as necessary. * This method should be called periodically in order to keep - * counts current. + * costs current. */ - private void decayCurrentCounts() { + private void decayCurrentCosts() { try { - long totalDecayedCount = 0; - long totalRawCount = 0; + long totalDecayedCost = 0; + long totalRawCost = 0; Iterator>> it = - callCounts.entrySet().iterator(); + callCosts.entrySet().iterator(); while (it.hasNext()) { Map.Entry> entry = it.next(); - AtomicLong decayedCount = entry.getValue().get(0); - AtomicLong rawCount = entry.getValue().get(1); + AtomicLong decayedCost = entry.getValue().get(0); + AtomicLong rawCost = entry.getValue().get(1); // Compute the next value by reducing it by the decayFactor - totalRawCount += rawCount.get(); - long currentValue = decayedCount.get(); + totalRawCost += rawCost.get(); + long currentValue = decayedCost.get(); long nextValue = (long) (currentValue * decayFactor); - totalDecayedCount += nextValue; - decayedCount.set(nextValue); + totalDecayedCost += nextValue; + decayedCost.set(nextValue); if (nextValue == 0) { // We will clean up unused keys here. An interesting optimization - // might be to have an upper bound on keyspace in callCounts and only + // might be to have an upper bound on keyspace in callCosts and only // clean once we pass it. it.remove(); } } // Update the total so that we remain in sync - totalDecayedCallCount.set(totalDecayedCount); - totalRawCallCount.set(totalRawCount); + totalDecayedCallCost.set(totalDecayedCost); + totalRawCallCost.set(totalRawCost); // Now refresh the cache of scheduling decisions recomputeScheduleCache(); @@ -430,19 +450,19 @@ public class DecayRpcScheduler implements RpcScheduler, // Update average response time with decay updateAverageResponseTime(true); } catch (Exception ex) { - LOG.error("decayCurrentCounts exception: " + - ExceptionUtils.getFullStackTrace(ex)); + LOG.error("decayCurrentCosts exception: " + + ExceptionUtils.getStackTrace(ex)); throw ex; } } /** - * Update the scheduleCache to match current conditions in callCounts. + * Update the scheduleCache to match current conditions in callCosts. */ private void recomputeScheduleCache() { Map nextCache = new HashMap(); - for (Map.Entry> entry : callCounts.entrySet()) { + for (Map.Entry> entry : callCosts.entrySet()) { Object id = entry.getKey(); AtomicLong value = entry.getValue().get(0); @@ -457,51 +477,52 @@ public class DecayRpcScheduler implements RpcScheduler, } /** - * Get the number of occurrences and increment atomically. - * @param identity the identity of the user to increment - * @return the value before incrementation + * Adjust the stored cost for a given identity. + * + * @param identity the identity of the user whose cost should be adjusted + * @param costDelta the cost to add for the given identity */ - private long getAndIncrementCallCounts(Object identity) - throws InterruptedException { - // We will increment the count, or create it if no such count exists - List count = this.callCounts.get(identity); - if (count == null) { - // Create the counts since no such count exists. - // idx 0 for decayed call count - // idx 1 for the raw call count - count = new ArrayList(2); - count.add(new AtomicLong(0)); - count.add(new AtomicLong(0)); + private void addCost(Object identity, long costDelta) { + // We will increment the cost, or create it if no such cost exists + List cost = this.callCosts.get(identity); + if (cost == null) { + // Create the costs since no such cost exists. + // idx 0 for decayed call cost + // idx 1 for the raw call cost + cost = new ArrayList(2); + cost.add(new AtomicLong(0)); + cost.add(new AtomicLong(0)); // Put it in, or get the AtomicInteger that was put in by another thread - List otherCount = callCounts.putIfAbsent(identity, count); - if (otherCount != null) { - count = otherCount; + List otherCost = callCosts.putIfAbsent(identity, cost); + if (otherCost != null) { + cost = otherCost; } } // Update the total - totalDecayedCallCount.getAndIncrement(); - totalRawCallCount.getAndIncrement(); + totalDecayedCallCost.getAndAdd(costDelta); + totalRawCallCost.getAndAdd(costDelta); // At this point value is guaranteed to be not null. It may however have - // been clobbered from callCounts. Nonetheless, we return what + // been clobbered from callCosts. Nonetheless, we return what // we have. - count.get(1).getAndIncrement(); - return count.get(0).getAndIncrement(); + cost.get(1).getAndAdd(costDelta); + cost.get(0).getAndAdd(costDelta); } /** - * Given the number of occurrences, compute a scheduling decision. - * @param occurrences how many occurrences + * Given the cost for an identity, compute a scheduling decision. + * + * @param cost the cost for an identity * @return scheduling decision from 0 to numLevels - 1 */ - private int computePriorityLevel(long occurrences) { - long totalCallSnapshot = totalDecayedCallCount.get(); + private int computePriorityLevel(long cost) { + long totalCallSnapshot = totalDecayedCallCost.get(); double proportion = 0; if (totalCallSnapshot > 0) { - proportion = (double) occurrences / totalCallSnapshot; + proportion = (double) cost / totalCallSnapshot; } // Start with low priority levels, since they will be most common @@ -522,31 +543,23 @@ public class DecayRpcScheduler implements RpcScheduler, * @return integer scheduling decision from 0 to numLevels - 1 */ private int cachedOrComputedPriorityLevel(Object identity) { - try { - long occurrences = this.getAndIncrementCallCounts(identity); - - // Try the cache - Map scheduleCache = scheduleCacheRef.get(); - if (scheduleCache != null) { - Integer priority = scheduleCache.get(identity); - if (priority != null) { - LOG.debug("Cache priority for: {} with priority: {}", identity, - priority); - return priority; - } + // Try the cache + Map scheduleCache = scheduleCacheRef.get(); + if (scheduleCache != null) { + Integer priority = scheduleCache.get(identity); + if (priority != null) { + LOG.debug("Cache priority for: {} with priority: {}", identity, + priority); + return priority; } - - // Cache was no good, compute it - int priority = computePriorityLevel(occurrences); - LOG.debug("compute priority for " + identity + " priority " + priority); - return priority; - - } catch (InterruptedException ie) { - LOG.warn("Caught InterruptedException, returning low priority level"); - LOG.debug("Fallback priority for: {} with priority: {}", identity, - numLevels - 1); - return numLevels - 1; } + + // Cache was no good, compute it + List costList = callCosts.get(identity); + long currentCost = costList == null ? 0 : costList.get(0).get(); + int priority = computePriorityLevel(currentCost); + LOG.debug("compute priority for {} priority {}", identity, priority); + return priority; } /** @@ -596,6 +609,10 @@ public class DecayRpcScheduler implements RpcScheduler, @Override public void addResponseTime(String callName, Schedulable schedulable, ProcessingDetails details) { + String user = identityProvider.makeIdentity(schedulable); + long processingCost = costProvider.getCost(details); + addCost(user, processingCost); + int priorityLevel = schedulable.getPriorityLevel(); long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS); long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS); @@ -643,22 +660,30 @@ public class DecayRpcScheduler implements RpcScheduler, // For testing @VisibleForTesting - public double getDecayFactor() { return decayFactor; } + double getDecayFactor() { + return decayFactor; + } @VisibleForTesting - public long getDecayPeriodMillis() { return decayPeriodMillis; } + long getDecayPeriodMillis() { + return decayPeriodMillis; + } @VisibleForTesting - public double[] getThresholds() { return thresholds; } + double[] getThresholds() { + return thresholds; + } @VisibleForTesting - public void forceDecay() { decayCurrentCounts(); } + void forceDecay() { + decayCurrentCosts(); + } @VisibleForTesting - public Map getCallCountSnapshot() { + Map getCallCostSnapshot() { HashMap snapshot = new HashMap(); - for (Map.Entry> entry : callCounts.entrySet()) { + for (Map.Entry> entry : callCosts.entrySet()) { snapshot.put(entry.getKey(), entry.getValue().get(0).get()); } @@ -666,8 +691,8 @@ public class DecayRpcScheduler implements RpcScheduler, } @VisibleForTesting - public long getTotalCallSnapshot() { - return totalDecayedCallCount.get(); + long getTotalCallSnapshot() { + return totalDecayedCallCost.get(); } /** @@ -800,15 +825,15 @@ public class DecayRpcScheduler implements RpcScheduler, } public int getUniqueIdentityCount() { - return callCounts.size(); + return callCosts.size(); } public long getTotalCallVolume() { - return totalDecayedCallCount.get(); + return totalDecayedCallCost.get(); } public long getTotalRawCallVolume() { - return totalRawCallCount.get(); + return totalRawCallCost.get(); } public long[] getResponseTimeCountInLastWindow() { @@ -901,17 +926,17 @@ public class DecayRpcScheduler implements RpcScheduler, } } - // Get the top N callers' raw call count and scheduler decision + // Get the top N callers' raw call cost and scheduler decision private TopN getTopCallers(int n) { TopN topNCallers = new TopN(n); Iterator>> it = - callCounts.entrySet().iterator(); + callCosts.entrySet().iterator(); while (it.hasNext()) { Map.Entry> entry = it.next(); String caller = entry.getKey().toString(); - Long count = entry.getValue().get(1).get(); - if (count > 0) { - topNCallers.offer(new NameValuePair(caller, count)); + Long cost = entry.getValue().get(1).get(); + if (cost > 0) { + topNCallers.offer(new NameValuePair(caller, cost)); } } return topNCallers; @@ -932,25 +957,25 @@ public class DecayRpcScheduler implements RpcScheduler, public String getCallVolumeSummary() { try { - return WRITER.writeValueAsString(getDecayedCallCounts()); + return WRITER.writeValueAsString(getDecayedCallCosts()); } catch (Exception e) { return "Error: " + e.getMessage(); } } - private Map getDecayedCallCounts() { - Map decayedCallCounts = new HashMap<>(callCounts.size()); + private Map getDecayedCallCosts() { + Map decayedCallCosts = new HashMap<>(callCosts.size()); Iterator>> it = - callCounts.entrySet().iterator(); + callCosts.entrySet().iterator(); while (it.hasNext()) { Map.Entry> entry = it.next(); Object user = entry.getKey(); - Long decayedCount = entry.getValue().get(0).get(); - if (decayedCount > 0) { - decayedCallCounts.put(user, decayedCount); + Long decayedCost = entry.getValue().get(0).get(); + if (decayedCost > 0) { + decayedCallCosts.put(user, decayedCost); } } - return decayedCallCounts; + return decayedCallCosts; } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java new file mode 100644 index 00000000000..ad56ddfb2e6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import org.apache.hadoop.conf.Configuration; + +/** + * Ignores process details and returns a constant value for each call. + */ +public class DefaultCostProvider implements CostProvider { + + @Override + public void init(String namespace, Configuration conf) { + // No-op + } + + /** + * Returns 1, regardless of the processing details. + * + * @param details Process details (ignored) + * @return 1 + */ + @Override + public long getCost(ProcessingDetails details) { + return 1; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java new file mode 100644 index 00000000000..4304b24299f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.util.Locale; +import org.apache.hadoop.conf.Configuration; + +import static org.apache.hadoop.ipc.ProcessingDetails.Timing; + +/** + * A {@link CostProvider} that calculates the cost for an operation + * as a weighted sum of its processing time values (see + * {@link ProcessingDetails}). This can be used by specifying the + * {@link org.apache.hadoop.fs.CommonConfigurationKeys#IPC_COST_PROVIDER_KEY} + * configuration key. + * + *

This allows for configuration of how heavily each of the operations + * within {@link ProcessingDetails} is weighted. By default, + * {@link ProcessingDetails.Timing#LOCKFREE}, + * {@link ProcessingDetails.Timing#RESPONSE}, and + * {@link ProcessingDetails.Timing#HANDLER} times have a weight of + * {@value #DEFAULT_LOCKFREE_WEIGHT}, + * {@link ProcessingDetails.Timing#LOCKSHARED} has a weight of + * {@value #DEFAULT_LOCKSHARED_WEIGHT}, + * {@link ProcessingDetails.Timing#LOCKEXCLUSIVE} has a weight of + * {@value #DEFAULT_LOCKEXCLUSIVE_WEIGHT}, and others are ignored. + * These values can all be configured using the {@link #WEIGHT_CONFIG_PREFIX} + * key, prefixed with the IPC namespace, and suffixed with the name of the + * timing measurement from {@link ProcessingDetails} (all lowercase). + * For example, to set the lock exclusive weight to be 1000, set: + *

+ *   ipc.8020.cost-provider.impl=org.apache.hadoop.ipc.WeightedTimeCostProvider
+ *   ipc.8020.weighted-cost.lockexclusive=1000
+ * 
+ */ +public class WeightedTimeCostProvider implements CostProvider { + + /** + * The prefix used in configuration values specifying the weight to use when + * determining the cost of an operation. See the class Javadoc for more info. + */ + public static final String WEIGHT_CONFIG_PREFIX = ".weighted-cost."; + static final int DEFAULT_LOCKFREE_WEIGHT = 1; + static final int DEFAULT_LOCKSHARED_WEIGHT = 10; + static final int DEFAULT_LOCKEXCLUSIVE_WEIGHT = 100; + + private long[] weights; + + @Override + public void init(String namespace, Configuration conf) { + weights = new long[Timing.values().length]; + for (Timing timing : ProcessingDetails.Timing.values()) { + final int defaultValue; + switch (timing) { + case LOCKFREE: + case RESPONSE: + case HANDLER: + defaultValue = DEFAULT_LOCKFREE_WEIGHT; + break; + case LOCKSHARED: + defaultValue = DEFAULT_LOCKSHARED_WEIGHT; + break; + case LOCKEXCLUSIVE: + defaultValue = DEFAULT_LOCKEXCLUSIVE_WEIGHT; + break; + default: + // by default don't bill for queueing or lock wait time + defaultValue = 0; + } + String key = namespace + WEIGHT_CONFIG_PREFIX + + timing.name().toLowerCase(Locale.ENGLISH); + weights[timing.ordinal()] = conf.getInt(key, defaultValue); + } + } + + /** + * Calculates a weighted sum of the times stored on the provided processing + * details to be used as the cost in {@link DecayRpcScheduler}. + * + * @param details Processing details + * @return The weighted sum of the times. The returned unit is the same + * as the default unit used by the provided processing details. + */ + @Override + public long getCost(ProcessingDetails details) { + assert weights != null : "Cost provider must be initialized before use"; + long cost = 0; + // weights was initialized to the same length as Timing.values() + for (int i = 0; i < Timing.values().length; i++) { + cost += details.get(Timing.values()[i]) * weights[i]; + } + return cost; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index 10ab40ace1f..7bdc6b5e96d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -26,6 +26,7 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.conf.Configuration; @@ -36,6 +37,7 @@ import javax.management.ObjectName; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.lang.management.ManagementFactory; +import java.util.concurrent.TimeUnit; public class TestDecayRpcScheduler { private Schedulable mockCall(String id) { @@ -131,67 +133,69 @@ public class TestDecayRpcScheduler { conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush scheduler = new DecayRpcScheduler(1, "ns", conf); - assertEquals(0, scheduler.getCallCountSnapshot().size()); // empty first + assertEquals(0, scheduler.getCallCostSnapshot().size()); // empty first - scheduler.getPriorityLevel(mockCall("A")); - assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue()); - assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue()); + getPriorityIncrementCallCount("A"); + assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue()); + assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue()); - scheduler.getPriorityLevel(mockCall("A")); - scheduler.getPriorityLevel(mockCall("B")); - scheduler.getPriorityLevel(mockCall("A")); + getPriorityIncrementCallCount("A"); + getPriorityIncrementCallCount("B"); + getPriorityIncrementCallCount("A"); - assertEquals(3, scheduler.getCallCountSnapshot().get("A").longValue()); - assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue()); + assertEquals(3, scheduler.getCallCostSnapshot().get("A").longValue()); + assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue()); } @Test @SuppressWarnings("deprecation") public void testDecay() throws Exception { Configuration conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5"); + conf.setLong("ns." // Never decay + + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999999); + conf.setDouble("ns." + + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY, 0.5); scheduler = new DecayRpcScheduler(1, "ns", conf); assertEquals(0, scheduler.getTotalCallSnapshot()); for (int i = 0; i < 4; i++) { - scheduler.getPriorityLevel(mockCall("A")); + getPriorityIncrementCallCount("A"); } sleep(1000); for (int i = 0; i < 8; i++) { - scheduler.getPriorityLevel(mockCall("B")); + getPriorityIncrementCallCount("B"); } assertEquals(12, scheduler.getTotalCallSnapshot()); - assertEquals(4, scheduler.getCallCountSnapshot().get("A").longValue()); - assertEquals(8, scheduler.getCallCountSnapshot().get("B").longValue()); + assertEquals(4, scheduler.getCallCostSnapshot().get("A").longValue()); + assertEquals(8, scheduler.getCallCostSnapshot().get("B").longValue()); scheduler.forceDecay(); assertEquals(6, scheduler.getTotalCallSnapshot()); - assertEquals(2, scheduler.getCallCountSnapshot().get("A").longValue()); - assertEquals(4, scheduler.getCallCountSnapshot().get("B").longValue()); + assertEquals(2, scheduler.getCallCostSnapshot().get("A").longValue()); + assertEquals(4, scheduler.getCallCostSnapshot().get("B").longValue()); scheduler.forceDecay(); assertEquals(3, scheduler.getTotalCallSnapshot()); - assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue()); - assertEquals(2, scheduler.getCallCountSnapshot().get("B").longValue()); + assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue()); + assertEquals(2, scheduler.getCallCostSnapshot().get("B").longValue()); scheduler.forceDecay(); assertEquals(1, scheduler.getTotalCallSnapshot()); - assertEquals(null, scheduler.getCallCountSnapshot().get("A")); - assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue()); + assertEquals(null, scheduler.getCallCostSnapshot().get("A")); + assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue()); scheduler.forceDecay(); assertEquals(0, scheduler.getTotalCallSnapshot()); - assertEquals(null, scheduler.getCallCountSnapshot().get("A")); - assertEquals(null, scheduler.getCallCountSnapshot().get("B")); + assertEquals(null, scheduler.getCallCostSnapshot().get("A")); + assertEquals(null, scheduler.getCallCostSnapshot().get("B")); } @Test @@ -205,16 +209,16 @@ public class TestDecayRpcScheduler { .IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75"); scheduler = new DecayRpcScheduler(4, namespace, conf); - assertEquals(0, scheduler.getPriorityLevel(mockCall("A"))); - assertEquals(2, scheduler.getPriorityLevel(mockCall("A"))); - assertEquals(0, scheduler.getPriorityLevel(mockCall("B"))); - assertEquals(1, scheduler.getPriorityLevel(mockCall("B"))); - assertEquals(0, scheduler.getPriorityLevel(mockCall("C"))); - assertEquals(0, scheduler.getPriorityLevel(mockCall("C"))); - assertEquals(1, scheduler.getPriorityLevel(mockCall("A"))); - assertEquals(1, scheduler.getPriorityLevel(mockCall("A"))); - assertEquals(1, scheduler.getPriorityLevel(mockCall("A"))); - assertEquals(2, scheduler.getPriorityLevel(mockCall("A"))); + assertEquals(0, getPriorityIncrementCallCount("A")); // 0 out of 0 calls + assertEquals(3, getPriorityIncrementCallCount("A")); // 1 out of 1 calls + assertEquals(0, getPriorityIncrementCallCount("B")); // 0 out of 2 calls + assertEquals(1, getPriorityIncrementCallCount("B")); // 1 out of 3 calls + assertEquals(0, getPriorityIncrementCallCount("C")); // 0 out of 4 calls + assertEquals(0, getPriorityIncrementCallCount("C")); // 1 out of 5 calls + assertEquals(1, getPriorityIncrementCallCount("A")); // 2 out of 6 calls + assertEquals(1, getPriorityIncrementCallCount("A")); // 3 out of 7 calls + assertEquals(2, getPriorityIncrementCallCount("A")); // 4 out of 8 calls + assertEquals(2, getPriorityIncrementCallCount("A")); // 5 out of 9 calls MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName mxbeanName = new ObjectName( @@ -243,7 +247,7 @@ public class TestDecayRpcScheduler { assertEquals(0, scheduler.getTotalCallSnapshot()); for (int i = 0; i < 64; i++) { - scheduler.getPriorityLevel(mockCall("A")); + getPriorityIncrementCallCount("A"); } // It should eventually decay to zero @@ -272,6 +276,108 @@ public class TestDecayRpcScheduler { //set systout back System.setOut(output); } + } + @Test + public void testUsingWeightedTimeCostProvider() { + scheduler = getSchedulerWithWeightedTimeCostProvider(3); + + // 3 details in increasing order of cost. Although medium has a longer + // duration, the shared lock is weighted less than the exclusive lock + ProcessingDetails callDetailsLow = + new ProcessingDetails(TimeUnit.MILLISECONDS); + callDetailsLow.set(ProcessingDetails.Timing.LOCKFREE, 1); + ProcessingDetails callDetailsMedium = + new ProcessingDetails(TimeUnit.MILLISECONDS); + callDetailsMedium.set(ProcessingDetails.Timing.LOCKSHARED, 500); + ProcessingDetails callDetailsHigh = + new ProcessingDetails(TimeUnit.MILLISECONDS); + callDetailsHigh.set(ProcessingDetails.Timing.LOCKEXCLUSIVE, 100); + + for (int i = 0; i < 10; i++) { + scheduler.addResponseTime("ignored", mockCall("LOW"), callDetailsLow); + } + scheduler.addResponseTime("ignored", mockCall("MED"), callDetailsMedium); + scheduler.addResponseTime("ignored", mockCall("HIGH"), callDetailsHigh); + + assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW"))); + assertEquals(1, scheduler.getPriorityLevel(mockCall("MED"))); + assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH"))); + + assertEquals(3, scheduler.getUniqueIdentityCount()); + long totalCallInitial = scheduler.getTotalRawCallVolume(); + assertEquals(totalCallInitial, scheduler.getTotalCallVolume()); + + scheduler.forceDecay(); + + // Relative priorities should stay the same after a single decay + assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW"))); + assertEquals(1, scheduler.getPriorityLevel(mockCall("MED"))); + assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH"))); + + assertEquals(3, scheduler.getUniqueIdentityCount()); + assertEquals(totalCallInitial, scheduler.getTotalRawCallVolume()); + assertTrue(scheduler.getTotalCallVolume() < totalCallInitial); + + for (int i = 0; i < 100; i++) { + scheduler.forceDecay(); + } + // After enough decay cycles, all callers should be high priority again + assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW"))); + assertEquals(0, scheduler.getPriorityLevel(mockCall("MED"))); + assertEquals(0, scheduler.getPriorityLevel(mockCall("HIGH"))); + } + + @Test + public void testUsingWeightedTimeCostProviderWithZeroCostCalls() { + scheduler = getSchedulerWithWeightedTimeCostProvider(2); + + ProcessingDetails emptyDetails = + new ProcessingDetails(TimeUnit.MILLISECONDS); + + for (int i = 0; i < 1000; i++) { + scheduler.addResponseTime("ignored", mockCall("MANY"), emptyDetails); + } + scheduler.addResponseTime("ignored", mockCall("FEW"), emptyDetails); + + // Since the calls are all "free", they should have the same priority + assertEquals(0, scheduler.getPriorityLevel(mockCall("MANY"))); + assertEquals(0, scheduler.getPriorityLevel(mockCall("FEW"))); + } + + @Test + public void testUsingWeightedTimeCostProviderNoRequests() { + scheduler = getSchedulerWithWeightedTimeCostProvider(2); + + assertEquals(0, scheduler.getPriorityLevel(mockCall("A"))); + } + + /** + * Get a scheduler that uses {@link WeightedTimeCostProvider} and has + * normal decaying disabled. + */ + private static DecayRpcScheduler getSchedulerWithWeightedTimeCostProvider( + int priorityLevels) { + Configuration conf = new Configuration(); + conf.setClass("ns." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY, + WeightedTimeCostProvider.class, CostProvider.class); + conf.setLong("ns." + + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999); + return new DecayRpcScheduler(priorityLevels, "ns", conf); + } + + /** + * Get the priority and increment the call count, assuming that + * {@link DefaultCostProvider} is in use. + */ + private int getPriorityIncrementCallCount(String callId) { + Schedulable mockCall = mockCall(callId); + int priority = scheduler.getPriorityLevel(mockCall); + // The DefaultCostProvider uses a cost of 1 for all calls, ignoring + // the processing details, so an empty one is fine + ProcessingDetails emptyProcessingDetails = + new ProcessingDetails(TimeUnit.MILLISECONDS); + scheduler.addResponseTime("ignored", mockCall, emptyProcessingDetails); + return priority; } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 8c7881ab390..95bc6737dd0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ipc; -import com.google.common.base.Supplier; import com.google.protobuf.ServiceException; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; @@ -1187,15 +1186,6 @@ public class TestRPC extends TestRpcBase { Exception lastException = null; proxy = getClient(addr, conf); - MetricsRecordBuilder rb1 = - getMetrics("DecayRpcSchedulerMetrics2." + ns); - final long beginDecayedCallVolume = MetricsAsserts.getLongCounter( - "DecayedCallVolume", rb1); - final long beginRawCallVolume = MetricsAsserts.getLongCounter( - "CallVolume", rb1); - final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers", - rb1); - try { // start a sleep RPC call that sleeps 3s. for (int i = 0; i < numClients; i++) { @@ -1223,41 +1213,6 @@ public class TestRPC extends TestRpcBase { } else { lastException = unwrapExeption; } - - // Lets Metric system update latest metrics - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - MetricsRecordBuilder rb2 = - getMetrics("DecayRpcSchedulerMetrics2." + ns); - long decayedCallVolume1 = MetricsAsserts.getLongCounter( - "DecayedCallVolume", rb2); - long rawCallVolume1 = MetricsAsserts.getLongCounter( - "CallVolume", rb2); - int uniqueCaller1 = MetricsAsserts.getIntCounter( - "UniqueCallers", rb2); - long callVolumePriority0 = MetricsAsserts.getLongGauge( - "Priority.0.CompletedCallVolume", rb2); - long callVolumePriority1 = MetricsAsserts.getLongGauge( - "Priority.1.CompletedCallVolume", rb2); - double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge( - "Priority.0.AvgResponseTime", rb2); - double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge( - "Priority.1.AvgResponseTime", rb2); - - LOG.info("DecayedCallVolume: " + decayedCallVolume1); - LOG.info("CallVolume: " + rawCallVolume1); - LOG.info("UniqueCaller: " + uniqueCaller1); - LOG.info("Priority.0.CompletedCallVolume: " + callVolumePriority0); - LOG.info("Priority.1.CompletedCallVolume: " + callVolumePriority1); - LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0); - LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1); - - return decayedCallVolume1 > beginDecayedCallVolume && - rawCallVolume1 > beginRawCallVolume && - uniqueCaller1 > beginUniqueCaller; - } - }, 30, 60000); } } finally { executorService.shutdown(); @@ -1269,6 +1224,63 @@ public class TestRPC extends TestRpcBase { assertTrue("RetriableException not received", succeeded); } + /** Test that the metrics for DecayRpcScheduler are updated. */ + @Test (timeout=30000) + public void testDecayRpcSchedulerMetrics() throws Exception { + final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0"; + Server server = setupDecayRpcSchedulerandTestServer(ns + "."); + + MetricsRecordBuilder rb1 = + getMetrics("DecayRpcSchedulerMetrics2." + ns); + final long beginDecayedCallVolume = MetricsAsserts.getLongCounter( + "DecayedCallVolume", rb1); + final long beginRawCallVolume = MetricsAsserts.getLongCounter( + "CallVolume", rb1); + final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers", + rb1); + + TestRpcService proxy = getClient(addr, conf); + try { + for (int i = 0; i < 2; i++) { + proxy.sleep(null, newSleepRequest(100)); + } + + // Lets Metric system update latest metrics + GenericTestUtils.waitFor(() -> { + MetricsRecordBuilder rb2 = + getMetrics("DecayRpcSchedulerMetrics2." + ns); + long decayedCallVolume1 = MetricsAsserts.getLongCounter( + "DecayedCallVolume", rb2); + long rawCallVolume1 = MetricsAsserts.getLongCounter( + "CallVolume", rb2); + int uniqueCaller1 = MetricsAsserts.getIntCounter( + "UniqueCallers", rb2); + long callVolumePriority0 = MetricsAsserts.getLongGauge( + "Priority.0.CompletedCallVolume", rb2); + long callVolumePriority1 = MetricsAsserts.getLongGauge( + "Priority.1.CompletedCallVolume", rb2); + double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge( + "Priority.0.AvgResponseTime", rb2); + double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge( + "Priority.1.AvgResponseTime", rb2); + + LOG.info("DecayedCallVolume: {}", decayedCallVolume1); + LOG.info("CallVolume: {}", rawCallVolume1); + LOG.info("UniqueCaller: {}", uniqueCaller1); + LOG.info("Priority.0.CompletedCallVolume: {}", callVolumePriority0); + LOG.info("Priority.1.CompletedCallVolume: {}", callVolumePriority1); + LOG.info("Priority.0.AvgResponseTime: {}", avgRespTimePriority0); + LOG.info("Priority.1.AvgResponseTime: {}", avgRespTimePriority1); + + return decayedCallVolume1 > beginDecayedCallVolume && + rawCallVolume1 > beginRawCallVolume && + uniqueCaller1 > beginUniqueCaller; + }, 30, 60000); + } finally { + stop(server, proxy); + } + } + private Server setupDecayRpcSchedulerandTestServer(String ns) throws Exception { final int queueSizePerHandler = 3; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java new file mode 100644 index 00000000000..4f4a72b99ab --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProcessingDetails.Timing; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKEXCLUSIVE_WEIGHT; +import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKFREE_WEIGHT; +import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKSHARED_WEIGHT; +import static org.junit.Assert.assertEquals; + +/** Tests for {@link WeightedTimeCostProvider}. */ +public class TestWeightedTimeCostProvider { + + private static final int QUEUE_TIME = 3; + private static final int LOCKFREE_TIME = 5; + private static final int LOCKSHARED_TIME = 7; + private static final int LOCKEXCLUSIVE_TIME = 11; + + private WeightedTimeCostProvider costProvider; + private ProcessingDetails processingDetails; + + @Before + public void setup() { + costProvider = new WeightedTimeCostProvider(); + processingDetails = new ProcessingDetails(TimeUnit.MILLISECONDS); + processingDetails.set(Timing.QUEUE, QUEUE_TIME); + processingDetails.set(Timing.LOCKFREE, LOCKFREE_TIME); + processingDetails.set(Timing.LOCKSHARED, LOCKSHARED_TIME); + processingDetails.set(Timing.LOCKEXCLUSIVE, LOCKEXCLUSIVE_TIME); + } + + @Test(expected = AssertionError.class) + public void testGetCostBeforeInit() { + costProvider.getCost(null); + } + + @Test + public void testGetCostDefaultWeights() { + costProvider.init("foo", new Configuration()); + long actualCost = costProvider.getCost(processingDetails); + long expectedCost = DEFAULT_LOCKFREE_WEIGHT * LOCKFREE_TIME + + DEFAULT_LOCKSHARED_WEIGHT * LOCKSHARED_TIME + + DEFAULT_LOCKEXCLUSIVE_WEIGHT * LOCKEXCLUSIVE_TIME; + assertEquals(expectedCost, actualCost); + } + + @Test + public void testGetCostConfiguredWeights() { + Configuration conf = new Configuration(); + int queueWeight = 1000; + int lockfreeWeight = 10000; + int locksharedWeight = 100000; + conf.setInt("foo.weighted-cost.queue", queueWeight); + conf.setInt("foo.weighted-cost.lockfree", lockfreeWeight); + conf.setInt("foo.weighted-cost.lockshared", locksharedWeight); + conf.setInt("bar.weighted-cost.lockexclusive", 0); // should not apply + costProvider.init("foo", conf); + long actualCost = costProvider.getCost(processingDetails); + long expectedCost = queueWeight * QUEUE_TIME + + lockfreeWeight * LOCKFREE_TIME + + locksharedWeight * LOCKSHARED_TIME + + DEFAULT_LOCKEXCLUSIVE_WEIGHT * LOCKEXCLUSIVE_TIME; + assertEquals(expectedCost, actualCost); + } +}