diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 3443d0394ad..ec87c753f58 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ipc; import java.lang.ref.WeakReference; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -125,12 +126,17 @@ public class DecayRpcScheduler implements RpcScheduler, public static final Logger LOG = LoggerFactory.getLogger(DecayRpcScheduler.class); - // Track the number of calls for each schedulable identity - private final ConcurrentHashMap callCounts = - new ConcurrentHashMap(); + // Track the decayed and raw (no decay) number of calls for each schedulable + // identity from all previous decay windows: idx 0 for decayed call count and + // idx 1 for the raw call count + private final ConcurrentHashMap> callCounts = + new ConcurrentHashMap>(); + + // Should be the sum of all AtomicLongs in decayed callCounts + private final AtomicLong totalDecayedCallCount = new AtomicLong(); + // The sum of all AtomicLongs in raw callCounts + private final AtomicLong totalRawCallCount = new AtomicLong(); - // Should be the sum of all AtomicLongs in callCounts - private final AtomicLong totalCalls = new AtomicLong(); // Track total call count and response time in current decay window private final AtomicLongArray responseTimeCountInCurrWindow; @@ -155,6 +161,7 @@ public class DecayRpcScheduler implements RpcScheduler, private final long[] backOffResponseTimeThresholds; private final String namespace; private final int topUsersCount; // e.g., report top 10 users' metrics + private static final double PRECISION = 0.0001; /** * This TimerTask will call decayCurrentCounts until @@ -380,19 +387,23 @@ public class DecayRpcScheduler implements RpcScheduler, */ private void decayCurrentCounts() { try { - long total = 0; - Iterator> it = + long totalDecayedCount = 0; + long totalRawCount = 0; + Iterator>> it = callCounts.entrySet().iterator(); while (it.hasNext()) { - Map.Entry entry = it.next(); - AtomicLong count = entry.getValue(); + Map.Entry> entry = it.next(); + AtomicLong decayedCount = entry.getValue().get(0); + AtomicLong rawCount = entry.getValue().get(1); + // Compute the next value by reducing it by the decayFactor - long currentValue = count.get(); + totalRawCount += rawCount.get(); + long currentValue = decayedCount.get(); long nextValue = (long) (currentValue * decayFactor); - total += nextValue; - count.set(nextValue); + totalDecayedCount += nextValue; + decayedCount.set(nextValue); if (nextValue == 0) { // We will clean up unused keys here. An interesting optimization @@ -403,7 +414,8 @@ public class DecayRpcScheduler implements RpcScheduler, } // Update the total so that we remain in sync - totalCalls.set(total); + totalDecayedCallCount.set(totalDecayedCount); + totalRawCallCount.set(totalRawCount); // Now refresh the cache of scheduling decisions recomputeScheduleCache(); @@ -423,9 +435,9 @@ public class DecayRpcScheduler implements RpcScheduler, private void recomputeScheduleCache() { Map nextCache = new HashMap(); - for (Map.Entry entry : callCounts.entrySet()) { + for (Map.Entry> entry : callCounts.entrySet()) { Object id = entry.getKey(); - AtomicLong value = entry.getValue(); + AtomicLong value = entry.getValue().get(0); long snapshot = value.get(); int computedLevel = computePriorityLevel(snapshot); @@ -442,27 +454,34 @@ public class DecayRpcScheduler implements RpcScheduler, * @param identity the identity of the user to increment * @return the value before incrementation */ - private long getAndIncrement(Object identity) throws InterruptedException { + private long getAndIncrementCallCounts(Object identity) + throws InterruptedException { // We will increment the count, or create it if no such count exists - AtomicLong count = this.callCounts.get(identity); + List count = this.callCounts.get(identity); if (count == null) { - // Create the count since no such count exists. - count = new AtomicLong(0); + // Create the counts since no such count exists. + // idx 0 for decayed call count + // idx 1 for the raw call count + count = new ArrayList(2); + count.add(new AtomicLong(0)); + count.add(new AtomicLong(0)); // Put it in, or get the AtomicInteger that was put in by another thread - AtomicLong otherCount = callCounts.putIfAbsent(identity, count); + List otherCount = callCounts.putIfAbsent(identity, count); if (otherCount != null) { count = otherCount; } } // Update the total - totalCalls.getAndIncrement(); + totalDecayedCallCount.getAndIncrement(); + totalRawCallCount.getAndIncrement(); // At this point value is guaranteed to be not null. It may however have // been clobbered from callCounts. Nonetheless, we return what // we have. - return count.getAndIncrement(); + count.get(1).getAndIncrement(); + return count.get(0).getAndIncrement(); } /** @@ -471,7 +490,7 @@ public class DecayRpcScheduler implements RpcScheduler, * @return scheduling decision from 0 to numLevels - 1 */ private int computePriorityLevel(long occurrences) { - long totalCallSnapshot = totalCalls.get(); + long totalCallSnapshot = totalDecayedCallCount.get(); double proportion = 0; if (totalCallSnapshot > 0) { @@ -497,7 +516,7 @@ public class DecayRpcScheduler implements RpcScheduler, */ private int cachedOrComputedPriorityLevel(Object identity) { try { - long occurrences = this.getAndIncrement(identity); + long occurrences = this.getAndIncrementCallCounts(identity); // Try the cache Map scheduleCache = scheduleCacheRef.get(); @@ -580,7 +599,7 @@ public class DecayRpcScheduler implements RpcScheduler, } } - // Update the cached average response time at the end of decay window + // Update the cached average response time at the end of the decay window void updateAverageResponseTime(boolean enableDecay) { for (int i = 0; i < numLevels; i++) { double averageResponseTime = 0; @@ -590,11 +609,13 @@ public class DecayRpcScheduler implements RpcScheduler, averageResponseTime = (double) totalResponseTime / responseTimeCount; } final double lastAvg = responseTimeAvgInLastWindow.get(i); - if (enableDecay && lastAvg > 0.0) { - final double decayed = decayFactor * lastAvg + averageResponseTime; - responseTimeAvgInLastWindow.set(i, decayed); - } else { - responseTimeAvgInLastWindow.set(i, averageResponseTime); + if (lastAvg > PRECISION || averageResponseTime > PRECISION) { + if (enableDecay) { + final double decayed = decayFactor * lastAvg + averageResponseTime; + responseTimeAvgInLastWindow.set(i, decayed); + } else { + responseTimeAvgInLastWindow.set(i, averageResponseTime); + } } responseTimeCountInLastWindow.set(i, responseTimeCount); if (LOG.isDebugEnabled()) { @@ -624,8 +645,8 @@ public class DecayRpcScheduler implements RpcScheduler, public Map getCallCountSnapshot() { HashMap snapshot = new HashMap(); - for (Map.Entry entry : callCounts.entrySet()) { - snapshot.put(entry.getKey(), entry.getValue().get()); + for (Map.Entry> entry : callCounts.entrySet()) { + snapshot.put(entry.getKey(), entry.getValue().get(0).get()); } return Collections.unmodifiableMap(snapshot); @@ -633,7 +654,7 @@ public class DecayRpcScheduler implements RpcScheduler, @VisibleForTesting public long getTotalCallSnapshot() { - return totalCalls.get(); + return totalDecayedCallCount.get(); } /** @@ -750,7 +771,11 @@ public class DecayRpcScheduler implements RpcScheduler, } public long getTotalCallVolume() { - return totalCalls.get(); + return totalDecayedCallCount.get(); + } + + public long getTotalRawCallVolume() { + return totalRawCallCount.get(); } public long[] getResponseTimeCountInLastWindow() { @@ -776,11 +801,12 @@ public class DecayRpcScheduler implements RpcScheduler, try { MetricsRecordBuilder rb = collector.addRecord(getClass().getName()) .setContext(namespace); - addTotalCallVolume(rb); + addDecayedCallVolume(rb); addUniqueIdentityCount(rb); addTopNCallerSummary(rb); addAvgResponseTimePerPriority(rb); addCallVolumePerPriority(rb); + addRawCallVolume(rb); } catch (Exception e) { LOG.warn("Exception thrown while metric collection. Exception : " + e.getMessage()); @@ -793,16 +819,22 @@ public class DecayRpcScheduler implements RpcScheduler, getUniqueIdentityCount()); } - // Key: CallVolume - private void addTotalCallVolume(MetricsRecordBuilder rb) { - rb.addCounter(Interns.info("CallVolume", "Total Call Volume"), - getTotalCallVolume()); + // Key: DecayedCallVolume + private void addDecayedCallVolume(MetricsRecordBuilder rb) { + rb.addCounter(Interns.info("DecayedCallVolume", "Decayed Total " + + "incoming Call Volume"), getTotalCallVolume()); } - // Key: Priority.0.CallVolume + private void addRawCallVolume(MetricsRecordBuilder rb) { + rb.addCounter(Interns.info("CallVolume", "Raw Total " + + "incoming Call Volume"), getTotalRawCallVolume()); + } + + // Key: Priority.0.CompletedCallVolume private void addCallVolumePerPriority(MetricsRecordBuilder rb) { for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) { - rb.addGauge(Interns.info("Priority." + i + ".CallVolume", "Call volume " + + rb.addGauge(Interns.info("Priority." + i + ".CompletedCallVolume", + "Completed Call volume " + "of priority "+ i), responseTimeCountInLastWindow.get(i)); } } @@ -816,16 +848,14 @@ public class DecayRpcScheduler implements RpcScheduler, } } - // Key: Top.0.Caller(xyz).Volume and Top.0.Caller(xyz).Priority + // Key: Caller(xyz).Volume and Caller(xyz).Priority private void addTopNCallerSummary(MetricsRecordBuilder rb) { - final int topCallerCount = 10; - TopN topNCallers = getTopCallers(topCallerCount); + TopN topNCallers = getTopCallers(topUsersCount); Map decisions = scheduleCacheRef.get(); final int actualCallerCount = topNCallers.size(); for (int i = 0; i < actualCallerCount; i++) { NameValuePair entry = topNCallers.poll(); - String topCaller = "Top." + (actualCallerCount - i) + "." + - "Caller(" + entry.getName() + ")"; + String topCaller = "Caller(" + entry.getName() + ")"; String topCallerVolume = topCaller + ".Volume"; String topCallerPriority = topCaller + ".Priority"; rb.addCounter(Interns.info(topCallerVolume, topCallerVolume), @@ -838,15 +868,15 @@ public class DecayRpcScheduler implements RpcScheduler, } } - // Get the top N callers' call count and scheduler decision + // Get the top N callers' raw call count and scheduler decision private TopN getTopCallers(int n) { TopN topNCallers = new TopN(n); - Iterator> it = + Iterator>> it = callCounts.entrySet().iterator(); while (it.hasNext()) { - Map.Entry entry = it.next(); + Map.Entry> entry = it.next(); String caller = entry.getKey().toString(); - Long count = entry.getValue().get(); + Long count = entry.getValue().get(1).get(); if (count > 0) { topNCallers.offer(new NameValuePair(caller, count)); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 4a869fde9b1..dbc9430bdf9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1029,7 +1029,6 @@ public class TestRPC extends TestRpcBase { final TestRpcService proxy; boolean succeeded = false; final int numClients = 1; - final int queueSizePerHandler = 3; GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG); @@ -1052,7 +1051,10 @@ public class TestRPC extends TestRpcBase { MetricsRecordBuilder rb1 = getMetrics("DecayRpcSchedulerMetrics2." + ns); - final long beginCallVolume = MetricsAsserts.getLongCounter("CallVolume", rb1); + final long beginDecayedCallVolume = MetricsAsserts.getLongCounter( + "DecayedCallVolume", rb1); + final long beginRawCallVolume = MetricsAsserts.getLongCounter( + "CallVolume", rb1); final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers", rb1); @@ -1090,27 +1092,32 @@ public class TestRPC extends TestRpcBase { public Boolean get() { MetricsRecordBuilder rb2 = getMetrics("DecayRpcSchedulerMetrics2." + ns); - long callVolume1 = MetricsAsserts.getLongCounter("CallVolume", rb2); - int uniqueCaller1 = MetricsAsserts.getIntCounter("UniqueCallers", - rb2); + long decayedCallVolume1 = MetricsAsserts.getLongCounter( + "DecayedCallVolume", rb2); + long rawCallVolume1 = MetricsAsserts.getLongCounter( + "CallVolume", rb2); + int uniqueCaller1 = MetricsAsserts.getIntCounter( + "UniqueCallers", rb2); long callVolumePriority0 = MetricsAsserts.getLongGauge( - "Priority.0.CallVolume", rb2); + "Priority.0.CompletedCallVolume", rb2); long callVolumePriority1 = MetricsAsserts.getLongGauge( - "Priority.1.CallVolume", rb2); + "Priority.1.CompletedCallVolume", rb2); double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge( "Priority.0.AvgResponseTime", rb2); double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge( "Priority.1.AvgResponseTime", rb2); - LOG.info("CallVolume1: " + callVolume1); + LOG.info("DecayedCallVolume: " + decayedCallVolume1); + LOG.info("CallVolume: " + rawCallVolume1); LOG.info("UniqueCaller: " + uniqueCaller1); - LOG.info("Priority.0.CallVolume: " + callVolumePriority0); - LOG.info("Priority.1.CallVolume: " + callVolumePriority1); + LOG.info("Priority.0.CompletedCallVolume: " + callVolumePriority0); + LOG.info("Priority.1.CompletedCallVolume: " + callVolumePriority1); LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0); LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1); - return callVolume1 > beginCallVolume - && uniqueCaller1 > beginUniqueCaller; + return decayedCallVolume1 > beginDecayedCallVolume && + rawCallVolume1 > beginRawCallVolume && + uniqueCaller1 > beginUniqueCaller; } }, 30, 60000); }