From 674511875d513ca3c031e63987288c45dacf56d9 Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 19 Oct 2016 14:54:35 -0700 Subject: [PATCH] HBASE-16870 Add the metrics of replication sources which were transformed from other dead rs to ReplicationLoad (Guanghao Zhang) --- .../regionserver/MetricsSource.java | 2 +- .../replication/regionserver/Replication.java | 13 ++++++-- .../regionserver/ReplicationLoad.java | 21 ++++++++++--- .../ReplicationSourceManager.java | 6 ++++ .../TestReplicationSmallTests.java | 30 ++++++++++++++++--- 5 files changed, 61 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index a647d03aac6..68f32f7544c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -82,7 +82,7 @@ public class MetricsSource implements BaseSource { public void setAgeOfLastShippedOp(long timestamp, String walGroup) { long age = EnvironmentEdgeManager.currentTime() - timestamp; singleSourceSource.setLastShippedAge(age); - globalSourceSource.setLastShippedAge(age); + globalSourceSource.setLastShippedAge(Math.max(age, globalSourceSource.getLastShippedAge())); this.lastTimeStamps.put(walGroup, timestamp); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 8bf6c959a17..5f876905a5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -376,15 +376,24 @@ public class Replication extends WALActionsListener.Base implements } private void buildReplicationLoad() { - // get source - List sources = this.replicationManager.getSources(); List sourceMetricsList = new ArrayList(); + // get source + List sources = this.replicationManager.getSources(); for (ReplicationSourceInterface source : sources) { if (source instanceof ReplicationSource) { sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); } } + + // get old source + List oldSources = this.replicationManager.getOldSources(); + for (ReplicationSourceInterface source : oldSources) { + if (source instanceof ReplicationSource) { + sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); + } + } + // get sink MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 14f3fcef98c..5772dd27bd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.ArrayList; +import java.util.Map; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; @@ -66,8 +68,10 @@ public class ReplicationLoad { this.replicationLoadSink = rLoadSinkBuild.build(); // build the SourceLoad List - this.replicationLoadSourceList = new ArrayList(); + Map replicationLoadSourceMap = + new HashMap(); for (MetricsSource sm : this.sourceMetricsList) { + String peerId = sm.getPeerID(); long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp(); @@ -85,17 +89,26 @@ public class ReplicationLoad { replicationLag = 0; } + ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); + if (rLoadSource != null) { + ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp); + sizeOfLogQueue += rLoadSource.getSizeOfLogQueue(); + timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(), + timeStampOfLastShippedOp); + replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag); + } ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = ClusterStatusProtos.ReplicationLoadSource.newBuilder(); - rLoadSourceBuild.setPeerID(sm.getPeerID()); + rLoadSourceBuild.setPeerID(peerId); rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp); rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); rLoadSourceBuild.setReplicationLag(replicationLag); - this.replicationLoadSourceList.add(rLoadSourceBuild.build()); + replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build()); } - + this.replicationLoadSourceList = new ArrayList( + replicationLoadSourceMap.values()); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a6f18916881..b2f3b8da9e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -530,6 +530,9 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void closeRecoveredQueue(ReplicationSourceInterface src) { LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); + if (src instanceof ReplicationSource) { + ((ReplicationSource) src).getSourceMetrics().clear(); + } this.oldsources.remove(src); deleteSource(src.getPeerClusterZnode(), false); this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); @@ -579,6 +582,9 @@ public class ReplicationSourceManager implements ReplicationListener { } for (ReplicationSourceInterface toRemove : srcToRemove) { toRemove.terminate(terminateMessage); + if (toRemove instanceof ReplicationSource) { + ((ReplicationSource) toRemove).getSourceMetrics().clear(); + } this.sources.remove(toRemove); } deleteSource(id, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index a199d4af32d..f915dd50cb6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -719,7 +719,12 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testReplicationStatus() throws Exception { LOG.info("testReplicationStatus"); - try (Admin admin = utility1.getConnection().getAdmin()) { + try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + // Wait roll log request in setUp() to finish + Thread.sleep(5000); + + // disable peer + admin.disablePeer(PEER_ID); final byte[] qualName = Bytes.toBytes("q"); Put p; @@ -730,7 +735,8 @@ public class TestReplicationSmallTests extends TestReplicationBase { htable1.put(p); } - ClusterStatus status = admin.getClusterStatus(); + ClusterStatus status = hbaseAdmin.getClusterStatus(); + long globalSizeOfLogQueue = 0; for (JVMClusterUtil.RegionServerThread thread : utility1.getHBaseCluster().getRegionServerThreads()) { @@ -739,8 +745,9 @@ public class TestReplicationSmallTests extends TestReplicationBase { List rLoadSourceList = sl.getReplicationLoadSourceList(); ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink(); - // check SourceList has at least one entry - assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0)); + // check SourceList only has one entry + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + globalSizeOfLogQueue += rLoadSourceList.get(0).getSizeOfLogQueue(); // check Sink exist only as it is difficult to verify the value on the fly assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ", @@ -748,6 +755,21 @@ public class TestReplicationSmallTests extends TestReplicationBase { assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ", (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0)); } + + // Stop one rs + utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer"); + Thread.sleep(5000); + status = hbaseAdmin.getClusterStatus(); + ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + ServerLoad sl = status.getLoad(server); + List rLoadSourceList = sl.getReplicationLoadSourceList(); + // check SourceList only has one entry + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + // Another rs has one queue and one recovery queue from died rs + assertEquals(globalSizeOfLogQueue, rLoadSourceList.get(0).getSizeOfLogQueue()); + } finally { + utility1.getHBaseCluster().getRegionServer(1).start(); + admin.enablePeer(PEER_ID); } }