HBASE-8106 Test to check replication log znodes move is done correctly (Himanshu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1457216 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e8b21aad62
commit
839ba8b4c5
|
@ -22,7 +22,12 @@ import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.SortedMap;
|
||||||
|
import java.util.SortedSet;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -95,6 +100,9 @@ public class TestReplicationSourceManager {
|
||||||
|
|
||||||
private static Path logDir;
|
private static Path logDir;
|
||||||
|
|
||||||
|
private static CountDownLatch latch;
|
||||||
|
|
||||||
|
private static List<String> files = new ArrayList<String>();
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
@ -215,7 +223,106 @@ public class TestReplicationSourceManager {
|
||||||
// TODO Need a case with only 2 HLogs and we only want to delete the first one
|
// TODO Need a case with only 2 HLogs and we only want to delete the first one
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeFailoverWorkerCopyQueuesFromRSUsingMulti() throws Exception {
|
||||||
|
LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
|
||||||
|
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
|
||||||
|
final Server server = new DummyServer("hostname0.example.org");
|
||||||
|
AtomicBoolean replicating = new AtomicBoolean(true);
|
||||||
|
ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating);
|
||||||
|
// populate some znodes in the peer znode
|
||||||
|
files.add("log1");
|
||||||
|
files.add("log2");
|
||||||
|
for (String file : files) {
|
||||||
|
rz.addLogToList(file, "1");
|
||||||
|
}
|
||||||
|
// create 3 DummyServers
|
||||||
|
Server s1 = new DummyServer("dummyserver1.example.org");
|
||||||
|
Server s2 = new DummyServer("dummyserver2.example.org");
|
||||||
|
Server s3 = new DummyServer("dummyserver3.example.org");
|
||||||
|
|
||||||
|
// create 3 DummyNodeFailoverWorkers
|
||||||
|
DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
|
||||||
|
server.getServerName().getServerName(), s1);
|
||||||
|
DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
|
||||||
|
server.getServerName().getServerName(), s2);
|
||||||
|
DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
|
||||||
|
server.getServerName().getServerName(), s3);
|
||||||
|
|
||||||
|
latch = new CountDownLatch(3);
|
||||||
|
// start the threads
|
||||||
|
w1.start();
|
||||||
|
w2.start();
|
||||||
|
w3.start();
|
||||||
|
// make sure only one is successful
|
||||||
|
int populatedMap = 0;
|
||||||
|
// wait for result now... till all the workers are done.
|
||||||
|
latch.await();
|
||||||
|
populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
|
||||||
|
+ w3.isLogZnodesMapPopulated();
|
||||||
|
assertEquals(1, populatedMap);
|
||||||
|
// close out the resources.
|
||||||
|
rz.close();
|
||||||
|
server.abort("", null);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class DummyNodeFailoverWorker extends Thread {
|
||||||
|
private SortedMap<String, SortedSet<String>> logZnodesMap;
|
||||||
|
Server server;
|
||||||
|
private String deadRsZnode;
|
||||||
|
ReplicationZookeeper rz;
|
||||||
|
|
||||||
|
public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
|
||||||
|
this.deadRsZnode = znode;
|
||||||
|
this.server = s;
|
||||||
|
rz = new ReplicationZookeeper(server, new AtomicBoolean(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
logZnodesMap = rz.copyQueuesFromRSUsingMulti(deadRsZnode);
|
||||||
|
rz.close();
|
||||||
|
server.abort("Done with testing", null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Got exception while running NodeFailoverWorker", e);
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return 1 when the map is not empty.
|
||||||
|
*/
|
||||||
|
private int isLogZnodesMapPopulated() {
|
||||||
|
Collection<SortedSet<String>> sets = logZnodesMap.values();
|
||||||
|
if (sets.size() > 1) {
|
||||||
|
throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
|
||||||
|
}
|
||||||
|
if (sets.size() == 1) {
|
||||||
|
SortedSet<String> s = sets.iterator().next();
|
||||||
|
for (String file : files) {
|
||||||
|
// at least one file was missing
|
||||||
|
if (!s.contains(file)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 1; // we found all the files
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class DummyServer implements Server {
|
static class DummyServer implements Server {
|
||||||
|
String hostname;
|
||||||
|
|
||||||
|
DummyServer() {
|
||||||
|
hostname = "hostname.example.org";
|
||||||
|
}
|
||||||
|
|
||||||
|
DummyServer(String hostname) {
|
||||||
|
this.hostname = hostname;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Configuration getConfiguration() {
|
public Configuration getConfiguration() {
|
||||||
|
@ -229,17 +336,17 @@ public class TestReplicationSourceManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CatalogTracker getCatalogTracker() {
|
public CatalogTracker getCatalogTracker() {
|
||||||
return null; //To change body of implemented methods use File | Settings | File Templates.
|
return null; // To change body of implemented methods use File | Settings | File Templates.
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerName getServerName() {
|
public ServerName getServerName() {
|
||||||
return new ServerName("hostname.example.org", 1234, -1L);
|
return new ServerName(hostname, 1234, -1L);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void abort(String why, Throwable e) {
|
public void abort(String why, Throwable e) {
|
||||||
//To change body of implemented methods use File | Settings | File Templates.
|
// To change body of implemented methods use File | Settings | File Templates.
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -249,15 +356,14 @@ public class TestReplicationSourceManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop(String why) {
|
public void stop(String why) {
|
||||||
//To change body of implemented methods use File | Settings | File Templates.
|
// To change body of implemented methods use File | Settings | File Templates.
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isStopped() {
|
public boolean isStopped() {
|
||||||
return false; //To change body of implemented methods use File | Settings | File Templates.
|
return false; // To change body of implemented methods use File | Settings | File Templates.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue