HBASE-16448 Custom metrics for custom replication endpoints
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
91fee265cf
commit
cb02be38ab
|
@ -18,7 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
public interface MetricsReplicationSourceSource {
|
||||
import org.apache.hadoop.hbase.metrics.BaseSource;
|
||||
|
||||
public interface MetricsReplicationSourceSource extends BaseSource {
|
||||
|
||||
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
|
||||
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
|
||||
|
|
|
@ -143,4 +143,60 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
|||
public int getSizeOfLogQueue() {
|
||||
return (int)sizeOfLogQueueGauge.value();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
rms.init();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setGauge(String gaugeName, long value) {
|
||||
rms.setGauge(gaugeName, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incGauge(String gaugeName, long delta) {
|
||||
rms.incGauge(gaugeName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decGauge(String gaugeName, long delta) {
|
||||
rms.decGauge(gaugeName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeMetric(String key) {
|
||||
rms.removeMetric(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incCounters(String counterName, long delta) {
|
||||
rms.incCounters(counterName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateHistogram(String name, long value) {
|
||||
rms.updateHistogram(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsContext() {
|
||||
return rms.getMetricsContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsDescription() {
|
||||
return rms.getMetricsDescription();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsJmxContext() {
|
||||
return rms.getMetricsJmxContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsName() {
|
||||
return rms.getMetricsName();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
private final String logEditsFilteredKey;
|
||||
private final String shippedBatchesKey;
|
||||
private final String shippedOpsKey;
|
||||
private String keyPrefix;
|
||||
|
||||
@Deprecated
|
||||
private final String shippedKBsKey;
|
||||
private final String shippedBytesKey;
|
||||
|
@ -52,38 +54,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
|
||||
this.rms = rms;
|
||||
this.id = id;
|
||||
this.keyPrefix = "source." + this.id + ".";
|
||||
|
||||
ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
|
||||
ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
|
||||
ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L);
|
||||
|
||||
sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
|
||||
sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
|
||||
sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
|
||||
|
||||
shippedBatchesKey = "source." + this.id + ".shippedBatches";
|
||||
shippedBatchesKey = this.keyPrefix + "shippedBatches";
|
||||
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
|
||||
|
||||
shippedOpsKey = "source." + this.id + ".shippedOps";
|
||||
shippedOpsKey = this.keyPrefix + "shippedOps";
|
||||
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
|
||||
|
||||
shippedKBsKey = "source." + this.id + ".shippedKBs";
|
||||
shippedKBsKey = this.keyPrefix + "shippedKBs";
|
||||
shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
|
||||
|
||||
shippedBytesKey = "source." + this.id + ".shippedBytes";
|
||||
shippedBytesKey = this.keyPrefix + "shippedBytes";
|
||||
shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
|
||||
|
||||
logReadInBytesKey = "source." + this.id + ".logReadInBytes";
|
||||
logReadInBytesKey = this.keyPrefix + "logReadInBytes";
|
||||
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
|
||||
|
||||
logReadInEditsKey = "source." + id + ".logEditsRead";
|
||||
logReadInEditsKey = this.keyPrefix + "logEditsRead";
|
||||
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
|
||||
|
||||
logEditsFilteredKey = "source." + id + ".logEditsFiltered";
|
||||
logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
|
||||
logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
|
||||
|
||||
shippedHFilesKey = "source." + this.id + ".shippedHFiles";
|
||||
shippedHFilesKey = this.keyPrefix + "shippedHFiles";
|
||||
shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
|
||||
|
||||
sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
|
||||
sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue";
|
||||
sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L);
|
||||
}
|
||||
|
||||
|
@ -168,4 +171,59 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
public int getSizeOfLogQueue() {
|
||||
return (int)sizeOfLogQueueGauge.value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
rms.init();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setGauge(String gaugeName, long value) {
|
||||
rms.setGauge(this.keyPrefix + gaugeName, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incGauge(String gaugeName, long delta) {
|
||||
rms.incGauge(this.keyPrefix + gaugeName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decGauge(String gaugeName, long delta) {
|
||||
rms.decGauge(this.keyPrefix + gaugeName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeMetric(String key) {
|
||||
rms.removeMetric(this.keyPrefix + key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incCounters(String counterName, long delta) {
|
||||
rms.incCounters(this.keyPrefix + counterName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateHistogram(String name, long value) {
|
||||
rms.updateHistogram(this.keyPrefix + name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsContext() {
|
||||
return rms.getMetricsContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsDescription() {
|
||||
return rms.getMetricsDescription();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsJmxContext() {
|
||||
return rms.getMetricsJmxContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsName() {
|
||||
return rms.getMetricsName();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.metrics.BaseSource;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
|
@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
* through the metrics interfaces.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||
public class MetricsSource {
|
||||
public class MetricsSource implements BaseSource {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MetricsSource.class);
|
||||
|
||||
|
@ -60,6 +61,19 @@ public class MetricsSource {
|
|||
globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for injecting custom (or test) MetricsReplicationSourceSources
|
||||
* @param id Name of the source this class is monitoring
|
||||
* @param singleSourceSource Class to monitor id-scoped metrics
|
||||
* @param globalSourceSource Class to monitor global-scoped metrics
|
||||
*/
|
||||
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
|
||||
MetricsReplicationSourceSource globalSourceSource) {
|
||||
this.id = id;
|
||||
this.singleSourceSource = singleSourceSource;
|
||||
this.globalSourceSource = globalSourceSource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the age of the last edit that was shipped
|
||||
* @param timestamp write time of the edit
|
||||
|
@ -227,4 +241,66 @@ public class MetricsSource {
|
|||
lastHFileRefsQueueSize = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
singleSourceSource.init();
|
||||
globalSourceSource.init();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setGauge(String gaugeName, long value) {
|
||||
singleSourceSource.setGauge(gaugeName, value);
|
||||
globalSourceSource.setGauge(gaugeName, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incGauge(String gaugeName, long delta) {
|
||||
singleSourceSource.incGauge(gaugeName, delta);
|
||||
globalSourceSource.incGauge(gaugeName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decGauge(String gaugeName, long delta) {
|
||||
singleSourceSource.decGauge(gaugeName, delta);
|
||||
globalSourceSource.decGauge(gaugeName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeMetric(String key) {
|
||||
singleSourceSource.removeMetric(key);
|
||||
globalSourceSource.removeMetric(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incCounters(String counterName, long delta) {
|
||||
singleSourceSource.incCounters(counterName, delta);
|
||||
globalSourceSource.incCounters(counterName, delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateHistogram(String name, long value) {
|
||||
singleSourceSource.updateHistogram(name, value);
|
||||
globalSourceSource.updateHistogram(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsContext() {
|
||||
return globalSourceSource.getMetricsContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsDescription() {
|
||||
return globalSourceSource.getMetricsDescription();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsJmxContext() {
|
||||
return globalSourceSource.getMetricsJmxContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMetricsName() {
|
||||
return globalSourceSource.getMetricsName();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.*;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
|||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -51,6 +52,10 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Tests ReplicationSource and ReplicationEndpoint interactions
|
||||
*/
|
||||
|
@ -115,8 +120,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
public void testCustomReplicationEndpoint() throws Exception {
|
||||
// test installing a custom replication endpoint other than the default one.
|
||||
admin.addPeer("testCustomReplicationEndpoint",
|
||||
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
||||
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
|
||||
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
||||
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
|
||||
|
||||
// check whether the class has been constructed and started
|
||||
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
|
||||
|
@ -255,6 +260,62 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMetricsSourceBaseSourcePassthrough(){
|
||||
/*
|
||||
The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
|
||||
and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
|
||||
Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
|
||||
allows for custom JMX metrics.
|
||||
This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
|
||||
the two layers of wrapping to the actual BaseSource.
|
||||
*/
|
||||
String id = "id";
|
||||
DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
|
||||
MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
|
||||
when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
|
||||
MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
|
||||
when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
|
||||
|
||||
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
|
||||
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
|
||||
MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
|
||||
String gaugeName = "gauge";
|
||||
String singleGaugeName = "source.id." + gaugeName;
|
||||
long delta = 1;
|
||||
String counterName = "counter";
|
||||
String singleCounterName = "source.id." + counterName;
|
||||
long count = 2;
|
||||
source.decGauge(gaugeName, delta);
|
||||
source.getMetricsContext();
|
||||
source.getMetricsDescription();
|
||||
source.getMetricsJmxContext();
|
||||
source.getMetricsName();
|
||||
source.incCounters(counterName, count);
|
||||
source.incGauge(gaugeName, delta);
|
||||
source.init();
|
||||
source.removeMetric(gaugeName);
|
||||
source.setGauge(gaugeName, delta);
|
||||
source.updateHistogram(counterName, count);
|
||||
|
||||
verify(singleRms).decGauge(singleGaugeName, delta);
|
||||
verify(globalRms).decGauge(gaugeName, delta);
|
||||
verify(globalRms).getMetricsContext();
|
||||
verify(globalRms).getMetricsJmxContext();
|
||||
verify(globalRms).getMetricsName();
|
||||
verify(singleRms).incCounters(singleCounterName, count);
|
||||
verify(globalRms).incCounters(counterName, count);
|
||||
verify(singleRms).incGauge(singleGaugeName, delta);
|
||||
verify(globalRms).incGauge(gaugeName, delta);
|
||||
verify(globalRms).init();
|
||||
verify(singleRms).removeMetric(singleGaugeName);
|
||||
verify(globalRms).removeMetric(gaugeName);
|
||||
verify(singleRms).setGauge(singleGaugeName, delta);
|
||||
verify(globalRms).setGauge(gaugeName, delta);
|
||||
verify(singleRms).updateHistogram(singleCounterName, count);
|
||||
verify(globalRms).updateHistogram(counterName, count);
|
||||
}
|
||||
|
||||
private void doPut(byte[] row) throws IOException {
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
|
||||
doPut(connection, row);
|
||||
|
|
Loading…
Reference in New Issue