diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index ffeafb5c0dc..3e952eb63c3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -42,6 +42,7 @@ import com.google.common.util.concurrent.AtomicDoubleArray; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ipc.metrics.DecayRpcSchedulerDetailedMetrics; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; @@ -154,6 +155,10 @@ public class DecayRpcScheduler implements RpcScheduler, private final AtomicDoubleArray responseTimeAvgInLastWindow; private final AtomicLongArray responseTimeCountInLastWindow; + // RPC queue time rates per queue + private final DecayRpcSchedulerDetailedMetrics + decayRpcSchedulerDetailedMetrics; + // Pre-computed scheduling decisions during the decay sweep are // atomically swapped in as a read-only map private final AtomicReference> scheduleCacheRef = @@ -236,6 +241,10 @@ public class DecayRpcScheduler implements RpcScheduler, Preconditions.checkArgument(topUsersCount > 0, "the number of top users for scheduler metrics must be at least 1"); + decayRpcSchedulerDetailedMetrics = + DecayRpcSchedulerDetailedMetrics.create(ns); + decayRpcSchedulerDetailedMetrics.init(numLevels); + // Setup delay timer Timer timer = new Timer(true); DecayTask task = new DecayTask(this, timer); @@ -626,6 +635,11 @@ public class DecayRpcScheduler implements RpcScheduler, long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS); long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS); + this.decayRpcSchedulerDetailedMetrics.addQueueTime( + priorityLevel, queueTime); + this.decayRpcSchedulerDetailedMetrics.addProcessingTime( + priorityLevel, processingTime); + responseTimeCountInCurrWindow.getAndIncrement(priorityLevel); responseTimeTotalInCurrWindow.getAndAdd(priorityLevel, queueTime+processingTime); @@ -987,9 +1001,16 @@ public class DecayRpcScheduler implements RpcScheduler, return decayedCallCosts; } + @VisibleForTesting + public DecayRpcSchedulerDetailedMetrics + getDecayRpcSchedulerDetailedMetrics() { + return decayRpcSchedulerDetailedMetrics; + } + @Override public void stop() { metricsProxy.unregisterSource(namespace); MetricsProxy.removeInstance(namespace); + decayRpcSchedulerDetailedMetrics.shutdown(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/DecayRpcSchedulerDetailedMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/DecayRpcSchedulerDetailedMetrics.java new file mode 100644 index 00000000000..04a6c0eab1c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/DecayRpcSchedulerDetailedMetrics.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class is for maintaining queue (priority) level related + * statistics when FairCallQueue is used and publishing them + * through the metrics interface. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +@Metrics(about="Per queue(priority) metrics", + context="decayrpcschedulerdetailed") +public class DecayRpcSchedulerDetailedMetrics { + + @Metric private MutableRatesWithAggregation rpcQueueRates; + @Metric private MutableRatesWithAggregation rpcProcessingRates; + + private static final Logger LOG = + LoggerFactory.getLogger(DecayRpcSchedulerDetailedMetrics.class); + private final MetricsRegistry registry; + private final String name; + private String[] queueNamesForLevels; + private String[] processingNamesForLevels; + + DecayRpcSchedulerDetailedMetrics(String ns) { + name = "DecayRpcSchedulerDetailedMetrics."+ ns; + registry = new MetricsRegistry("decayrpcschedulerdetailed") + .tag("port", "RPC port", String.valueOf(ns)); + LOG.debug(registry.info().toString()); + } + + public static DecayRpcSchedulerDetailedMetrics create(String ns) { + DecayRpcSchedulerDetailedMetrics m = + new DecayRpcSchedulerDetailedMetrics(ns); + return DefaultMetricsSystem.instance().register(m.name, null, m); + } + + /** + * Initialize the metrics for JMX with priority levels. + */ + public void init(int numLevels) { + LOG.info("Initializing RPC stats for {} priority levels", numLevels); + queueNamesForLevels = new String[numLevels]; + processingNamesForLevels = new String[numLevels]; + for (int i = 0; i < numLevels; i++) { + queueNamesForLevels[i] = getQueueName(i+1); + processingNamesForLevels[i] = getProcessingName(i+1); + } + rpcQueueRates.init(queueNamesForLevels); + rpcProcessingRates.init(processingNamesForLevels); + } + + /** + * Instrument a Call queue time based on its priority. + * + * @param priority of the RPC call + * @param queueTime of the RPC call in the queue of the priority + */ + public void addQueueTime(int priority, long queueTime) { + rpcQueueRates.add(queueNamesForLevels[priority], queueTime); + } + + /** + * Instrument a Call processing time based on its priority. + * + * @param priority of the RPC call + * @param processingTime of the RPC call in the queue of the priority + */ + public void addProcessingTime(int priority, long processingTime) { + rpcProcessingRates.add(processingNamesForLevels[priority], processingTime); + } + + /** + * Shutdown the instrumentation process. + */ + public void shutdown() { + DefaultMetricsSystem.instance().unregisterSource(name); + } + + /** + * Returns the rate name inside the metric. + */ + public String getQueueName(int priority) { + return "DecayRPCSchedulerPriority."+priority+".RpcQueueTime"; + } + + /** + * Returns the rate name inside the metric. + */ + public String getProcessingName(int priority) { + return "DecayRPCSchedulerPriority."+priority+".RpcProcessingTime"; + } + + public String getName() { + return name; + } + + @VisibleForTesting + MutableRatesWithAggregation getRpcQueueRates() { + return rpcQueueRates; + } + + @VisibleForTesting + MutableRatesWithAggregation getRpcProcessingRates() { + return rpcProcessingRates; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java index aa7b7596173..5fe0083aa5d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java @@ -79,6 +79,18 @@ public class MutableRatesWithAggregation extends MutableMetric { } } + /** + * Initialize the registry with all rate names passed in. + * This is an alternative to the above init function since this metric + * can be used more than just for rpc name. + * @param names the array of all rate names + */ + public void init(String[] names) { + for (String name : names) { + addMetricIfNotExists(name); + } + } + /** * Add a rate sample for a rate metric. * @param name of the rate metric diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index bafdfddf16b..8210eee0384 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -123,6 +123,17 @@ FairCallQueue metrics will only exist if FairCallQueue is enabled. Each metric e | `FairCallQueueSize_p`*Priority* | Current number of calls in priority queue | | `FairCallQueueOverflowedCalls_p`*Priority* | Total number of overflowed calls in priority queue | +DecayRpcSchedulerDetailed +------------------------- + +DecayRpcSchedulerDetailed metrics only exist when DecayRpcScheduler is used (FairCallQueue enabled). It is an addition +to FairCallQueue metrics. For each level of priority, rpcqueue and rpcprocessing detailed metrics are exposed. + +| Name | Description | +|:---- | :---- | +| `DecayRPCSchedulerPriority.`*Priority*`.RpcQueueTime` | RpcQueueTime metrics for each priority | +| `DecayRPCSchedulerPriority.`*Priority*`.RpcProcessingTime` | RpcProcessingTime metrics for each priority | + rpcdetailed context =================== diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index 7bdc6b5e96d..71723325e2c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -66,15 +66,15 @@ public class TestDecayRpcScheduler { @SuppressWarnings("deprecation") public void testParsePeriod() { // By default - scheduler = new DecayRpcScheduler(1, "", new Configuration()); + scheduler = new DecayRpcScheduler(1, "ipc.1", new Configuration()); assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT, scheduler.getDecayPeriodMillis()); // Custom Configuration conf = new Configuration(); - conf.setLong("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, + conf.setLong("ipc.2." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, 1058); - scheduler = new DecayRpcScheduler(1, "ns", conf); + scheduler = new DecayRpcScheduler(1, "ipc.2", conf); assertEquals(1058L, scheduler.getDecayPeriodMillis()); } @@ -82,15 +82,15 @@ public class TestDecayRpcScheduler { @SuppressWarnings("deprecation") public void testParseFactor() { // Default - scheduler = new DecayRpcScheduler(1, "", new Configuration()); + scheduler = new DecayRpcScheduler(1, "ipc.3", new Configuration()); assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT, scheduler.getDecayFactor(), 0.00001); // Custom Configuration conf = new Configuration(); - conf.set("prefix." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, + conf.set("ipc.4." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.125"); - scheduler = new DecayRpcScheduler(1, "prefix", conf); + scheduler = new DecayRpcScheduler(1, "ipc.4", conf); assertEquals(0.125, scheduler.getDecayFactor(), 0.00001); } @@ -106,23 +106,23 @@ public class TestDecayRpcScheduler { public void testParseThresholds() { // Defaults vary by number of queues Configuration conf = new Configuration(); - scheduler = new DecayRpcScheduler(1, "", conf); + scheduler = new DecayRpcScheduler(1, "ipc.5", conf); assertEqualDecimalArrays(new double[]{}, scheduler.getThresholds()); - scheduler = new DecayRpcScheduler(2, "", conf); + scheduler = new DecayRpcScheduler(2, "ipc.6", conf); assertEqualDecimalArrays(new double[]{0.5}, scheduler.getThresholds()); - scheduler = new DecayRpcScheduler(3, "", conf); + scheduler = new DecayRpcScheduler(3, "ipc.7", conf); assertEqualDecimalArrays(new double[]{0.25, 0.5}, scheduler.getThresholds()); - scheduler = new DecayRpcScheduler(4, "", conf); + scheduler = new DecayRpcScheduler(4, "ipc.8", conf); assertEqualDecimalArrays(new double[]{0.125, 0.25, 0.5}, scheduler.getThresholds()); // Custom conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, + conf.set("ipc.9." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "1, 10, 20, 50, 85"); - scheduler = new DecayRpcScheduler(6, "ns", conf); + scheduler = new DecayRpcScheduler(6, "ipc.9", conf); assertEqualDecimalArrays(new double[]{0.01, 0.1, 0.2, 0.5, 0.85}, scheduler.getThresholds()); } @@ -130,8 +130,9 @@ public class TestDecayRpcScheduler { @SuppressWarnings("deprecation") public void testAccumulate() { Configuration conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush - scheduler = new DecayRpcScheduler(1, "ns", conf); + conf.set("ipc.10." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, + "99999999"); // Never flush + scheduler = new DecayRpcScheduler(1, "ipc.10", conf); assertEquals(0, scheduler.getCallCostSnapshot().size()); // empty first @@ -151,11 +152,11 @@ public class TestDecayRpcScheduler { @SuppressWarnings("deprecation") public void testDecay() throws Exception { Configuration conf = new Configuration(); - conf.setLong("ns." // Never decay + conf.setLong("ipc.11." // Never decay + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999999); - conf.setDouble("ns." + conf.setDouble("ipc.11." + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY, 0.5); - scheduler = new DecayRpcScheduler(1, "ns", conf); + scheduler = new DecayRpcScheduler(1, "ipc.11", conf); assertEquals(0, scheduler.getTotalCallSnapshot()); @@ -202,7 +203,7 @@ public class TestDecayRpcScheduler { @SuppressWarnings("deprecation") public void testPriority() throws Exception { Configuration conf = new Configuration(); - final String namespace = "ns"; + final String namespace = "ipc.12"; conf.set(namespace + "." + DecayRpcScheduler .IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush conf.set(namespace + "." + DecayRpcScheduler @@ -239,9 +240,11 @@ public class TestDecayRpcScheduler { @SuppressWarnings("deprecation") public void testPeriodic() throws InterruptedException { Configuration conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10"); - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5"); - scheduler = new DecayRpcScheduler(1, "ns", conf); + conf.set( + "ipc.13." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10"); + conf.set( + "ipc.13." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5"); + scheduler = new DecayRpcScheduler(1, "ipc.13", conf); assertEquals(10, scheduler.getDecayPeriodMillis()); assertEquals(0, scheduler.getTotalCallSnapshot()); @@ -269,7 +272,7 @@ public class TestDecayRpcScheduler { // MetricsSystemImpl to true DefaultMetricsSystem.initialize("NameNode"); Configuration conf = new Configuration(); - scheduler = new DecayRpcScheduler(1, "ns", conf); + scheduler = new DecayRpcScheduler(1, "ipc.14", conf); // check if there is npe in log assertFalse(bytes.toString().contains("NullPointerException")); } finally { @@ -280,7 +283,7 @@ public class TestDecayRpcScheduler { @Test public void testUsingWeightedTimeCostProvider() { - scheduler = getSchedulerWithWeightedTimeCostProvider(3); + scheduler = getSchedulerWithWeightedTimeCostProvider(3, "ipc.15"); // 3 details in increasing order of cost. Although medium has a longer // duration, the shared lock is weighted less than the exclusive lock @@ -330,7 +333,7 @@ public class TestDecayRpcScheduler { @Test public void testUsingWeightedTimeCostProviderWithZeroCostCalls() { - scheduler = getSchedulerWithWeightedTimeCostProvider(2); + scheduler = getSchedulerWithWeightedTimeCostProvider(2, "ipc.16"); ProcessingDetails emptyDetails = new ProcessingDetails(TimeUnit.MILLISECONDS); @@ -347,7 +350,7 @@ public class TestDecayRpcScheduler { @Test public void testUsingWeightedTimeCostProviderNoRequests() { - scheduler = getSchedulerWithWeightedTimeCostProvider(2); + scheduler = getSchedulerWithWeightedTimeCostProvider(2, "ipc.18"); assertEquals(0, scheduler.getPriorityLevel(mockCall("A"))); } @@ -357,13 +360,13 @@ public class TestDecayRpcScheduler { * normal decaying disabled. */ private static DecayRpcScheduler getSchedulerWithWeightedTimeCostProvider( - int priorityLevels) { + int priorityLevels, String ns) { Configuration conf = new Configuration(); - conf.setClass("ns." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY, + conf.setClass(ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY, WeightedTimeCostProvider.class, CostProvider.class); - conf.setLong("ns." + conf.setLong(ns + "." + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999); - return new DecayRpcScheduler(priorityLevels, "ns", conf); + return new DecayRpcScheduler(priorityLevels, ns, conf); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/metrics/TestDecayRpcSchedulerDetailedMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/metrics/TestDecayRpcSchedulerDetailedMetrics.java new file mode 100644 index 00000000000..01d407ba260 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/metrics/TestDecayRpcSchedulerDetailedMetrics.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc.metrics; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.DecayRpcScheduler; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.junit.Test; + +public class TestDecayRpcSchedulerDetailedMetrics { + + @Test + public void metricsRegistered() { + Configuration conf = new Configuration(); + DecayRpcScheduler scheduler = new DecayRpcScheduler(4, "ipc.8020", conf); + MetricsSystem metricsSystem = DefaultMetricsSystem.instance(); + DecayRpcSchedulerDetailedMetrics metrics = + scheduler.getDecayRpcSchedulerDetailedMetrics(); + + assertNotNull(metricsSystem.getSource(metrics.getName())); + + scheduler.stop(); + + assertNull(metricsSystem.getSource(metrics.getName())); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java index b5f62b18904..5d20abdd8bf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java @@ -149,6 +149,19 @@ public class TestMutableMetrics { assertGauge("BarAvgTime", 0.0, rb); } + @Test public void testMutableRatesWithAggregationInitWithArray() { + MetricsRecordBuilder rb = mockMetricsRecordBuilder(); + MutableRatesWithAggregation rates = new MutableRatesWithAggregation(); + + rates.init(new String[]{"Foo", "Bar"}); + rates.snapshot(rb, false); + + assertCounter("FooNumOps", 0L, rb); + assertGauge("FooAvgTime", 0.0, rb); + assertCounter("BarNumOps", 0L, rb); + assertGauge("BarAvgTime", 0.0, rb); + } + @Test public void testMutableRatesWithAggregationSingleThread() { MutableRatesWithAggregation rates = new MutableRatesWithAggregation();