diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 42373395ee0..b39bda26c58 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -32,11 +32,19 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AtomicDoubleArray; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair; +import org.apache.hadoop.metrics2.util.Metrics2Util.TopN; import org.codehaus.jackson.map.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -49,7 +57,8 @@ import org.slf4j.LoggerFactory; * for large periods (on the order of seconds), as it offloads work to the * decay sweep. */ -public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean { +public class DecayRpcScheduler implements RpcScheduler, + DecayRpcSchedulerMXBean, MetricsSource { /** * Period controls how many milliseconds between each decay sweep. */ @@ -107,6 +116,12 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY = "decay-scheduler.backoff.responsetime.thresholds"; + // Specifies the top N user's call count and scheduler decision + // Metrics2 Source + public static final String DECAYSCHEDULER_METRICS_TOP_USER_COUNT = + "decay-scheduler.metrics.top.user.count"; + public static final int DECAYSCHEDULER_METRICS_TOP_USER_COUNT_DEFAULT = 10; + public static final Logger LOG = LoggerFactory.getLogger(DecayRpcScheduler.class); @@ -138,6 +153,8 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean private final IdentityProvider identityProvider; private final boolean backOffByResponseTimeEnabled; private final long[] backOffResponseTimeThresholds; + private final String namespace; + private final int topUsersCount; // e.g., report top 10 users' metrics /** * This TimerTask will call decayCurrentCounts until @@ -179,6 +196,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean "at least 1"); } this.numLevels = numLevels; + this.namespace = ns; this.decayFactor = parseDecayFactor(ns, conf); this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf); this.identityProvider = this.parseIdentityProvider(ns, conf); @@ -199,8 +217,15 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean responseTimeAvgInLastWindow = new AtomicDoubleArray(numLevels); responseTimeCountInLastWindow = new AtomicLongArray(numLevels); + topUsersCount = + conf.getInt(DECAYSCHEDULER_METRICS_TOP_USER_COUNT, + DECAYSCHEDULER_METRICS_TOP_USER_COUNT_DEFAULT); + Preconditions.checkArgument(topUsersCount > 0, + "the number of top users for scheduler metrics must be at least 1"); + MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels); prox.setDelegate(this); + prox.registerMetrics2Source(ns); } // Load configs @@ -615,7 +640,8 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean * MetricsProxy is a singleton because we may init multiple schedulers and we * want to clean up resources when a new scheduler replaces the old one. */ - private static final class MetricsProxy implements DecayRpcSchedulerMXBean { + public static final class MetricsProxy implements DecayRpcSchedulerMXBean, + MetricsSource { // One singleton per namespace private static final HashMap INSTANCES = new HashMap(); @@ -646,6 +672,11 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean this.delegate = new WeakReference(obj); } + void registerMetrics2Source(String namespace) { + final String name = "DecayRpcSchedulerMetrics2." + namespace; + DefaultMetricsSystem.instance().register(name, name, this); + } + @Override public String getSchedulingDecisionSummary() { DecayRpcScheduler scheduler = delegate.get(); @@ -704,6 +735,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean return scheduler.getResponseTimeCountInLastWindow(); } } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + DecayRpcScheduler scheduler = delegate.get(); + if (scheduler != null) { + scheduler.getMetrics(collector, all); + } + } } public int getUniqueIdentityCount() { @@ -731,6 +770,89 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean return ret; } + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + // Metrics2 interface to act as a Metric source + try { + MetricsRecordBuilder rb = collector.addRecord(getClass().getName()) + .setContext(namespace); + addTotalCallVolume(rb); + addUniqueIdentityCount(rb); + addTopNCallerSummary(rb); + addAvgResponseTimePerPriority(rb); + addCallVolumePerPriority(rb); + } catch (Exception e) { + LOG.warn("Exception thrown while metric collection. Exception : " + + e.getMessage()); + } + } + + // Key: UniqueCallers + private void addUniqueIdentityCount(MetricsRecordBuilder rb) { + rb.addCounter(Interns.info("UniqueCallers", "Total unique callers"), + getUniqueIdentityCount()); + } + + // Key: CallVolume + private void addTotalCallVolume(MetricsRecordBuilder rb) { + rb.addCounter(Interns.info("CallVolume", "Total Call Volume"), + getTotalCallVolume()); + } + + // Key: Priority.0.CallVolume + private void addCallVolumePerPriority(MetricsRecordBuilder rb) { + for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) { + rb.addGauge(Interns.info("Priority." + i + ".CallVolume", "Call volume " + + "of priority "+ i), responseTimeCountInLastWindow.get(i)); + } + } + + // Key: Priority.0.AvgResponseTime + private void addAvgResponseTimePerPriority(MetricsRecordBuilder rb) { + for (int i = 0; i < responseTimeAvgInLastWindow.length(); i++) { + rb.addGauge(Interns.info("Priority." + i + ".AvgResponseTime", "Average" + + " response time of priority " + i), + responseTimeAvgInLastWindow.get(i)); + } + } + + // Key: Top.0.Caller(xyz).Volume and Top.0.Caller(xyz).Priority + private void addTopNCallerSummary(MetricsRecordBuilder rb) { + final int topCallerCount = 10; + TopN topNCallers = getTopCallers(topCallerCount); + Map decisions = scheduleCacheRef.get(); + for (int i=0; i < topNCallers.size(); i++) { + NameValuePair entry = topNCallers.poll(); + String topCaller = "Top." + (topCallerCount - i) + "." + + "Caller(" + entry.getName() + ")"; + String topCallerVolume = topCaller + ".Volume"; + String topCallerPriority = topCaller + ".Priority"; + rb.addCounter(Interns.info(topCallerVolume, topCallerVolume), + entry.getValue()); + Integer priority = decisions.get(entry.getName()); + if (priority != null) { + rb.addCounter(Interns.info(topCallerPriority, topCallerPriority), + priority); + } + } + } + + // Get the top N callers' call count and scheduler decision + private TopN getTopCallers(int n) { + TopN topNCallers = new TopN(n); + Iterator> it = + callCounts.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + String caller = entry.getKey().toString(); + Long count = entry.getValue().get(); + if (count > 0) { + topNCallers.offer(new NameValuePair(caller, count)); + } + } + return topNCallers; + } + public String getSchedulingDecisionSummary() { Map decisions = scheduleCacheRef.get(); if (decisions == null) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsUtil.java index e079b4cc932..14a3e331173 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsUtil.java @@ -102,5 +102,4 @@ public class MetricsUtil { } return hostName; } - } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Metrics2Util.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Metrics2Util.java new file mode 100644 index 00000000000..41b86913336 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Metrics2Util.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.util; + +import java.util.PriorityQueue; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Utility class to simplify creation of hadoop metrics2 source/sink. + */ +@InterfaceAudience.Private +public class Metrics2Util { + /** + * A pair of a name and its corresponding value. Defines a custom + * comparator so the TopN PriorityQueue sorts based on the count. + */ + @InterfaceAudience.Private + public static class NameValuePair implements Comparable { + private String name; + private long value; + + public NameValuePair(String metricName, long value) { + this.name = metricName; + this.value = value; + } + + public String getName() { + return name; + } + + public long getValue() { + return value; + } + + @Override + public int compareTo(NameValuePair other) { + return (int) (value - other.value); + } + + @Override + public boolean equals(Object other) { + if (other instanceof NameValuePair) { + return compareTo((NameValuePair)other) == 0; + } + return false; + } + + @Override + public int hashCode() { + return Long.valueOf(value).hashCode(); + } + } + + /** + * A fixed-size priority queue, used to retrieve top-n of offered entries. + */ + @InterfaceAudience.Private + public static class TopN extends PriorityQueue { + private static final long serialVersionUID = 5134028249611535803L; + private int n; // > 0 + private long total = 0; + + public TopN(int n) { + super(n); + this.n = n; + } + + @Override + public boolean offer(NameValuePair entry) { + updateTotal(entry.value); + if (size() == n) { + NameValuePair smallest = peek(); + if (smallest.value >= entry.value) { + return false; + } + poll(); // remove smallest + } + return super.offer(entry); + } + + private void updateTotal(long value) { + total += value; + } + + public long getTotal() { + return total; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index d600e014f81..7c7657f3fdd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ipc; +import com.google.common.base.Supplier; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -1037,7 +1038,6 @@ public class TestRPC extends TestRpcBase { */ @Test (timeout=30000) public void testClientBackOffByResponseTime() throws Exception { - Server server; final TestRpcService proxy; boolean succeeded = false; final int numClients = 1; @@ -1050,28 +1050,9 @@ public class TestRPC extends TestRpcBase { final ExecutorService executorService = Executors.newFixedThreadPool(numClients); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); - final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0."; - conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); - conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, - "org.apache.hadoop.ipc.FairCallQueue"); - conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, - "org.apache.hadoop.ipc.DecayRpcScheduler"); - conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY, - 2); - conf.setBoolean(ns + - DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY, - true); - // set a small thresholds 2s and 4s for level 0 and level 1 for testing - conf.set(ns + - DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY - , "2s, 4s"); - // Set max queue size to 3 so that 2 calls from the test won't trigger - // back off because the queue is full. - RPC.Builder builder = newServerBuilder(conf) - .setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1) - .setVerbose(true); - server = setupTestServer(builder); + final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0"; + Server server = setupDecayRpcSchedulerandTestServer(ns + "."); @SuppressWarnings("unchecked") CallQueueManager spy = spy((CallQueueManager) Whitebox @@ -1080,6 +1061,13 @@ public class TestRPC extends TestRpcBase { Exception lastException = null; proxy = getClient(addr, conf); + + MetricsRecordBuilder rb1 = + getMetrics("DecayRpcSchedulerMetrics2." + ns); + final long beginCallVolume = MetricsAsserts.getLongCounter("CallVolume", rb1); + final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers", + rb1); + try { // start a sleep RPC call that sleeps 3s. for (int i = 0; i < numClients; i++) { @@ -1107,6 +1095,36 @@ public class TestRPC extends TestRpcBase { } else { lastException = unwrapExeption; } + + // Lets Metric system update latest metrics + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + MetricsRecordBuilder rb2 = + getMetrics("DecayRpcSchedulerMetrics2." + ns); + long callVolume1 = MetricsAsserts.getLongCounter("CallVolume", rb2); + int uniqueCaller1 = MetricsAsserts.getIntCounter("UniqueCallers", + rb2); + long callVolumePriority0 = MetricsAsserts.getLongGauge( + "Priority.0.CallVolume", rb2); + long callVolumePriority1 = MetricsAsserts.getLongGauge( + "Priority.1.CallVolume", rb2); + double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge( + "Priority.0.AvgResponseTime", rb2); + double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge( + "Priority.1.AvgResponseTime", rb2); + + LOG.info("CallVolume1: " + callVolume1); + LOG.info("UniqueCaller: " + uniqueCaller1); + LOG.info("Priority.0.CallVolume: " + callVolumePriority0); + LOG.info("Priority.1.CallVolume: " + callVolumePriority1); + LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0); + LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1); + + return callVolume1 > beginCallVolume + && uniqueCaller1 > beginUniqueCaller; + } + }, 30, 60000); } } finally { executorService.shutdown(); @@ -1118,6 +1136,34 @@ public class TestRPC extends TestRpcBase { assertTrue("RetriableException not received", succeeded); } + private Server setupDecayRpcSchedulerandTestServer(String ns) + throws Exception { + final int queueSizePerHandler = 3; + + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); + conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, + "org.apache.hadoop.ipc.FairCallQueue"); + conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, + "org.apache.hadoop.ipc.DecayRpcScheduler"); + conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY, + 2); + conf.setBoolean(ns + + DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY, + true); + // set a small thresholds 2s and 4s for level 0 and level 1 for testing + conf.set(ns + + DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY + , "2s, 4s"); + + // Set max queue size to 3 so that 2 calls from the test won't trigger + // back off because the queue is full. + RPC.Builder builder = newServerBuilder(conf) + .setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1) + .setVerbose(true); + return setupTestServer(builder); + } + /** * Test RPC timeout. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java index 2834ebb2974..bdd0ab01b97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java @@ -20,17 +20,17 @@ package org.apache.hadoop.hdfs.server.namenode.top.window; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import java.util.Set; import java.util.Stack; import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.primitives.Ints; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair; +import org.apache.hadoop.metrics2.util.Metrics2Util.TopN; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,7 +209,7 @@ public class RollingWindowManager { } for (int i = 0; i < size; i++) { NameValuePair userEntry = reverse.pop(); - User user = new User(userEntry.name, userEntry.value); + User user = new User(userEntry.getName(), userEntry.getValue()); op.addUser(user); } } @@ -276,71 +276,4 @@ public class RollingWindowManager { } return window; } - - /** - * A pair of a name and its corresponding value. Defines a custom - * comparator so the TopN PriorityQueue sorts based on the count. - */ - static private class NameValuePair implements Comparable { - String name; - long value; - - public NameValuePair(String metricName, long value) { - this.name = metricName; - this.value = value; - } - - @Override - public int compareTo(NameValuePair other) { - return (int) (value - other.value); - } - - @Override - public boolean equals(Object other) { - if (other instanceof NameValuePair) { - return compareTo((NameValuePair)other) == 0; - } - return false; - } - - @Override - public int hashCode() { - return Long.valueOf(value).hashCode(); - } - } - - /** - * A fixed-size priority queue, used to retrieve top-n of offered entries. - */ - static private class TopN extends PriorityQueue { - private static final long serialVersionUID = 5134028249611535803L; - int n; // > 0 - private long total = 0; - - TopN(int n) { - super(n); - this.n = n; - } - - @Override - public boolean offer(NameValuePair entry) { - updateTotal(entry.value); - if (size() == n) { - NameValuePair smallest = peek(); - if (smallest.value >= entry.value) { - return false; - } - poll(); // remove smallest - } - return super.offer(entry); - } - - private void updateTotal(long value) { - total += value; - } - - public long getTotal() { - return total; - } - } }