HBASE-16448 Custom metrics for custom replication endpoints

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Geoffrey 2016-08-23 14:42:07 -07:00 committed by Andrew Purtell
parent 1e15fa57df
commit 6e9b49cac7
5 changed files with 269 additions and 16 deletions

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.replication.regionserver;
public interface MetricsReplicationSourceSource {
import org.apache.hadoop.hbase.metrics.BaseSource;
public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";

View File

@ -143,4 +143,60 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
@Override
public void init() {
rms.init();
}
@Override
public void setGauge(String gaugeName, long value) {
rms.setGauge(gaugeName, value);
}
@Override
public void incGauge(String gaugeName, long delta) {
rms.incGauge(gaugeName, delta);
}
@Override
public void decGauge(String gaugeName, long delta) {
rms.decGauge(gaugeName, delta);
}
@Override
public void removeMetric(String key) {
rms.removeMetric(key);
}
@Override
public void incCounters(String counterName, long delta) {
rms.incCounters(counterName, delta);
}
@Override
public void updateHistogram(String name, long value) {
rms.updateHistogram(name, value);
}
@Override
public String getMetricsContext() {
return rms.getMetricsContext();
}
@Override
public String getMetricsDescription() {
return rms.getMetricsDescription();
}
@Override
public String getMetricsJmxContext() {
return rms.getMetricsJmxContext();
}
@Override
public String getMetricsName() {
return rms.getMetricsName();
}
}

View File

@ -30,6 +30,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
private String keyPrefix;
@Deprecated
private final String shippedKBsKey;
private final String shippedBytesKey;
@ -52,38 +54,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
this.id = id;
this.keyPrefix = "source." + this.id + ".";
ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L);
sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
shippedBatchesKey = "source." + this.id + ".shippedBatches";
shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
shippedOpsKey = "source." + this.id + ".shippedOps";
shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
shippedKBsKey = "source." + this.id + ".shippedKBs";
shippedKBsKey = this.keyPrefix + "shippedKBs";
shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
shippedBytesKey = "source." + this.id + ".shippedBytes";
shippedBytesKey = this.keyPrefix + "shippedBytes";
shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
logReadInBytesKey = "source." + this.id + ".logReadInBytes";
logReadInBytesKey = this.keyPrefix + "logReadInBytes";
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
logReadInEditsKey = "source." + id + ".logEditsRead";
logReadInEditsKey = this.keyPrefix + "logEditsRead";
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
logEditsFilteredKey = "source." + id + ".logEditsFiltered";
logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
shippedHFilesKey = "source." + this.id + ".shippedHFiles";
shippedHFilesKey = this.keyPrefix + "shippedHFiles";
shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue";
sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L);
}
@ -168,4 +171,59 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
@Override
public void init() {
rms.init();
}
@Override
public void setGauge(String gaugeName, long value) {
rms.setGauge(this.keyPrefix + gaugeName, value);
}
@Override
public void incGauge(String gaugeName, long delta) {
rms.incGauge(this.keyPrefix + gaugeName, delta);
}
@Override
public void decGauge(String gaugeName, long delta) {
rms.decGauge(this.keyPrefix + gaugeName, delta);
}
@Override
public void removeMetric(String key) {
rms.removeMetric(this.keyPrefix + key);
}
@Override
public void incCounters(String counterName, long delta) {
rms.incCounters(this.keyPrefix + counterName, delta);
}
@Override
public void updateHistogram(String name, long value) {
rms.updateHistogram(this.keyPrefix + name, value);
}
@Override
public String getMetricsContext() {
return rms.getMetricsContext();
}
@Override
public String getMetricsDescription() {
return rms.getMetricsDescription();
}
@Override
public String getMetricsJmxContext() {
return rms.getMetricsJmxContext();
}
@Override
public String getMetricsName() {
return rms.getMetricsName();
}
}

View File

