HADOOP-15481. Emit FairCallQueue stats as metrics. Contributed by Christopher Gregorian.

This commit is contained in:
Chen Liang 2019-01-18 11:48:26 -08:00
parent dea75af9a7
commit 0cfc2d8475
3 changed files with 76 additions and 2 deletions

View File

@ -35,6 +35,11 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException; import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -43,7 +48,7 @@ import org.slf4j.LoggerFactory;
* A queue with multiple levels for each priority. * A queue with multiple levels for each priority.
*/ */
public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E> public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
implements BlockingQueue<E> { implements BlockingQueue<E> {
@Deprecated @Deprecated
public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4; public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4;
@Deprecated @Deprecated
@ -335,7 +340,8 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
* MetricsProxy is a singleton because we may init multiple * MetricsProxy is a singleton because we may init multiple
* FairCallQueues, but the metrics system cannot unregister beans cleanly. * FairCallQueues, but the metrics system cannot unregister beans cleanly.
*/ */
private static final class MetricsProxy implements FairCallQueueMXBean { private static final class MetricsProxy implements FairCallQueueMXBean,
MetricsSource {
// One singleton per namespace // One singleton per namespace
private static final HashMap<String, MetricsProxy> INSTANCES = private static final HashMap<String, MetricsProxy> INSTANCES =
new HashMap<String, MetricsProxy>(); new HashMap<String, MetricsProxy>();
@ -346,8 +352,13 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
// Keep track of how many objects we registered // Keep track of how many objects we registered
private int revisionNumber = 0; private int revisionNumber = 0;
private String namespace;
private MetricsProxy(String namespace) { private MetricsProxy(String namespace) {
this.namespace = namespace;
MBeans.register(namespace, "FairCallQueue", this); MBeans.register(namespace, "FairCallQueue", this);
final String name = namespace + ".FairCallQueue";
DefaultMetricsSystem.instance().register(name, name, this);
} }
public static synchronized MetricsProxy getInstance(String namespace) { public static synchronized MetricsProxy getInstance(String namespace) {
@ -389,6 +400,23 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
@Override public int getRevision() { @Override public int getRevision() {
return revisionNumber; return revisionNumber;
} }
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder rb = collector.addRecord("FairCallQueue")
.setContext("rpc")
.tag(Interns.info("namespace", "Namespace"), namespace);
final int[] currentQueueSizes = getQueueSizes();
final long[] currentOverflowedCalls = getOverflowedCalls();
for (int i = 0; i < currentQueueSizes.length; i++) {
rb.addGauge(Interns.info("FairCallQueueSize_p" + i, "FCQ Queue Size"),
currentQueueSizes[i]);
rb.addCounter(Interns.info("FairCallQueueOverflowedCalls_p" + i,
"FCQ Overflowed Calls"), currentOverflowedCalls[i]);
}
}
} }
// FairCallQueueMXBean // FairCallQueueMXBean

View File

@ -104,6 +104,16 @@ RetryCache metrics is useful to monitor NameNode fail-over. Each metrics record
| `CacheCleared` | Total number of RetryCache cleared | | `CacheCleared` | Total number of RetryCache cleared |
| `CacheUpdated` | Total number of RetryCache updated | | `CacheUpdated` | Total number of RetryCache updated |
FairCallQueue
-------------
FairCallQueue metrics will only exist if FairCallQueue is enabled. Each metric exists for each level of priority.
| Name | Description |
|:---- |:---- |
| `FairCallQueueSize_p`*Priority* | Current number of calls in priority queue |
| `FairCallQueueOverflowedCalls_p`*Priority* | Total number of overflowed calls in priority queue |
rpcdetailed context rpcdetailed context
=================== ===================

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyObject; import static org.mockito.Mockito.anyObject;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
@ -637,4 +640,37 @@ public class TestFairCallQueue {
assertEquals(0, queueSizes[0]); assertEquals(0, queueSizes[0]);
assertEquals(0, queueSizes[1]); assertEquals(0, queueSizes[1]);
} }
@Test
public void testFairCallQueueMetrics() throws Exception {
final String fcqMetrics = "ns.FairCallQueue";
Schedulable p0 = mockCall("a", 0);
Schedulable p1 = mockCall("b", 1);
assertGauge("FairCallQueueSize_p0", 0, getMetrics(fcqMetrics));
assertGauge("FairCallQueueSize_p1", 0, getMetrics(fcqMetrics));
assertCounter("FairCallQueueOverflowedCalls_p0", 0L,
getMetrics(fcqMetrics));
assertCounter("FairCallQueueOverflowedCalls_p1", 0L,
getMetrics(fcqMetrics));
for (int i = 0; i < 5; i++) {
fcq.add(p0);
fcq.add(p1);
}
try {
fcq.add(p1);
fail("didn't overflow");
} catch (IllegalStateException ise) {
// Expected exception
}
assertGauge("FairCallQueueSize_p0", 5, getMetrics(fcqMetrics));
assertGauge("FairCallQueueSize_p1", 5, getMetrics(fcqMetrics));
assertCounter("FairCallQueueOverflowedCalls_p0", 0L,
getMetrics(fcqMetrics));
assertCounter("FairCallQueueOverflowedCalls_p1", 1L,
getMetrics(fcqMetrics));
}
} }