HBASE-16096 Properly remove the replication queue and peer znodes after calling ReplicationSourceManager.removePeer().
Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
parent
03fe257a64
commit
744248c131
|
@ -561,9 +561,10 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
srcToRemove.add(src);
|
||||
}
|
||||
}
|
||||
if (srcToRemove.size() == 0) {
|
||||
LOG.error("The queue we wanted to close is missing " + id);
|
||||
return;
|
||||
if (srcToRemove.isEmpty()) {
|
||||
LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " +
|
||||
"This could mean that ReplicationSourceInterface initialization failed for this peer " +
|
||||
"and that replication on this peer may not be caught up. peerId=" + id);
|
||||
}
|
||||
for (ReplicationSourceInterface toRemove : srcToRemove) {
|
||||
toRemove.terminate(terminateMessage);
|
||||
|
@ -739,8 +740,6 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Get the directory where wals are archived
|
||||
* @return the directory where wals are archived
|
||||
|
@ -765,6 +764,12 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
return this.fs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the ReplicationPeers used by this ReplicationSourceManager
|
||||
* @return the ReplicationPeers used by this ReplicationSourceManager
|
||||
*/
|
||||
public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
|
||||
|
||||
/**
|
||||
* Get a string representation of all the sources' metrics
|
||||
*/
|
||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
|
@ -424,6 +425,45 @@ public abstract class TestReplicationSourceManager {
|
|||
scopes.containsKey(f2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
|
||||
* corresponding ReplicationSourceInterface correctly cleans up the corresponding
|
||||
* replication queue and ReplicationPeer.
|
||||
* See HBASE-16096.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testPeerRemovalCleanup() throws Exception{
|
||||
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
|
||||
try {
|
||||
DummyServer server = new DummyServer();
|
||||
ReplicationQueues rq =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
|
||||
server.getConfiguration(), server, server.getZooKeeper()));
|
||||
rq.init(server.getServerName().toString());
|
||||
// Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
|
||||
// initialization to throw an exception.
|
||||
conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl");
|
||||
ReplicationPeers rp = manager.getReplicationPeers();
|
||||
// Set up the znode and ReplicationPeer for the fake peer
|
||||
rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
|
||||
rp.peerConnected("FakePeer");
|
||||
// Have ReplicationSourceManager add the fake peer. It should fail to initialize a
|
||||
// ReplicationSourceInterface.
|
||||
List<String> fakePeers = new ArrayList<>();
|
||||
fakePeers.add("FakePeer");
|
||||
manager.peerListChanged(fakePeers);
|
||||
// Create a replication queue for the fake peer
|
||||
rq.addLog("FakePeer", "FakeFile");
|
||||
// Removing the peer should remove both the replication queue and the ReplicationPeer
|
||||
manager.removePeer("FakePeer");
|
||||
assertFalse(rq.getAllQueues().contains("FakePeer"));
|
||||
assertNull(rp.getConnectedPeer("FakePeer"));
|
||||
} finally {
|
||||
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
|
||||
}
|
||||
}
|
||||
|
||||
private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
|
||||
// 1. Create store files for the families
|
||||
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
|
||||
|
|
Loading…
Reference in New Issue