From 6e9b49cac7b118732659eef0cebb804be3e16238 Mon Sep 17 00:00:00 2001 From: Geoffrey Date: Tue, 23 Aug 2016 14:42:07 -0700 Subject: [PATCH] HBASE-16448 Custom metrics for custom replication endpoints Signed-off-by: Andrew Purtell --- .../MetricsReplicationSourceSource.java | 4 +- .../MetricsReplicationGlobalSourceSource.java | 56 +++++++++++++ .../MetricsReplicationSourceSourceImpl.java | 80 ++++++++++++++++--- .../regionserver/MetricsSource.java | 78 +++++++++++++++++- .../replication/TestReplicationEndpoint.java | 67 +++++++++++++++- 5 files changed, 269 insertions(+), 16 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 271f0acf237..c877608de05 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.replication.regionserver; -public interface MetricsReplicationSourceSource { +import org.apache.hadoop.hbase.metrics.BaseSource; + +public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 476d2f76a5e..d595ca98e93 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -143,4 +143,60 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS public int getSizeOfLogQueue() { return (int)sizeOfLogQueueGauge.value(); } + + + @Override + public void init() { + rms.init(); + } + + @Override + public void setGauge(String gaugeName, long value) { + rms.setGauge(gaugeName, value); + } + + @Override + public void incGauge(String gaugeName, long delta) { + rms.incGauge(gaugeName, delta); + } + + @Override + public void decGauge(String gaugeName, long delta) { + rms.decGauge(gaugeName, delta); + } + + @Override + public void removeMetric(String key) { + rms.removeMetric(key); + } + + @Override + public void incCounters(String counterName, long delta) { + rms.incCounters(counterName, delta); + } + + @Override + public void updateHistogram(String name, long value) { + rms.updateHistogram(name, value); + } + + @Override + public String getMetricsContext() { + return rms.getMetricsContext(); + } + + @Override + public String getMetricsDescription() { + return rms.getMetricsDescription(); + } + + @Override + public String getMetricsJmxContext() { + return rms.getMetricsJmxContext(); + } + + @Override + public String getMetricsName() { + return rms.getMetricsName(); + } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 835e81c207d..5a6a1035151 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -30,6 +30,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String logEditsFilteredKey; private final String shippedBatchesKey; private final String shippedOpsKey; + private String keyPrefix; + @Deprecated private final String shippedKBsKey; private final String shippedBytesKey; @@ -52,38 +54,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { this.rms = rms; this.id = id; + this.keyPrefix = "source." + this.id + "."; - ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp"; + ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp"; ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L); - sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue"; + sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue"; sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L); - shippedBatchesKey = "source." + this.id + ".shippedBatches"; + shippedBatchesKey = this.keyPrefix + "shippedBatches"; shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L); - shippedOpsKey = "source." + this.id + ".shippedOps"; + shippedOpsKey = this.keyPrefix + "shippedOps"; shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L); - shippedKBsKey = "source." + this.id + ".shippedKBs"; + shippedKBsKey = this.keyPrefix + "shippedKBs"; shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L); - shippedBytesKey = "source." + this.id + ".shippedBytes"; + shippedBytesKey = this.keyPrefix + "shippedBytes"; shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L); - logReadInBytesKey = "source." + this.id + ".logReadInBytes"; + logReadInBytesKey = this.keyPrefix + "logReadInBytes"; logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L); - logReadInEditsKey = "source." + id + ".logEditsRead"; + logReadInEditsKey = this.keyPrefix + "logEditsRead"; logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L); - logEditsFilteredKey = "source." + id + ".logEditsFiltered"; + logEditsFilteredKey = this.keyPrefix + "logEditsFiltered"; logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L); - shippedHFilesKey = "source." + this.id + ".shippedHFiles"; + shippedHFilesKey = this.keyPrefix + "shippedHFiles"; shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L); - sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue"; + sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue"; sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L); } @@ -168,4 +171,59 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou public int getSizeOfLogQueue() { return (int)sizeOfLogQueueGauge.value(); } + + @Override + public void init() { + rms.init(); + } + + @Override + public void setGauge(String gaugeName, long value) { + rms.setGauge(this.keyPrefix + gaugeName, value); + } + + @Override + public void incGauge(String gaugeName, long delta) { + rms.incGauge(this.keyPrefix + gaugeName, delta); + } + + @Override + public void decGauge(String gaugeName, long delta) { + rms.decGauge(this.keyPrefix + gaugeName, delta); + } + + @Override + public void removeMetric(String key) { + rms.removeMetric(this.keyPrefix + key); + } + + @Override + public void incCounters(String counterName, long delta) { + rms.incCounters(this.keyPrefix + counterName, delta); + } + + @Override + public void updateHistogram(String name, long value) { + rms.updateHistogram(this.keyPrefix + name, value); + } + + @Override + public String getMetricsContext() { + return rms.getMetricsContext(); + } + + @Override + public String getMetricsDescription() { + return rms.getMetricsDescription(); + } + + @Override + public String getMetricsJmxContext() { + return rms.getMetricsJmxContext(); + } + + @Override + public String getMetricsName() { + return rms.getMetricsName(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index b07f1d1b4ad..7dfeff64f45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSource; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; * through the metrics interfaces. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public class MetricsSource { +public class MetricsSource implements BaseSource { private static final Log LOG = LogFactory.getLog(MetricsSource.class); @@ -60,6 +61,19 @@ public class MetricsSource { globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); } + /** + * Constructor for injecting custom (or test) MetricsReplicationSourceSources + * @param id Name of the source this class is monitoring + * @param singleSourceSource Class to monitor id-scoped metrics + * @param globalSourceSource Class to monitor global-scoped metrics + */ + public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, + MetricsReplicationSourceSource globalSourceSource) { + this.id = id; + this.singleSourceSource = singleSourceSource; + this.globalSourceSource = globalSourceSource; + } + /** * Set the age of the last edit that was shipped * @param timestamp write time of the edit @@ -227,4 +241,66 @@ public class MetricsSource { lastHFileRefsQueueSize = 0; } } + + @Override + public void init() { + singleSourceSource.init(); + globalSourceSource.init(); + } + + @Override + public void setGauge(String gaugeName, long value) { + singleSourceSource.setGauge(gaugeName, value); + globalSourceSource.setGauge(gaugeName, value); + } + + @Override + public void incGauge(String gaugeName, long delta) { + singleSourceSource.incGauge(gaugeName, delta); + globalSourceSource.incGauge(gaugeName, delta); + } + + @Override + public void decGauge(String gaugeName, long delta) { + singleSourceSource.decGauge(gaugeName, delta); + globalSourceSource.decGauge(gaugeName, delta); + } + + @Override + public void removeMetric(String key) { + singleSourceSource.removeMetric(key); + globalSourceSource.removeMetric(key); + } + + @Override + public void incCounters(String counterName, long delta) { + singleSourceSource.incCounters(counterName, delta); + globalSourceSource.incCounters(counterName, delta); + } + + @Override + public void updateHistogram(String name, long value) { + singleSourceSource.updateHistogram(name, value); + globalSourceSource.updateHistogram(name, value); + } + + @Override + public String getMetricsContext() { + return globalSourceSource.getMetricsContext(); + } + + @Override + public String getMetricsDescription() { + return globalSourceSource.getMetricsDescription(); + } + + @Override + public String getMetricsJmxContext() { + return globalSourceSource.getMetricsJmxContext(); + } + + @Override + public String getMetricsName() { + return globalSourceSource.getMetricsName(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index d5705499aeb..5a54314837c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -38,11 +38,12 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -50,6 +51,10 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Tests ReplicationSource and ReplicationEndpoint interactions */ @@ -114,8 +119,8 @@ public class TestReplicationEndpoint extends TestReplicationBase { public void testCustomReplicationEndpoint() throws Exception { // test installing a custom replication endpoint other than the default one. admin.addPeer("testCustomReplicationEndpoint", - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) - .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); // check whether the class has been constructed and started Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { @@ -254,6 +259,62 @@ public class TestReplicationEndpoint extends TestReplicationBase { } + @Test + public void testMetricsSourceBaseSourcePassthrough(){ + /* + The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl + and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. + Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which + allows for custom JMX metrics. + This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through + the two layers of wrapping to the actual BaseSource. + */ + String id = "id"; + DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); + MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); + when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); + MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); + when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); + + MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); + MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms); + MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource); + String gaugeName = "gauge"; + String singleGaugeName = "source.id." + gaugeName; + long delta = 1; + String counterName = "counter"; + String singleCounterName = "source.id." + counterName; + long count = 2; + source.decGauge(gaugeName, delta); + source.getMetricsContext(); + source.getMetricsDescription(); + source.getMetricsJmxContext(); + source.getMetricsName(); + source.incCounters(counterName, count); + source.incGauge(gaugeName, delta); + source.init(); + source.removeMetric(gaugeName); + source.setGauge(gaugeName, delta); + source.updateHistogram(counterName, count); + + verify(singleRms).decGauge(singleGaugeName, delta); + verify(globalRms).decGauge(gaugeName, delta); + verify(globalRms).getMetricsContext(); + verify(globalRms).getMetricsJmxContext(); + verify(globalRms).getMetricsName(); + verify(singleRms).incCounters(singleCounterName, count); + verify(globalRms).incCounters(counterName, count); + verify(singleRms).incGauge(singleGaugeName, delta); + verify(globalRms).incGauge(gaugeName, delta); + verify(globalRms).init(); + verify(singleRms).removeMetric(singleGaugeName); + verify(globalRms).removeMetric(gaugeName); + verify(singleRms).setGauge(singleGaugeName, delta); + verify(globalRms).setGauge(gaugeName, delta); + verify(singleRms).updateHistogram(singleCounterName, count); + verify(globalRms).updateHistogram(counterName, count); + } + private void doPut(byte[] row) throws IOException { try (Connection connection = ConnectionFactory.createConnection(conf1)) { doPut(connection, row);