From f8474c8d4d3e722aa0129e085f6a5287c5e2be89 Mon Sep 17 00:00:00 2001 From: Vincent Date: Tue, 20 Dec 2016 16:29:40 -0800 Subject: [PATCH] HBASE-17328 Properly dispose of looped replication peers Signed-off-by: Andrew Purtell --- .../regionserver/ReplicationSource.java | 2 + .../ReplicationSourceManager.java | 14 ++++++ .../replication/TestMasterReplication.java | 46 ++++++++++++++++++- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a6fe0fb54fc..c988f8766ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -327,6 +327,8 @@ public class ReplicationSource extends Thread this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); + this.manager.closeQueue(this); + return; } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); // start workers diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index fa6f894fc4f..2c9fdcc1fbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -538,6 +538,20 @@ public class ReplicationSourceManager implements ReplicationListener { this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); } + /** + * Clear the references to the specified old source + * @param src source to clear + */ + public void closeQueue(ReplicationSourceInterface src) { + LOG.info("Done with the queue " + src.getPeerClusterZnode()); + if (src instanceof ReplicationSource) { + ((ReplicationSource) src).getSourceMetrics().clear(); + } + this.sources.remove(src); + deleteSource(src.getPeerClusterZnode(), true); + this.walsById.remove(src.getPeerClusterZnode()); + } + /** * Thie method first deletes all the recovered sources for the specified * id, then deletes the normal source (deleting all related data in ZK). diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 5b8538be4de..7ac5e94958b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -42,7 +43,10 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; @@ -69,6 +73,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.Before; @@ -168,6 +173,43 @@ public class TestMasterReplication { } } + /** + * Tests the replication scenario 0 -> 0. By default + * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the + * ReplicationSource should terminate, and no further logs should get enqueued + */ + @Test(timeout = 300000) + public void testLoopedReplication() throws Exception { + LOG.info("testLoopedReplication"); + startMiniClusters(1); + createTableOnClusters(table); + addPeer("1", 0, 0); + Thread.sleep(SLEEP_TIME); + + // wait for source to terminate + final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName(); + Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ClusterStatus clusterStatus = utilities[0].getAdmin().getClusterStatus(); + ServerLoad serverLoad = clusterStatus.getLoad(rsName); + List replicationLoadSourceList = + serverLoad.getReplicationLoadSourceList(); + return replicationLoadSourceList.size() == 0; + } + }); + + Table[] htables = getHTablesOnClusters(tableName); + putAndWait(row, famName, htables[0], htables[0]); + rollWALAndWait(utilities[0], table.getTableName(), row); + ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher(); + String queuesZnode = + ZKUtil.joinZNode(zkw.getZNodePaths().baseZNode, ZKUtil.joinZNode("replication", "rs")); + List listChildrenNoWatch = + ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString())); + assertEquals(0, listChildrenNoWatch.size()); + } + /** * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of * HFiles to a table in each cluster, checking if it's replicated. @@ -332,7 +374,7 @@ public class TestMasterReplication { shutDownMiniClusters(); } } - + /** * It tests the bulk loaded hfile replication scenario to only explicitly specified table column * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set @@ -483,7 +525,7 @@ public class TestMasterReplication { close(replicationAdmin); } } - + private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) throws Exception { ReplicationAdmin replicationAdmin = null;