diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 2d99018046d..d122556b928 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -46,7 +46,7 @@ public class MetricsSource implements BaseSource { private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; - + private Map singleSourceSourceByTable; /** * Constructor used to register the metrics @@ -58,7 +58,9 @@ public class MetricsSource implements BaseSource { singleSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) .getSource(id); - globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); + globalSourceSource = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); + singleSourceSourceByTable = new HashMap<>(); } /** @@ -68,10 +70,12 @@ public class MetricsSource implements BaseSource { * @param globalSourceSource Class to monitor global-scoped metrics */ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, - MetricsReplicationSourceSource globalSourceSource) { + MetricsReplicationSourceSource globalSourceSource, + Map singleSourceSourceByTable) { this.id = id; this.singleSourceSource = singleSourceSource; this.globalSourceSource = globalSourceSource; + this.singleSourceSourceByTable = singleSourceSourceByTable; } /** @@ -86,6 +90,20 @@ public class MetricsSource implements BaseSource { this.lastTimeStamps.put(walGroup, timestamp); } + /** + * Set the age of the last edit that was shipped group by table + * @param timestamp write time of the edit + * @param tableName String as group and tableName + */ + public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { + long age = EnvironmentEdgeManager.currentTime() - timestamp; + if (!this.getSingleSourceSourceByTable().containsKey(tableName)) { + this.getSingleSourceSourceByTable().put(tableName, + CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) + .getSource(tableName)); + } + this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age); + } /** * Convenience method to use the last given timestamp to refresh the age of the last edit. Used * when replication fails and need to keep that metric accurate. @@ -340,6 +358,10 @@ public class MetricsSource implements BaseSource { return globalSourceSource.getMetricsName(); } + public Map getSingleSourceSourceByTable() { + return singleSourceSourceByTable; + } + @Override public MetricRegistryInfo getMetricRegistryInfo() { return new MetricRegistryInfo(getMetricsName(), getMetricsDescription(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d7861a53a07..1f91789034c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -700,6 +700,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf int size = entries.size(); for (int i = 0; i < size; i++) { cleanUpHFileRefs(entries.get(i).getEdit()); + + TableName tableName = entries.get(i).getKey().getTablename(); + source.getSourceMetrics().setAgeOfLastShippedOpByTable( + entries.get(i).getKey().getWriteTime(), + tableName.getNameAsString()); } //Log and clean up WAL logs updateLogPosition(lastReadPosition); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 277d876bd60..3b984abf45f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -20,8 +20,9 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -306,7 +307,9 @@ public class TestReplicationEndpoint extends TestReplicationBase { MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms); - MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource); + Map singleSourceSourceByTable = new HashMap<>(); + MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource, + singleSourceSourceByTable); String gaugeName = "gauge"; String singleGaugeName = "source.id." + gaugeName; long delta = 1; @@ -341,6 +344,23 @@ public class TestReplicationEndpoint extends TestReplicationBase { verify(globalRms).setGauge(gaugeName, delta); verify(singleRms).updateHistogram(singleCounterName, count); verify(globalRms).updateHistogram(counterName, count); + + + // check singleSourceSourceByTable metrics. + // singleSourceSourceByTable map entry will be created only + // after calling #setAgeOfLastShippedOpByTable + boolean containsRandomNewTable = source.getSingleSourceSourceByTable() + .containsKey("RandomNewTable"); + Assert.assertEquals(false, containsRandomNewTable); + source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable"); + containsRandomNewTable = source.getSingleSourceSourceByTable() + .containsKey("RandomNewTable"); + Assert.assertEquals(true, containsRandomNewTable); + MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable() + .get("RandomNewTable"); + // cannot put more concreate value here to verify because the age is arbitrary. + // as long as it's greater than 0, we see it as correct answer. + Assert.assertTrue(msr.getLastShippedAge() > 0); } private void doPut(byte[] row) throws IOException {