HBASE-16681: Flaky TestReplicationSourceManagerZkImpl

This commit is contained in:
Ashu Pachauri 2016-09-23 16:04:08 -07:00 committed by Apekshit Sharma
parent 97c1333831
commit 2c7211ec4b
1 changed files with 46 additions and 13 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -56,7 +57,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@ -64,11 +67,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -437,28 +442,45 @@ public abstract class TestReplicationSourceManager {
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
try {
DummyServer server = new DummyServer();
ReplicationQueues rq =
final ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
server.getConfiguration(), server, server.getZooKeeper()));
rq.init(server.getServerName().toString());
// Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
// initialization to throw an exception.
conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl");
ReplicationPeers rp = manager.getReplicationPeers();
conf.set("replication.replicationsource.implementation",
FailInitializeDummyReplicationSource.class.getName());
final ReplicationPeers rp = manager.getReplicationPeers();
// Set up the znode and ReplicationPeer for the fake peer
rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
rp.peerConnected("FakePeer");
// Have ReplicationSourceManager add the fake peer. It should fail to initialize a
// ReplicationSourceInterface.
List<String> fakePeers = new ArrayList<>();
fakePeers.add("FakePeer");
manager.peerListChanged(fakePeers);
// Wait for the peer to get created and connected
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return (rp.getConnectedPeer("FakePeer") != null);
}
});
// Make sure that the replication source was not initialized
List<ReplicationSourceInterface> sources = manager.getSources();
for (ReplicationSourceInterface source : sources) {
assertNotEquals("FakePeer", source.getPeerClusterId());
}
// Create a replication queue for the fake peer
rq.addLog("FakePeer", "FakeFile");
// Removing the peer should remove both the replication queue and the ReplicationPeer
manager.removePeer("FakePeer");
assertFalse(rq.getAllQueues().contains("FakePeer"));
assertNull(rp.getConnectedPeer("FakePeer"));
// Unregister peer, this should remove the peer and clear all queues associated with it
// Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
rp.unregisterPeer("FakePeer");
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!rq.getAllQueues().contains("FakePeer"))
&& (rp.getConnectedPeer("FakePeer") == null)
&& (!peers.contains("FakePeer"));
}
});
} finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
}
@ -553,6 +575,17 @@ public abstract class TestReplicationSourceManager {
}
}
static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
throws IOException {
throw new IOException("Failing deliberately");
}
}
static class DummyServer implements Server {
String hostname;