HBASE-20695 Implement table level RegionServer replication metrics
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
04db900772
commit
86653c708f
|
@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|||
import org.apache.hadoop.hbase.metrics.BaseSource;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This class is for maintaining the various replication statistics for a source and publishing them
|
||||
* through the metrics interfaces.
|
||||
|
@ -45,7 +47,7 @@ public class MetricsSource implements BaseSource {
|
|||
|
||||
private final MetricsReplicationSourceSource singleSourceSource;
|
||||
private final MetricsReplicationSourceSource globalSourceSource;
|
||||
|
||||
private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
|
||||
|
||||
/**
|
||||
* Constructor used to register the metrics
|
||||
|
@ -58,6 +60,7 @@ public class MetricsSource implements BaseSource {
|
|||
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
|
||||
.getSource(id);
|
||||
globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
|
||||
singleSourceSourceByTable = new HashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -67,10 +70,12 @@ public class MetricsSource implements BaseSource {
|
|||
* @param globalSourceSource Class to monitor global-scoped metrics
|
||||
*/
|
||||
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
|
||||
MetricsReplicationSourceSource globalSourceSource) {
|
||||
MetricsReplicationSourceSource globalSourceSource,
|
||||
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
|
||||
this.id = id;
|
||||
this.singleSourceSource = singleSourceSource;
|
||||
this.globalSourceSource = globalSourceSource;
|
||||
this.singleSourceSourceByTable = singleSourceSourceByTable;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -85,6 +90,19 @@ public class MetricsSource implements BaseSource {
|
|||
this.lastTimestamps.put(walGroup, timestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the age of the last edit that was shipped group by table
|
||||
* @param timestamp write time of the edit
|
||||
* @param tableName String as group and tableName
|
||||
*/
|
||||
public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
|
||||
long age = EnvironmentEdgeManager.currentTime() - timestamp;
|
||||
this.getSingleSourceSourceByTable().computeIfAbsent(
|
||||
tableName, t -> CompatibilitySingletonFactory
|
||||
.getInstance(MetricsReplicationSourceFactory.class).getSource(t))
|
||||
.setLastShippedAge(age);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
|
||||
* when replication fails and need to keep that metric accurate.
|
||||
|
@ -349,4 +367,9 @@ public class MetricsSource implements BaseSource {
|
|||
public String getMetricsName() {
|
||||
return globalSourceSource.getMetricsName();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
|
||||
return singleSourceSourceByTable;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -184,6 +185,10 @@ public class ReplicationSourceShipper extends Thread {
|
|||
// Clean up hfile references
|
||||
for (Entry entry : entries) {
|
||||
cleanUpHFileRefs(entry.getEdit());
|
||||
|
||||
TableName tableName = entry.getKey().getTableName();
|
||||
source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
|
||||
tableName.getNameAsString());
|
||||
}
|
||||
// Log and clean up WAL logs
|
||||
updateLogPosition(entryBatch);
|
||||
|
@ -199,8 +204,8 @@ public class ReplicationSourceShipper extends Thread {
|
|||
source.getSourceMetrics().setAgeOfLastShippedOp(
|
||||
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
|
||||
+ " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
|
||||
LOG.trace("Replicated {} entries or {} operations in {} ms",
|
||||
entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
|
||||
}
|
||||
break;
|
||||
} catch (Exception ex) {
|
||||
|
|
|
@ -23,7 +23,9 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -316,7 +318,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
|
||||
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
|
||||
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
|
||||
MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
|
||||
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
|
||||
MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource,
|
||||
singleSourceSourceByTable);
|
||||
String gaugeName = "gauge";
|
||||
String singleGaugeName = "source.id." + gaugeName;
|
||||
String globalGaugeName = "source." + gaugeName;
|
||||
|
@ -353,6 +357,22 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
verify(globalRms).setGauge(globalGaugeName, delta);
|
||||
verify(singleRms).updateHistogram(singleCounterName, count);
|
||||
verify(globalRms).updateHistogram(globalCounterName, count);
|
||||
|
||||
//check singleSourceSourceByTable metrics.
|
||||
// singleSourceSourceByTable map entry will be created only
|
||||
// after calling #setAgeOfLastShippedOpByTable
|
||||
boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
|
||||
.containsKey("RandomNewTable");
|
||||
Assert.assertEquals(false, containsRandomNewTable);
|
||||
source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
|
||||
containsRandomNewTable = source.getSingleSourceSourceByTable()
|
||||
.containsKey("RandomNewTable");
|
||||
Assert.assertEquals(true, containsRandomNewTable);
|
||||
MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
|
||||
.get("RandomNewTable");
|
||||
// cannot put more concreate value here to verify because the age is arbitrary.
|
||||
// as long as it's greater than 0, we see it as correct answer.
|
||||
Assert.assertTrue(msr.getLastShippedAge() > 0);
|
||||
}
|
||||
|
||||
private void doPut(byte[] row) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue