diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 2c993ca2fff..b6f1c38c83c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -110,7 +110,7 @@ public class MasterFileSystem { conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true); if (this.distributedLogSplitting) { this.splitLogManager = new SplitLogManager(master.getZooKeeper(), - master.getConfiguration(), master, master.getServerName()); + master.getConfiguration(), master, services, master.getServerName()); this.splitLogManager.finishInitialization(masterRecovery); } else { this.splitLogManager = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 3b03e98e57a..4ecae96c6af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -98,12 +98,13 @@ import org.apache.zookeeper.data.Stat; public class SplitLogManager extends ZooKeeperListener { private static final Log LOG = LogFactory.getLog(SplitLogManager.class); - public static final int DEFAULT_TIMEOUT = 25000; // 25 sec + public static final int DEFAULT_TIMEOUT = 120000; public static final int DEFAULT_ZK_RETRIES = 3; public static final int DEFAULT_MAX_RESUBMIT = 3; public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min private final Stoppable stopper; + private final MasterServices master; private final ServerName serverName; private final TaskFinisher taskFinisher; private FileSystem fs; @@ -116,11 +117,11 @@ public class SplitLogManager extends ZooKeeperListener { private long lastNodeCreateTime = Long.MAX_VALUE; public boolean ignoreZKDeleteForTesting = false; - private ConcurrentMap tasks = new ConcurrentHashMap(); + private final ConcurrentMap tasks = new ConcurrentHashMap(); private TimeoutMonitor timeoutMonitor; private volatile Set deadWorkers = null; - private Object deadWorkersLock = new Object(); + private final Object deadWorkersLock = new Object(); /** * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration, @@ -135,8 +136,8 @@ public class SplitLogManager extends ZooKeeperListener { * @param serverName */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, - Stoppable stopper, ServerName serverName) { - this(zkw, conf, stopper, serverName, new TaskFinisher() { + Stoppable stopper, MasterServices master, ServerName serverName) { + this(zkw, conf, stopper, master, serverName, new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { try { @@ -162,18 +163,19 @@ public class SplitLogManager extends ZooKeeperListener { * @param tf task finisher */ public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, - Stoppable stopper, ServerName serverName, TaskFinisher tf) { + Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf) { super(zkw); this.taskFinisher = tf; this.conf = conf; this.stopper = stopper; + this.master = master; this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); - LOG.debug("timeout = " + timeout); - LOG.debug("unassigned timeout = " + unassignedTimeout); + LOG.info("timeout = " + timeout); + LOG.info("unassigned timeout = " + unassignedTimeout); this.serverName = serverName; this.timeoutMonitor = @@ -551,8 +553,18 @@ public class SplitLogManager extends ZooKeeperListener { } int version; if (directive != FORCE) { - if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) < - timeout) { + // We're going to resubmit: + // 1) immediately if the worker server is now marked as dead + // 2) after a configurable timeout if the server is not marked as dead but has still not + // finished the task. This allows to continue if the worker cannot actually handle it, + // for any reason. + final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update; + final boolean alive = master.getServerManager() != null ? + master.getServerManager().isServerOnline(task.cur_worker_name) : true; + if (alive && time < timeout) { + LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " + + task.cur_worker_name + " is not marked as dead, we waited for " + time + + " while the timeout is " + timeout); return false; } if (task.unforcedResubmits >= resubmit_threshold) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 8dd79945396..fa69309ebbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; @@ -64,15 +65,20 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; @Category(MediumTests.class) public class TestSplitLogManager { private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); private final ServerName DUMMY_MASTER = new ServerName("dummy-master,1,1"); + private final ServerManager sm = Mockito.mock(ServerManager.class); + private final MasterServices master = Mockito.mock(MasterServices.class); + static { Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); } @@ -103,14 +109,6 @@ public class TestSplitLogManager { }; - @BeforeClass - public static void setUpBeforeClass() throws Exception { - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - @Before public void setup() throws Exception { TEST_UTIL = new HBaseTestingUtility(); @@ -128,6 +126,11 @@ public class TestSplitLogManager { stopped = false; resetCounters(); + + // By default, we let the test manage the error as before, so the server + // does not appear as dead from the master point of view, only from the split log pov. + Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true); + Mockito.when(master.getServerManager()).thenReturn(sm); } @After @@ -194,8 +197,9 @@ public class TestSplitLogManager { */ @Test public void testTaskCreation() throws Exception { + LOG.info("TestTaskCreation - test the creation of a task in zk"); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -221,7 +225,7 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); to = to + 2 * 100; - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); Task task = slm.findOrCreateOrphanTask(tasknode); @@ -248,7 +252,7 @@ public class TestSplitLogManager { CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); Task task = slm.findOrCreateOrphanTask(tasknode); @@ -277,7 +281,7 @@ public class TestSplitLogManager { to = to + 2 * 100; conf.setInt("hbase.splitlog.max.resubmit", 2); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -312,7 +316,7 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.timeout", 1000); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -328,26 +332,21 @@ public class TestSplitLogManager { return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get()); } }, 0, 1, 5*60000); // wait long enough - if (tot_mgr_resubmit_failed.get() == 0) { - int version1 = ZKUtil.checkExists(zkw, tasknode); - assertTrue(version1 > version); - byte[] taskstate = ZKUtil.getData(zkw, tasknode); - slt = SplitLogTask.parseFrom(taskstate); - assertTrue(slt.isUnassigned(DUMMY_MASTER)); - - waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); - } else { - LOG.warn("Could not run test. Lost ZK connection?"); - } + Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get()); + int version1 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version1 > version); + byte[] taskstate = ZKUtil.getData(zkw, tasknode); + slt = SplitLogTask.parseFrom(taskstate); + assertTrue(slt.isUnassigned(DUMMY_MASTER)); - return; + waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); } @Test public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -368,7 +367,7 @@ public class TestSplitLogManager { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -376,6 +375,7 @@ public class TestSplitLogManager { final ServerName worker1 = new ServerName("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Err(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); + synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); @@ -390,7 +390,7 @@ public class TestSplitLogManager { public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(tot_mgr_resubmit.get(), 0); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); assertEquals(tot_mgr_resubmit.get(), 0); TaskBatch batch = new TaskBatch(); @@ -431,7 +431,7 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); @@ -462,7 +462,7 @@ public class TestSplitLogManager { LOG.info("testDeadWorker"); conf.setLong("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -486,10 +486,35 @@ public class TestSplitLogManager { return; } + @Test + public void testWorkerCrash() throws Exception { + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + + String tasknode = submitTaskAndWait(batch, "foo/1"); + final ServerName worker1 = new ServerName("worker1,1,1"); + + SplitLogTask slt = new SplitLogTask.Owned(worker1); + ZKUtil.setData(zkw, tasknode, slt.toByteArray()); + if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + + // Not yet resubmitted. + Assert.assertEquals(0, tot_mgr_resubmit.get()); + + // This server becomes dead + Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); + + Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). + + // It has been resubmitted + Assert.assertEquals(1, tot_mgr_resubmit.get()); + } + @Test public void testEmptyLogDir() throws Exception { LOG.info("testEmptyLogDir"); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); FileSystem fs = TEST_UTIL.getTestFileSystem(); Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), @@ -508,7 +533,7 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); - slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null); + slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); FileSystem fs = TEST_UTIL.getTestFileSystem(); final Path logDir = new Path(fs.getWorkingDirectory(),