HBASE-16870 Add the metrics of replication sources which were transformed from other dead rs to ReplicationLoad (Guanghao Zhang)

This commit is contained in:
tedyu 2016-10-19 14:54:35 -07:00
parent 72db953886
commit 674511875d
5 changed files with 61 additions and 11 deletions

View File

@ -82,7 +82,7 @@ public class MetricsSource implements BaseSource {
public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(Math.max(age, globalSourceSource.getLastShippedAge()));
this.lastTimeStamps.put(walGroup, timestamp);
}

View File

@ -376,15 +376,24 @@ public class Replication extends WALActionsListener.Base implements
}
private void buildReplicationLoad() {
// get source
List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
// get source
List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
for (ReplicationSourceInterface source : sources) {
if (source instanceof ReplicationSource) {
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
}
}
// get old source
List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
for (ReplicationSourceInterface source : oldSources) {
if (source instanceof ReplicationSource) {
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
}
}
// get sink
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
@ -66,8 +68,10 @@ public class ReplicationLoad {
this.replicationLoadSink = rLoadSinkBuild.build();
// build the SourceLoad List
this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>();
Map<String, ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceMap =
new HashMap<String, ClusterStatusProtos.ReplicationLoadSource>();
for (MetricsSource sm : this.sourceMetricsList) {
String peerId = sm.getPeerID();
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
int sizeOfLogQueue = sm.getSizeOfLogQueue();
long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp();
@ -85,17 +89,26 @@ public class ReplicationLoad {
replicationLag = 0;
}
ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
if (rLoadSource != null) {
ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp);
sizeOfLogQueue += rLoadSource.getSizeOfLogQueue();
timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(),
timeStampOfLastShippedOp);
replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag);
}
ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
ClusterStatusProtos.ReplicationLoadSource.newBuilder();
rLoadSourceBuild.setPeerID(sm.getPeerID());
rLoadSourceBuild.setPeerID(peerId);
rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
rLoadSourceBuild.setReplicationLag(replicationLag);
this.replicationLoadSourceList.add(rLoadSourceBuild.build());
replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build());
}
this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>(
replicationLoadSourceMap.values());
}
/**

View File

@ -530,6 +530,9 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public void closeRecoveredQueue(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
if (src instanceof ReplicationSource) {
((ReplicationSource) src).getSourceMetrics().clear();
}
this.oldsources.remove(src);
deleteSource(src.getPeerClusterZnode(), false);
this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
@ -579,6 +582,9 @@ public class ReplicationSourceManager implements ReplicationListener {
}
for (ReplicationSourceInterface toRemove : srcToRemove) {
toRemove.terminate(terminateMessage);
if (toRemove instanceof ReplicationSource) {
((ReplicationSource) toRemove).getSourceMetrics().clear();
}
this.sources.remove(toRemove);
}
deleteSource(id, true);

View File

@ -719,7 +719,12 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testReplicationStatus() throws Exception {
LOG.info("testReplicationStatus");
try (Admin admin = utility1.getConnection().getAdmin()) {
try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
// Wait roll log request in setUp() to finish
Thread.sleep(5000);
// disable peer
admin.disablePeer(PEER_ID);
final byte[] qualName = Bytes.toBytes("q");
Put p;
@ -730,7 +735,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
htable1.put(p);
}
ClusterStatus status = admin.getClusterStatus();
ClusterStatus status = hbaseAdmin.getClusterStatus();
long globalSizeOfLogQueue = 0;
for (JVMClusterUtil.RegionServerThread thread :
utility1.getHBaseCluster().getRegionServerThreads()) {
@ -739,8 +745,9 @@ public class TestReplicationSmallTests extends TestReplicationBase {
List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
// check SourceList has at least one entry
assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0));
// check SourceList only has one entry
assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
globalSizeOfLogQueue += rLoadSourceList.get(0).getSizeOfLogQueue();
// check Sink exist only as it is difficult to verify the value on the fly
assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
@ -748,6 +755,21 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
(rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
}
// Stop one rs
utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
Thread.sleep(5000);
status = hbaseAdmin.getClusterStatus();
ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName();
ServerLoad sl = status.getLoad(server);
List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
// check SourceList only has one entry
assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
// Another rs has one queue and one recovery queue from died rs
assertEquals(globalSizeOfLogQueue, rLoadSourceList.get(0).getSizeOfLogQueue());
} finally {
utility1.getHBaseCluster().getRegionServer(1).start();
admin.enablePeer(PEER_ID);
}
}