From d95c6eb32cec7768ac418fb467b1198ccf3cf0dc Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Thu, 31 Mar 2016 08:42:57 -0700 Subject: [PATCH] HADOOP-12916. Allow RPC scheduler/callqueue backoff using response times. Contributed by Xiaoyu Yao. --- .../org/apache/hadoop/conf/Configuration.java | 13 + .../hadoop/fs/CommonConfigurationKeys.java | 14 +- .../apache/hadoop/ipc/CallQueueManager.java | 124 +++++- .../apache/hadoop/ipc/DecayRpcScheduler.java | 392 ++++++++++++++---- .../hadoop/ipc/DecayRpcSchedulerMXBean.java | 2 + .../hadoop/ipc/DefaultRpcScheduler.java | 45 ++ .../org/apache/hadoop/ipc/FairCallQueue.java | 45 +- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 8 +- .../org/apache/hadoop/ipc/RpcScheduler.java | 8 +- .../org/apache/hadoop/ipc/Schedulable.java | 5 +- .../java/org/apache/hadoop/ipc/Server.java | 77 +++- .../apache/hadoop/ipc/WritableRpcEngine.java | 45 +- .../hadoop/ipc/TestCallQueueManager.java | 147 ++++++- .../hadoop/ipc/TestDecayRpcScheduler.java | 42 +- .../apache/hadoop/ipc/TestFairCallQueue.java | 79 ++-- .../hadoop/ipc/TestIdentityProviders.java | 18 +- .../java/org/apache/hadoop/ipc/TestRPC.java | 92 +++- 17 files changed, 891 insertions(+), 265 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java index 8355d9694ee..4c8f27b9802 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java @@ -1626,6 +1626,10 @@ public class Configuration implements Iterable>, return defaultValue; } vStr = vStr.trim(); + return getTimeDurationHelper(name, vStr, unit); + } + + private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) { ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr); if (null == vUnit) { LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit); @@ -1636,6 +1640,15 @@ public class Configuration implements Iterable>, return unit.convert(Long.parseLong(vStr), vUnit.unit()); } + public long[] getTimeDurations(String name, TimeUnit unit) { + String[] strings = getTrimmedStrings(name); + long[] durations = new long[strings.length]; + for (int i = 0; i < strings.length; i++) { + durations[i] = getTimeDurationHelper(name, strings[i], unit); + } + return durations; + } + /** * Get the value of the name property as a Pattern. * If no such property is specified, or if the specified value is not a valid diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 9b4069a422a..a708900e630 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -90,14 +90,22 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** * CallQueue related settings. These are not used directly, but rather * combined with a namespace and port. For instance: - * IPC_CALLQUEUE_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY + * IPC_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY */ - public static final String IPC_CALLQUEUE_NAMESPACE = "ipc"; + public static final String IPC_NAMESPACE = "ipc"; public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl"; - public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl"; + public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl"; + public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl"; public static final String IPC_BACKOFF_ENABLE = "backoff.enable"; public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false; + /** + * IPC scheduler priority levels. + */ + public static final String IPC_SCHEDULER_PRIORITY_LEVELS_KEY = + "scheduler.priority.levels"; + public static final int IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY = 4; + /** This is for specifying the implementation for the mappings from * hostnames to the racks they belong to */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 2ee15d3809e..1a7782aa63c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; /** * Abstracts queue operations for different blocking queues. @@ -43,6 +44,13 @@ public class CallQueueManager { Class queueClass, Class elementClass) { return (Class>)queueClass; } + + @SuppressWarnings("unchecked") + static Class convertSchedulerClass( + Class schedulerClass) { + return (Class)schedulerClass; + } + private final boolean clientBackOffEnabled; // Atomic refs point to active callQueue @@ -50,25 +58,76 @@ public class CallQueueManager { private final AtomicReference> putRef; private final AtomicReference> takeRef; + private RpcScheduler scheduler; + public CallQueueManager(Class> backingClass, + Class schedulerClass, boolean clientBackOffEnabled, int maxQueueSize, String namespace, Configuration conf) { + int priorityLevels = parseNumLevels(namespace, conf); + this.scheduler = createScheduler(schedulerClass, priorityLevels, + namespace, conf); BlockingQueue bq = createCallQueueInstance(backingClass, - maxQueueSize, namespace, conf); + priorityLevels, maxQueueSize, namespace, conf); this.clientBackOffEnabled = clientBackOffEnabled; this.putRef = new AtomicReference>(bq); this.takeRef = new AtomicReference>(bq); - LOG.info("Using callQueue " + backingClass); + LOG.info("Using callQueue: " + backingClass + " scheduler: " + + schedulerClass); + } + + private static T createScheduler( + Class theClass, int priorityLevels, String ns, Configuration conf) { + // Used for custom, configurable scheduler + try { + Constructor ctor = theClass.getDeclaredConstructor(int.class, + String.class, Configuration.class); + return ctor.newInstance(priorityLevels, ns, conf); + } catch (RuntimeException e) { + throw e; + } catch (InvocationTargetException e) { + throw new RuntimeException(theClass.getName() + + " could not be constructed.", e.getCause()); + } catch (Exception e) { + } + + try { + Constructor ctor = theClass.getDeclaredConstructor(int.class); + return ctor.newInstance(priorityLevels); + } catch (RuntimeException e) { + throw e; + } catch (InvocationTargetException e) { + throw new RuntimeException(theClass.getName() + + " could not be constructed.", e.getCause()); + } catch (Exception e) { + } + + // Last attempt + try { + Constructor ctor = theClass.getDeclaredConstructor(); + return ctor.newInstance(); + } catch (RuntimeException e) { + throw e; + } catch (InvocationTargetException e) { + throw new RuntimeException(theClass.getName() + + " could not be constructed.", e.getCause()); + } catch (Exception e) { + } + + // Nothing worked + throw new RuntimeException(theClass.getName() + + " could not be constructed."); } private > T createCallQueueInstance( - Class theClass, int maxLen, String ns, Configuration conf) { + Class theClass, int priorityLevels, int maxLen, String ns, + Configuration conf) { // Used for custom, configurable callqueues try { - Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class, - Configuration.class); - return ctor.newInstance(maxLen, ns, conf); + Constructor ctor = theClass.getDeclaredConstructor(int.class, + int.class, String.class, Configuration.class); + return ctor.newInstance(priorityLevels, maxLen, ns, conf); } catch (RuntimeException e) { throw e; } catch (InvocationTargetException e) { @@ -110,6 +169,22 @@ public class CallQueueManager { return clientBackOffEnabled; } + // Based on policy to determine back off current call + boolean shouldBackOff(Schedulable e) { + return scheduler.shouldBackOff(e); + } + + void addResponseTime(String name, int priorityLevel, int queueTime, + int processingTime) { + scheduler.addResponseTime(name, priorityLevel, queueTime, processingTime); + } + + // This should be only called once per call and cached in the call object + // each getPriorityLevel call will increment the counter for the caller + int getPriorityLevel(Schedulable e) { + return scheduler.getPriorityLevel(e); + } + /** * Insert e into the backing queue or block until we can. * If we block and the queue changes on us, we will insert while the @@ -146,15 +221,46 @@ public class CallQueueManager { return takeRef.get().size(); } + /** + * Read the number of levels from the configuration. + * This will affect the FairCallQueue's overall capacity. + * @throws IllegalArgumentException on invalid queue count + */ + @SuppressWarnings("deprecation") + private static int parseNumLevels(String ns, Configuration conf) { + // Fair call queue levels (IPC_CALLQUEUE_PRIORITY_LEVELS_KEY) + // takes priority over the scheduler level key + // (IPC_SCHEDULER_PRIORITY_LEVELS_KEY) + int retval = conf.getInt(ns + "." + + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 0); + if (retval == 0) { // No FCQ priority level configured + retval = conf.getInt(ns + "." + + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY, + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY); + } else { + LOG.warn(ns + "." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY + + " is deprecated. Please use " + ns + "." + + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY + "."); + } + if(retval < 1) { + throw new IllegalArgumentException("numLevels must be at least 1"); + } + return retval; + } + /** * Replaces active queue with the newly requested one and transfers * all calls to the newQ before returning. */ public synchronized void swapQueue( + Class schedulerClass, Class> queueClassToUse, int maxSize, String ns, Configuration conf) { - BlockingQueue newQ = createCallQueueInstance(queueClassToUse, maxSize, - ns, conf); + int priorityLevels = parseNumLevels(ns, conf); + RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels, + ns, conf); + BlockingQueue newQ = createCallQueueInstance(queueClassToUse, + priorityLevels, maxSize, ns, conf); // Our current queue becomes the old queue BlockingQueue oldQ = putRef.get(); @@ -168,6 +274,8 @@ public class CallQueueManager { // Swap takeRef to handle new calls takeRef.set(newQ); + this.scheduler = newScheduler; + LOG.info("Old Queue: " + stringRepr(oldQ) + ", " + "Replacement: " + stringRepr(newQ)); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index a6a14d08beb..42373395ee0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -27,17 +27,21 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import com.google.common.util.concurrent.AtomicDoubleArray; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.metrics2.util.MBeans; import org.codehaus.jackson.map.ObjectMapper; import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The decay RPC scheduler counts incoming requests in a map, then @@ -49,22 +53,28 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean /** * Period controls how many milliseconds between each decay sweep. */ - public static final String IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY = + public static final String IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY = + "decay-scheduler.period-ms"; + public static final long IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT = + 5000; + @Deprecated + public static final String IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY = "faircallqueue.decay-scheduler.period-ms"; - public static final long IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT = - 5000L; /** * Decay factor controls how much each count is suppressed by on each sweep. * Valid numbers are > 0 and < 1. Decay factor works in tandem with period * to control how long the scheduler remembers an identity. */ - public static final String IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY = + public static final String IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY = + "decay-scheduler.decay-factor"; + public static final double IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT = + 0.5; + @Deprecated + public static final String IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY = "faircallqueue.decay-scheduler.decay-factor"; - public static final double IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT = - 0.5; - /** + /** * Thresholds are specified as integer percentages, and specify which usage * range each queue will be allocated to. For instance, specifying the list * 10, 40, 80 @@ -74,15 +84,31 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean * - q1 from 10 up to 40 * - q0 otherwise. */ - public static final String IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY = - "faircallqueue.decay-scheduler.thresholds"; + public static final String IPC_DECAYSCHEDULER_THRESHOLDS_KEY = + "decay-scheduler.thresholds"; + @Deprecated + public static final String IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY = + "faircallqueue.decay-scheduler.thresholds"; // Specifies the identity to use when the IdentityProvider cannot handle // a schedulable. public static final String DECAYSCHEDULER_UNKNOWN_IDENTITY = - "IdentityProvider.Unknown"; + "IdentityProvider.Unknown"; - public static final Log LOG = LogFactory.getLog(DecayRpcScheduler.class); + public static final String + IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY = + "decay-scheduler.backoff.responsetime.enable"; + public static final Boolean + IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT = false; + + // Specifies the average response time (ms) thresholds of each + // level to trigger backoff + public static final String + IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY = + "decay-scheduler.backoff.responsetime.thresholds"; + + public static final Logger LOG = + LoggerFactory.getLogger(DecayRpcScheduler.class); // Track the number of calls for each schedulable identity private final ConcurrentHashMap callCounts = @@ -91,6 +117,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean // Should be the sum of all AtomicLongs in callCounts private final AtomicLong totalCalls = new AtomicLong(); + // Track total call count and response time in current decay window + private final AtomicLongArray responseTimeCountInCurrWindow; + private final AtomicLongArray responseTimeTotalInCurrWindow; + + // Track average response time in previous decay window + private final AtomicDoubleArray responseTimeAvgInLastWindow; + private final AtomicLongArray responseTimeCountInLastWindow; + // Pre-computed scheduling decisions during the decay sweep are // atomically swapped in as a read-only map private final AtomicReference> scheduleCacheRef = @@ -98,10 +132,12 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean // Tune the behavior of the scheduler private final long decayPeriodMillis; // How long between each tick - private final double decayFactor; // nextCount = currentCount / decayFactor - private final int numQueues; // affects scheduling decisions, from 0 to numQueues - 1 + private final double decayFactor; // nextCount = currentCount * decayFactor + private final int numLevels; private final double[] thresholds; private final IdentityProvider identityProvider; + private final boolean backOffByResponseTimeEnabled; + private final long[] backOffResponseTimeThresholds; /** * This TimerTask will call decayCurrentCounts until @@ -132,35 +168,46 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean /** * Create a decay scheduler. - * @param numQueues number of queues to schedule for + * @param numLevels number of priority levels * @param ns config prefix, so that we can configure multiple schedulers * in a single instance. * @param conf configuration to use. */ - public DecayRpcScheduler(int numQueues, String ns, Configuration conf) { - if (numQueues < 1) { - throw new IllegalArgumentException("number of queues must be > 0"); + public DecayRpcScheduler(int numLevels, String ns, Configuration conf) { + if(numLevels < 1) { + throw new IllegalArgumentException("Number of Priority Levels must be " + + "at least 1"); } - - this.numQueues = numQueues; + this.numLevels = numLevels; this.decayFactor = parseDecayFactor(ns, conf); this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf); this.identityProvider = this.parseIdentityProvider(ns, conf); - this.thresholds = parseThresholds(ns, conf, numQueues); + this.thresholds = parseThresholds(ns, conf, numLevels); + this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns, + conf); + this.backOffResponseTimeThresholds = + parseBackOffResponseTimeThreshold(ns, conf, numLevels); // Setup delay timer Timer timer = new Timer(); DecayTask task = new DecayTask(this, timer); timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis); - MetricsProxy prox = MetricsProxy.getInstance(ns); + // Setup response time metrics + responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels); + responseTimeCountInCurrWindow = new AtomicLongArray(numLevels); + responseTimeAvgInLastWindow = new AtomicDoubleArray(numLevels); + responseTimeCountInLastWindow = new AtomicLongArray(numLevels); + + MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels); prox.setDelegate(this); } // Load configs - private IdentityProvider parseIdentityProvider(String ns, Configuration conf) { + private IdentityProvider parseIdentityProvider(String ns, + Configuration conf) { List providers = conf.getInstances( - ns + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY, + ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY, IdentityProvider.class); if (providers.size() < 1) { @@ -174,10 +221,16 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean private static double parseDecayFactor(String ns, Configuration conf) { double factor = conf.getDouble(ns + "." + - IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, - IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT - ); - + IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, 0.0); + if (factor == 0.0) { + factor = conf.getDouble(ns + "." + + IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY, + IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT); + } else if ((factor > 0.0) && (factor < 1)) { + LOG.warn(IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY + + " is deprecated. Please use " + + IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY + "."); + } if (factor <= 0 || factor >= 1) { throw new IllegalArgumentException("Decay Factor " + "must be between 0 and 1"); @@ -188,10 +241,17 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean private static long parseDecayPeriodMillis(String ns, Configuration conf) { long period = conf.getLong(ns + "." + - IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, - IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT - ); - + IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, + 0); + if (period == 0) { + period = conf.getLong(ns + "." + + IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, + IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT); + } else if (period > 0) { + LOG.warn((IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY + + " is deprecated. Please use " + + IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY)); + } if (period <= 0) { throw new IllegalArgumentException("Period millis must be >= 0"); } @@ -200,15 +260,24 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean } private static double[] parseThresholds(String ns, Configuration conf, - int numQueues) { + int numLevels) { int[] percentages = conf.getInts(ns + "." + - IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY); + IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY); if (percentages.length == 0) { - return getDefaultThresholds(numQueues); - } else if (percentages.length != numQueues-1) { + percentages = conf.getInts(ns + "." + IPC_DECAYSCHEDULER_THRESHOLDS_KEY); + if (percentages.length == 0) { + return getDefaultThresholds(numLevels); + } + } else { + LOG.warn(IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY + + " is deprecated. Please use " + + IPC_DECAYSCHEDULER_THRESHOLDS_KEY); + } + + if (percentages.length != numLevels-1) { throw new IllegalArgumentException("Number of thresholds should be " + - (numQueues-1) + ". Was: " + percentages.length); + (numLevels-1) + ". Was: " + percentages.length); } // Convert integer percentages to decimals @@ -223,14 +292,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean /** * Generate default thresholds if user did not specify. Strategy is * to halve each time, since queue usage tends to be exponential. - * So if numQueues is 4, we would generate: double[]{0.125, 0.25, 0.5} + * So if numLevels is 4, we would generate: double[]{0.125, 0.25, 0.5} * which specifies the boundaries between each queue's usage. - * @param numQueues number of queues to compute for - * @return array of boundaries of length numQueues - 1 + * @param numLevels number of levels to compute for + * @return array of boundaries of length numLevels - 1 */ - private static double[] getDefaultThresholds(int numQueues) { - double[] ret = new double[numQueues - 1]; - double div = Math.pow(2, numQueues - 1); + private static double[] getDefaultThresholds(int numLevels) { + double[] ret = new double[numLevels - 1]; + double div = Math.pow(2, numLevels - 1); for (int i = 0; i < ret.length; i++) { ret[i] = Math.pow(2, i)/div; @@ -238,39 +307,89 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean return ret; } + private static long[] parseBackOffResponseTimeThreshold(String ns, + Configuration conf, int numLevels) { + long[] responseTimeThresholds = conf.getTimeDurations(ns + "." + + IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY, + TimeUnit.MILLISECONDS); + // backoff thresholds not specified + if (responseTimeThresholds.length == 0) { + return getDefaultBackOffResponseTimeThresholds(numLevels); + } + // backoff thresholds specified but not match with the levels + if (responseTimeThresholds.length != numLevels) { + throw new IllegalArgumentException( + "responseTimeThresholds must match with the number of priority " + + "levels"); + } + // invalid thresholds + for (long responseTimeThreshold: responseTimeThresholds) { + if (responseTimeThreshold <= 0) { + throw new IllegalArgumentException( + "responseTimeThreshold millis must be >= 0"); + } + } + return responseTimeThresholds; + } + + // 10s for level 0, 20s for level 1, 30s for level 2, ... + private static long[] getDefaultBackOffResponseTimeThresholds(int numLevels) { + long[] ret = new long[numLevels]; + for (int i = 0; i < ret.length; i++) { + ret[i] = 10000*(i+1); + } + return ret; + } + + private static Boolean parseBackOffByResponseTimeEnabled(String ns, + Configuration conf) { + return conf.getBoolean(ns + "." + + IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY, + IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT); + } + /** * Decay the stored counts for each user and clean as necessary. * This method should be called periodically in order to keep * counts current. */ private void decayCurrentCounts() { - long total = 0; - Iterator> it = - callCounts.entrySet().iterator(); + try { + long total = 0; + Iterator> it = + callCounts.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - AtomicLong count = entry.getValue(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + AtomicLong count = entry.getValue(); - // Compute the next value by reducing it by the decayFactor - long currentValue = count.get(); - long nextValue = (long)(currentValue * decayFactor); - total += nextValue; - count.set(nextValue); + // Compute the next value by reducing it by the decayFactor + long currentValue = count.get(); + long nextValue = (long) (currentValue * decayFactor); + total += nextValue; + count.set(nextValue); - if (nextValue == 0) { - // We will clean up unused keys here. An interesting optimization might - // be to have an upper bound on keyspace in callCounts and only - // clean once we pass it. - it.remove(); + if (nextValue == 0) { + // We will clean up unused keys here. An interesting optimization + // might be to have an upper bound on keyspace in callCounts and only + // clean once we pass it. + it.remove(); + } } + + // Update the total so that we remain in sync + totalCalls.set(total); + + // Now refresh the cache of scheduling decisions + recomputeScheduleCache(); + + // Update average response time with decay + updateAverageResponseTime(true); + } catch (Exception ex) { + LOG.error("decayCurrentCounts exception: " + + ExceptionUtils.getFullStackTrace(ex)); + throw ex; } - - // Update the total so that we remain in sync - totalCalls.set(total); - - // Now refresh the cache of scheduling decisions - recomputeScheduleCache(); } /** @@ -324,7 +443,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean /** * Given the number of occurrences, compute a scheduling decision. * @param occurrences how many occurrences - * @return scheduling decision from 0 to numQueues - 1 + * @return scheduling decision from 0 to numLevels - 1 */ private int computePriorityLevel(long occurrences) { long totalCallSnapshot = totalCalls.get(); @@ -334,14 +453,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean proportion = (double) occurrences / totalCallSnapshot; } - // Start with low priority queues, since they will be most common - for(int i = (numQueues - 1); i > 0; i--) { + // Start with low priority levels, since they will be most common + for(int i = (numLevels - 1); i > 0; i--) { if (proportion >= this.thresholds[i - 1]) { - return i; // We've found our queue number + return i; // We've found our level number } } - // If we get this far, we're at queue 0 + // If we get this far, we're at level 0 return 0; } @@ -349,7 +468,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean * Returns the priority level for a given identity by first trying the cache, * then computing it. * @param identity an object responding to toString and hashCode - * @return integer scheduling decision from 0 to numQueues - 1 + * @return integer scheduling decision from 0 to numLevels - 1 */ private int cachedOrComputedPriorityLevel(Object identity) { try { @@ -360,22 +479,29 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean if (scheduleCache != null) { Integer priority = scheduleCache.get(identity); if (priority != null) { + LOG.debug("Cache priority for: {} with priority: {}", identity, + priority); return priority; } } // Cache was no good, compute it - return computePriorityLevel(occurrences); + int priority = computePriorityLevel(occurrences); + LOG.debug("compute priority for " + identity + " priority " + priority); + return priority; + } catch (InterruptedException ie) { - LOG.warn("Caught InterruptedException, returning low priority queue"); - return numQueues - 1; + LOG.warn("Caught InterruptedException, returning low priority level"); + LOG.debug("Fallback priority for: {} with priority: {}", identity, + numLevels - 1); + return numLevels - 1; } } /** * Compute the appropriate priority for a schedulable based on past requests. * @param obj the schedulable obj to query and remember - * @return the queue index which we recommend scheduling in + * @return the level index which we recommend scheduling in */ @Override public int getPriorityLevel(Schedulable obj) { @@ -389,6 +515,73 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean return cachedOrComputedPriorityLevel(identity); } + @Override + public boolean shouldBackOff(Schedulable obj) { + Boolean backOff = false; + if (backOffByResponseTimeEnabled) { + int priorityLevel = obj.getPriorityLevel(); + if (LOG.isDebugEnabled()) { + double[] responseTimes = getAverageResponseTime(); + LOG.debug("Current Caller: {} Priority: {} ", + obj.getUserGroupInformation().getUserName(), + obj.getPriorityLevel()); + for (int i = 0; i < numLevels; i++) { + LOG.debug("Queue: {} responseTime: {} backoffThreshold: {}", i, + responseTimes[i], backOffResponseTimeThresholds[i]); + } + } + // High priority rpc over threshold triggers back off of low priority rpc + for (int i = 0; i < priorityLevel + 1; i++) { + if (responseTimeAvgInLastWindow.get(i) > + backOffResponseTimeThresholds[i]) { + backOff = true; + break; + } + } + } + return backOff; + } + + @Override + public void addResponseTime(String name, int priorityLevel, int queueTime, + int processingTime) { + responseTimeCountInCurrWindow.getAndIncrement(priorityLevel); + responseTimeTotalInCurrWindow.getAndAdd(priorityLevel, + queueTime+processingTime); + if (LOG.isDebugEnabled()) { + LOG.debug("addResponseTime for call: {} priority: {} queueTime: {} " + + "processingTime: {} ", name, priorityLevel, queueTime, + processingTime); + } + } + + // Update the cached average response time at the end of decay window + void updateAverageResponseTime(boolean enableDecay) { + for (int i = 0; i < numLevels; i++) { + double averageResponseTime = 0; + long totalResponseTime = responseTimeTotalInCurrWindow.get(i); + long responseTimeCount = responseTimeCountInCurrWindow.get(i); + if (responseTimeCount > 0) { + averageResponseTime = (double) totalResponseTime / responseTimeCount; + } + final double lastAvg = responseTimeAvgInLastWindow.get(i); + if (enableDecay && lastAvg > 0.0) { + final double decayed = decayFactor * lastAvg + averageResponseTime; + responseTimeAvgInLastWindow.set(i, decayed); + } else { + responseTimeAvgInLastWindow.set(i, averageResponseTime); + } + responseTimeCountInLastWindow.set(i, responseTimeCount); + if (LOG.isDebugEnabled()) { + LOG.debug("updateAverageResponseTime queue: {} Average: {} Count: {}", + i, averageResponseTime, responseTimeCount); + } + // Reset for next decay window + responseTimeTotalInCurrWindow.set(i, 0); + responseTimeCountInCurrWindow.set(i, 0); + } + } + // For testing @VisibleForTesting public double getDecayFactor() { return decayFactor; } @@ -429,16 +622,21 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean // Weakref for delegate, so we don't retain it forever if it can be GC'd private WeakReference delegate; + private double[] averageResponseTimeDefault; + private long[] callCountInLastWindowDefault; - private MetricsProxy(String namespace) { + private MetricsProxy(String namespace, int numLevels) { + averageResponseTimeDefault = new double[numLevels]; + callCountInLastWindowDefault = new long[numLevels]; MBeans.register(namespace, "DecayRpcScheduler", this); } - public static synchronized MetricsProxy getInstance(String namespace) { + public static synchronized MetricsProxy getInstance(String namespace, + int numLevels) { MetricsProxy mp = INSTANCES.get(namespace); if (mp == null) { // We must create one - mp = new MetricsProxy(namespace); + mp = new MetricsProxy(namespace, numLevels); INSTANCES.put(namespace, mp); } return mp; @@ -487,6 +685,25 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean return scheduler.getTotalCallVolume(); } } + + @Override + public double[] getAverageResponseTime() { + DecayRpcScheduler scheduler = delegate.get(); + if (scheduler == null) { + return averageResponseTimeDefault; + } else { + return scheduler.getAverageResponseTime(); + } + } + + public long[] getResponseTimeCountInLastWindow() { + DecayRpcScheduler scheduler = delegate.get(); + if (scheduler == null) { + return callCountInLastWindowDefault; + } else { + return scheduler.getResponseTimeCountInLastWindow(); + } + } } public int getUniqueIdentityCount() { @@ -497,6 +714,23 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean return totalCalls.get(); } + public long[] getResponseTimeCountInLastWindow() { + long[] ret = new long[responseTimeCountInLastWindow.length()]; + for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) { + ret[i] = responseTimeCountInLastWindow.get(i); + } + return ret; + } + + @Override + public double[] getAverageResponseTime() { + double[] ret = new double[responseTimeAvgInLastWindow.length()]; + for (int i = 0; i < responseTimeAvgInLastWindow.length(); i++) { + ret[i] = responseTimeAvgInLastWindow.get(i); + } + return ret; + } + public String getSchedulingDecisionSummary() { Map decisions = scheduleCacheRef.get(); if (decisions == null) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java index 3481f19449d..fab9b93b1b3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java @@ -27,4 +27,6 @@ public interface DecayRpcSchedulerMXBean { String getCallVolumeSummary(); int getUniqueIdentityCount(); long getTotalCallVolume(); + double[] getAverageResponseTime(); + long[] getResponseTimeCountInLastWindow(); } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java new file mode 100644 index 00000000000..08f74d4b1c0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import org.apache.hadoop.conf.Configuration; + +/** + * No op default RPC scheduler. + */ +public class DefaultRpcScheduler implements RpcScheduler { + @Override + public int getPriorityLevel(Schedulable obj) { + return 0; + } + + @Override + public boolean shouldBackOff(Schedulable obj) { + return false; + } + + @Override + public void addResponseTime(String name, int priorityLevel, int queueTime, + int processingTime) { + } + + public DefaultRpcScheduler(int priorityLevels, String namespace, + Configuration conf) { + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index 0b56243db58..435c454176b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -44,8 +44,9 @@ import org.apache.hadoop.metrics2.util.MBeans; */ public class FairCallQueue extends AbstractQueue implements BlockingQueue { - // Configuration Keys + @Deprecated public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4; + @Deprecated public static final String IPC_CALLQUEUE_PRIORITY_LEVELS_KEY = "faircallqueue.priority-levels"; @@ -66,9 +67,6 @@ public class FairCallQueue extends AbstractQueue } } - /* Scheduler picks which queue to place in */ - private RpcScheduler scheduler; - /* Multiplexer picks which queue to draw from */ private RpcMultiplexer multiplexer; @@ -83,8 +81,13 @@ public class FairCallQueue extends AbstractQueue * Notes: the FairCallQueue has no fixed capacity. Rather, it has a minimum * capacity of `capacity` and a maximum capacity of `capacity * number_queues` */ - public FairCallQueue(int capacity, String ns, Configuration conf) { - int numQueues = parseNumQueues(ns, conf); + public FairCallQueue(int priorityLevels, int capacity, String ns, + Configuration conf) { + if(priorityLevels < 1) { + throw new IllegalArgumentException("Number of Priority Levels must be " + + "at least 1"); + } + int numQueues = priorityLevels; LOG.info("FairCallQueue is in use with " + numQueues + " queues."); this.queues = new ArrayList>(numQueues); @@ -95,28 +98,12 @@ public class FairCallQueue extends AbstractQueue this.overflowedCalls.add(new AtomicLong(0)); } - this.scheduler = new DecayRpcScheduler(numQueues, ns, conf); this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf); - // Make this the active source of metrics MetricsProxy mp = MetricsProxy.getInstance(ns); mp.setDelegate(this); } - /** - * Read the number of queues from the configuration. - * This will affect the FairCallQueue's overall capacity. - * @throws IllegalArgumentException on invalid queue count - */ - private static int parseNumQueues(String ns, Configuration conf) { - int retval = conf.getInt(ns + "." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, - IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT); - if(retval < 1) { - throw new IllegalArgumentException("numQueues must be at least 1"); - } - return retval; - } - /** * Returns the first non-empty queue with equal or lesser priority * than startIdx. Wraps around, searching a maximum of N @@ -144,7 +131,7 @@ public class FairCallQueue extends AbstractQueue /** * Put and offer follow the same pattern: - * 1. Get a priorityLevel from the scheduler + * 1. Get the assigned priorityLevel from the call by scheduler * 2. Get the nth sub-queue matching this priorityLevel * 3. delegate the call to this sub-queue. * @@ -154,7 +141,7 @@ public class FairCallQueue extends AbstractQueue */ @Override public void put(E e) throws InterruptedException { - int priorityLevel = scheduler.getPriorityLevel(e); + int priorityLevel = e.getPriorityLevel(); final int numLevels = this.queues.size(); while (true) { @@ -185,7 +172,7 @@ public class FairCallQueue extends AbstractQueue @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { - int priorityLevel = scheduler.getPriorityLevel(e); + int priorityLevel = e.getPriorityLevel(); BlockingQueue q = this.queues.get(priorityLevel); boolean ret = q.offer(e, timeout, unit); @@ -196,7 +183,7 @@ public class FairCallQueue extends AbstractQueue @Override public boolean offer(E e) { - int priorityLevel = scheduler.getPriorityLevel(e); + int priorityLevel = e.getPriorityLevel(); BlockingQueue q = this.queues.get(priorityLevel); boolean ret = q.offer(e); @@ -436,12 +423,6 @@ public class FairCallQueue extends AbstractQueue return calls; } - // For testing - @VisibleForTesting - public void setScheduler(RpcScheduler newScheduler) { - this.scheduler = newScheduler; - } - @VisibleForTesting public void setMultiplexer(RpcMultiplexer newMux) { this.multiplexer = newMux; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 692d2b6e384..071e2e827b4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -654,13 +654,7 @@ public class ProtobufRpcEngine implements RpcEngine { String detailedMetricsName = (exception == null) ? methodName : exception.getClass().getSimpleName(); - server.rpcMetrics.addRpcQueueTime(qTime); - server.rpcMetrics.addRpcProcessingTime(processingTime); - server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName, - processingTime); - if (server.isLogSlowRPC()) { - server.logSlowRpcCalls(methodName, processingTime); - } + server.updateMetrics(detailedMetricsName, qTime, processingTime); } return new RpcResponseWrapper(result); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java index a1557061809..6f93b22eed1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java @@ -19,11 +19,17 @@ package org.apache.hadoop.ipc; /** - * Implement this interface to be used for RPC scheduling in the fair call queues. + * Implement this interface to be used for RPC scheduling and backoff. + * */ public interface RpcScheduler { /** * Returns priority level greater than zero as a hint for scheduling. */ int getPriorityLevel(Schedulable obj); + + boolean shouldBackOff(Schedulable obj); + + void addResponseTime(String name, int priorityLevel, int queueTime, + int processingTime); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java index 38f3518df98..3b28d85428b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java @@ -18,11 +18,8 @@ package org.apache.hadoop.ipc; -import java.nio.ByteBuffer; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.io.Writable; /** * Interface which allows extracting information necessary to @@ -31,4 +28,6 @@ import org.apache.hadoop.io.Writable; @InterfaceAudience.Private public interface Schedulable { public UserGroupInformation getUserGroupInformation(); + + int getPriorityLevel(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 1d928659ea9..eb28ad57d11 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -396,6 +396,15 @@ public abstract class Server { return CurCall.get() != null; } + /** + * Return the priority level assigned by call queue to an RPC + * Returns 0 in case no priority is assigned. + */ + public static int getPriorityLevel() { + Call call = CurCall.get(); + return call != null? call.getPriorityLevel() : 0; + } + private String bindAddress; private int port; // port we listen on private int handlerCount; // number of handler threads @@ -482,6 +491,18 @@ public abstract class Server { } } + void updateMetrics(String name, int queueTime, int processingTime) { + rpcMetrics.addRpcQueueTime(queueTime); + rpcMetrics.addRpcProcessingTime(processingTime); + rpcDetailedMetrics.addProcessingTime(name, processingTime); + callQueue.addResponseTime(name, getPriorityLevel(), queueTime, + processingTime); + + if (isLogSlowRPC()) { + logSlowRpcCalls(name, processingTime); + } + } + /** * A convenience method to bind to a given address and report * better exceptions if the address is not a valid host. @@ -578,6 +599,10 @@ public abstract class Server { return serviceAuthorizationManager; } + private String getQueueClassPrefix() { + return CommonConfigurationKeys.IPC_NAMESPACE + "." + port; + } + static Class> getQueueClass( String prefix, Configuration conf) { String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY; @@ -585,8 +610,29 @@ public abstract class Server { return CallQueueManager.convertQueueClass(queueClass, Call.class); } - private String getQueueClassPrefix() { - return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port; + static Class getSchedulerClass( + String prefix, Configuration conf) { + String schedulerKeyname = prefix + "." + CommonConfigurationKeys + .IPC_SCHEDULER_IMPL_KEY; + Class schedulerClass = conf.getClass(schedulerKeyname, null); + // Patch the configuration for legacy fcq configuration that does not have + // a separate scheduler setting + if (schedulerClass == null) { + String queueKeyName = prefix + "." + CommonConfigurationKeys + .IPC_CALLQUEUE_IMPL_KEY; + Class queueClass = conf.getClass(queueKeyName, null); + if (queueClass != null) { + if (queueClass.getCanonicalName().equals( + FairCallQueue.class.getCanonicalName())) { + conf.setClass(schedulerKeyname, DecayRpcScheduler.class, + RpcScheduler.class); + } + } + } + schedulerClass = conf.getClass(schedulerKeyname, + DefaultRpcScheduler.class); + + return CallQueueManager.convertSchedulerClass(schedulerClass); } /* @@ -595,7 +641,8 @@ public abstract class Server { public synchronized void refreshCallQueue(Configuration conf) { // Create the next queue String prefix = getQueueClassPrefix(); - callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf); + callQueue.swapQueue(getSchedulerClass(prefix, conf), + getQueueClass(prefix, conf), maxQueueSize, prefix, conf); } /** @@ -623,6 +670,8 @@ public abstract class Server { private final byte[] clientId; private final TraceScope traceScope; // the HTrace scope on the server side private final CallerContext callerContext; // the call context + private int priorityLevel; + // the priority level assigned by scheduler, 0 by default private Call(Call call) { this(call.callId, call.retryCount, call.rpcRequest, call.connection, @@ -709,7 +758,16 @@ public abstract class Server { @Override public UserGroupInformation getUserGroupInformation() { return connection.user; - } + } + + @Override + public int getPriorityLevel() { + return this.priorityLevel; + } + + public void setPriorityLevel(int priorityLevel) { + this.priorityLevel = priorityLevel; + } } /** Listens on the socket. Creates jobs for the handler threads*/ @@ -2151,6 +2209,9 @@ public abstract class Server { rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray(), traceScope, callerContext); + // Save the priority level assignment by the scheduler + call.setPriorityLevel(callQueue.getPriorityLevel(call)); + if (callQueue.isClientBackoffEnabled()) { // if RPC queue is full, we will ask the RPC client to back off by // throwing RetriableException. Whether RPC client will honor @@ -2166,9 +2227,10 @@ public abstract class Server { private void queueRequestOrAskClientToBackOff(Call call) throws WrappedRpcServerException, InterruptedException { - // If rpc queue is full, we will ask the client to back off. - boolean isCallQueued = callQueue.offer(call); - if (!isCallQueued) { + // If rpc scheduler indicates back off based on performance + // degradation such as response time or rpc queue is full, + // we will ask the client to back off. + if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) { rpcMetrics.incrClientBackoff(); RetriableException retriableException = new RetriableException("Server is too busy."); @@ -2513,6 +2575,7 @@ public abstract class Server { // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); this.callQueue = new CallQueueManager(getQueueClass(prefix, conf), + getSchedulerClass(prefix, conf), getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf); this.secretManager = (SecretManager) secretManager; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index a1db6be7c43..a9dbb41fd98 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -34,7 +34,6 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; -import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -502,13 +501,12 @@ public class WritableRpcEngine implements RpcEngine { } } } - - // Invoke the protocol method - long startTime = Time.now(); - int qTime = (int) (startTime-receivedTime); - Exception exception = null; - try { + // Invoke the protocol method + long startTime = Time.now(); + int qTime = (int) (startTime-receivedTime); + Exception exception = null; + try { Method method = protocolImpl.protocolClass.getMethod(call.getMethodName(), call.getParameterClasses()); @@ -539,27 +537,20 @@ public class WritableRpcEngine implements RpcEngine { exception = ioe; throw ioe; } finally { - int processingTime = (int) (Time.now() - startTime); - if (LOG.isDebugEnabled()) { - String msg = "Served: " + call.getMethodName() + - " queueTime= " + qTime + - " procesingTime= " + processingTime; - if (exception != null) { - msg += " exception= " + exception.getClass().getSimpleName(); - } - LOG.debug(msg); - } - String detailedMetricsName = (exception == null) ? - call.getMethodName() : - exception.getClass().getSimpleName(); - server.rpcMetrics.addRpcQueueTime(qTime); - server.rpcMetrics.addRpcProcessingTime(processingTime); - server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName, - processingTime); - if (server.isLogSlowRPC()) { - server.logSlowRpcCalls(call.getMethodName(), processingTime); + int processingTime = (int) (Time.now() - startTime); + if (LOG.isDebugEnabled()) { + String msg = "Served: " + call.getMethodName() + + " queueTime= " + qTime + " procesingTime= " + processingTime; + if (exception != null) { + msg += " exception= " + exception.getClass().getSimpleName(); + } + LOG.debug(msg); } - } + String detailedMetricsName = (exception == null) ? + call.getMethodName() : + exception.getClass().getSimpleName(); + server.updateMetrics(detailedMetricsName, qTime, processingTime); + } } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index 4d659acd46d..af9ce1b08bc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -27,17 +27,37 @@ import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Test; public class TestCallQueueManager { private CallQueueManager manager; + private Configuration conf = new Configuration(); - public class FakeCall { + public class FakeCall implements Schedulable { public final int tag; // Can be used for unique identification - + private int priorityLevel; + UserGroupInformation fakeUgi = UserGroupInformation.createRemoteUser + ("fakeUser"); public FakeCall(int tag) { this.tag = tag; } + + @Override + public UserGroupInformation getUserGroupInformation() { + return fakeUgi; + } + + @Override + public int getPriorityLevel() { + return priorityLevel; + } + + public void setPriorityLevel(int level) { + this.priorityLevel = level; + } } /** @@ -62,7 +82,9 @@ public class TestCallQueueManager { try { // Fill up to max (which is infinite if maxCalls < 0) while (isRunning && (callsAdded < maxCalls || maxCalls < 0)) { - cq.put(new FakeCall(this.tag)); + FakeCall call = new FakeCall(this.tag); + call.setPriorityLevel(cq.getPriorityLevel(call)); + cq.put(call); callsAdded++; } } catch (InterruptedException e) { @@ -135,7 +157,7 @@ public class TestCallQueueManager { t.start(); t.join(100); - assertEquals(putter.callsAdded, numberOfPuts); + assertEquals(numberOfPuts, putter.callsAdded); t.interrupt(); } @@ -143,23 +165,90 @@ public class TestCallQueueManager { private static final Class> queueClass = CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class); + private static final Class schedulerClass + = CallQueueManager.convertSchedulerClass(DefaultRpcScheduler.class); + @Test public void testCallQueueCapacity() throws InterruptedException { - manager = new CallQueueManager(queueClass, false, 10, "", null); + manager = new CallQueueManager(queueClass, schedulerClass, false, + 10, "", conf); assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity } @Test public void testEmptyConsume() throws InterruptedException { - manager = new CallQueueManager(queueClass, false, 10, "", null); + manager = new CallQueueManager(queueClass, schedulerClass, false, + 10, "", conf); assertCanTake(manager, 0, 1); // Fails since it's empty } + static Class> getQueueClass( + String prefix, Configuration conf) { + String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY; + Class queueClass = conf.getClass(name, LinkedBlockingQueue.class); + return CallQueueManager.convertQueueClass(queueClass, FakeCall.class); + } + + @Test + public void testFcqBackwardCompatibility() throws InterruptedException { + // Test BackwardCompatibility to ensure existing FCQ deployment still + // work without explicitly specifying DecayRpcScheduler + Configuration conf = new Configuration(); + final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0"; + + final String queueClassName = "org.apache.hadoop.ipc.FairCallQueue"; + conf.setStrings(ns + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, + queueClassName); + + // Specify only Fair Call Queue without a scheduler + // Ensure the DecayScheduler will be added to avoid breaking. + Class scheduler = Server.getSchedulerClass(ns, + conf); + assertTrue(scheduler.getCanonicalName(). + equals("org.apache.hadoop.ipc.DecayRpcScheduler")); + + Class> queue = + (Class>) getQueueClass(ns, conf); + assertTrue(queue.getCanonicalName().equals(queueClassName)); + + manager = new CallQueueManager(queue, scheduler, false, + 2, "", conf); + + // Default FCQ has 4 levels and the max capacity is 2 x 4 + assertCanPut(manager, 3, 3); + } + + @Test + public void testSchedulerWithoutFCQ() throws InterruptedException { + Configuration conf = new Configuration(); + // Test DecayedRpcScheduler without FCQ + // Ensure the default LinkedBlockingQueue can work with DecayedRpcScheduler + final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0"; + final String schedulerClassName = "org.apache.hadoop.ipc.DecayRpcScheduler"; + conf.setStrings(ns + "." + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, + schedulerClassName); + + Class> queue = + (Class>) getQueueClass(ns, conf); + assertTrue(queue.getCanonicalName().equals("java.util.concurrent." + + "LinkedBlockingQueue")); + + manager = new CallQueueManager(queue, + Server.getSchedulerClass(ns, conf), false, + 3, "", conf); + + // LinkedBlockingQueue with a capacity of 3 can put 3 calls + assertCanPut(manager, 3, 3); + // LinkedBlockingQueue with a capacity of 3 can't put 1 more call + assertCanPut(manager, 0, 1); + } + @Test(timeout=60000) public void testSwapUnderContention() throws InterruptedException { - manager = new CallQueueManager(queueClass, false, 5000, "", null); + manager = new CallQueueManager(queueClass, schedulerClass, false, + 5000, "", conf); ArrayList producers = new ArrayList(); ArrayList consumers = new ArrayList(); @@ -188,7 +277,7 @@ public class TestCallQueueManager { Thread.sleep(500); for (int i=0; i < 5; i++) { - manager.swapQueue(queueClass, 5000, "", null); + manager.swapQueue(schedulerClass, queueClass, 5000, "", conf); } // Stop the producers @@ -223,24 +312,50 @@ public class TestCallQueueManager { } public static class ExceptionFakeCall { - public ExceptionFakeCall() { - throw new IllegalArgumentException("Exception caused by constructor.!!"); + throw new IllegalArgumentException("Exception caused by call queue " + + "constructor.!!"); } } - private static final Class> exceptionQueueClass = CallQueueManager - .convertQueueClass(ExceptionFakeCall.class, ExceptionFakeCall.class); + public static class ExceptionFakeScheduler { + public ExceptionFakeScheduler() { + throw new IllegalArgumentException("Exception caused by " + + "scheduler constructor.!!"); + } + } + + private static final Class + exceptionSchedulerClass = CallQueueManager.convertSchedulerClass( + ExceptionFakeScheduler.class); + + private static final Class> + exceptionQueueClass = CallQueueManager.convertQueueClass( + ExceptionFakeCall.class, ExceptionFakeCall.class); @Test - public void testInvocationException() throws InterruptedException { + public void testCallQueueConstructorException() throws InterruptedException { try { - new CallQueueManager(exceptionQueueClass, false, 10, - "", null); + new CallQueueManager(exceptionQueueClass, + schedulerClass, false, 10, "", new Configuration()); fail(); } catch (RuntimeException re) { assertTrue(re.getCause() instanceof IllegalArgumentException); - assertEquals("Exception caused by constructor.!!", re.getCause() + assertEquals("Exception caused by call queue constructor.!!", re + .getCause() + .getMessage()); + } + } + + @Test + public void testSchedulerConstructorException() throws InterruptedException { + try { + new CallQueueManager(queueClass, exceptionSchedulerClass, + false, 10, "", new Configuration()); + fail(); + } catch (RuntimeException re) { + assertTrue(re.getCause() instanceof IllegalArgumentException); + assertEquals("Exception caused by scheduler constructor.!!", re.getCause() .getMessage()); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index edc3b0051ab..0b0408cbcdb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -18,12 +18,11 @@ package org.apache.hadoop.ipc; +import static java.lang.Thread.sleep; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.Arrays; import org.junit.Test; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -31,8 +30,6 @@ import static org.mockito.Mockito.when; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; - public class TestDecayRpcScheduler { private Schedulable mockCall(String id) { Schedulable mockCall = mock(Schedulable.class); @@ -57,30 +54,32 @@ public class TestDecayRpcScheduler { } @Test + @SuppressWarnings("deprecation") public void testParsePeriod() { // By default scheduler = new DecayRpcScheduler(1, "", new Configuration()); - assertEquals(DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT, + assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT, scheduler.getDecayPeriodMillis()); // Custom Configuration conf = new Configuration(); - conf.setLong("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, + conf.setLong("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, 1058); scheduler = new DecayRpcScheduler(1, "ns", conf); assertEquals(1058L, scheduler.getDecayPeriodMillis()); } @Test + @SuppressWarnings("deprecation") public void testParseFactor() { // Default scheduler = new DecayRpcScheduler(1, "", new Configuration()); - assertEquals(DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT, + assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT, scheduler.getDecayFactor(), 0.00001); // Custom Configuration conf = new Configuration(); - conf.set("prefix." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, + conf.set("prefix." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.125"); scheduler = new DecayRpcScheduler(1, "prefix", conf); assertEquals(0.125, scheduler.getDecayFactor(), 0.00001); @@ -94,6 +93,7 @@ public class TestDecayRpcScheduler { } @Test + @SuppressWarnings("deprecation") public void testParseThresholds() { // Defaults vary by number of queues Configuration conf = new Configuration(); @@ -111,16 +111,17 @@ public class TestDecayRpcScheduler { // Custom conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY, + conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "1, 10, 20, 50, 85"); scheduler = new DecayRpcScheduler(6, "ns", conf); assertEqualDecimalArrays(new double[]{0.01, 0.1, 0.2, 0.5, 0.85}, scheduler.getThresholds()); } @Test + @SuppressWarnings("deprecation") public void testAccumulate() { Configuration conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush + conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush scheduler = new DecayRpcScheduler(1, "ns", conf); assertEquals(0, scheduler.getCallCountSnapshot().size()); // empty first @@ -138,10 +139,11 @@ public class TestDecayRpcScheduler { } @Test - public void testDecay() { + @SuppressWarnings("deprecation") + public void testDecay() throws Exception { Configuration conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never - conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, "0.5"); + conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never + conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5"); scheduler = new DecayRpcScheduler(1, "ns", conf); assertEquals(0, scheduler.getTotalCallSnapshot()); @@ -150,6 +152,8 @@ public class TestDecayRpcScheduler { scheduler.getPriorityLevel(mockCall("A")); } + sleep(1000); + for (int i = 0; i < 8; i++) { scheduler.getPriorityLevel(mockCall("B")); } @@ -184,10 +188,11 @@ public class TestDecayRpcScheduler { } @Test + @SuppressWarnings("deprecation") public void testPriority() { Configuration conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush - conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY, + conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush + conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75"); scheduler = new DecayRpcScheduler(4, "ns", conf); @@ -204,10 +209,11 @@ public class TestDecayRpcScheduler { } @Test(timeout=2000) + @SuppressWarnings("deprecation") public void testPeriodic() throws InterruptedException { Configuration conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "10"); - conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, "0.5"); + conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10"); + conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5"); scheduler = new DecayRpcScheduler(1, "ns", conf); assertEquals(10, scheduler.getDecayPeriodMillis()); @@ -219,7 +225,7 @@ public class TestDecayRpcScheduler { // It should eventually decay to zero while (scheduler.getTotalCallSnapshot() > 0) { - Thread.sleep(10); + sleep(10); } } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java index 2694ba3ab91..4a8ad3b9271 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java @@ -37,21 +37,24 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.conf.Configuration; import org.mockito.Matchers; -import static org.apache.hadoop.ipc.FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY; - public class TestFairCallQueue extends TestCase { private FairCallQueue fcq; - private Schedulable mockCall(String id) { + private Schedulable mockCall(String id, int priority) { Schedulable mockCall = mock(Schedulable.class); UserGroupInformation ugi = mock(UserGroupInformation.class); when(ugi.getUserName()).thenReturn(id); when(mockCall.getUserGroupInformation()).thenReturn(ugi); + when(mockCall.getPriorityLevel()).thenReturn(priority); return mockCall; } + private Schedulable mockCall(String id) { + return mockCall(id, 0); + } + // A scheduler which always schedules into priority zero private RpcScheduler alwaysZeroScheduler; { @@ -60,11 +63,12 @@ public class TestFairCallQueue extends TestCase { alwaysZeroScheduler = sched; } + @SuppressWarnings("deprecation") public void setUp() { Configuration conf = new Configuration(); - conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); + conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); - fcq = new FairCallQueue(5, "ns", conf); + fcq = new FairCallQueue(2, 5, "ns", conf); } // @@ -85,7 +89,6 @@ public class TestFairCallQueue extends TestCase { } public void testOfferSucceeds() { - fcq.setScheduler(alwaysZeroScheduler); for (int i = 0; i < 5; i++) { // We can fit 10 calls @@ -96,7 +99,6 @@ public class TestFairCallQueue extends TestCase { } public void testOfferFailsWhenFull() { - fcq.setScheduler(alwaysZeroScheduler); for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); } assertFalse(fcq.offer(mockCall("c"))); // It's full @@ -107,11 +109,10 @@ public class TestFairCallQueue extends TestCase { public void testOfferSucceedsWhenScheduledLowPriority() { // Scheduler will schedule into queue 0 x 5, then queue 1 RpcScheduler sched = mock(RpcScheduler.class); - when(sched.getPriorityLevel(Matchers.any())).thenReturn(0, 0, 0, 0, 0, 1, 0); - fcq.setScheduler(sched); - for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); } + int mockedPriorities[] = {0, 0, 0, 0, 0, 1, 0}; + for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c", mockedPriorities[i]))); } - assertTrue(fcq.offer(mockCall("c"))); + assertTrue(fcq.offer(mockCall("c", mockedPriorities[5]))); assertEquals(6, fcq.size()); } @@ -121,7 +122,7 @@ public class TestFairCallQueue extends TestCase { } public void testPeekNonDestructive() { - Schedulable call = mockCall("c"); + Schedulable call = mockCall("c", 0); assertTrue(fcq.offer(call)); assertEquals(call, fcq.peek()); @@ -130,8 +131,8 @@ public class TestFairCallQueue extends TestCase { } public void testPeekPointsAtHead() { - Schedulable call = mockCall("c"); - Schedulable next = mockCall("b"); + Schedulable call = mockCall("c", 0); + Schedulable next = mockCall("b", 0); fcq.offer(call); fcq.offer(next); @@ -139,15 +140,11 @@ public class TestFairCallQueue extends TestCase { } public void testPollTimeout() throws InterruptedException { - fcq.setScheduler(alwaysZeroScheduler); - assertNull(fcq.poll(10, TimeUnit.MILLISECONDS)); } public void testPollSuccess() throws InterruptedException { - fcq.setScheduler(alwaysZeroScheduler); - - Schedulable call = mockCall("c"); + Schedulable call = mockCall("c", 0); assertTrue(fcq.offer(call)); assertEquals(call, fcq.poll(10, TimeUnit.MILLISECONDS)); @@ -156,7 +153,6 @@ public class TestFairCallQueue extends TestCase { } public void testOfferTimeout() throws InterruptedException { - fcq.setScheduler(alwaysZeroScheduler); for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"), 10, TimeUnit.MILLISECONDS)); } @@ -166,13 +162,11 @@ public class TestFairCallQueue extends TestCase { assertEquals(5, fcq.size()); } + @SuppressWarnings("deprecation") public void testDrainTo() { Configuration conf = new Configuration(); - conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); - FairCallQueue fcq2 = new FairCallQueue(10, "ns", conf); - - fcq.setScheduler(alwaysZeroScheduler); - fcq2.setScheduler(alwaysZeroScheduler); + conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); + FairCallQueue fcq2 = new FairCallQueue(2, 10, "ns", conf); // Start with 3 in fcq, to be drained for (int i = 0; i < 3; i++) { @@ -185,13 +179,11 @@ public class TestFairCallQueue extends TestCase { assertEquals(3, fcq2.size()); } + @SuppressWarnings("deprecation") public void testDrainToWithLimit() { Configuration conf = new Configuration(); - conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); - FairCallQueue fcq2 = new FairCallQueue(10, "ns", conf); - - fcq.setScheduler(alwaysZeroScheduler); - fcq2.setScheduler(alwaysZeroScheduler); + conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); + FairCallQueue fcq2 = new FairCallQueue(2, 10, "ns", conf); // Start with 3 in fcq, to be drained for (int i = 0; i < 3; i++) { @@ -209,27 +201,23 @@ public class TestFairCallQueue extends TestCase { } public void testFirstQueueFullRemainingCapacity() { - fcq.setScheduler(alwaysZeroScheduler); while (fcq.offer(mockCall("c"))) ; // Queue 0 will fill up first, then queue 1 assertEquals(5, fcq.remainingCapacity()); } public void testAllQueuesFullRemainingCapacity() { - RpcScheduler sched = mock(RpcScheduler.class); - when(sched.getPriorityLevel(Matchers.any())).thenReturn(0, 0, 0, 0, 0, 1, 1, 1, 1, 1); - fcq.setScheduler(sched); - while (fcq.offer(mockCall("c"))) ; + int[] mockedPriorities = {0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0}; + int i = 0; + while (fcq.offer(mockCall("c", mockedPriorities[i++]))) ; assertEquals(0, fcq.remainingCapacity()); assertEquals(10, fcq.size()); } public void testQueuesPartialFilledRemainingCapacity() { - RpcScheduler sched = mock(RpcScheduler.class); - when(sched.getPriorityLevel(Matchers.any())).thenReturn(0, 1, 0, 1, 0); - fcq.setScheduler(sched); - for (int i = 0; i < 5; i++) { fcq.offer(mockCall("c")); } + int[] mockedPriorities = {0, 1, 0, 1, 0}; + for (int i = 0; i < 5; i++) { fcq.offer(mockCall("c", mockedPriorities[i])); } assertEquals(5, fcq.remainingCapacity()); assertEquals(5, fcq.size()); @@ -351,16 +339,12 @@ public class TestFairCallQueue extends TestCase { // Make sure put will overflow into lower queues when the top is full public void testPutOverflows() throws InterruptedException { - fcq.setScheduler(alwaysZeroScheduler); - // We can fit more than 5, even though the scheduler suggests the top queue assertCanPut(fcq, 8, 8); assertEquals(8, fcq.size()); } public void testPutBlocksWhenAllFull() throws InterruptedException { - fcq.setScheduler(alwaysZeroScheduler); - assertCanPut(fcq, 10, 10); // Fill up assertEquals(10, fcq.size()); @@ -369,12 +353,10 @@ public class TestFairCallQueue extends TestCase { } public void testTakeBlocksWhenEmpty() throws InterruptedException { - fcq.setScheduler(alwaysZeroScheduler); assertCanTake(fcq, 0, 1); } public void testTakeRemovesCall() throws InterruptedException { - fcq.setScheduler(alwaysZeroScheduler); Schedulable call = mockCall("c"); fcq.offer(call); @@ -383,17 +365,14 @@ public class TestFairCallQueue extends TestCase { } public void testTakeTriesNextQueue() throws InterruptedException { - // Make a FCQ filled with calls in q 1 but empty in q 0 - RpcScheduler q1Scheduler = mock(RpcScheduler.class); - when(q1Scheduler.getPriorityLevel(Matchers.any())).thenReturn(1); - fcq.setScheduler(q1Scheduler); // A mux which only draws from q 0 RpcMultiplexer q0mux = mock(RpcMultiplexer.class); when(q0mux.getAndAdvanceCurrentIndex()).thenReturn(0); fcq.setMultiplexer(q0mux); - Schedulable call = mockCall("c"); + // Make a FCQ filled with calls in q 1 but empty in q 0 + Schedulable call = mockCall("c", 1); fcq.put(call); // Take from q1 even though mux said q0, since q0 empty diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java index 1fa0fff7055..263841246bf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java @@ -19,25 +19,15 @@ package org.apache.hadoop.ipc; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import org.junit.Assert; -import org.junit.Assume; import org.junit.Test; -import org.junit.Before; -import org.junit.After; import java.util.List; import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.conf.Configuration; @@ -55,16 +45,20 @@ public class TestIdentityProviders { } } + @Override + public int getPriorityLevel() { + return 0; + } } @Test public void testPluggableIdentityProvider() { Configuration conf = new Configuration(); - conf.set(CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY, + conf.set(CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY, "org.apache.hadoop.ipc.UserIdentityProvider"); List providers = conf.getInstances( - CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY, + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY, IdentityProvider.class); assertTrue(providers.size() == 1); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 99bfc61c2ea..2ebd1c5e4e1 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -43,8 +43,10 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.MockitoUtil; +import org.apache.log4j.Level; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -956,7 +958,7 @@ public class TestRPC extends TestRpcBase { } /** - * Test RPC backoff. + * Test RPC backoff by queue full. */ @Test (timeout=30000) public void testClientBackOff() throws Exception { @@ -969,7 +971,7 @@ public class TestRPC extends TestRpcBase { final ExecutorService executorService = Executors.newFixedThreadPool(numClients); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); - conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + + conf.setBoolean(CommonConfigurationKeys.IPC_NAMESPACE + ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); RPC.Builder builder = newServerBuilder(conf) .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); @@ -1018,6 +1020,92 @@ public class TestRPC extends TestRpcBase { assertTrue("RetriableException not received", succeeded); } + /** + * Test RPC backoff by response time of each priority level. + */ + @Test (timeout=30000) + public void testClientBackOffByResponseTime() throws Exception { + Server server; + final TestRpcService proxy; + boolean succeeded = false; + final int numClients = 1; + final int queueSizePerHandler = 3; + + GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG); + + final List> res = new ArrayList>(); + final ExecutorService executorService = + Executors.newFixedThreadPool(numClients); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0."; + conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); + conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, + "org.apache.hadoop.ipc.FairCallQueue"); + conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, + "org.apache.hadoop.ipc.DecayRpcScheduler"); + conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY, + 2); + conf.setBoolean(ns + + DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY, + true); + // set a small thresholds 2s and 4s for level 0 and level 1 for testing + conf.set(ns + + DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY + , "2s, 4s"); + + // Set max queue size to 3 so that 2 calls from the test won't trigger + // back off because the queue is full. + RPC.Builder builder = newServerBuilder(conf) + .setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1) + .setVerbose(true); + server = setupTestServer(builder); + + @SuppressWarnings("unchecked") + CallQueueManager spy = spy((CallQueueManager) Whitebox + .getInternalState(server, "callQueue")); + Whitebox.setInternalState(server, "callQueue", spy); + + Exception lastException = null; + proxy = getClient(addr, conf); + try { + // start a sleep RPC call that sleeps 3s. + for (int i = 0; i < numClients; i++) { + res.add(executorService.submit( + new Callable() { + @Override + public Void call() throws ServiceException, InterruptedException { + proxy.sleep(null, newSleepRequest(3000)); + return null; + } + })); + verify(spy, timeout(500).times(i + 1)).offer(Mockito.anyObject()); + } + // Start another sleep RPC call and verify the call is backed off due to + // avg response time(3s) exceeds threshold (2s). + try { + // wait for the 1st response time update + Thread.sleep(5500); + proxy.sleep(null, newSleepRequest(100)); + } catch (ServiceException e) { + RemoteException re = (RemoteException) e.getCause(); + IOException unwrapExeption = re.unwrapRemoteException(); + if (unwrapExeption instanceof RetriableException) { + succeeded = true; + } else { + lastException = unwrapExeption; + } + } + } finally { + executorService.shutdown(); + stop(server, proxy); + } + if (lastException != null) { + LOG.error("Last received non-RetriableException:", lastException); + } + assertTrue("RetriableException not received", succeeded); + } + /** * Test RPC timeout. */