HBASE-7172 TestSplitLogManager.testVanishingTaskZNode() fails when run individually and is flaky
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1414973 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e6053a6160
commit
b79c62dcbd
|
@ -340,8 +340,10 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
LOG.warn("No more task remaining (ZK or task map), splitting "
|
||||
+ "should have completed. Remaining tasks in ZK " + remainingInZK
|
||||
+ ", active tasks in map " + actual);
|
||||
if (remainingInZK == 0 && actual == 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
batch.wait(100);
|
||||
if (stopper.isStopped()) {
|
||||
LOG.warn("Stopped while waiting for log splits to be completed");
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.SplitLogTask;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
|
@ -64,10 +63,8 @@ import org.apache.zookeeper.CreateMode;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
@ -87,15 +84,10 @@ public class TestSplitLogManager {
|
|||
private static boolean stopped = false;
|
||||
private SplitLogManager slm;
|
||||
private Configuration conf;
|
||||
private int to;
|
||||
|
||||
private static HBaseTestingUtility TEST_UTIL;
|
||||
|
||||
/**
|
||||
* Additional amount of time we wait for events to happen. Added where unit
|
||||
* test failures have been observed.
|
||||
*/
|
||||
private static final int EXTRA_TOLERANCE_MS = 200;
|
||||
|
||||
static Stoppable stopper = new Stoppable() {
|
||||
@Override
|
||||
public void stop(String why) {
|
||||
|
@ -131,6 +123,12 @@ public class TestSplitLogManager {
|
|||
// does not appear as dead from the master point of view, only from the split log pov.
|
||||
Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
|
||||
Mockito.when(master.getServerManager()).thenReturn(sm);
|
||||
|
||||
to = 4000;
|
||||
conf.setInt("hbase.splitlog.manager.timeout", to);
|
||||
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
|
||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||
to = to + 4 * 100;
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -220,25 +218,20 @@ public class TestSplitLogManager {
|
|||
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
|
||||
int to = 1000;
|
||||
conf.setInt("hbase.splitlog.manager.timeout", to);
|
||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||
to = to + 2 * 100;
|
||||
|
||||
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||
slm.finishInitialization();
|
||||
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
|
||||
Task task = slm.findOrCreateOrphanTask(tasknode);
|
||||
assertTrue(task.isOrphan());
|
||||
waitForCounter(tot_mgr_heartbeat, 0, 1, 100);
|
||||
waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
|
||||
assertFalse(task.isUnassigned());
|
||||
long curt = System.currentTimeMillis();
|
||||
assertTrue((task.last_update <= curt) &&
|
||||
(task.last_update > (curt - 1000)));
|
||||
LOG.info("waiting for manager to resubmit the orphan task");
|
||||
waitForCounter(tot_mgr_resubmit, 0, 1, to + 300);
|
||||
waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
|
||||
assertTrue(task.isUnassigned());
|
||||
waitForCounter(tot_mgr_rescan, 0, 1, to + 100);
|
||||
waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -254,12 +247,12 @@ public class TestSplitLogManager {
|
|||
|
||||
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||
slm.finishInitialization();
|
||||
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
|
||||
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
|
||||
Task task = slm.findOrCreateOrphanTask(tasknode);
|
||||
assertTrue(task.isOrphan());
|
||||
assertTrue(task.isUnassigned());
|
||||
// wait for RESCAN node to be created
|
||||
waitForCounter(tot_mgr_rescan, 0, 1, 500);
|
||||
waitForCounter(tot_mgr_rescan, 0, 1, to/2);
|
||||
Task task2 = slm.findOrCreateOrphanTask(tasknode);
|
||||
assertTrue(task == task2);
|
||||
LOG.debug("task = " + task);
|
||||
|
@ -275,11 +268,6 @@ public class TestSplitLogManager {
|
|||
public void testMultipleResubmits() throws Exception {
|
||||
LOG.info("TestMultipleResbmits - no indefinite resubmissions");
|
||||
|
||||
int to = 1000;
|
||||
conf.setInt("hbase.splitlog.manager.timeout", to);
|
||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||
to = to + 2 * 100;
|
||||
|
||||
conf.setInt("hbase.splitlog.max.resubmit", 2);
|
||||
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||
slm.finishInitialization();
|
||||
|
@ -292,21 +280,21 @@ public class TestSplitLogManager {
|
|||
final ServerName worker3 = new ServerName("worker3,1,1");
|
||||
SplitLogTask slt = new SplitLogTask.Owned(worker1);
|
||||
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
||||
waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
|
||||
waitForCounter(tot_mgr_resubmit, 0, 1, to + EXTRA_TOLERANCE_MS);
|
||||
waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
|
||||
waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
|
||||
int version1 = ZKUtil.checkExists(zkw, tasknode);
|
||||
assertTrue(version1 > version);
|
||||
slt = new SplitLogTask.Owned(worker2);
|
||||
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
||||
waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
|
||||
waitForCounter(tot_mgr_resubmit, 1, 2, to + 100);
|
||||
waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
|
||||
waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
|
||||
int version2 = ZKUtil.checkExists(zkw, tasknode);
|
||||
assertTrue(version2 > version1);
|
||||
slt = new SplitLogTask.Owned(worker3);
|
||||
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
||||
waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
|
||||
waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + EXTRA_TOLERANCE_MS);
|
||||
Thread.sleep(to + EXTRA_TOLERANCE_MS);
|
||||
waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
|
||||
waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
|
||||
Thread.sleep(to + to/2);
|
||||
assertEquals(2L, tot_mgr_resubmit.get());
|
||||
}
|
||||
|
||||
|
@ -314,8 +302,6 @@ public class TestSplitLogManager {
|
|||
public void testRescanCleanup() throws Exception {
|
||||
LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
|
||||
|
||||
conf.setInt("hbase.splitlog.manager.timeout", 1000);
|
||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||
slm.finishInitialization();
|
||||
TaskBatch batch = new TaskBatch();
|
||||
|
@ -325,7 +311,7 @@ public class TestSplitLogManager {
|
|||
final ServerName worker1 = new ServerName("worker1,1,1");
|
||||
SplitLogTask slt = new SplitLogTask.Owned(worker1);
|
||||
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
||||
waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
|
||||
waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
|
||||
waitForCounter(new Expr() {
|
||||
@Override
|
||||
public long eval() {
|
||||
|
@ -339,7 +325,7 @@ public class TestSplitLogManager {
|
|||
slt = SplitLogTask.parseFrom(taskstate);
|
||||
assertTrue(slt.isUnassigned(DUMMY_MASTER));
|
||||
|
||||
waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
|
||||
waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -358,7 +344,7 @@ public class TestSplitLogManager {
|
|||
batch.wait();
|
||||
}
|
||||
}
|
||||
waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
|
||||
waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
|
||||
assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
|
||||
}
|
||||
|
||||
|
@ -381,7 +367,7 @@ public class TestSplitLogManager {
|
|||
batch.wait();
|
||||
}
|
||||
}
|
||||
waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
|
||||
waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
|
||||
assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
|
||||
conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT);
|
||||
}
|
||||
|
@ -403,7 +389,7 @@ public class TestSplitLogManager {
|
|||
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
||||
int version = ZKUtil.checkExists(zkw, tasknode);
|
||||
// Could be small race here.
|
||||
if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
|
||||
if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
|
||||
assertEquals(tot_mgr_resubmit.get(), 1);
|
||||
int version1 = ZKUtil.checkExists(zkw, tasknode);
|
||||
assertTrue("version1=" + version1 + ", version=" + version, version1 > version);
|
||||
|
@ -425,16 +411,9 @@ public class TestSplitLogManager {
|
|||
zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
|
||||
int to = 4000;
|
||||
conf.setInt("hbase.splitlog.manager.timeout", to);
|
||||
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
|
||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||
|
||||
|
||||
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||
slm.finishInitialization();
|
||||
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
|
||||
|
||||
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
|
||||
|
||||
// submit another task which will stay in unassigned mode
|
||||
TaskBatch batch = new TaskBatch();
|
||||
|
@ -471,11 +450,11 @@ public class TestSplitLogManager {
|
|||
final ServerName worker1 = new ServerName("worker1,1,1");
|
||||
SplitLogTask slt = new SplitLogTask.Owned(worker1);
|
||||
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
||||
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
|
||||
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
|
||||
slm.handleDeadWorker(worker1);
|
||||
if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, 3000);
|
||||
if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
|
||||
if (tot_mgr_resubmit_dead_server_task.get() == 0) {
|
||||
waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 3000);
|
||||
waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
|
||||
}
|
||||
|
||||
int version1 = ZKUtil.checkExists(zkw, tasknode);
|
||||
|
@ -497,7 +476,7 @@ public class TestSplitLogManager {
|
|||
|
||||
SplitLogTask slt = new SplitLogTask.Owned(worker1);
|
||||
ZKUtil.setData(zkw, tasknode, slt.toByteArray());
|
||||
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
|
||||
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
|
||||
|
||||
// Not yet resubmitted.
|
||||
Assert.assertEquals(0, tot_mgr_resubmit.get());
|
||||
|
@ -528,9 +507,6 @@ public class TestSplitLogManager {
|
|||
public void testVanishingTaskZNode() throws Exception {
|
||||
LOG.info("testVanishingTaskZNode");
|
||||
|
||||
int to = 1000;
|
||||
conf.setInt("hbase.splitlog.manager.timeout", to);
|
||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
|
||||
|
||||
slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
|
||||
|
|
Loading…
Reference in New Issue