HBASE-20858 Port HBASE-20695 (Implement table level RegionServer replication metrics) to branch-1
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
a52c6bf2f3
commit
99189f284b
|
@ -46,7 +46,7 @@ public class MetricsSource implements BaseSource {
|
|||
|
||||
private final MetricsReplicationSourceSource singleSourceSource;
|
||||
private final MetricsReplicationSourceSource globalSourceSource;
|
||||
|
||||
private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
|
||||
|
||||
/**
|
||||
* Constructor used to register the metrics
|
||||
|
@ -58,7 +58,9 @@ public class MetricsSource implements BaseSource {
|
|||
singleSourceSource =
|
||||
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
|
||||
.getSource(id);
|
||||
globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
|
||||
globalSourceSource = CompatibilitySingletonFactory
|
||||
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
|
||||
singleSourceSourceByTable = new HashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -68,10 +70,12 @@ public class MetricsSource implements BaseSource {
|
|||
* @param globalSourceSource Class to monitor global-scoped metrics
|
||||
*/
|
||||
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
|
||||
MetricsReplicationSourceSource globalSourceSource) {
|
||||
MetricsReplicationSourceSource globalSourceSource,
|
||||
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
|
||||
this.id = id;
|
||||
this.singleSourceSource = singleSourceSource;
|
||||
this.globalSourceSource = globalSourceSource;
|
||||
this.singleSourceSourceByTable = singleSourceSourceByTable;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -86,6 +90,20 @@ public class MetricsSource implements BaseSource {
|
|||
this.lastTimeStamps.put(walGroup, timestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the age of the last edit that was shipped group by table
|
||||
* @param timestamp write time of the edit
|
||||
* @param tableName String as group and tableName
|
||||
*/
|
||||
public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
|
||||
long age = EnvironmentEdgeManager.currentTime() - timestamp;
|
||||
if (!this.getSingleSourceSourceByTable().containsKey(tableName)) {
|
||||
this.getSingleSourceSourceByTable().put(tableName,
|
||||
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
|
||||
.getSource(tableName));
|
||||
}
|
||||
this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age);
|
||||
}
|
||||
/**
|
||||
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
|
||||
* when replication fails and need to keep that metric accurate.
|
||||
|
@ -340,6 +358,10 @@ public class MetricsSource implements BaseSource {
|
|||
return globalSourceSource.getMetricsName();
|
||||
}
|
||||
|
||||
public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
|
||||
return singleSourceSourceByTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricRegistryInfo getMetricRegistryInfo() {
|
||||
return new MetricRegistryInfo(getMetricsName(), getMetricsDescription(),
|
||||
|
|
|
@ -700,6 +700,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
int size = entries.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
cleanUpHFileRefs(entries.get(i).getEdit());
|
||||
|
||||
TableName tableName = entries.get(i).getKey().getTablename();
|
||||
source.getSourceMetrics().setAgeOfLastShippedOpByTable(
|
||||
entries.get(i).getKey().getWriteTime(),
|
||||
tableName.getNameAsString());
|
||||
}
|
||||
//Log and clean up WAL logs
|
||||
updateLogPosition(lastReadPosition);
|
||||
|
|
|
@ -20,8 +20,9 @@ package org.apache.hadoop.hbase.replication;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -306,7 +307,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
|
||||
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
|
||||
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
|
||||
MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
|
||||
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
|
||||
MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource,
|
||||
singleSourceSourceByTable);
|
||||
String gaugeName = "gauge";
|
||||
String singleGaugeName = "source.id." + gaugeName;
|
||||
long delta = 1;
|
||||
|
@ -341,6 +344,23 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
verify(globalRms).setGauge(gaugeName, delta);
|
||||
verify(singleRms).updateHistogram(singleCounterName, count);
|
||||
verify(globalRms).updateHistogram(counterName, count);
|
||||
|
||||
|
||||
// check singleSourceSourceByTable metrics.
|
||||
// singleSourceSourceByTable map entry will be created only
|
||||
// after calling #setAgeOfLastShippedOpByTable
|
||||
boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
|
||||
.containsKey("RandomNewTable");
|
||||
Assert.assertEquals(false, containsRandomNewTable);
|
||||
source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
|
||||
containsRandomNewTable = source.getSingleSourceSourceByTable()
|
||||
.containsKey("RandomNewTable");
|
||||
Assert.assertEquals(true, containsRandomNewTable);
|
||||
MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
|
||||
.get("RandomNewTable");
|
||||
// cannot put more concreate value here to verify because the age is arbitrary.
|
||||
// as long as it's greater than 0, we see it as correct answer.
|
||||
Assert.assertTrue(msr.getLastShippedAge() > 0);
|
||||
}
|
||||
|
||||
private void doPut(byte[] row) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue