HBASE-10268. TestSplitLogWorker occasionally fails
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1556916 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3cc835b7e1
commit
d39389e2e3
|
@ -54,6 +54,7 @@ import org.junit.experimental.categories.Category;
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
public class TestSplitLogWorker {
|
public class TestSplitLogWorker {
|
||||||
private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
|
private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
|
||||||
|
private static final int WAIT_TIME = 15000;
|
||||||
private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
|
private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
|
||||||
static {
|
static {
|
||||||
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
|
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
|
||||||
|
@ -140,7 +141,7 @@ public class TestSplitLogWorker {
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testAcquireTaskAtStartup() throws Exception {
|
public void testAcquireTaskAtStartup() throws Exception {
|
||||||
LOG.info("testAcquireTaskAtStartup");
|
LOG.info("testAcquireTaskAtStartup");
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
|
@ -155,7 +156,7 @@ public class TestSplitLogWorker {
|
||||||
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
|
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
|
||||||
slw.start();
|
slw.start();
|
||||||
try {
|
try {
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
|
||||||
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
|
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
assertTrue(slt.isOwned(RS));
|
assertTrue(slt.isOwned(RS));
|
||||||
|
@ -168,14 +169,14 @@ public class TestSplitLogWorker {
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
if (slw != null) {
|
if (slw != null) {
|
||||||
slw.stop();
|
slw.stop();
|
||||||
slw.worker.join(3000);
|
slw.worker.join(WAIT_TIME);
|
||||||
if (slw.worker.isAlive()) {
|
if (slw.worker.isAlive()) {
|
||||||
assertTrue(("Could not stop the worker thread slw=" + slw) == null);
|
assertTrue(("Could not stop the worker thread slw=" + slw) == null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testRaceForTask() throws Exception {
|
public void testRaceForTask() throws Exception {
|
||||||
LOG.info("testRaceForTask");
|
LOG.info("testRaceForTask");
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
|
@ -194,10 +195,11 @@ public class TestSplitLogWorker {
|
||||||
slw1.start();
|
slw1.start();
|
||||||
slw2.start();
|
slw2.start();
|
||||||
try {
|
try {
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
|
||||||
// Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
|
// Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
|
||||||
// not it, that we fell through to the next counter in line and it was set.
|
// not it, that we fell through to the next counter in line and it was set.
|
||||||
assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1500, false) ||
|
assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
|
||||||
|
WAIT_TIME, false) ||
|
||||||
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
|
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
|
||||||
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
|
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
|
@ -208,7 +210,7 @@ public class TestSplitLogWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testPreemptTask() throws Exception {
|
public void testPreemptTask() throws Exception {
|
||||||
LOG.info("testPreemptTask");
|
LOG.info("testPreemptTask");
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
|
@ -221,27 +223,27 @@ public class TestSplitLogWorker {
|
||||||
try {
|
try {
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, 5000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
|
||||||
|
|
||||||
// this time create a task node after starting the splitLogWorker
|
// this time create a task node after starting the splitLogWorker
|
||||||
zkw.getRecoverableZooKeeper().create(PATH,
|
zkw.getRecoverableZooKeeper().create(PATH,
|
||||||
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 8000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
|
||||||
assertEquals(1, slw.taskReadySeq);
|
assertEquals(1, slw.taskReadySeq);
|
||||||
byte [] bytes = ZKUtil.getData(zkw, PATH);
|
byte [] bytes = ZKUtil.getData(zkw, PATH);
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
assertTrue(slt.isOwned(SRV));
|
assertTrue(slt.isOwned(SRV));
|
||||||
slt = new SplitLogTask.Unassigned(MANAGER);
|
slt = new SplitLogTask.Unassigned(MANAGER);
|
||||||
ZKUtil.setData(zkw, PATH, slt.toByteArray());
|
ZKUtil.setData(zkw, PATH, slt.toByteArray());
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 5000);
|
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
|
||||||
} finally {
|
} finally {
|
||||||
stopSplitLogWorker(slw);
|
stopSplitLogWorker(slw);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testMultipleTasks() throws Exception {
|
public void testMultipleTasks() throws Exception {
|
||||||
LOG.info("testMultipleTasks");
|
LOG.info("testMultipleTasks");
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
|
@ -254,13 +256,13 @@ public class TestSplitLogWorker {
|
||||||
try {
|
try {
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, 5000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
|
||||||
|
|
||||||
SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
|
SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
|
||||||
zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
|
zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
|
||||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 5000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
|
||||||
// now the worker is busy doing the above task
|
// now the worker is busy doing the above task
|
||||||
|
|
||||||
// create another task
|
// create another task
|
||||||
|
@ -272,9 +274,9 @@ public class TestSplitLogWorker {
|
||||||
final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
|
final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
|
||||||
SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
|
SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
|
||||||
ZKUtil.setData(zkw, PATH1, slt.toByteArray());
|
ZKUtil.setData(zkw, PATH1, slt.toByteArray());
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 5000);
|
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 5000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
|
||||||
assertEquals(2, slw.taskReadySeq);
|
assertEquals(2, slw.taskReadySeq);
|
||||||
byte [] bytes = ZKUtil.getData(zkw, PATH2);
|
byte [] bytes = ZKUtil.getData(zkw, PATH2);
|
||||||
slt = SplitLogTask.parseFrom(bytes);
|
slt = SplitLogTask.parseFrom(bytes);
|
||||||
|
@ -284,7 +286,7 @@ public class TestSplitLogWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testRescan() throws Exception {
|
public void testRescan() throws Exception {
|
||||||
LOG.info("testRescan");
|
LOG.info("testRescan");
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
|
@ -300,25 +302,25 @@ public class TestSplitLogWorker {
|
||||||
zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
|
||||||
// now the worker is busy doing the above task
|
// now the worker is busy doing the above task
|
||||||
|
|
||||||
// preempt the task, have it owned by another worker
|
// preempt the task, have it owned by another worker
|
||||||
ZKUtil.setData(zkw, task, slt.toByteArray());
|
ZKUtil.setData(zkw, task, slt.toByteArray());
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1500);
|
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
|
||||||
|
|
||||||
// create a RESCAN node
|
// create a RESCAN node
|
||||||
String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
|
String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
|
||||||
rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT_SEQUENTIAL);
|
CreateMode.PERSISTENT_SEQUENTIAL);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1500);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
|
||||||
// RESCAN node might not have been processed if the worker became busy
|
// RESCAN node might not have been processed if the worker became busy
|
||||||
// with the above task. preempt the task again so that now the RESCAN
|
// with the above task. preempt the task again so that now the RESCAN
|
||||||
// node is processed
|
// node is processed
|
||||||
ZKUtil.setData(zkw, task, slt.toByteArray());
|
ZKUtil.setData(zkw, task, slt.toByteArray());
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, 1500);
|
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, 1500);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
|
||||||
|
|
||||||
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
|
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
|
||||||
LOG.debug(nodes);
|
LOG.debug(nodes);
|
||||||
|
@ -336,7 +338,7 @@ public class TestSplitLogWorker {
|
||||||
assertEquals(2, num);
|
assertEquals(2, num);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testAcquireMultiTasks() throws Exception {
|
public void testAcquireMultiTasks() throws Exception {
|
||||||
LOG.info("testAcquireMultiTasks");
|
LOG.info("testAcquireMultiTasks");
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
|
@ -356,7 +358,7 @@ public class TestSplitLogWorker {
|
||||||
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
|
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
|
||||||
slw.start();
|
slw.start();
|
||||||
try {
|
try {
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, 6000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
|
||||||
for (int i = 0; i < maxTasks; i++) {
|
for (int i = 0; i < maxTasks; i++) {
|
||||||
byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
|
byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
|
@ -372,7 +374,7 @@ public class TestSplitLogWorker {
|
||||||
* RS
|
* RS
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
|
public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
|
||||||
LOG.info("testAcquireMultiTasks");
|
LOG.info("testAcquireMultiTasks");
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
|
@ -401,7 +403,7 @@ public class TestSplitLogWorker {
|
||||||
slw.start();
|
slw.start();
|
||||||
try {
|
try {
|
||||||
int acquiredTasks = 0;
|
int acquiredTasks = 0;
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, 6000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
|
||||||
for (int i = 0; i < maxTasks; i++) {
|
for (int i = 0; i < maxTasks; i++) {
|
||||||
byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
|
byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
|
|
Loading…
Reference in New Issue