diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 4ecae96c6af..5679f491275 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -340,7 +340,9 @@ public class SplitLogManager extends ZooKeeperListener { LOG.warn("No more task remaining (ZK or task map), splitting " + "should have completed. Remaining tasks in ZK " + remainingInZK + ", active tasks in map " + actual); - return; + if (remainingInZK == 0 && actual == 0) { + return; + } } batch.wait(100); if (stopper.isStopped()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 5a995f9d148..7f6b50cb24d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -48,7 +48,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; @@ -64,10 +63,8 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; @@ -87,15 +84,10 @@ public class TestSplitLogManager { private static boolean stopped = false; private SplitLogManager slm; private Configuration conf; + private int to; private static HBaseTestingUtility TEST_UTIL; - /** - * Additional amount of time we wait for events to happen. Added where unit - * test failures have been observed. - */ - private static final int EXTRA_TOLERANCE_MS = 200; - static Stoppable stopper = new Stoppable() { @Override public void stop(String why) { @@ -131,6 +123,12 @@ public class TestSplitLogManager { // does not appear as dead from the master point of view, only from the split log pov. Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true); Mockito.when(master.getServerManager()).thenReturn(sm); + + to = 4000; + conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); + conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); + to = to + 4 * 100; } @After @@ -220,25 +218,20 @@ public class TestSplitLogManager { zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - int to = 1000; - conf.setInt("hbase.splitlog.manager.timeout", to); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - to = to + 2 * 100; - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); - waitForCounter(tot_mgr_heartbeat, 0, 1, 100); + waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); - waitForCounter(tot_mgr_resubmit, 0, 1, to + 300); + waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); - waitForCounter(tot_mgr_rescan, 0, 1, to + 100); + waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); } @Test @@ -254,12 +247,12 @@ public class TestSplitLogManager { slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); - waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); + waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); // wait for RESCAN node to be created - waitForCounter(tot_mgr_rescan, 0, 1, 500); + waitForCounter(tot_mgr_rescan, 0, 1, to/2); Task task2 = slm.findOrCreateOrphanTask(tasknode); assertTrue(task == task2); LOG.debug("task = " + task); @@ -275,11 +268,6 @@ public class TestSplitLogManager { public void testMultipleResubmits() throws Exception { LOG.info("TestMultipleResbmits - no indefinite resubmissions"); - int to = 1000; - conf.setInt("hbase.splitlog.manager.timeout", to); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - to = to + 2 * 100; - conf.setInt("hbase.splitlog.max.resubmit", 2); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); @@ -292,21 +280,21 @@ public class TestSplitLogManager { final ServerName worker3 = new ServerName("worker3,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); - waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); - waitForCounter(tot_mgr_resubmit, 0, 1, to + EXTRA_TOLERANCE_MS); + waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); + waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); slt = new SplitLogTask.Owned(worker2); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); - waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); - waitForCounter(tot_mgr_resubmit, 1, 2, to + 100); + waitForCounter(tot_mgr_heartbeat, 1, 2, to/2); + waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2); int version2 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version2 > version1); slt = new SplitLogTask.Owned(worker3); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); - waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); - waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + EXTRA_TOLERANCE_MS); - Thread.sleep(to + EXTRA_TOLERANCE_MS); + waitForCounter(tot_mgr_heartbeat, 1, 2, to/2); + waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2); + Thread.sleep(to + to/2); assertEquals(2L, tot_mgr_resubmit.get()); } @@ -314,8 +302,6 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - conf.setInt("hbase.splitlog.manager.timeout", 1000); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -325,7 +311,7 @@ public class TestSplitLogManager { final ServerName worker1 = new ServerName("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); - waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); waitForCounter(new Expr() { @Override public long eval() { @@ -339,7 +325,7 @@ public class TestSplitLogManager { slt = SplitLogTask.parseFrom(taskstate); assertTrue(slt.isUnassigned(DUMMY_MASTER)); - waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); + waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2); } @Test @@ -358,7 +344,7 @@ public class TestSplitLogManager { batch.wait(); } } - waitForCounter(tot_mgr_task_deleted, 0, 1, 1000); + waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); } @@ -381,7 +367,7 @@ public class TestSplitLogManager { batch.wait(); } } - waitForCounter(tot_mgr_task_deleted, 0, 1, 1000); + waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT); } @@ -403,7 +389,7 @@ public class TestSplitLogManager { ZKUtil.setData(zkw, tasknode, slt.toByteArray()); int version = ZKUtil.checkExists(zkw, tasknode); // Could be small race here. - if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, 1000); + if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to/2); assertEquals(tot_mgr_resubmit.get(), 1); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue("version1=" + version1 + ", version=" + version, version1 > version); @@ -425,16 +411,9 @@ public class TestSplitLogManager { zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - int to = 4000; - conf.setInt("hbase.splitlog.manager.timeout", to); - conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); - waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); - + waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); // submit another task which will stay in unassigned mode TaskBatch batch = new TaskBatch(); @@ -471,11 +450,11 @@ public class TestSplitLogManager { final ServerName worker1 = new ServerName("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); - if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); slm.handleDeadWorker(worker1); - if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, 3000); + if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2); if (tot_mgr_resubmit_dead_server_task.get() == 0) { - waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 3000); + waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); } int version1 = ZKUtil.checkExists(zkw, tasknode); @@ -497,7 +476,7 @@ public class TestSplitLogManager { SplitLogTask slt = new SplitLogTask.Owned(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); - if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.get()); @@ -528,9 +507,6 @@ public class TestSplitLogManager { public void testVanishingTaskZNode() throws Exception { LOG.info("testVanishingTaskZNode"); - int to = 1000; - conf.setInt("hbase.splitlog.manager.timeout", to); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);