@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* through the metrics interfaces.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class MetricsSource {
public class MetricsSource implements BaseSource {
private static final Log LOG = LogFactory.getLog(MetricsSource.class);
@ -60,6 +61,19 @@ public class MetricsSource {
globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
}
/**
* Constructor for injecting custom (or test) MetricsReplicationSourceSources
* @param id Name of the source this class is monitoring
* @param singleSourceSource Class to monitor id-scoped metrics
* @param globalSourceSource Class to monitor global-scoped metrics
*/
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
MetricsReplicationSourceSource globalSourceSource) {
this.id = id;
this.singleSourceSource = singleSourceSource;
this.globalSourceSource = globalSourceSource;
}
/**
* Set the age of the last edit that was shipped
* @param timestamp write time of the edit
@ -227,4 +241,66 @@ public class MetricsSource {
lastHFileRefsQueueSize = 0;
}
}
@Override
public void init() {
singleSourceSource.init();
globalSourceSource.init();
}
@Override
public void setGauge(String gaugeName, long value) {
singleSourceSource.setGauge(gaugeName, value);
globalSourceSource.setGauge(gaugeName, value);
}
@Override
public void incGauge(String gaugeName, long delta) {
singleSourceSource.incGauge(gaugeName, delta);
globalSourceSource.incGauge(gaugeName, delta);
}
@Override
public void decGauge(String gaugeName, long delta) {
singleSourceSource.decGauge(gaugeName, delta);
globalSourceSource.decGauge(gaugeName, delta);
}
@Override
public void removeMetric(String key) {
singleSourceSource.removeMetric(key);
globalSourceSource.removeMetric(key);
}
@Override
public void incCounters(String counterName, long delta) {
singleSourceSource.incCounters(counterName, delta);
globalSourceSource.incCounters(counterName, delta);
}
@Override
public void updateHistogram(String name, long value) {
singleSourceSource.updateHistogram(name, value);
globalSourceSource.updateHistogram(name, value);
}
@Override
public String getMetricsContext() {
return globalSourceSource.getMetricsContext();
}
@Override
public String getMetricsDescription() {
return globalSourceSource.getMetricsDescription();
}
@Override
public String getMetricsJmxContext() {
return globalSourceSource.getMetricsJmxContext();
}
@Override
public String getMetricsName() {
return globalSourceSource.getMetricsName();
}
}

View File

@ -38,11 +38,12 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@ -50,6 +51,10 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests ReplicationSource and ReplicationEndpoint interactions
*/
@ -114,8 +119,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@ -254,6 +259,62 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
@Test
public void testMetricsSourceBaseSourcePassthrough(){
/*
The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
allows for custom JMX metrics.
This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
the two layers of wrapping to the actual BaseSource.
*/
String id = "id";
DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
String gaugeName = "gauge";
String singleGaugeName = "source.id." + gaugeName;
long delta = 1;
String counterName = "counter";
String singleCounterName = "source.id." + counterName;
long count = 2;
source.decGauge(gaugeName, delta);
source.getMetricsContext();
source.getMetricsDescription();
source.getMetricsJmxContext();
source.getMetricsName();
source.incCounters(counterName, count);
source.incGauge(gaugeName, delta);
source.init();
source.removeMetric(gaugeName);
source.setGauge(gaugeName, delta);
source.updateHistogram(counterName, count);
verify(singleRms).decGauge(singleGaugeName, delta);
verify(globalRms).decGauge(gaugeName, delta);
verify(globalRms).getMetricsContext();
verify(globalRms).getMetricsJmxContext();
verify(globalRms).getMetricsName();
verify(singleRms).incCounters(singleCounterName, count);
verify(globalRms).incCounters(counterName, count);
verify(singleRms).incGauge(singleGaugeName, delta);
verify(globalRms).incGauge(gaugeName, delta);
verify(globalRms).init();
verify(singleRms).removeMetric(singleGaugeName);
verify(globalRms).removeMetric(gaugeName);
verify(singleRms).setGauge(singleGaugeName, delta);
verify(globalRms).setGauge(gaugeName, delta);
verify(singleRms).updateHistogram(singleCounterName, count);
verify(globalRms).updateHistogram(counterName, count);
}
private void doPut(byte[] row) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
doPut(connection, row);