HBASE-3819 TestSplitLogWorker has too many SLWs running -- makes for contention and occasional failures
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1096627 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6ad14f3c73
commit
e01e57ff92
|
@ -84,6 +84,8 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3800 HMaster is not able to start due to AlreadyCreatedException
|
||||
HBASE-3806 distributed log splitting double escapes task names
|
||||
(Prakash Khemani)
|
||||
HBASE-3819 TestSplitLogWorker has too many SLWs running -- makes for
|
||||
contention and occasional failures
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||
|
|
|
@ -58,6 +58,7 @@ public class TestSplitLogWorker {
|
|||
|
||||
private void waitForCounter(AtomicLong ctr, long oldval, long newval,
|
||||
long timems) {
|
||||
assert ctr.get() == oldval;
|
||||
long curt = System.currentTimeMillis();
|
||||
long endt = curt + timems;
|
||||
while (curt < endt) {
|
||||
|
@ -72,7 +73,8 @@ public class TestSplitLogWorker {
|
|||
return;
|
||||
}
|
||||
}
|
||||
assertTrue(false);
|
||||
assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
|
||||
false);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
@ -85,7 +87,6 @@ public class TestSplitLogWorker {
|
|||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
slw = null;
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
"split-log-worker-tests", null);
|
||||
|
@ -102,13 +103,6 @@ public class TestSplitLogWorker {
|
|||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
if (slw != null) {
|
||||
slw.stop();
|
||||
slw.worker.join(3000);
|
||||
if (slw.worker.isAlive()) {
|
||||
assertTrue("could not stop the worker thread" == null);
|
||||
}
|
||||
}
|
||||
TEST_UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
|
@ -139,12 +133,27 @@ public class TestSplitLogWorker {
|
|||
TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
|
||||
slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs",
|
||||
neverEndingTask);
|
||||
SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
|
||||
"rs", neverEndingTask);
|
||||
slw.start();
|
||||
waitForCounter(tot_wkr_task_acquired, 0, 1, 100);
|
||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||
try {
|
||||
waitForCounter(tot_wkr_task_acquired, 0, 1, 100);
|
||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||
ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs"));
|
||||
} finally {
|
||||
stopSplitLogWorker(slw);
|
||||
}
|
||||
}
|
||||
|
||||
private void stopSplitLogWorker(final SplitLogWorker slw)
|
||||
throws InterruptedException {
|
||||
if (slw != null) {
|
||||
slw.stop();
|
||||
slw.worker.join(3000);
|
||||
if (slw.worker.isAlive()) {
|
||||
assertTrue(("Could not stop the worker thread slw=" + slw) == null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -167,67 +176,73 @@ public class TestSplitLogWorker {
|
|||
ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr1") ||
|
||||
TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||
ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr2"));
|
||||
slw1.stop();
|
||||
slw2.stop();
|
||||
slw1.worker.join();
|
||||
slw2.worker.join();
|
||||
stopSplitLogWorker(slw1);
|
||||
stopSplitLogWorker(slw2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreemptTask() throws Exception {
|
||||
LOG.info("testPreemptTask");
|
||||
|
||||
slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
|
||||
SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
|
||||
"tpt_svr", neverEndingTask);
|
||||
slw.start();
|
||||
Thread.yield(); // let the worker start
|
||||
Thread.sleep(100);
|
||||
try {
|
||||
Thread.yield(); // let the worker start
|
||||
Thread.sleep(100);
|
||||
|
||||
// this time create a task node after starting the splitLogWorker
|
||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
|
||||
// this time create a task node after starting the splitLogWorker
|
||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
|
||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
|
||||
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
|
||||
assertEquals(1, slw.taskReadySeq);
|
||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
|
||||
assertEquals(1, slw.taskReadySeq);
|
||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||
ZKSplitLog.getEncodedNodeName(zkw, "tpt_task")), "tpt_svr"));
|
||||
|
||||
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
|
||||
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
|
||||
TaskState.TASK_UNASSIGNED.get("manager"));
|
||||
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
||||
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
||||
} finally {
|
||||
stopSplitLogWorker(slw);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleTasks() throws Exception {
|
||||
LOG.info("testMultipleTasks");
|
||||
slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
|
||||
SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
|
||||
"tmt_svr", neverEndingTask);
|
||||
slw.start();
|
||||
Thread.yield(); // let the worker start
|
||||
Thread.sleep(100);
|
||||
try {
|
||||
Thread.yield(); // let the worker start
|
||||
Thread.sleep(100);
|
||||
|
||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
|
||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
|
||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
|
||||
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
|
||||
// now the worker is busy doing the above task
|
||||
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
|
||||
// now the worker is busy doing the above task
|
||||
|
||||
// create another task
|
||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
|
||||
// create another task
|
||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
|
||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
|
||||
// preempt the first task, have it owned by another worker
|
||||
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
|
||||
// preempt the first task, have it owned by another worker
|
||||
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
|
||||
TaskState.TASK_OWNED.get("another-worker"));
|
||||
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
||||
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
||||
|
||||
waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
|
||||
assertEquals(2, slw.taskReadySeq);
|
||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||
waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
|
||||
assertEquals(2, slw.taskReadySeq);
|
||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||
ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2")), "tmt_svr"));
|
||||
} finally {
|
||||
stopSplitLogWorker(slw);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue