diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index 008b89a608f..08980b25b05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -54,6 +54,7 @@ import org.junit.experimental.categories.Category; @Category(MediumTests.class) public class TestSplitLogWorker { private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class); + private static final int WAIT_TIME = 15000; private final ServerName MANAGER = ServerName.valueOf("manager,1,1"); static { Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); @@ -140,7 +141,7 @@ public class TestSplitLogWorker { }; - @Test + @Test(timeout=60000) public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); SplitLogCounters.resetCounters(); @@ -155,7 +156,7 @@ public class TestSplitLogWorker { new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(RS)); @@ -168,14 +169,14 @@ public class TestSplitLogWorker { throws InterruptedException { if (slw != null) { slw.stop(); - slw.worker.join(3000); + slw.worker.join(WAIT_TIME); if (slw.worker.isAlive()) { assertTrue(("Could not stop the worker thread slw=" + slw) == null); } } } - @Test + @Test(timeout=60000) public void testRaceForTask() throws Exception { LOG.info("testRaceForTask"); SplitLogCounters.resetCounters(); @@ -194,11 +195,12 @@ public class TestSplitLogWorker { slw1.start(); slw2.start(); try { - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if // not it, that we fell through to the next counter in line and it was set. - assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1500, false) || - SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1); + assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, + WAIT_TIME, false) || + SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2)); @@ -208,7 +210,7 @@ public class TestSplitLogWorker { } } - @Test + @Test(timeout=60000) public void testPreemptTask() throws Exception { LOG.info("testPreemptTask"); SplitLogCounters.resetCounters(); @@ -221,27 +223,27 @@ public class TestSplitLogWorker { try { Thread.yield(); // let the worker start Thread.sleep(1000); - waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, 5000); + waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); // this time create a task node after starting the splitLogWorker zkw.getRecoverableZooKeeper().create(PATH, new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 8000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); assertEquals(1, slw.taskReadySeq); byte [] bytes = ZKUtil.getData(zkw, PATH); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(SRV)); slt = new SplitLogTask.Unassigned(MANAGER); ZKUtil.setData(zkw, PATH, slt.toByteArray()); - waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 5000); + waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); } finally { stopSplitLogWorker(slw); } } - @Test + @Test(timeout=60000) public void testMultipleTasks() throws Exception { LOG.info("testMultipleTasks"); SplitLogCounters.resetCounters(); @@ -254,13 +256,13 @@ public class TestSplitLogWorker { try { Thread.yield(); // let the worker start Thread.sleep(100); - waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, 5000); + waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER); zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 5000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); // now the worker is busy doing the above task // create another task @@ -272,9 +274,9 @@ public class TestSplitLogWorker { final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1"); SplitLogTask slt = new SplitLogTask.Owned(anotherWorker); ZKUtil.setData(zkw, PATH1, slt.toByteArray()); - waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 5000); + waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 5000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); assertEquals(2, slw.taskReadySeq); byte [] bytes = ZKUtil.getData(zkw, PATH2); slt = SplitLogTask.parseFrom(bytes); @@ -284,7 +286,7 @@ public class TestSplitLogWorker { } } - @Test + @Test(timeout=60000) public void testRescan() throws Exception { LOG.info("testRescan"); SplitLogCounters.resetCounters(); @@ -300,25 +302,25 @@ public class TestSplitLogWorker { zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); // now the worker is busy doing the above task // preempt the task, have it owned by another worker ZKUtil.setData(zkw, task, slt.toByteArray()); - waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1500); + waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); // create a RESCAN node String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"); rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1500); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); // RESCAN node might not have been processed if the worker became busy // with the above task. preempt the task again so that now the RESCAN // node is processed ZKUtil.setData(zkw, task, slt.toByteArray()); - waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, 1500); - waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, 1500); + waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME); List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); LOG.debug(nodes); @@ -336,7 +338,7 @@ public class TestSplitLogWorker { assertEquals(2, num); } - @Test + @Test(timeout=60000) public void testAcquireMultiTasks() throws Exception { LOG.info("testAcquireMultiTasks"); SplitLogCounters.resetCounters(); @@ -356,7 +358,7 @@ public class TestSplitLogWorker { SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); slw.start(); try { - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, 6000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); for (int i = 0; i < maxTasks; i++) { byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); SplitLogTask slt = SplitLogTask.parseFrom(bytes); @@ -372,7 +374,7 @@ public class TestSplitLogWorker { * RS * @throws Exception */ - @Test + @Test(timeout=60000) public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception { LOG.info("testAcquireMultiTasks"); SplitLogCounters.resetCounters(); @@ -401,7 +403,7 @@ public class TestSplitLogWorker { slw.start(); try { int acquiredTasks = 0; - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, 6000); + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME); for (int i = 0; i < maxTasks; i++) { byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); SplitLogTask slt = SplitLogTask.parseFrom(bytes);