HADOOP-13197. Add non-decayed call metrics for DecayRpcScheduler. Contributed by Xiaoyu Yao.

(cherry picked from commit 4ca8859583)
This commit is contained in:
Xiaoyu Yao 2016-05-23 16:59:53 -07:00
parent de28ca1e92
commit 001c8f5d7b
2 changed files with 99 additions and 62 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -125,12 +126,17 @@ public class DecayRpcScheduler implements RpcScheduler,
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(DecayRpcScheduler.class); LoggerFactory.getLogger(DecayRpcScheduler.class);
// Track the number of calls for each schedulable identity // Track the decayed and raw (no decay) number of calls for each schedulable
private final ConcurrentHashMap<Object, AtomicLong> callCounts = // identity from all previous decay windows: idx 0 for decayed call count and
new ConcurrentHashMap<Object, AtomicLong>(); // idx 1 for the raw call count
private final ConcurrentHashMap<Object, List<AtomicLong>> callCounts =
new ConcurrentHashMap<Object, List<AtomicLong>>();
// Should be the sum of all AtomicLongs in decayed callCounts
private final AtomicLong totalDecayedCallCount = new AtomicLong();
// The sum of all AtomicLongs in raw callCounts
private final AtomicLong totalRawCallCount = new AtomicLong();
// Should be the sum of all AtomicLongs in callCounts
private final AtomicLong totalCalls = new AtomicLong();
// Track total call count and response time in current decay window // Track total call count and response time in current decay window
private final AtomicLongArray responseTimeCountInCurrWindow; private final AtomicLongArray responseTimeCountInCurrWindow;
@ -155,6 +161,7 @@ public class DecayRpcScheduler implements RpcScheduler,
private final long[] backOffResponseTimeThresholds; private final long[] backOffResponseTimeThresholds;
private final String namespace; private final String namespace;
private final int topUsersCount; // e.g., report top 10 users' metrics private final int topUsersCount; // e.g., report top 10 users' metrics
private static final double PRECISION = 0.0001;
/** /**
* This TimerTask will call decayCurrentCounts until * This TimerTask will call decayCurrentCounts until
@ -380,19 +387,23 @@ public class DecayRpcScheduler implements RpcScheduler,
*/ */
private void decayCurrentCounts() { private void decayCurrentCounts() {
try { try {
long total = 0; long totalDecayedCount = 0;
Iterator<Map.Entry<Object, AtomicLong>> it = long totalRawCount = 0;
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
callCounts.entrySet().iterator(); callCounts.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Map.Entry<Object, AtomicLong> entry = it.next(); Map.Entry<Object, List<AtomicLong>> entry = it.next();
AtomicLong count = entry.getValue(); AtomicLong decayedCount = entry.getValue().get(0);
AtomicLong rawCount = entry.getValue().get(1);
// Compute the next value by reducing it by the decayFactor // Compute the next value by reducing it by the decayFactor
long currentValue = count.get(); totalRawCount += rawCount.get();
long currentValue = decayedCount.get();
long nextValue = (long) (currentValue * decayFactor); long nextValue = (long) (currentValue * decayFactor);
total += nextValue; totalDecayedCount += nextValue;
count.set(nextValue); decayedCount.set(nextValue);
if (nextValue == 0) { if (nextValue == 0) {
// We will clean up unused keys here. An interesting optimization // We will clean up unused keys here. An interesting optimization
@ -403,7 +414,8 @@ public class DecayRpcScheduler implements RpcScheduler,
} }
// Update the total so that we remain in sync // Update the total so that we remain in sync
totalCalls.set(total); totalDecayedCallCount.set(totalDecayedCount);
totalRawCallCount.set(totalRawCount);
// Now refresh the cache of scheduling decisions // Now refresh the cache of scheduling decisions
recomputeScheduleCache(); recomputeScheduleCache();
@ -423,9 +435,9 @@ public class DecayRpcScheduler implements RpcScheduler,
private void recomputeScheduleCache() { private void recomputeScheduleCache() {
Map<Object, Integer> nextCache = new HashMap<Object, Integer>(); Map<Object, Integer> nextCache = new HashMap<Object, Integer>();
for (Map.Entry<Object, AtomicLong> entry : callCounts.entrySet()) { for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
Object id = entry.getKey(); Object id = entry.getKey();
AtomicLong value = entry.getValue(); AtomicLong value = entry.getValue().get(0);
long snapshot = value.get(); long snapshot = value.get();
int computedLevel = computePriorityLevel(snapshot); int computedLevel = computePriorityLevel(snapshot);
@ -442,27 +454,34 @@ public class DecayRpcScheduler implements RpcScheduler,
* @param identity the identity of the user to increment * @param identity the identity of the user to increment
* @return the value before incrementation * @return the value before incrementation
*/ */
private long getAndIncrement(Object identity) throws InterruptedException { private long getAndIncrementCallCounts(Object identity)
throws InterruptedException {
// We will increment the count, or create it if no such count exists // We will increment the count, or create it if no such count exists
AtomicLong count = this.callCounts.get(identity); List<AtomicLong> count = this.callCounts.get(identity);
if (count == null) { if (count == null) {
// Create the count since no such count exists. // Create the counts since no such count exists.
count = new AtomicLong(0); // idx 0 for decayed call count
// idx 1 for the raw call count
count = new ArrayList<AtomicLong>(2);
count.add(new AtomicLong(0));
count.add(new AtomicLong(0));
// Put it in, or get the AtomicInteger that was put in by another thread // Put it in, or get the AtomicInteger that was put in by another thread
AtomicLong otherCount = callCounts.putIfAbsent(identity, count); List<AtomicLong> otherCount = callCounts.putIfAbsent(identity, count);
if (otherCount != null) { if (otherCount != null) {
count = otherCount; count = otherCount;
} }
} }
// Update the total // Update the total
totalCalls.getAndIncrement(); totalDecayedCallCount.getAndIncrement();
totalRawCallCount.getAndIncrement();
// At this point value is guaranteed to be not null. It may however have // At this point value is guaranteed to be not null. It may however have
// been clobbered from callCounts. Nonetheless, we return what // been clobbered from callCounts. Nonetheless, we return what
// we have. // we have.
return count.getAndIncrement(); count.get(1).getAndIncrement();
return count.get(0).getAndIncrement();
} }
/** /**
@ -471,7 +490,7 @@ public class DecayRpcScheduler implements RpcScheduler,
* @return scheduling decision from 0 to numLevels - 1 * @return scheduling decision from 0 to numLevels - 1
*/ */
private int computePriorityLevel(long occurrences) { private int computePriorityLevel(long occurrences) {
long totalCallSnapshot = totalCalls.get(); long totalCallSnapshot = totalDecayedCallCount.get();
double proportion = 0; double proportion = 0;
if (totalCallSnapshot > 0) { if (totalCallSnapshot > 0) {
@ -497,7 +516,7 @@ public class DecayRpcScheduler implements RpcScheduler,
*/ */
private int cachedOrComputedPriorityLevel(Object identity) { private int cachedOrComputedPriorityLevel(Object identity) {
try { try {
long occurrences = this.getAndIncrement(identity); long occurrences = this.getAndIncrementCallCounts(identity);
// Try the cache // Try the cache
Map<Object, Integer> scheduleCache = scheduleCacheRef.get(); Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
@ -580,7 +599,7 @@ public class DecayRpcScheduler implements RpcScheduler,
} }
} }
// Update the cached average response time at the end of decay window // Update the cached average response time at the end of the decay window
void updateAverageResponseTime(boolean enableDecay) { void updateAverageResponseTime(boolean enableDecay) {
for (int i = 0; i < numLevels; i++) { for (int i = 0; i < numLevels; i++) {
double averageResponseTime = 0; double averageResponseTime = 0;
@ -590,11 +609,13 @@ public class DecayRpcScheduler implements RpcScheduler,
averageResponseTime = (double) totalResponseTime / responseTimeCount; averageResponseTime = (double) totalResponseTime / responseTimeCount;
} }
final double lastAvg = responseTimeAvgInLastWindow.get(i); final double lastAvg = responseTimeAvgInLastWindow.get(i);
if (enableDecay && lastAvg > 0.0) { if (lastAvg > PRECISION || averageResponseTime > PRECISION) {
final double decayed = decayFactor * lastAvg + averageResponseTime; if (enableDecay) {
responseTimeAvgInLastWindow.set(i, decayed); final double decayed = decayFactor * lastAvg + averageResponseTime;
} else { responseTimeAvgInLastWindow.set(i, decayed);
responseTimeAvgInLastWindow.set(i, averageResponseTime); } else {
responseTimeAvgInLastWindow.set(i, averageResponseTime);
}
} }
responseTimeCountInLastWindow.set(i, responseTimeCount); responseTimeCountInLastWindow.set(i, responseTimeCount);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -624,8 +645,8 @@ public class DecayRpcScheduler implements RpcScheduler,
public Map<Object, Long> getCallCountSnapshot() { public Map<Object, Long> getCallCountSnapshot() {
HashMap<Object, Long> snapshot = new HashMap<Object, Long>(); HashMap<Object, Long> snapshot = new HashMap<Object, Long>();
for (Map.Entry<Object, AtomicLong> entry : callCounts.entrySet()) { for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
snapshot.put(entry.getKey(), entry.getValue().get()); snapshot.put(entry.getKey(), entry.getValue().get(0).get());
} }
return Collections.unmodifiableMap(snapshot); return Collections.unmodifiableMap(snapshot);
@ -633,7 +654,7 @@ public class DecayRpcScheduler implements RpcScheduler,
@VisibleForTesting @VisibleForTesting
public long getTotalCallSnapshot() { public long getTotalCallSnapshot() {
return totalCalls.get(); return totalDecayedCallCount.get();
} }
/** /**
@ -750,7 +771,11 @@ public class DecayRpcScheduler implements RpcScheduler,
} }
public long getTotalCallVolume() { public long getTotalCallVolume() {
return totalCalls.get(); return totalDecayedCallCount.get();
}
public long getTotalRawCallVolume() {
return totalRawCallCount.get();
} }
public long[] getResponseTimeCountInLastWindow() { public long[] getResponseTimeCountInLastWindow() {
@ -776,11 +801,12 @@ public class DecayRpcScheduler implements RpcScheduler,
try { try {
MetricsRecordBuilder rb = collector.addRecord(getClass().getName()) MetricsRecordBuilder rb = collector.addRecord(getClass().getName())
.setContext(namespace); .setContext(namespace);
addTotalCallVolume(rb); addDecayedCallVolume(rb);
addUniqueIdentityCount(rb); addUniqueIdentityCount(rb);
addTopNCallerSummary(rb); addTopNCallerSummary(rb);
addAvgResponseTimePerPriority(rb); addAvgResponseTimePerPriority(rb);
addCallVolumePerPriority(rb); addCallVolumePerPriority(rb);
addRawCallVolume(rb);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Exception thrown while metric collection. Exception : " LOG.warn("Exception thrown while metric collection. Exception : "
+ e.getMessage()); + e.getMessage());
@ -793,16 +819,22 @@ public class DecayRpcScheduler implements RpcScheduler,
getUniqueIdentityCount()); getUniqueIdentityCount());
} }
// Key: CallVolume // Key: DecayedCallVolume
private void addTotalCallVolume(MetricsRecordBuilder rb) { private void addDecayedCallVolume(MetricsRecordBuilder rb) {
rb.addCounter(Interns.info("CallVolume", "Total Call Volume"), rb.addCounter(Interns.info("DecayedCallVolume", "Decayed Total " +
getTotalCallVolume()); "incoming Call Volume"), getTotalCallVolume());
} }
// Key: Priority.0.CallVolume private void addRawCallVolume(MetricsRecordBuilder rb) {
rb.addCounter(Interns.info("CallVolume", "Raw Total " +
"incoming Call Volume"), getTotalRawCallVolume());
}
// Key: Priority.0.CompletedCallVolume
private void addCallVolumePerPriority(MetricsRecordBuilder rb) { private void addCallVolumePerPriority(MetricsRecordBuilder rb) {
for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) { for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {
rb.addGauge(Interns.info("Priority." + i + ".CallVolume", "Call volume " + rb.addGauge(Interns.info("Priority." + i + ".CompletedCallVolume",
"Completed Call volume " +
"of priority "+ i), responseTimeCountInLastWindow.get(i)); "of priority "+ i), responseTimeCountInLastWindow.get(i));
} }
} }
@ -816,16 +848,14 @@ public class DecayRpcScheduler implements RpcScheduler,
} }
} }
// Key: Top.0.Caller(xyz).Volume and Top.0.Caller(xyz).Priority // Key: Caller(xyz).Volume and Caller(xyz).Priority
private void addTopNCallerSummary(MetricsRecordBuilder rb) { private void addTopNCallerSummary(MetricsRecordBuilder rb) {
final int topCallerCount = 10; TopN topNCallers = getTopCallers(topUsersCount);
TopN topNCallers = getTopCallers(topCallerCount);
Map<Object, Integer> decisions = scheduleCacheRef.get(); Map<Object, Integer> decisions = scheduleCacheRef.get();
final int actualCallerCount = topNCallers.size(); final int actualCallerCount = topNCallers.size();
for (int i = 0; i < actualCallerCount; i++) { for (int i = 0; i < actualCallerCount; i++) {
NameValuePair entry = topNCallers.poll(); NameValuePair entry = topNCallers.poll();
String topCaller = "Top." + (actualCallerCount - i) + "." + String topCaller = "Caller(" + entry.getName() + ")";
"Caller(" + entry.getName() + ")";
String topCallerVolume = topCaller + ".Volume"; String topCallerVolume = topCaller + ".Volume";
String topCallerPriority = topCaller + ".Priority"; String topCallerPriority = topCaller + ".Priority";
rb.addCounter(Interns.info(topCallerVolume, topCallerVolume), rb.addCounter(Interns.info(topCallerVolume, topCallerVolume),
@ -838,15 +868,15 @@ public class DecayRpcScheduler implements RpcScheduler,
} }
} }
// Get the top N callers' call count and scheduler decision // Get the top N callers' raw call count and scheduler decision
private TopN getTopCallers(int n) { private TopN getTopCallers(int n) {
TopN topNCallers = new TopN(n); TopN topNCallers = new TopN(n);
Iterator<Map.Entry<Object, AtomicLong>> it = Iterator<Map.Entry<Object, List<AtomicLong>>> it =
callCounts.entrySet().iterator(); callCounts.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Map.Entry<Object, AtomicLong> entry = it.next(); Map.Entry<Object, List<AtomicLong>> entry = it.next();
String caller = entry.getKey().toString(); String caller = entry.getKey().toString();
Long count = entry.getValue().get(); Long count = entry.getValue().get(1).get();
if (count > 0) { if (count > 0) {
topNCallers.offer(new NameValuePair(caller, count)); topNCallers.offer(new NameValuePair(caller, count));
} }

View File

@ -1041,7 +1041,6 @@ public class TestRPC extends TestRpcBase {
final TestRpcService proxy; final TestRpcService proxy;
boolean succeeded = false; boolean succeeded = false;
final int numClients = 1; final int numClients = 1;
final int queueSizePerHandler = 3;
GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG);
@ -1064,7 +1063,10 @@ public class TestRPC extends TestRpcBase {
MetricsRecordBuilder rb1 = MetricsRecordBuilder rb1 =
getMetrics("DecayRpcSchedulerMetrics2." + ns); getMetrics("DecayRpcSchedulerMetrics2." + ns);
final long beginCallVolume = MetricsAsserts.getLongCounter("CallVolume", rb1); final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
"DecayedCallVolume", rb1);
final long beginRawCallVolume = MetricsAsserts.getLongCounter(
"CallVolume", rb1);
final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers", final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
rb1); rb1);
@ -1102,27 +1104,32 @@ public class TestRPC extends TestRpcBase {
public Boolean get() { public Boolean get() {
MetricsRecordBuilder rb2 = MetricsRecordBuilder rb2 =
getMetrics("DecayRpcSchedulerMetrics2." + ns); getMetrics("DecayRpcSchedulerMetrics2." + ns);
long callVolume1 = MetricsAsserts.getLongCounter("CallVolume", rb2); long decayedCallVolume1 = MetricsAsserts.getLongCounter(
int uniqueCaller1 = MetricsAsserts.getIntCounter("UniqueCallers", "DecayedCallVolume", rb2);
rb2); long rawCallVolume1 = MetricsAsserts.getLongCounter(
"CallVolume", rb2);
int uniqueCaller1 = MetricsAsserts.getIntCounter(
"UniqueCallers", rb2);
long callVolumePriority0 = MetricsAsserts.getLongGauge( long callVolumePriority0 = MetricsAsserts.getLongGauge(
"Priority.0.CallVolume", rb2); "Priority.0.CompletedCallVolume", rb2);
long callVolumePriority1 = MetricsAsserts.getLongGauge( long callVolumePriority1 = MetricsAsserts.getLongGauge(
"Priority.1.CallVolume", rb2); "Priority.1.CompletedCallVolume", rb2);
double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge( double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
"Priority.0.AvgResponseTime", rb2); "Priority.0.AvgResponseTime", rb2);
double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge( double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
"Priority.1.AvgResponseTime", rb2); "Priority.1.AvgResponseTime", rb2);
LOG.info("CallVolume1: " + callVolume1); LOG.info("DecayedCallVolume: " + decayedCallVolume1);
LOG.info("CallVolume: " + rawCallVolume1);
LOG.info("UniqueCaller: " + uniqueCaller1); LOG.info("UniqueCaller: " + uniqueCaller1);
LOG.info("Priority.0.CallVolume: " + callVolumePriority0); LOG.info("Priority.0.CompletedCallVolume: " + callVolumePriority0);
LOG.info("Priority.1.CallVolume: " + callVolumePriority1); LOG.info("Priority.1.CompletedCallVolume: " + callVolumePriority1);
LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0); LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0);
LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1); LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1);
return callVolume1 > beginCallVolume return decayedCallVolume1 > beginDecayedCallVolume &&
&& uniqueCaller1 > beginUniqueCaller; rawCallVolume1 > beginRawCallVolume &&
uniqueCaller1 > beginUniqueCaller;
} }
}, 30, 60000); }, 30, 60000);
} }