HADOOP-12916. Allow RPC scheduler/callqueue backoff using response times. Contributed by Xiaoyu Yao.

(cherry picked from commit d95c6eb32c)
(cherry picked from commit a0c001c0b9)
This commit is contained in:
Xiaoyu Yao 2016-03-31 08:42:57 -07:00
parent dcad99892d
commit 6db3c9d394
17 changed files with 891 additions and 265 deletions

View File

@ -1570,6 +1570,10 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
return defaultValue;
}
vStr = vStr.trim();
return getTimeDurationHelper(name, vStr, unit);
}
private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) {
ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr);
if (null == vUnit) {
LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit);
@ -1580,6 +1584,15 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
return unit.convert(Long.parseLong(vStr), vUnit.unit());
}
public long[] getTimeDurations(String name, TimeUnit unit) {
String[] strings = getTrimmedStrings(name);
long[] durations = new long[strings.length];
for (int i = 0; i < strings.length; i++) {
durations[i] = getTimeDurationHelper(name, strings[i], unit);
}
return durations;
}
/**
* Get the value of the <code>name</code> property as a <code>Pattern</code>.
* If no such property is specified, or if the specified value is not a valid

View File

@ -90,14 +90,22 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/**
* CallQueue related settings. These are not used directly, but rather
* combined with a namespace and port. For instance:
* IPC_CALLQUEUE_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY
* IPC_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY
*/
public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
public static final String IPC_NAMESPACE = "ipc";
public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl";
public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
/**
* IPC scheduler priority levels.
*/
public static final String IPC_SCHEDULER_PRIORITY_LEVELS_KEY =
"scheduler.priority.levels";
public static final int IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY = 4;
/** This is for specifying the implementation for the mappings from
* hostnames to the racks they belong to
*/

View File

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
/**
* Abstracts queue operations for different blocking queues.
@ -43,6 +44,13 @@ public class CallQueueManager<E> {
Class<?> queueClass, Class<E> elementClass) {
return (Class<? extends BlockingQueue<E>>)queueClass;
}
@SuppressWarnings("unchecked")
static Class<? extends RpcScheduler> convertSchedulerClass(
Class<?> schedulerClass) {
return (Class<? extends RpcScheduler>)schedulerClass;
}
private final boolean clientBackOffEnabled;
// Atomic refs point to active callQueue
@ -50,25 +58,76 @@ public class CallQueueManager<E> {
private final AtomicReference<BlockingQueue<E>> putRef;
private final AtomicReference<BlockingQueue<E>> takeRef;
private RpcScheduler scheduler;
public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
Class<? extends RpcScheduler> schedulerClass,
boolean clientBackOffEnabled, int maxQueueSize, String namespace,
Configuration conf) {
int priorityLevels = parseNumLevels(namespace, conf);
this.scheduler = createScheduler(schedulerClass, priorityLevels,
namespace, conf);
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
maxQueueSize, namespace, conf);
priorityLevels, maxQueueSize, namespace, conf);
this.clientBackOffEnabled = clientBackOffEnabled;
this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
LOG.info("Using callQueue " + backingClass);
LOG.info("Using callQueue: " + backingClass + " scheduler: " +
schedulerClass);
}
private static <T extends RpcScheduler> T createScheduler(
Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
// Used for custom, configurable scheduler
try {
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
String.class, Configuration.class);
return ctor.newInstance(priorityLevels, ns, conf);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
throw new RuntimeException(theClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
}
try {
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class);
return ctor.newInstance(priorityLevels);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
throw new RuntimeException(theClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
}
// Last attempt
try {
Constructor<T> ctor = theClass.getDeclaredConstructor();
return ctor.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
throw new RuntimeException(theClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
}
// Nothing worked
throw new RuntimeException(theClass.getName() +
" could not be constructed.");
}
private <T extends BlockingQueue<E>> T createCallQueueInstance(
Class<T> theClass, int maxLen, String ns, Configuration conf) {
Class<T> theClass, int priorityLevels, int maxLen, String ns,
Configuration conf) {
// Used for custom, configurable callqueues
try {
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class, String.class,
Configuration.class);
return ctor.newInstance(maxLen, ns, conf);
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
int.class, String.class, Configuration.class);
return ctor.newInstance(priorityLevels, maxLen, ns, conf);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
@ -110,6 +169,22 @@ public class CallQueueManager<E> {
return clientBackOffEnabled;
}
// Based on policy to determine back off current call
boolean shouldBackOff(Schedulable e) {
return scheduler.shouldBackOff(e);
}
void addResponseTime(String name, int priorityLevel, int queueTime,
int processingTime) {
scheduler.addResponseTime(name, priorityLevel, queueTime, processingTime);
}
// This should be only called once per call and cached in the call object
// each getPriorityLevel call will increment the counter for the caller
int getPriorityLevel(Schedulable e) {
return scheduler.getPriorityLevel(e);
}
/**
* Insert e into the backing queue or block until we can.
* If we block and the queue changes on us, we will insert while the
@ -146,15 +221,46 @@ public class CallQueueManager<E> {
return takeRef.get().size();
}
/**
* Read the number of levels from the configuration.
* This will affect the FairCallQueue's overall capacity.
* @throws IllegalArgumentException on invalid queue count
*/
@SuppressWarnings("deprecation")
private static int parseNumLevels(String ns, Configuration conf) {
// Fair call queue levels (IPC_CALLQUEUE_PRIORITY_LEVELS_KEY)
// takes priority over the scheduler level key
// (IPC_SCHEDULER_PRIORITY_LEVELS_KEY)
int retval = conf.getInt(ns + "." +
FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 0);
if (retval == 0) { // No FCQ priority level configured
retval = conf.getInt(ns + "." +
CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY);
} else {
LOG.warn(ns + "." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY +
" is deprecated. Please use " + ns + "." +
CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY + ".");
}
if(retval < 1) {
throw new IllegalArgumentException("numLevels must be at least 1");
}
return retval;
}
/**
* Replaces active queue with the newly requested one and transfers
* all calls to the newQ before returning.
*/
public synchronized void swapQueue(
Class<? extends RpcScheduler> schedulerClass,
Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize,
String ns, Configuration conf) {
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse, maxSize,
ns, conf);
int priorityLevels = parseNumLevels(ns, conf);
RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
ns, conf);
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
priorityLevels, maxSize, ns, conf);
// Our current queue becomes the old queue
BlockingQueue<E> oldQ = putRef.get();
@ -168,6 +274,8 @@ public class CallQueueManager<E> {
// Swap takeRef to handle new calls
takeRef.set(newQ);
this.scheduler = newScheduler;
LOG.info("Old Queue: " + stringRepr(oldQ) + ", " +
"Replacement: " + stringRepr(newQ));
}

View File

