HBASE-16096 Backport. Cleanly remove replication peers from ZooKeeper.

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Joseph Hwang 2016-06-30 15:18:33 -07:00 committed by Elliott Clark
parent 7983d2f45b
commit d597e29ec5
2 changed files with 50 additions and 3 deletions

View File

@ -573,9 +573,10 @@ public class ReplicationSourceManager implements ReplicationListener {
srcToRemove.add(src);
}
}
if (srcToRemove.size() == 0) {
LOG.error("The queue we wanted to close is missing " + id);
return;
if (srcToRemove.isEmpty()) {
LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " +
"This could mean that ReplicationSourceInterface initialization failed for this peer " +
"and that replication on this peer may not be caught up. peerId=" + id);
}
for (ReplicationSourceInterface toRemove : srcToRemove) {
toRemove.terminate(terminateMessage);
@ -752,6 +753,12 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.fs;
}
/**
* Get the ReplicationPeers used by this ReplicationSourceManager
* @return the ReplicationPeers used by this ReplicationSourceManager
*/
public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
/**
* Get a string representation of all the sources' metrics
*/

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@ -494,6 +495,45 @@ public class TestReplicationSourceManager {
scopes.containsKey(f2));
}
/**
* Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
* corresponding ReplicationSourceInterface correctly cleans up the corresponding
* replication queue and ReplicationPeer.
* See HBASE-16096.
* @throws Exception
*/
@Test
public void testPeerRemovalCleanup() throws Exception{
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
try {
DummyServer server = new DummyServer();
ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
rq.init(server.getServerName().toString());
// Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
// initialization to throw an exception.
conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl");
ReplicationPeers rp = manager.getReplicationPeers();
// Set up the znode and ReplicationPeer for the fake peer
rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"), null);
rp.peerAdded("FakePeer");
// Have ReplicationSourceManager add the fake peer. It should fail to initialize a
// ReplicationSourceInterface.
List<String> fakePeers = new ArrayList<>();
fakePeers.add("FakePeer");
manager.peerListChanged(fakePeers);
// Create a replication queue for the fake peer
rq.addLog("FakePeer", "FakeFile");
// Removing the peer should remove both the replication queue and the ReplicationPeer
manager.removePeer("FakePeer");
assertFalse(rq.getAllQueues().contains("FakePeer"));
assertNull(rp.getPeer("FakePeer"));
} finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
}
}
private WALEdit getBulkLoadWALEdit() {
// 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);