From 961337aadc3205e875e345b39da02aabb34c921e Mon Sep 17 00:00:00 2001 From: Ashu Pachauri Date: Thu, 25 May 2017 18:24:38 -0700 Subject: [PATCH] HBASE-18092: Removing a peer does not properly clean up the ReplicationSourceManager state and metrics Signed-off-by: tedyu --- .../regionserver/MetricsSource.java | 6 +- .../replication/regionserver/Replication.java | 4 +- .../regionserver/ReplicationSource.java | 5 +- .../ReplicationSourceInterface.java | 7 + .../ReplicationSourceManager.java | 27 ++-- .../replication/ReplicationSourceDummy.java | 8 ++ .../TestReplicationSourceManager.java | 123 ++++++++++++++---- 7 files changed, 132 insertions(+), 48 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index e8085701846..2d99018046d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -41,7 +41,6 @@ public class MetricsSource implements BaseSource { // tracks last shipped timestamp for each wal group private Map lastTimeStamps = new HashMap(); - private int lastQueueSize = 0; private long lastHFileRefsQueueSize = 0; private String id; @@ -182,11 +181,12 @@ public class MetricsSource implements BaseSource { /** Removes all metrics about this Source. */ public void clear() { - singleSourceSource.clear(); + int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); globalSourceSource.decrSizeOfLogQueue(lastQueueSize); + singleSourceSource.decrSizeOfLogQueue(lastQueueSize); + singleSourceSource.clear(); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); lastTimeStamps.clear(); - lastQueueSize = 0; lastHFileRefsQueueSize = 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 991eb2fac1a..b2b403ba58c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -451,9 +451,7 @@ public class Replication extends WALActionsListener.Base implements // get source List sources = this.replicationManager.getSources(); for (ReplicationSourceInterface source : sources) { - if (source instanceof ReplicationSource) { - sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); - } + sourceMetricsList.add(source.getSourceMetrics()); } // get old source diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 0d52bbef353..65ea4228fcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -490,10 +490,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf return sb.toString(); } - /** - * Get Replication Source Metrics - * @return sourceMetrics - */ + @Override public MetricsSource getSourceMetrics() { return this.metrics; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 8d5451c6144..e7569edf80b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -120,4 +120,11 @@ public interface ReplicationSourceInterface { void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException; + /** + * Get the associated metrics. + * + * @return The metrics for this source. + */ + MetricsSource getSourceMetrics(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 5b22d0466a7..ed2aa1d319a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -370,6 +370,20 @@ public class ReplicationSourceManager implements ReplicationListener { return this.oldsources; } + /** + * Get the normal source for a given peer + * @param peerId + * @return the normal source for the give peer if it exists, otherwise null. + */ + public ReplicationSourceInterface getSource(String peerId) { + for (ReplicationSourceInterface source: getSources()) { + if (source.getPeerClusterId().equals(peerId)) { + return source; + } + } + return null; + } + @VisibleForTesting List getAllQueues() { return replicationQueues.getAllQueues(); @@ -549,9 +563,7 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void closeRecoveredQueue(ReplicationSourceInterface src) { LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); - if (src instanceof ReplicationSource) { - ((ReplicationSource) src).getSourceMetrics().clear(); - } + src.getSourceMetrics().clear(); this.oldsources.remove(src); deleteSource(src.getPeerClusterZnode(), false); this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); @@ -563,9 +575,7 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void closeQueue(ReplicationSourceInterface src) { LOG.info("Done with the queue " + src.getPeerClusterZnode()); - if (src instanceof ReplicationSource) { - ((ReplicationSource) src).getSourceMetrics().clear(); - } + src.getSourceMetrics().clear(); this.sources.remove(src); deleteSource(src.getPeerClusterZnode(), true); this.walsById.remove(src.getPeerClusterZnode()); @@ -615,10 +625,7 @@ public class ReplicationSourceManager implements ReplicationListener { } for (ReplicationSourceInterface toRemove : srcToRemove) { toRemove.terminate(terminateMessage); - if (toRemove instanceof ReplicationSource) { - ((ReplicationSource) toRemove).getSourceMetrics().clear(); - } - this.sources.remove(toRemove); + closeQueue(toRemove); } deleteSource(id, true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 57e54d77cc5..ad8c52f4f2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -40,6 +40,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { ReplicationSourceManager manager; String peerClusterId; Path currentPath; + MetricsSource metrics; @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, @@ -49,11 +50,13 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { this.manager = manager; this.peerClusterId = peerClusterId; + this.metrics = metrics; } @Override public void enqueueLog(Path log) { this.currentPath = log; + metrics.incrSizeOfLogQueue(); } @Override @@ -98,4 +101,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { throws ReplicationException { return; } + + @Override + public MetricsSource getSourceMetrics() { + return metrics; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 96228e57c6e..b042a577563 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -20,13 +20,14 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import com.google.common.collect.Sets; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; @@ -523,6 +524,9 @@ public class TestReplicationSourceManager { @Test public void testPeerRemovalCleanup() throws Exception{ String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); + final String peerId = "FakePeer"; + final ReplicationPeerConfig peerConfig = + new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"); try { DummyServer server = new DummyServer(); final ReplicationQueues rq = @@ -535,42 +539,105 @@ public class TestReplicationSourceManager { FailInitializeDummyReplicationSource.class.getName()); final ReplicationPeers rp = manager.getReplicationPeers(); // Set up the znode and ReplicationPeer for the fake peer - rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase")); - // Wait for the peer to get created and connected - Waiter.waitFor(conf, 20000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return (rp.getPeer("FakePeer") != null); - } - }); + // Don't wait for replication source to initialize, we know it won't. + addPeerAndWait(peerId, peerConfig, false); - // Make sure that the replication source was not initialized - List sources = manager.getSources(); - for (ReplicationSourceInterface source : sources) { - assertNotEquals("FakePeer", source.getPeerClusterId()); - } + // Sanity check + assertNull(manager.getSource(peerId)); - // Removing the peer should remove both the replication queue and the ReplicationPeer - manager.removePeer("FakePeer"); - assertFalse(rq.getAllQueues().contains("FakePeer")); - assertNull(rp.getPeer("FakePeer")); + + // Create a replication queue for the fake peer + rq.addLog(peerId, "FakeFile"); // Unregister peer, this should remove the peer and clear all queues associated with it // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. - rp.removePeer("FakePeer"); - Waiter.waitFor(conf, 20000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - List peers = rp.getAllPeerIds(); - return (!rq.getAllQueues().contains("FakePeer")) - && (rp.getPeer("FakePeer") == null) - && (!peers.contains("FakePeer")); - } - }); + removePeerAndWait(peerId); + assertFalse(rq.getAllQueues().contains(peerId)); } finally { conf.set("replication.replicationsource.implementation", replicationSourceImplName); + removePeerAndWait(peerId); } } + @Test + public void testRemovePeerMetricsCleanup() throws Exception { + final String peerId = "DummyPeer"; + final ReplicationPeerConfig peerConfig = + new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"); + try { + addPeerAndWait(peerId, peerConfig, true); + + ReplicationSourceInterface source = manager.getSource(peerId); + // Sanity check + assertNotNull(source); + // Retrieve the global replication metrics source + Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); + f.setAccessible(true); + MetricsReplicationSourceSource globalSource = + (MetricsReplicationSourceSource)f.get(source.getSourceMetrics()); + int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); + + // Enqueue log and check if metrics updated + source.enqueueLog(new Path("abc")); + assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + + // Removing the peer should reset the global metrics + removePeerAndWait(peerId); + assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + + // Adding the same peer back again should reset the single source metrics + addPeerAndWait(peerId, peerConfig, true); + source = manager.getSource(peerId); + assertNotNull(source); + assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + } finally { + removePeerAndWait(peerId); + } + } + + /** + * Add a peer and wait for it to initialize + * @param peerId + * @param peerConfig + * @param waitForSource Whether to wait for replication source to initialize + * @throws Exception + */ + private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, + final boolean waitForSource) throws Exception { + final ReplicationPeers rp = manager.getReplicationPeers(); + rp.addPeer(peerId, peerConfig); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + if (waitForSource) { + return (manager.getSource(peerId) != null); + } else { + return (rp.getPeer(peerId) != null); + } + } + }); + } + + /** + * Remove a peer and wait for it to get cleaned up + * @param peerId + * @throws Exception + */ + private void removePeerAndWait(final String peerId) throws Exception { + final ReplicationPeers rp = manager.getReplicationPeers(); + if (rp.getAllPeerIds().contains(peerId)) { + rp.removePeer(peerId); + } + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + List peers = rp.getAllPeerIds(); + return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null) + && (!peers.contains(peerId)); + } + }); + } + + private WALEdit getBulkLoadWALEdit() { // 1. Create store files for the families Map> storeFiles = new HashMap<>(1);