@ -27,17 +27,21 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.util.concurrent.AtomicDoubleArray;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.util.MBeans;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The decay RPC scheduler counts incoming requests in a map, then
@ -49,22 +53,28 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
/**
* Period controls how many milliseconds between each decay sweep.
*/
public static final String IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY =
public static final String IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY =
"decay-scheduler.period-ms";
public static final long IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT =
5000;
@Deprecated
public static final String IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY =
"faircallqueue.decay-scheduler.period-ms";
public static final long IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT =
5000L;
/**
* Decay factor controls how much each count is suppressed by on each sweep.
* Valid numbers are > 0 and < 1. Decay factor works in tandem with period
* to control how long the scheduler remembers an identity.
*/
public static final String IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY =
public static final String IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY =
"decay-scheduler.decay-factor";
public static final double IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT =
0.5;
@Deprecated
public static final String IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY =
"faircallqueue.decay-scheduler.decay-factor";
public static final double IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT =
0.5;
/**
/**
* Thresholds are specified as integer percentages, and specify which usage
* range each queue will be allocated to. For instance, specifying the list
* 10, 40, 80
@ -74,15 +84,31 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
* - q1 from 10 up to 40
* - q0 otherwise.
*/
public static final String IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY =
"faircallqueue.decay-scheduler.thresholds";
public static final String IPC_DECAYSCHEDULER_THRESHOLDS_KEY =
"decay-scheduler.thresholds";
@Deprecated
public static final String IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY =
"faircallqueue.decay-scheduler.thresholds";
// Specifies the identity to use when the IdentityProvider cannot handle
// a schedulable.
public static final String DECAYSCHEDULER_UNKNOWN_IDENTITY =
"IdentityProvider.Unknown";
"IdentityProvider.Unknown";
public static final Log LOG = LogFactory.getLog(DecayRpcScheduler.class);
public static final String
IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY =
"decay-scheduler.backoff.responsetime.enable";
public static final Boolean
IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT = false;
// Specifies the average response time (ms) thresholds of each
// level to trigger backoff
public static final String
IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY =
"decay-scheduler.backoff.responsetime.thresholds";
public static final Logger LOG =
LoggerFactory.getLogger(DecayRpcScheduler.class);
// Track the number of calls for each schedulable identity
private final ConcurrentHashMap<Object, AtomicLong> callCounts =
@ -91,6 +117,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
// Should be the sum of all AtomicLongs in callCounts
private final AtomicLong totalCalls = new AtomicLong();
// Track total call count and response time in current decay window
private final AtomicLongArray responseTimeCountInCurrWindow;
private final AtomicLongArray responseTimeTotalInCurrWindow;
// Track average response time in previous decay window
private final AtomicDoubleArray responseTimeAvgInLastWindow;
private final AtomicLongArray responseTimeCountInLastWindow;
// Pre-computed scheduling decisions during the decay sweep are
// atomically swapped in as a read-only map
private final AtomicReference<Map<Object, Integer>> scheduleCacheRef =
@ -98,10 +132,12 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
// Tune the behavior of the scheduler
private final long decayPeriodMillis; // How long between each tick
private final double decayFactor; // nextCount = currentCount / decayFactor
private final int numQueues; // affects scheduling decisions, from 0 to numQueues - 1
private final double decayFactor; // nextCount = currentCount * decayFactor
private final int numLevels;
private final double[] thresholds;
private final IdentityProvider identityProvider;
private final boolean backOffByResponseTimeEnabled;
private final long[] backOffResponseTimeThresholds;
/**
* This TimerTask will call decayCurrentCounts until
@ -132,35 +168,46 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
/**
* Create a decay scheduler.
* @param numQueues number of queues to schedule for
* @param numLevels number of priority levels
* @param ns config prefix, so that we can configure multiple schedulers
* in a single instance.
* @param conf configuration to use.
*/
public DecayRpcScheduler(int numQueues, String ns, Configuration conf) {
if (numQueues < 1) {
throw new IllegalArgumentException("number of queues must be > 0");
public DecayRpcScheduler(int numLevels, String ns, Configuration conf) {
if(numLevels < 1) {
throw new IllegalArgumentException("Number of Priority Levels must be " +
"at least 1");
}
this.numQueues = numQueues;
this.numLevels = numLevels;
this.decayFactor = parseDecayFactor(ns, conf);
this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
this.identityProvider = this.parseIdentityProvider(ns, conf);
this.thresholds = parseThresholds(ns, conf, numQueues);
this.thresholds = parseThresholds(ns, conf, numLevels);
this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
conf);
this.backOffResponseTimeThresholds =
parseBackOffResponseTimeThreshold(ns, conf, numLevels);
// Setup delay timer
Timer timer = new Timer();
DecayTask task = new DecayTask(this, timer);
timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis);
MetricsProxy prox = MetricsProxy.getInstance(ns);
// Setup response time metrics
responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels);
responseTimeCountInCurrWindow = new AtomicLongArray(numLevels);
responseTimeAvgInLastWindow = new AtomicDoubleArray(numLevels);
responseTimeCountInLastWindow = new AtomicLongArray(numLevels);
MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels);
prox.setDelegate(this);
}
// Load configs
private IdentityProvider parseIdentityProvider(String ns, Configuration conf) {
private IdentityProvider parseIdentityProvider(String ns,
Configuration conf) {
List<IdentityProvider> providers = conf.getInstances(
ns + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
IdentityProvider.class);
if (providers.size() < 1) {
@ -174,10 +221,16 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
private static double parseDecayFactor(String ns, Configuration conf) {
double factor = conf.getDouble(ns + "." +
IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY,
IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT
);
IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, 0.0);
if (factor == 0.0) {
factor = conf.getDouble(ns + "." +
IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY,
IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT);
} else if ((factor > 0.0) && (factor < 1)) {
LOG.warn(IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY +
" is deprecated. Please use " +
IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY + ".");
}
if (factor <= 0 || factor >= 1) {
throw new IllegalArgumentException("Decay Factor " +
"must be between 0 and 1");
@ -188,10 +241,17 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
private static long parseDecayPeriodMillis(String ns, Configuration conf) {
long period = conf.getLong(ns + "." +
IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY,
IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT
);
IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
0);
if (period == 0) {
period = conf.getLong(ns + "." +
IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY,
IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT);
} else if (period > 0) {
LOG.warn((IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY +
" is deprecated. Please use " +
IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY));
}
if (period <= 0) {
throw new IllegalArgumentException("Period millis must be >= 0");
}
@ -200,15 +260,24 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
}
private static double[] parseThresholds(String ns, Configuration conf,
int numQueues) {
int numLevels) {
int[] percentages = conf.getInts(ns + "." +
IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY);
IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY);
if (percentages.length == 0) {
return getDefaultThresholds(numQueues);
} else if (percentages.length != numQueues-1) {
percentages = conf.getInts(ns + "." + IPC_DECAYSCHEDULER_THRESHOLDS_KEY);
if (percentages.length == 0) {
return getDefaultThresholds(numLevels);
}
} else {
LOG.warn(IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY +
" is deprecated. Please use " +
IPC_DECAYSCHEDULER_THRESHOLDS_KEY);
}
if (percentages.length != numLevels-1) {
throw new IllegalArgumentException("Number of thresholds should be " +
(numQueues-1) + ". Was: " + percentages.length);
(numLevels-1) + ". Was: " + percentages.length);
}
// Convert integer percentages to decimals
@ -223,14 +292,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
/**
* Generate default thresholds if user did not specify. Strategy is
* to halve each time, since queue usage tends to be exponential.
* So if numQueues is 4, we would generate: double[]{0.125, 0.25, 0.5}
* So if numLevels is 4, we would generate: double[]{0.125, 0.25, 0.5}
* which specifies the boundaries between each queue's usage.
* @param numQueues number of queues to compute for
* @return array of boundaries of length numQueues - 1
* @param numLevels number of levels to compute for
* @return array of boundaries of length numLevels - 1
*/
private static double[] getDefaultThresholds(int numQueues) {
double[] ret = new double[numQueues - 1];
double div = Math.pow(2, numQueues - 1);
private static double[] getDefaultThresholds(int numLevels) {
double[] ret = new double[numLevels - 1];
double div = Math.pow(2, numLevels - 1);
for (int i = 0; i < ret.length; i++) {
ret[i] = Math.pow(2, i)/div;
@ -238,39 +307,89 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
return ret;
}
private static long[] parseBackOffResponseTimeThreshold(String ns,
Configuration conf, int numLevels) {
long[] responseTimeThresholds = conf.getTimeDurations(ns + "." +
IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY,
TimeUnit.MILLISECONDS);
// backoff thresholds not specified
if (responseTimeThresholds.length == 0) {
return getDefaultBackOffResponseTimeThresholds(numLevels);
}
// backoff thresholds specified but not match with the levels
if (responseTimeThresholds.length != numLevels) {
throw new IllegalArgumentException(
"responseTimeThresholds must match with the number of priority " +
"levels");
}
// invalid thresholds
for (long responseTimeThreshold: responseTimeThresholds) {
if (responseTimeThreshold <= 0) {
throw new IllegalArgumentException(
"responseTimeThreshold millis must be >= 0");
}
}
return responseTimeThresholds;
}
// 10s for level 0, 20s for level 1, 30s for level 2, ...
private static long[] getDefaultBackOffResponseTimeThresholds(int numLevels) {
long[] ret = new long[numLevels];
for (int i = 0; i < ret.length; i++) {
ret[i] = 10000*(i+1);
}
return ret;
}
private static Boolean parseBackOffByResponseTimeEnabled(String ns,
Configuration conf) {
return conf.getBoolean(ns + "." +
IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT);
}
/**
* Decay the stored counts for each user and clean as necessary.
* This method should be called periodically in order to keep
* counts current.
*/
private void decayCurrentCounts() {
long total = 0;
Iterator<Map.Entry<Object, AtomicLong>> it =
callCounts.entrySet().iterator();
try {
long total = 0;
Iterator<Map.Entry<Object, AtomicLong>> it =
callCounts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Object, AtomicLong> entry = it.next();
AtomicLong count = entry.getValue();
while (it.hasNext()) {
Map.Entry<Object, AtomicLong> entry = it.next();
AtomicLong count = entry.getValue();
// Compute the next value by reducing it by the decayFactor
long currentValue = count.get();
long nextValue = (long)(currentValue * decayFactor);
total += nextValue;
count.set(nextValue);
// Compute the next value by reducing it by the decayFactor
long currentValue = count.get();
long nextValue = (long) (currentValue * decayFactor);
total += nextValue;
count.set(nextValue);
if (nextValue == 0) {
// We will clean up unused keys here. An interesting optimization might
// be to have an upper bound on keyspace in callCounts and only
// clean once we pass it.
it.remove();
if (nextValue == 0) {
// We will clean up unused keys here. An interesting optimization
// might be to have an upper bound on keyspace in callCounts and only
// clean once we pass it.
it.remove();
}
}
// Update the total so that we remain in sync
totalCalls.set(total);
// Now refresh the cache of scheduling decisions
recomputeScheduleCache();
// Update average response time with decay
updateAverageResponseTime(true);
} catch (Exception ex) {
LOG.error("decayCurrentCounts exception: " +
ExceptionUtils.getFullStackTrace(ex));
throw ex;
}
// Update the total so that we remain in sync
totalCalls.set(total);
// Now refresh the cache of scheduling decisions
recomputeScheduleCache();
}
/**
@ -324,7 +443,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
/**
* Given the number of occurrences, compute a scheduling decision.
* @param occurrences how many occurrences
* @return scheduling decision from 0 to numQueues - 1
* @return scheduling decision from 0 to numLevels - 1
*/
private int computePriorityLevel(long occurrences) {
long totalCallSnapshot = totalCalls.get();
@ -334,14 +453,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
proportion = (double) occurrences / totalCallSnapshot;
}
// Start with low priority queues, since they will be most common
for(int i = (numQueues - 1); i > 0; i--) {
// Start with low priority levels, since they will be most common
for(int i = (numLevels - 1); i > 0; i--) {
if (proportion >= this.thresholds[i - 1]) {
return i; // We've found our queue number
return i; // We've found our level number
}
}
// If we get this far, we're at queue 0
// If we get this far, we're at level 0
return 0;
}
@ -349,7 +468,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
* Returns the priority level for a given identity by first trying the cache,
* then computing it.
* @param identity an object responding to toString and hashCode
* @return integer scheduling decision from 0 to numQueues - 1
* @return integer scheduling decision from 0 to numLevels - 1
*/
private int cachedOrComputedPriorityLevel(Object identity) {
try {
@ -360,22 +479,29 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
if (scheduleCache != null) {
Integer priority = scheduleCache.get(identity);
if (priority != null) {
LOG.debug("Cache priority for: {} with priority: {}", identity,
priority);
return priority;
}
}
// Cache was no good, compute it
return computePriorityLevel(occurrences);
int priority = computePriorityLevel(occurrences);
LOG.debug("compute priority for " + identity + " priority " + priority);
return priority;
} catch (InterruptedException ie) {
LOG.warn("Caught InterruptedException, returning low priority queue");
return numQueues - 1;
LOG.warn("Caught InterruptedException, returning low priority level");
LOG.debug("Fallback priority for: {} with priority: {}", identity,
numLevels - 1);
return numLevels - 1;
}
}
/**
* Compute the appropriate priority for a schedulable based on past requests.
* @param obj the schedulable obj to query and remember
* @return the queue index which we recommend scheduling in
* @return the level index which we recommend scheduling in
*/
@Override
public int getPriorityLevel(Schedulable obj) {
@ -389,6 +515,73 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
return cachedOrComputedPriorityLevel(identity);
}
@Override
public boolean shouldBackOff(Schedulable obj) {
Boolean backOff = false;
if (backOffByResponseTimeEnabled) {
int priorityLevel = obj.getPriorityLevel();
if (LOG.isDebugEnabled()) {
double[] responseTimes = getAverageResponseTime();
LOG.debug("Current Caller: {} Priority: {} ",
obj.getUserGroupInformation().getUserName(),
obj.getPriorityLevel());
for (int i = 0; i < numLevels; i++) {
LOG.debug("Queue: {} responseTime: {} backoffThreshold: {}", i,
responseTimes[i], backOffResponseTimeThresholds[i]);
}
}
// High priority rpc over threshold triggers back off of low priority rpc
for (int i = 0; i < priorityLevel + 1; i++) {
if (responseTimeAvgInLastWindow.get(i) >
backOffResponseTimeThresholds[i]) {
backOff = true;
break;
}
}
}
return backOff;
}
@Override
public void addResponseTime(String name, int priorityLevel, int queueTime,
int processingTime) {
responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
queueTime+processingTime);
if (LOG.isDebugEnabled()) {
LOG.debug("addResponseTime for call: {} priority: {} queueTime: {} " +
"processingTime: {} ", name, priorityLevel, queueTime,
processingTime);
}
}
// Update the cached average response time at the end of decay window
void updateAverageResponseTime(boolean enableDecay) {
for (int i = 0; i < numLevels; i++) {
double averageResponseTime = 0;
long totalResponseTime = responseTimeTotalInCurrWindow.get(i);
long responseTimeCount = responseTimeCountInCurrWindow.get(i);
if (responseTimeCount > 0) {
averageResponseTime = (double) totalResponseTime / responseTimeCount;
}
final double lastAvg = responseTimeAvgInLastWindow.get(i);
if (enableDecay && lastAvg > 0.0) {
final double decayed = decayFactor * lastAvg + averageResponseTime;
responseTimeAvgInLastWindow.set(i, decayed);
} else {
responseTimeAvgInLastWindow.set(i, averageResponseTime);
}
responseTimeCountInLastWindow.set(i, responseTimeCount);
if (LOG.isDebugEnabled()) {
LOG.debug("updateAverageResponseTime queue: {} Average: {} Count: {}",
i, averageResponseTime, responseTimeCount);
}
// Reset for next decay window
responseTimeTotalInCurrWindow.set(i, 0);
responseTimeCountInCurrWindow.set(i, 0);
}
}
// For testing
@VisibleForTesting
public double getDecayFactor() { return decayFactor; }
@ -429,16 +622,21 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
// Weakref for delegate, so we don't retain it forever if it can be GC'd
private WeakReference<DecayRpcScheduler> delegate;
private double[] averageResponseTimeDefault;
private long[] callCountInLastWindowDefault;
private MetricsProxy(String namespace) {
private MetricsProxy(String namespace, int numLevels) {
averageResponseTimeDefault = new double[numLevels];
callCountInLastWindowDefault = new long[numLevels];
MBeans.register(namespace, "DecayRpcScheduler", this);
}
public static synchronized MetricsProxy getInstance(String namespace) {
public static synchronized MetricsProxy getInstance(String namespace,
int numLevels) {
MetricsProxy mp = INSTANCES.get(namespace);
if (mp == null) {
// We must create one
mp = new MetricsProxy(namespace);
mp = new MetricsProxy(namespace, numLevels);
INSTANCES.put(namespace, mp);
}
return mp;
@ -487,6 +685,25 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
return scheduler.getTotalCallVolume();
}
}
@Override
public double[] getAverageResponseTime() {
DecayRpcScheduler scheduler = delegate.get();
if (scheduler == null) {
return averageResponseTimeDefault;
} else {
return scheduler.getAverageResponseTime();
}
}
public long[] getResponseTimeCountInLastWindow() {
DecayRpcScheduler scheduler = delegate.get();
if (scheduler == null) {
return callCountInLastWindowDefault;
} else {
return scheduler.getResponseTimeCountInLastWindow();
}
}
}
public int getUniqueIdentityCount() {
@ -497,6 +714,23 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
return totalCalls.get();
}
public long[] getResponseTimeCountInLastWindow() {
long[] ret = new long[responseTimeCountInLastWindow.length()];
for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {
ret[i] = responseTimeCountInLastWindow.get(i);
}
return ret;
}
@Override
public double[] getAverageResponseTime() {
double[] ret = new double[responseTimeAvgInLastWindow.length()];
for (int i = 0; i < responseTimeAvgInLastWindow.length(); i++) {
ret[i] = responseTimeAvgInLastWindow.get(i);
}
return ret;
}
public String getSchedulingDecisionSummary() {
Map<Object, Integer> decisions = scheduleCacheRef.get();
if (decisions == null) {

View File

@ -27,4 +27,6 @@ public interface DecayRpcSchedulerMXBean {
String getCallVolumeSummary();
int getUniqueIdentityCount();
long getTotalCallVolume();
double[] getAverageResponseTime();
long[] getResponseTimeCountInLastWindow();
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.conf.Configuration;
/**
* No op default RPC scheduler.
*/
public class DefaultRpcScheduler implements RpcScheduler {
@Override
public int getPriorityLevel(Schedulable obj) {
return 0;
}
@Override
public boolean shouldBackOff(Schedulable obj) {
return false;
}
@Override
public void addResponseTime(String name, int priorityLevel, int queueTime,
int processingTime) {
}
public DefaultRpcScheduler(int priorityLevels, String namespace,
Configuration conf) {
}
}

View File

@ -44,8 +44,9 @@ import org.apache.hadoop.metrics2.util.MBeans;
*/
public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
implements BlockingQueue<E> {
// Configuration Keys
@Deprecated
public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4;
@Deprecated
public static final String IPC_CALLQUEUE_PRIORITY_LEVELS_KEY =
"faircallqueue.priority-levels";
@ -66,9 +67,6 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
}
}
/* Scheduler picks which queue to place in */
private RpcScheduler scheduler;
/* Multiplexer picks which queue to draw from */
private RpcMultiplexer multiplexer;
@ -83,8 +81,13 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
* Notes: the FairCallQueue has no fixed capacity. Rather, it has a minimum
* capacity of `capacity` and a maximum capacity of `capacity * number_queues`
*/
public FairCallQueue(int capacity, String ns, Configuration conf) {
int numQueues = parseNumQueues(ns, conf);
public FairCallQueue(int priorityLevels, int capacity, String ns,
Configuration conf) {
if(priorityLevels < 1) {
throw new IllegalArgumentException("Number of Priority Levels must be " +
"at least 1");
}
int numQueues = priorityLevels;
LOG.info("FairCallQueue is in use with " + numQueues + " queues.");
this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
@ -95,28 +98,12 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
this.overflowedCalls.add(new AtomicLong(0));
}
this.scheduler = new DecayRpcScheduler(numQueues, ns, conf);
this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf);
// Make this the active source of metrics
MetricsProxy mp = MetricsProxy.getInstance(ns);
mp.setDelegate(this);
}
/**
* Read the number of queues from the configuration.
* This will affect the FairCallQueue's overall capacity.
* @throws IllegalArgumentException on invalid queue count
*/
private static int parseNumQueues(String ns, Configuration conf) {
int retval = conf.getInt(ns + "." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY,
IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT);
if(retval < 1) {
throw new IllegalArgumentException("numQueues must be at least 1");
}
return retval;
}
/**
* Returns the first non-empty queue with equal or lesser priority
* than <i>startIdx</i>. Wraps around, searching a maximum of N
@ -144,7 +131,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
/**
* Put and offer follow the same pattern:
* 1. Get a priorityLevel from the scheduler
* 1. Get the assigned priorityLevel from the call by scheduler
* 2. Get the nth sub-queue matching this priorityLevel
* 3. delegate the call to this sub-queue.
*
@ -154,7 +141,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public void put(E e) throws InterruptedException {
int priorityLevel = scheduler.getPriorityLevel(e);
int priorityLevel = e.getPriorityLevel();
final int numLevels = this.queues.size();
while (true) {
@ -185,7 +172,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
int priorityLevel = scheduler.getPriorityLevel(e);
int priorityLevel = e.getPriorityLevel();
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean ret = q.offer(e, timeout, unit);
@ -196,7 +183,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
@Override
public boolean offer(E e) {
int priorityLevel = scheduler.getPriorityLevel(e);
int priorityLevel = e.getPriorityLevel();
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean ret = q.offer(e);
@ -436,12 +423,6 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
return calls;
}
// For testing
@VisibleForTesting
public void setScheduler(RpcScheduler newScheduler) {
this.scheduler = newScheduler;
}
@VisibleForTesting
public void setMultiplexer(RpcMultiplexer newMux) {
this.multiplexer = newMux;

View File

@ -634,13 +634,7 @@ public class ProtobufRpcEngine implements RpcEngine {
String detailedMetricsName = (exception == null) ?
methodName :
exception.getClass().getSimpleName();
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
if (server.isLogSlowRPC()) {
server.logSlowRpcCalls(methodName, processingTime);
}
server.updateMetrics(detailedMetricsName, qTime, processingTime);
}
return new RpcResponseWrapper(result);
}

View File

@ -19,11 +19,17 @@
package org.apache.hadoop.ipc;
/**
* Implement this interface to be used for RPC scheduling in the fair call queues.
* Implement this interface to be used for RPC scheduling and backoff.
*
*/
public interface RpcScheduler {
/**
* Returns priority level greater than zero as a hint for scheduling.
*/
int getPriorityLevel(Schedulable obj);
boolean shouldBackOff(Schedulable obj);
void addResponseTime(String name, int priorityLevel, int queueTime,
int processingTime);
}

View File

@ -18,11 +18,8 @@
package org.apache.hadoop.ipc;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.io.Writable;
/**
* Interface which allows extracting information necessary to
@ -31,4 +28,6 @@ import org.apache.hadoop.io.Writable;
@InterfaceAudience.Private
public interface Schedulable {
public UserGroupInformation getUserGroupInformation();
int getPriorityLevel();
}

View File

@ -393,6 +393,15 @@ public abstract class Server {
return CurCall.get() != null;
}
/**
* Return the priority level assigned by call queue to an RPC
* Returns 0 in case no priority is assigned.
*/
public static int getPriorityLevel() {
Call call = CurCall.get();
return call != null? call.getPriorityLevel() : 0;
}
private String bindAddress;
private int port; // port we listen on
private int handlerCount; // number of handler threads
@ -479,6 +488,18 @@ public abstract class Server {
}
}
void updateMetrics(String name, int queueTime, int processingTime) {
rpcMetrics.addRpcQueueTime(queueTime);
rpcMetrics.addRpcProcessingTime(processingTime);
rpcDetailedMetrics.addProcessingTime(name, processingTime);
callQueue.addResponseTime(name, getPriorityLevel(), queueTime,
processingTime);
if (isLogSlowRPC()) {
logSlowRpcCalls(name, processingTime);
}
}
/**
* A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host.
@ -575,6 +596,10 @@ public abstract class Server {
return serviceAuthorizationManager;
}
private String getQueueClassPrefix() {
return CommonConfigurationKeys.IPC_NAMESPACE + "." + port;
}
static Class<? extends BlockingQueue<Call>> getQueueClass(
String prefix, Configuration conf) {
String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
@ -582,8 +607,29 @@ public abstract class Server {
return CallQueueManager.convertQueueClass(queueClass, Call.class);
}
private String getQueueClassPrefix() {
return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port;
static Class<? extends RpcScheduler> getSchedulerClass(
String prefix, Configuration conf) {
String schedulerKeyname = prefix + "." + CommonConfigurationKeys
.IPC_SCHEDULER_IMPL_KEY;
Class<?> schedulerClass = conf.getClass(schedulerKeyname, null);
// Patch the configuration for legacy fcq configuration that does not have
// a separate scheduler setting
if (schedulerClass == null) {
String queueKeyName = prefix + "." + CommonConfigurationKeys
.IPC_CALLQUEUE_IMPL_KEY;
Class<?> queueClass = conf.getClass(queueKeyName, null);
if (queueClass != null) {
if (queueClass.getCanonicalName().equals(
FairCallQueue.class.getCanonicalName())) {
conf.setClass(schedulerKeyname, DecayRpcScheduler.class,
RpcScheduler.class);
}
}
}
schedulerClass = conf.getClass(schedulerKeyname,
DefaultRpcScheduler.class);
return CallQueueManager.convertSchedulerClass(schedulerClass);
}
/*
@ -592,7 +638,8 @@ public abstract class Server {
public synchronized void refreshCallQueue(Configuration conf) {
// Create the next queue
String prefix = getQueueClassPrefix();
callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
callQueue.swapQueue(getSchedulerClass(prefix, conf),
getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
}
/**
@ -620,6 +667,8 @@ public abstract class Server {
private final byte[] clientId;
private final TraceScope traceScope; // the HTrace scope on the server side
private final CallerContext callerContext; // the call context
private int priorityLevel;
// the priority level assigned by scheduler, 0 by default
private Call(Call call) {
this(call.callId, call.retryCount, call.rpcRequest, call.connection,
@ -706,7 +755,16 @@ public abstract class Server {
@Override
public UserGroupInformation getUserGroupInformation() {
return connection.user;
}
}
@Override
public int getPriorityLevel() {
return this.priorityLevel;
}
public void setPriorityLevel(int priorityLevel) {
this.priorityLevel = priorityLevel;
}
}
/** Listens on the socket. Creates jobs for the handler threads*/
@ -2066,6 +2124,9 @@ public abstract class Server {
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceScope, callerContext);
// Save the priority level assignment by the scheduler
call.setPriorityLevel(callQueue.getPriorityLevel(call));
if (callQueue.isClientBackoffEnabled()) {
// if RPC queue is full, we will ask the RPC client to back off by
// throwing RetriableException. Whether RPC client will honor
@ -2081,9 +2142,10 @@ public abstract class Server {
private void queueRequestOrAskClientToBackOff(Call call)
throws WrappedRpcServerException, InterruptedException {
// If rpc queue is full, we will ask the client to back off.
boolean isCallQueued = callQueue.offer(call);
if (!isCallQueued) {
// If rpc scheduler indicates back off based on performance
// degradation such as response time or rpc queue is full,
// we will ask the client to back off.
if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
rpcMetrics.incrClientBackoff();
RetriableException retriableException =
new RetriableException("Server is too busy.");
@ -2427,6 +2489,7 @@ public abstract class Server {
// Setup appropriate callqueue
final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
getSchedulerClass(prefix, conf),
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -502,13 +501,12 @@ public class WritableRpcEngine implements RpcEngine {
}
}
}
// Invoke the protocol method
long startTime = Time.now();
int qTime = (int) (startTime-receivedTime);
Exception exception = null;
try {
// Invoke the protocol method
long startTime = Time.now();
int qTime = (int) (startTime-receivedTime);
Exception exception = null;
try {
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
@ -539,27 +537,20 @@ public class WritableRpcEngine implements RpcEngine {
exception = ioe;
throw ioe;
} finally {
int processingTime = (int) (Time.now() - startTime);
if (LOG.isDebugEnabled()) {
String msg = "Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
LOG.debug(msg);
}
String detailedMetricsName = (exception == null) ?
call.getMethodName() :
exception.getClass().getSimpleName();
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
if (server.isLogSlowRPC()) {
server.logSlowRpcCalls(call.getMethodName(), processingTime);
int processingTime = (int) (Time.now() - startTime);
if (LOG.isDebugEnabled()) {
String msg = "Served: " + call.getMethodName() +
" queueTime= " + qTime + " procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
LOG.debug(msg);
}
}
String detailedMetricsName = (exception == null) ?
call.getMethodName() :
exception.getClass().getSimpleName();
server.updateMetrics(detailedMetricsName, qTime, processingTime);
}
}
}
}

View File

@ -27,17 +27,37 @@ import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
public class TestCallQueueManager {
private CallQueueManager<FakeCall> manager;
private Configuration conf = new Configuration();
public class FakeCall {
public class FakeCall implements Schedulable {
public final int tag; // Can be used for unique identification
private int priorityLevel;
UserGroupInformation fakeUgi = UserGroupInformation.createRemoteUser
("fakeUser");
public FakeCall(int tag) {
this.tag = tag;
}
@Override
public UserGroupInformation getUserGroupInformation() {
return fakeUgi;
}
@Override
public int getPriorityLevel() {
return priorityLevel;
}
public void setPriorityLevel(int level) {
this.priorityLevel = level;
}
}
/**
@ -62,7 +82,9 @@ public class TestCallQueueManager {
try {
// Fill up to max (which is infinite if maxCalls < 0)
while (isRunning && (callsAdded < maxCalls || maxCalls < 0)) {
cq.put(new FakeCall(this.tag));
FakeCall call = new FakeCall(this.tag);
call.setPriorityLevel(cq.getPriorityLevel(call));
cq.put(call);
callsAdded++;
}
} catch (InterruptedException e) {
@ -135,7 +157,7 @@ public class TestCallQueueManager {
t.start();
t.join(100);
assertEquals(putter.callsAdded, numberOfPuts);
assertEquals(numberOfPuts, putter.callsAdded);
t.interrupt();
}
@ -143,23 +165,90 @@ public class TestCallQueueManager {
private static final Class<? extends BlockingQueue<FakeCall>> queueClass
= CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class);
private static final Class<? extends RpcScheduler> schedulerClass
= CallQueueManager.convertSchedulerClass(DefaultRpcScheduler.class);
@Test
public void testCallQueueCapacity() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
10, "", conf);
assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
}
@Test
public void testEmptyConsume() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
10, "", conf);
assertCanTake(manager, 0, 1); // Fails since it's empty
}
static Class<? extends BlockingQueue<FakeCall>> getQueueClass(
String prefix, Configuration conf) {
String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
return CallQueueManager.convertQueueClass(queueClass, FakeCall.class);
}
@Test
public void testFcqBackwardCompatibility() throws InterruptedException {
// Test BackwardCompatibility to ensure existing FCQ deployment still
// work without explicitly specifying DecayRpcScheduler
Configuration conf = new Configuration();
final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
final String queueClassName = "org.apache.hadoop.ipc.FairCallQueue";
conf.setStrings(ns + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY,
queueClassName);
// Specify only Fair Call Queue without a scheduler
// Ensure the DecayScheduler will be added to avoid breaking.
Class<? extends RpcScheduler> scheduler = Server.getSchedulerClass(ns,
conf);
assertTrue(scheduler.getCanonicalName().
equals("org.apache.hadoop.ipc.DecayRpcScheduler"));
Class<? extends BlockingQueue<FakeCall>> queue =
(Class<? extends BlockingQueue<FakeCall>>) getQueueClass(ns, conf);
assertTrue(queue.getCanonicalName().equals(queueClassName));
manager = new CallQueueManager<FakeCall>(queue, scheduler, false,
2, "", conf);
// Default FCQ has 4 levels and the max capacity is 2 x 4
assertCanPut(manager, 3, 3);
}
@Test
public void testSchedulerWithoutFCQ() throws InterruptedException {
Configuration conf = new Configuration();
// Test DecayedRpcScheduler without FCQ
// Ensure the default LinkedBlockingQueue can work with DecayedRpcScheduler
final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
final String schedulerClassName = "org.apache.hadoop.ipc.DecayRpcScheduler";
conf.setStrings(ns + "." + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
schedulerClassName);
Class<? extends BlockingQueue<FakeCall>> queue =
(Class<? extends BlockingQueue<FakeCall>>) getQueueClass(ns, conf);
assertTrue(queue.getCanonicalName().equals("java.util.concurrent." +
"LinkedBlockingQueue"));
manager = new CallQueueManager<FakeCall>(queue,
Server.getSchedulerClass(ns, conf), false,
3, "", conf);
// LinkedBlockingQueue with a capacity of 3 can put 3 calls
assertCanPut(manager, 3, 3);
// LinkedBlockingQueue with a capacity of 3 can't put 1 more call
assertCanPut(manager, 0, 1);
}
@Test(timeout=60000)
public void testSwapUnderContention() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(queueClass, false, 5000, "", null);
manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
5000, "", conf);
ArrayList<Putter> producers = new ArrayList<Putter>();
ArrayList<Taker> consumers = new ArrayList<Taker>();
@ -188,7 +277,7 @@ public class TestCallQueueManager {
Thread.sleep(500);
for (int i=0; i < 5; i++) {
manager.swapQueue(queueClass, 5000, "", null);
manager.swapQueue(schedulerClass, queueClass, 5000, "", conf);
}
// Stop the producers
@ -223,24 +312,50 @@ public class TestCallQueueManager {
}
public static class ExceptionFakeCall {
public ExceptionFakeCall() {
throw new IllegalArgumentException("Exception caused by constructor.!!");
throw new IllegalArgumentException("Exception caused by call queue " +
"constructor.!!");
}
}
private static final Class<? extends BlockingQueue<ExceptionFakeCall>> exceptionQueueClass = CallQueueManager
.convertQueueClass(ExceptionFakeCall.class, ExceptionFakeCall.class);
public static class ExceptionFakeScheduler {
public ExceptionFakeScheduler() {
throw new IllegalArgumentException("Exception caused by " +
"scheduler constructor.!!");
}
}
private static final Class<? extends RpcScheduler>
exceptionSchedulerClass = CallQueueManager.convertSchedulerClass(
ExceptionFakeScheduler.class);
private static final Class<? extends BlockingQueue<ExceptionFakeCall>>
exceptionQueueClass = CallQueueManager.convertQueueClass(
ExceptionFakeCall.class, ExceptionFakeCall.class);
@Test
public void testInvocationException() throws InterruptedException {
public void testCallQueueConstructorException() throws InterruptedException {
try {
new CallQueueManager<ExceptionFakeCall>(exceptionQueueClass, false, 10,
"", null);
new CallQueueManager<ExceptionFakeCall>(exceptionQueueClass,
schedulerClass, false, 10, "", new Configuration());
fail();
} catch (RuntimeException re) {
assertTrue(re.getCause() instanceof IllegalArgumentException);
assertEquals("Exception caused by constructor.!!", re.getCause()
assertEquals("Exception caused by call queue constructor.!!", re
.getCause()
.getMessage());
}
}
@Test
public void testSchedulerConstructorException() throws InterruptedException {
try {
new CallQueueManager<FakeCall>(queueClass, exceptionSchedulerClass,
false, 10, "", new Configuration());
fail();
} catch (RuntimeException re) {
assertTrue(re.getCause() instanceof IllegalArgumentException);
assertEquals("Exception caused by scheduler constructor.!!", re.getCause()
.getMessage());
}
}

View File

@ -18,12 +18,11 @@
package org.apache.hadoop.ipc;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -31,8 +30,6 @@ import static org.mockito.Mockito.when;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
public class TestDecayRpcScheduler {
private Schedulable mockCall(String id) {
Schedulable mockCall = mock(Schedulable.class);
@ -57,30 +54,32 @@ public class TestDecayRpcScheduler {
}
@Test
@SuppressWarnings("deprecation")
public void testParsePeriod() {
// By default
scheduler = new DecayRpcScheduler(1, "", new Configuration());
assertEquals(DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT,
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
scheduler.getDecayPeriodMillis());
// Custom
Configuration conf = new Configuration();
conf.setLong("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY,
conf.setLong("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
1058);
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(1058L, scheduler.getDecayPeriodMillis());
}
@Test
@SuppressWarnings("deprecation")
public void testParseFactor() {
// Default
scheduler = new DecayRpcScheduler(1, "", new Configuration());
assertEquals(DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT,
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT,
scheduler.getDecayFactor(), 0.00001);
// Custom
Configuration conf = new Configuration();
conf.set("prefix." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY,
conf.set("prefix." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY,
"0.125");
scheduler = new DecayRpcScheduler(1, "prefix", conf);
assertEquals(0.125, scheduler.getDecayFactor(), 0.00001);
@ -94,6 +93,7 @@ public class TestDecayRpcScheduler {
}
@Test
@SuppressWarnings("deprecation")
public void testParseThresholds() {
// Defaults vary by number of queues
Configuration conf = new Configuration();
@ -111,16 +111,17 @@ public class TestDecayRpcScheduler {
// Custom
conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY,
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
"1, 10, 20, 50, 85");
scheduler = new DecayRpcScheduler(6, "ns", conf);
assertEqualDecimalArrays(new double[]{0.01, 0.1, 0.2, 0.5, 0.85}, scheduler.getThresholds());
}
@Test
@SuppressWarnings("deprecation")
public void testAccumulate() {
Configuration conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(0, scheduler.getCallCountSnapshot().size()); // empty first
@ -138,10 +139,11 @@ public class TestDecayRpcScheduler {
}
@Test
public void testDecay() {
@SuppressWarnings("deprecation")
public void testDecay() throws Exception {
Configuration conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, "0.5");
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(0, scheduler.getTotalCallSnapshot());
@ -150,6 +152,8 @@ public class TestDecayRpcScheduler {
scheduler.getPriorityLevel(mockCall("A"));
}
sleep(1000);
for (int i = 0; i < 8; i++) {
scheduler.getPriorityLevel(mockCall("B"));
}
@ -184,10 +188,11 @@ public class TestDecayRpcScheduler {
}
@Test
@SuppressWarnings("deprecation")
public void testPriority() {
Configuration conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY,
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
"25, 50, 75");
scheduler = new DecayRpcScheduler(4, "ns", conf);
@ -204,10 +209,11 @@ public class TestDecayRpcScheduler {
}
@Test(timeout=2000)
@SuppressWarnings("deprecation")
public void testPeriodic() throws InterruptedException {
Configuration conf = new Configuration();
conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "10");
conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, "0.5");
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10");
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(10, scheduler.getDecayPeriodMillis());
@ -219,7 +225,7 @@ public class TestDecayRpcScheduler {
// It should eventually decay to zero
while (scheduler.getTotalCallSnapshot() > 0) {
Thread.sleep(10);
sleep(10);
}
}
}

View File

@ -37,21 +37,24 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration;
import org.mockito.Matchers;
import static org.apache.hadoop.ipc.FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY;
public class TestFairCallQueue extends TestCase {
private FairCallQueue<Schedulable> fcq;
private Schedulable mockCall(String id) {
private Schedulable mockCall(String id, int priority) {
Schedulable mockCall = mock(Schedulable.class);
UserGroupInformation ugi = mock(UserGroupInformation.class);
when(ugi.getUserName()).thenReturn(id);
when(mockCall.getUserGroupInformation()).thenReturn(ugi);
when(mockCall.getPriorityLevel()).thenReturn(priority);
return mockCall;
}
private Schedulable mockCall(String id) {
return mockCall(id, 0);
}
// A scheduler which always schedules into priority zero
private RpcScheduler alwaysZeroScheduler;
{
@ -60,11 +63,12 @@ public class TestFairCallQueue extends TestCase {
alwaysZeroScheduler = sched;
}
@SuppressWarnings("deprecation")
public void setUp() {
Configuration conf = new Configuration();
conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
fcq = new FairCallQueue<Schedulable>(5, "ns", conf);
fcq = new FairCallQueue<Schedulable>(2, 5, "ns", conf);
}
//
@ -85,7 +89,6 @@ public class TestFairCallQueue extends TestCase {
}
public void testOfferSucceeds() {
fcq.setScheduler(alwaysZeroScheduler);
for (int i = 0; i < 5; i++) {
// We can fit 10 calls
@ -96,7 +99,6 @@ public class TestFairCallQueue extends TestCase {
}
public void testOfferFailsWhenFull() {
fcq.setScheduler(alwaysZeroScheduler);
for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); }
assertFalse(fcq.offer(mockCall("c"))); // It's full
@ -107,11 +109,10 @@ public class TestFairCallQueue extends TestCase {
public void testOfferSucceedsWhenScheduledLowPriority() {
// Scheduler will schedule into queue 0 x 5, then queue 1
RpcScheduler sched = mock(RpcScheduler.class);
when(sched.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(0, 0, 0, 0, 0, 1, 0);
fcq.setScheduler(sched);
for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); }
int mockedPriorities[] = {0, 0, 0, 0, 0, 1, 0};
for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c", mockedPriorities[i]))); }
assertTrue(fcq.offer(mockCall("c")));
assertTrue(fcq.offer(mockCall("c", mockedPriorities[5])));
assertEquals(6, fcq.size());
}
@ -121,7 +122,7 @@ public class TestFairCallQueue extends TestCase {
}
public void testPeekNonDestructive() {
Schedulable call = mockCall("c");
Schedulable call = mockCall("c", 0);
assertTrue(fcq.offer(call));
assertEquals(call, fcq.peek());
@ -130,8 +131,8 @@ public class TestFairCallQueue extends TestCase {
}
public void testPeekPointsAtHead() {
Schedulable call = mockCall("c");
Schedulable next = mockCall("b");
Schedulable call = mockCall("c", 0);
Schedulable next = mockCall("b", 0);
fcq.offer(call);
fcq.offer(next);
@ -139,15 +140,11 @@ public class TestFairCallQueue extends TestCase {
}
public void testPollTimeout() throws InterruptedException {
fcq.setScheduler(alwaysZeroScheduler);
assertNull(fcq.poll(10, TimeUnit.MILLISECONDS));
}
public void testPollSuccess() throws InterruptedException {
fcq.setScheduler(alwaysZeroScheduler);
Schedulable call = mockCall("c");
Schedulable call = mockCall("c", 0);
assertTrue(fcq.offer(call));
assertEquals(call, fcq.poll(10, TimeUnit.MILLISECONDS));
@ -156,7 +153,6 @@ public class TestFairCallQueue extends TestCase {
}
public void testOfferTimeout() throws InterruptedException {
fcq.setScheduler(alwaysZeroScheduler);
for (int i = 0; i < 5; i++) {
assertTrue(fcq.offer(mockCall("c"), 10, TimeUnit.MILLISECONDS));
}
@ -166,13 +162,11 @@ public class TestFairCallQueue extends TestCase {
assertEquals(5, fcq.size());
}
@SuppressWarnings("deprecation")
public void testDrainTo() {
Configuration conf = new Configuration();
conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(10, "ns", conf);
fcq.setScheduler(alwaysZeroScheduler);
fcq2.setScheduler(alwaysZeroScheduler);
conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(2, 10, "ns", conf);
// Start with 3 in fcq, to be drained
for (int i = 0; i < 3; i++) {
@ -185,13 +179,11 @@ public class TestFairCallQueue extends TestCase {
assertEquals(3, fcq2.size());
}
@SuppressWarnings("deprecation")
public void testDrainToWithLimit() {
Configuration conf = new Configuration();
conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(10, "ns", conf);
fcq.setScheduler(alwaysZeroScheduler);
fcq2.setScheduler(alwaysZeroScheduler);
conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(2, 10, "ns", conf);
// Start with 3 in fcq, to be drained
for (int i = 0; i < 3; i++) {
@ -209,27 +201,23 @@ public class TestFairCallQueue extends TestCase {
}
public void testFirstQueueFullRemainingCapacity() {
fcq.setScheduler(alwaysZeroScheduler);
while (fcq.offer(mockCall("c"))) ; // Queue 0 will fill up first, then queue 1
assertEquals(5, fcq.remainingCapacity());
}
public void testAllQueuesFullRemainingCapacity() {
RpcScheduler sched = mock(RpcScheduler.class);
when(sched.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(0, 0, 0, 0, 0, 1, 1, 1, 1, 1);
fcq.setScheduler(sched);
while (fcq.offer(mockCall("c"))) ;
int[] mockedPriorities = {0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0};
int i = 0;
while (fcq.offer(mockCall("c", mockedPriorities[i++]))) ;
assertEquals(0, fcq.remainingCapacity());
assertEquals(10, fcq.size());
}
public void testQueuesPartialFilledRemainingCapacity() {
RpcScheduler sched = mock(RpcScheduler.class);
when(sched.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(0, 1, 0, 1, 0);
fcq.setScheduler(sched);
for (int i = 0; i < 5; i++) { fcq.offer(mockCall("c")); }
int[] mockedPriorities = {0, 1, 0, 1, 0};
for (int i = 0; i < 5; i++) { fcq.offer(mockCall("c", mockedPriorities[i])); }
assertEquals(5, fcq.remainingCapacity());
assertEquals(5, fcq.size());
@ -351,16 +339,12 @@ public class TestFairCallQueue extends TestCase {
// Make sure put will overflow into lower queues when the top is full
public void testPutOverflows() throws InterruptedException {
fcq.setScheduler(alwaysZeroScheduler);
// We can fit more than 5, even though the scheduler suggests the top queue
assertCanPut(fcq, 8, 8);
assertEquals(8, fcq.size());
}
public void testPutBlocksWhenAllFull() throws InterruptedException {
fcq.setScheduler(alwaysZeroScheduler);
assertCanPut(fcq, 10, 10); // Fill up
assertEquals(10, fcq.size());
@ -369,12 +353,10 @@ public class TestFairCallQueue extends TestCase {
}
public void testTakeBlocksWhenEmpty() throws InterruptedException {
fcq.setScheduler(alwaysZeroScheduler);
assertCanTake(fcq, 0, 1);
}
public void testTakeRemovesCall() throws InterruptedException {
fcq.setScheduler(alwaysZeroScheduler);
Schedulable call = mockCall("c");
fcq.offer(call);
@ -383,17 +365,14 @@ public class TestFairCallQueue extends TestCase {
}
public void testTakeTriesNextQueue() throws InterruptedException {
// Make a FCQ filled with calls in q 1 but empty in q 0
RpcScheduler q1Scheduler = mock(RpcScheduler.class);
when(q1Scheduler.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(1);
fcq.setScheduler(q1Scheduler);
// A mux which only draws from q 0
RpcMultiplexer q0mux = mock(RpcMultiplexer.class);
when(q0mux.getAndAdvanceCurrentIndex()).thenReturn(0);
fcq.setMultiplexer(q0mux);
Schedulable call = mockCall("c");
// Make a FCQ filled with calls in q 1 but empty in q 0
Schedulable call = mockCall("c", 1);
fcq.put(call);
// Take from q1 even though mux said q0, since q0 empty

View File

@ -19,25 +19,15 @@
package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import java.util.List;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.conf.Configuration;
@ -55,16 +45,20 @@ public class TestIdentityProviders {
}
}
@Override
public int getPriorityLevel() {
return 0;
}
}
@Test
public void testPluggableIdentityProvider() {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
conf.set(CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
"org.apache.hadoop.ipc.UserIdentityProvider");
List<IdentityProvider> providers = conf.getInstances(
CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
IdentityProvider.class);
assertTrue(providers.size() == 1);

View File

@ -43,8 +43,10 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.MockitoUtil;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -968,7 +970,7 @@ public class TestRPC extends TestRpcBase {
}
/**
* Test RPC backoff.
* Test RPC backoff by queue full.
*/
@Test (timeout=30000)
public void testClientBackOff() throws Exception {
@ -981,7 +983,7 @@ public class TestRPC extends TestRpcBase {
final ExecutorService executorService =
Executors.newFixedThreadPool(numClients);
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE +
conf.setBoolean(CommonConfigurationKeys.IPC_NAMESPACE +
".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
RPC.Builder builder = newServerBuilder(conf)
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
@ -1030,6 +1032,92 @@ public class TestRPC extends TestRpcBase {
assertTrue("RetriableException not received", succeeded);
}
/**
* Test RPC backoff by response time of each priority level.
*/
@Test (timeout=30000)
public void testClientBackOffByResponseTime() throws Exception {
Server server;
final TestRpcService proxy;
boolean succeeded = false;
final int numClients = 1;
final int queueSizePerHandler = 3;
GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG);
final List<Future<Void>> res = new ArrayList<Future<Void>>();
final ExecutorService executorService =
Executors.newFixedThreadPool(numClients);
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0.";
conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY,
"org.apache.hadoop.ipc.FairCallQueue");
conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
"org.apache.hadoop.ipc.DecayRpcScheduler");
conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
2);
conf.setBoolean(ns +
DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
true);
// set a small thresholds 2s and 4s for level 0 and level 1 for testing
conf.set(ns +
DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY
, "2s, 4s");
// Set max queue size to 3 so that 2 calls from the test won't trigger
// back off because the queue is full.
RPC.Builder builder = newServerBuilder(conf)
.setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1)
.setVerbose(true);
server = setupTestServer(builder);
@SuppressWarnings("unchecked")
CallQueueManager<Call> spy = spy((CallQueueManager<Call>) Whitebox
.getInternalState(server, "callQueue"));
Whitebox.setInternalState(server, "callQueue", spy);
Exception lastException = null;
proxy = getClient(addr, conf);
try {
// start a sleep RPC call that sleeps 3s.
for (int i = 0; i < numClients; i++) {
res.add(executorService.submit(
new Callable<Void>() {
@Override
public Void call() throws ServiceException, InterruptedException {
proxy.sleep(null, newSleepRequest(3000));
return null;
}
}));
verify(spy, timeout(500).times(i + 1)).offer(Mockito.<Call>anyObject());
}
// Start another sleep RPC call and verify the call is backed off due to
// avg response time(3s) exceeds threshold (2s).
try {
// wait for the 1st response time update
Thread.sleep(5500);
proxy.sleep(null, newSleepRequest(100));
} catch (ServiceException e) {
RemoteException re = (RemoteException) e.getCause();
IOException unwrapExeption = re.unwrapRemoteException();
if (unwrapExeption instanceof RetriableException) {
succeeded = true;
} else {
lastException = unwrapExeption;
}
}
} finally {
executorService.shutdown();
stop(server, proxy);
}
if (lastException != null) {
LOG.error("Last received non-RetriableException:", lastException);
}
assertTrue("RetriableException not received", succeeded);
}
/**
* Test RPC timeout.
*/