HBASE-16681: Flaky TestReplicationSourceManagerZkImpl
Change-Id: I6bf31eb2f3815079d346963ad78045f67e0f44b7
This commit is contained in:
parent
a8fe9ed64f
commit
7092dc3ec4
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -57,13 +58,16 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
|
@ -521,28 +525,47 @@ public class TestReplicationSourceManager {
|
|||
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
|
||||
try {
|
||||
DummyServer server = new DummyServer();
|
||||
ReplicationQueues rq =
|
||||
final ReplicationQueues rq =
|
||||
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
|
||||
server);
|
||||
rq.init(server.getServerName().toString());
|
||||
// Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
|
||||
// initialization to throw an exception.
|
||||
conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl");
|
||||
ReplicationPeers rp = manager.getReplicationPeers();
|
||||
conf.set("replication.replicationsource.implementation",
|
||||
FailInitializeDummyReplicationSource.class.getName());
|
||||
final ReplicationPeers rp = manager.getReplicationPeers();
|
||||
// Set up the znode and ReplicationPeer for the fake peer
|
||||
rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"), null);
|
||||
rp.peerAdded("FakePeer");
|
||||
// Have ReplicationSourceManager add the fake peer. It should fail to initialize a
|
||||
// ReplicationSourceInterface.
|
||||
List<String> fakePeers = new ArrayList<>();
|
||||
fakePeers.add("FakePeer");
|
||||
manager.peerListChanged(fakePeers);
|
||||
// Create a replication queue for the fake peer
|
||||
rq.addLog("FakePeer", "FakeFile");
|
||||
// Wait for the peer to get created and connected
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return (rp.getPeer("FakePeer") != null);
|
||||
}
|
||||
});
|
||||
|
||||
// Make sure that the replication source was not initialized
|
||||
List<ReplicationSourceInterface> sources = manager.getSources();
|
||||
for (ReplicationSourceInterface source : sources) {
|
||||
assertNotEquals("FakePeer", source.getPeerClusterId());
|
||||
}
|
||||
|
||||
// Removing the peer should remove both the replication queue and the ReplicationPeer
|
||||
manager.removePeer("FakePeer");
|
||||
assertFalse(rq.getAllQueues().contains("FakePeer"));
|
||||
assertNull(rp.getPeer("FakePeer"));
|
||||
// Unregister peer, this should remove the peer and clear all queues associated with it
|
||||
// Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
|
||||
rp.removePeer("FakePeer");
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
List<String> peers = rp.getAllPeerIds();
|
||||
return (!rq.getAllQueues().contains("FakePeer"))
|
||||
&& (rp.getPeer("FakePeer") == null)
|
||||
&& (!peers.contains("FakePeer"));
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
|
||||
}
|
||||
|
@ -638,6 +661,17 @@ public class TestReplicationSourceManager {
|
|||
}
|
||||
}
|
||||
|
||||
static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
|
||||
UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
|
||||
throws IOException {
|
||||
throw new IOException("Failing deliberately");
|
||||
}
|
||||
}
|
||||
|
||||
static class DummyServer implements Server {
|
||||
String hostname;
|
||||
|
||||
|
|
Loading…
Reference in New Issue