HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Vincent 2016-12-20 16:29:40 -08:00 committed by Andrew Purtell
parent 06b67a632c
commit f8474c8d4d
3 changed files with 60 additions and 2 deletions

View File

@ -327,6 +327,8 @@ public class ReplicationSource extends Thread
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.closeQueue(this);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
// start workers

View File

@ -538,6 +538,20 @@ public class ReplicationSourceManager implements ReplicationListener {
this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
}
/**
* Clear the references to the specified old source
* @param src source to clear
*/
public void closeQueue(ReplicationSourceInterface src) {
LOG.info("Done with the queue " + src.getPeerClusterZnode());
if (src instanceof ReplicationSource) {
((ReplicationSource) src).getSourceMetrics().clear();
}
this.sources.remove(src);
deleteSource(src.getPeerClusterZnode(), true);
this.walsById.remove(src.getPeerClusterZnode());
}
/**
* Thie method first deletes all the recovered sources for the specified
* id, then deletes the normal source (deleting all related data in ZK).

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -42,7 +43,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
@ -69,6 +73,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.Before;
@ -168,6 +173,43 @@ public class TestMasterReplication {
}
}
/**
* Tests the replication scenario 0 -> 0. By default
* {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
* ReplicationSource should terminate, and no further logs should get enqueued
*/
@Test(timeout = 300000)
public void testLoopedReplication() throws Exception {
LOG.info("testLoopedReplication");
startMiniClusters(1);
createTableOnClusters(table);
addPeer("1", 0, 0);
Thread.sleep(SLEEP_TIME);
// wait for source to terminate
final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ClusterStatus clusterStatus = utilities[0].getAdmin().getClusterStatus();
ServerLoad serverLoad = clusterStatus.getLoad(rsName);
List<ReplicationLoadSource> replicationLoadSourceList =
serverLoad.getReplicationLoadSourceList();
return replicationLoadSourceList.size() == 0;
}
});
Table[] htables = getHTablesOnClusters(tableName);
putAndWait(row, famName, htables[0], htables[0]);
rollWALAndWait(utilities[0], table.getTableName(), row);
ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
String queuesZnode =
ZKUtil.joinZNode(zkw.getZNodePaths().baseZNode, ZKUtil.joinZNode("replication", "rs"));
List<String> listChildrenNoWatch =
ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
assertEquals(0, listChildrenNoWatch.size());
}
/**
* It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
* HFiles to a table in each cluster, checking if it's replicated.
@ -332,7 +374,7 @@ public class TestMasterReplication {
shutDownMiniClusters();
}
}
/**
* It tests the bulk loaded hfile replication scenario to only explicitly specified table column
* families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
@ -483,7 +525,7 @@ public class TestMasterReplication {
close(replicationAdmin);
}
}
private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
throws Exception {
ReplicationAdmin replicationAdmin = null;