HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <apurtell@apache.org>

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
This commit is contained in:
Vincent 2016-12-20 16:29:40 -08:00 committed by Andrew Purtell
parent 33002bd8e3
commit e79afbf0cb
3 changed files with 61 additions and 3 deletions

View File

@ -327,6 +327,8 @@ public class ReplicationSource extends Thread
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:" + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false); + replicationEndpoint.getClass().getName(), null, false);
this.manager.closeQueue(this);
return;
} }
LOG.info("Replicating " + clusterId + " -> " + peerClusterId); LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
// start workers // start workers

View File

@ -549,6 +549,20 @@ public class ReplicationSourceManager implements ReplicationListener {
this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
} }
/**
* Clear the references to the specified old source
* @param src source to clear
*/
public void closeQueue(ReplicationSourceInterface src) {
LOG.info("Done with the queue " + src.getPeerClusterZnode());
if (src instanceof ReplicationSource) {
((ReplicationSource) src).getSourceMetrics().clear();
}
this.sources.remove(src);
deleteSource(src.getPeerClusterZnode(), true);
this.walsById.remove(src.getPeerClusterZnode());
}
/** /**
* Thie method first deletes all the recovered sources for the specified * Thie method first deletes all the recovered sources for the specified
* id, then deletes the normal source (deleting all related data in ZK). * id, then deletes the normal source (deleting all related data in ZK).

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -42,8 +43,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
@ -65,9 +68,11 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -170,6 +175,43 @@ public class TestMasterReplication {
} }
} }
/**
* Tests the replication scenario 0 -> 0. By default
* {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
* ReplicationSource should terminate, and no further logs should get enqueued
*/
@Test(timeout = 300000)
public void testLoopedReplication() throws Exception {
LOG.info("testLoopedReplication");
startMiniClusters(1);
createTableOnClusters(table);
addPeer("1", 0, 0);
Thread.sleep(SLEEP_TIME);
// wait for source to terminate
final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus();
ServerLoad serverLoad = clusterStatus.getLoad(rsName);
List<ReplicationLoadSource> replicationLoadSourceList =
serverLoad.getReplicationLoadSourceList();
return replicationLoadSourceList.size() == 0;
}
});
Table[] htables = getHTablesOnClusters(tableName);
putAndWait(row, famName, htables[0], htables[0]);
rollWALAndWait(utilities[0], table.getTableName(), row);
ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
String queuesZnode =
ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
List<String> listChildrenNoWatch =
ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
assertEquals(0, listChildrenNoWatch.size());
}
/** /**
* It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
* HFiles to a table in each cluster, checking if it's replicated. * HFiles to a table in each cluster, checking if it's replicated.
@ -334,7 +376,7 @@ public class TestMasterReplication {
shutDownMiniClusters(); shutDownMiniClusters();
} }
} }
/** /**
* It tests the bulk loaded hfile replication scenario to only explicitly specified table column * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
* families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
@ -515,7 +557,7 @@ public class TestMasterReplication {
close(replicationAdmin); close(replicationAdmin);
} }
} }
private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
throws Exception { throws Exception {
ReplicationAdmin replicationAdmin = null; ReplicationAdmin replicationAdmin = null;