diff --git a/CHANGES.txt b/CHANGES.txt index a3a00eb4724..ef3c802ba94 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -253,6 +253,8 @@ Release 0.91.0 - Unreleased (ramkrishna.s.vasudevan) HBASE-4350 Fix a Bloom filter bug introduced by HFile v2 and TestMultiColumnScanner that caught it (Mikhail Bautin) + HBASE-4007 distributed log splitting can get indefinitely stuck + (Prakash Khemani) diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 9a71fdf424b..b760ce26f57 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -233,6 +233,9 @@ public class MasterFileSystem { } if (distributedLogSplitting) { + for (ServerName serverName : serverNames) { + splitLogManager.handleDeadWorker(serverName.toString()); + } splitTime = EnvironmentEdgeManager.currentTimeMillis(); try { try { diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 54b6d4573ce..4a0c6d8daa0 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -57,6 +59,9 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; +import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.*; +import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*; + /** * Distributes the task of log splitting to the available region servers. * Coordination happens via zookeeper. For every log file that has to be split a @@ -105,6 +110,9 @@ public class SplitLogManager extends ZooKeeperListener { new ConcurrentHashMap(); private TimeoutMonitor timeoutMonitor; + private Set deadWorkers = null; + private Object deadWorkersLock = new Object(); + /** * Its OK to construct this object even when region-servers are not online. It * does lookup the orphan tasks in zk but it doesn't block for them to be @@ -307,9 +315,9 @@ public class SplitLogManager extends ZooKeeperListener { } } - private void setDone(String path, boolean err) { + private void setDone(String path, TerminationStatus status) { if (!ZKSplitLog.isRescanNode(watcher, path)) { - if (!err) { + if (status == SUCCESS) { tot_mgr_log_split_success.incrementAndGet(); LOG.info("Done splitting " + path); } else { @@ -329,7 +337,7 @@ public class SplitLogManager extends ZooKeeperListener { // accessing task.batch here. if (!task.isOrphan()) { synchronized (task.batch) { - if (!err) { + if (status == SUCCESS) { task.batch.done++; } else { task.batch.error++; @@ -366,7 +374,7 @@ public class SplitLogManager extends ZooKeeperListener { private void createNodeFailure(String path) { // TODO the Manger should split the log locally instead of giving up LOG.warn("failed to create task node" + path); - setDone(path, true); + setDone(path, FAILURE); } @@ -381,7 +389,7 @@ public class SplitLogManager extends ZooKeeperListener { if (data == null) { tot_mgr_null_data.incrementAndGet(); LOG.fatal("logic error - got null data " + path); - setDone(path, true); + setDone(path, FAILURE); return; } data = this.watcher.getRecoverableZooKeeper().removeMetaData(data); @@ -390,36 +398,36 @@ public class SplitLogManager extends ZooKeeperListener { LOG.debug("task not yet acquired " + path + " ver = " + version); handleUnassignedTask(path); } else if (TaskState.TASK_OWNED.equals(data)) { - registerHeartbeat(path, version, + heartbeat(path, version, TaskState.TASK_OWNED.getWriterName(data)); } else if (TaskState.TASK_RESIGNED.equals(data)) { LOG.info("task " + path + " entered state " + new String(data)); - resubmit(path, true); + resubmitOrFail(path, FORCE); } else if (TaskState.TASK_DONE.equals(data)) { LOG.info("task " + path + " entered state " + new String(data)); if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data), ZKSplitLog.getFileName(path)) == Status.DONE) { - setDone(path, false); // success + setDone(path, SUCCESS); } else { - resubmit(path, false); // err + resubmitOrFail(path, CHECK); } } else { - setDone(path, false); // success + setDone(path, SUCCESS); } } else if (TaskState.TASK_ERR.equals(data)) { LOG.info("task " + path + " entered state " + new String(data)); - resubmit(path, false); + resubmitOrFail(path, CHECK); } else { LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + new String(data)); - setDone(path, true); + setDone(path, FAILURE); } } private void getDataSetWatchFailure(String path) { LOG.warn("failed to set data watch " + path); - setDone(path, true); + setDone(path, FAILURE); } /** @@ -440,23 +448,19 @@ public class SplitLogManager extends ZooKeeperListener { LOG.info("resubmitting unassigned orphan task " + path); // ignore failure to resubmit. The timeout-monitor will handle it later // albeit in a more crude fashion - resubmit(path, task, true); + resubmit(path, task, FORCE); } } - private void registerHeartbeat(String path, int new_version, + private void heartbeat(String path, int new_version, String workerName) { Task task = findOrCreateOrphanTask(path); if (new_version != task.last_version) { if (task.isUnassigned()) { LOG.info("task " + path + " acquired by " + workerName); } - // very noisy - //LOG.debug("heartbeat for " + path + " last_version=" + task.last_version + - // " last_update=" + task.last_update + " new_version=" + - // new_version); - task.last_update = EnvironmentEdgeManager.currentTimeMillis(); - task.last_version = new_version; + task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), + new_version, workerName); tot_mgr_heartbeat.incrementAndGet(); } else { assert false; @@ -465,14 +469,15 @@ public class SplitLogManager extends ZooKeeperListener { return; } - private boolean resubmit(String path, Task task, boolean force) { + private boolean resubmit(String path, Task task, + ResubmitDirective directive) { // its ok if this thread misses the update to task.deleted. It will // fail later if (task.deleted) { return false; } int version; - if (!force) { + if (directive != FORCE) { if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) < timeout) { return false; @@ -485,7 +490,7 @@ public class SplitLogManager extends ZooKeeperListener { } return false; } - // race with registerHeartBeat that might be changing last_version + // race with heartbeat() that might be changing last_version version = task.last_version; } else { version = -1; @@ -510,7 +515,7 @@ public class SplitLogManager extends ZooKeeperListener { return false; } // don't count forced resubmits - if (!force) { + if (directive != FORCE) { task.unforcedResubmits++; } task.setUnassigned(); @@ -519,9 +524,9 @@ public class SplitLogManager extends ZooKeeperListener { return true; } - private void resubmit(String path, boolean force) { - if (resubmit(path, findOrCreateOrphanTask(path), force) == false) { - setDone(path, true); // error + private void resubmitOrFail(String path, ResubmitDirective directive) { + if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) { + setDone(path, FAILURE); } } @@ -558,14 +563,22 @@ public class SplitLogManager extends ZooKeeperListener { * @throws KeeperException */ private void createRescanNode(long retries) { + // The RESCAN node will be deleted almost immediately by the + // SplitLogManager as soon as it is created because it is being + // created in the DONE state. This behavior prevents a buildup + // of RESCAN nodes. But there is also a chance that a SplitLogWorker + // might miss the watch-trigger that creation of RESCAN node provides. + // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks + // therefore this behavior is safe. this.watcher.getRecoverableZooKeeper().getZooKeeper(). create(ZKSplitLog.getRescanNode(watcher), - TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL, + TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), new Long(retries)); } private void createRescanSuccess(String path) { + lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis(); tot_mgr_rescan.incrementAndGet(); getDataSetWatch(path, zkretries); } @@ -698,6 +711,7 @@ public class SplitLogManager extends ZooKeeperListener { static class Task { long last_update; int last_version; + String cur_worker_name; TaskBatch batch; boolean deleted; int incarnation; @@ -707,6 +721,7 @@ public class SplitLogManager extends ZooKeeperListener { public String toString() { return ("last_update = " + last_update + " last_version = " + last_version + + " cur_worker_name = " + cur_worker_name + " deleted = " + deleted + " incarnation = " + incarnation + " resubmits = " + unforcedResubmits + @@ -739,11 +754,30 @@ public class SplitLogManager extends ZooKeeperListener { return (last_update == -1); } + public void heartbeat(long time, int version, String worker) { + last_version = version; + last_update = time; + cur_worker_name = worker; + } + public void setUnassigned() { + cur_worker_name = null; last_update = -1; } } + void handleDeadWorker(String worker_name) { + // resubmit the tasks on the TimeoutMonitor thread. Makes it easier + // to reason about concurrency. Makes it easier to retry. + synchronized (deadWorkersLock) { + if (deadWorkers == null) { + deadWorkers = new HashSet(100); + } + deadWorkers.add(worker_name); + } + LOG.info("dead splitlog worker " + worker_name); + } + /** * Periodically checks all active tasks and resubmits the ones that have timed * out @@ -759,10 +793,17 @@ public class SplitLogManager extends ZooKeeperListener { int unassigned = 0; int tot = 0; boolean found_assigned_task = false; + Set localDeadWorkers; + + synchronized (deadWorkersLock) { + localDeadWorkers = deadWorkers; + deadWorkers = null; + } for (Map.Entry e : tasks.entrySet()) { String path = e.getKey(); Task task = e.getValue(); + String cur_worker = task.cur_worker_name; tot++; // don't easily resubmit a task which hasn't been picked up yet. It // might be a long while before a SplitLogWorker is free to pick up a @@ -774,7 +815,16 @@ public class SplitLogManager extends ZooKeeperListener { continue; } found_assigned_task = true; - if (resubmit(path, task, false)) { + if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { + tot_mgr_resubmit_dead_server_task.incrementAndGet(); + if (resubmit(path, task, FORCE)) { + resubmitted++; + } else { + handleDeadWorker(cur_worker); + LOG.warn("Failed to resubmit task " + path + " owned by dead " + + cur_worker + ", will retry."); + } + } else if (resubmit(path, task, CHECK)) { resubmitted++; } } @@ -994,4 +1044,12 @@ public class SplitLogManager extends ZooKeeperListener { */ public Status finish(String workerName, String taskname); } -} + enum ResubmitDirective { + CHECK(), + FORCE(); + } + enum TerminationStatus { + SUCCESS(), + FAILURE(); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index 61e5c653080..9b83840652c 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -235,6 +235,8 @@ public class ZKSplitLog { public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0); public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0); public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit_dead_server_task = + new AtomicLong(0); diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 9a888552915..5c9d7ddf75b 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -131,31 +131,6 @@ public class TestSplitLogManager { assertTrue(false); } - private int numRescanPresent() throws KeeperException { - int num = 0; - List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); - for (String node : nodes) { - if (ZKSplitLog.isRescanNode(zkw, - ZKUtil.joinZNode(zkw.splitLogZNode, node))) { - num++; - } - } - return num; - } - - private void setRescanNodeDone(int count) throws KeeperException { - List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); - for (String node : nodes) { - String nodepath = ZKUtil.joinZNode(zkw.splitLogZNode, node); - if (ZKSplitLog.isRescanNode(zkw, nodepath)) { - ZKUtil.setData(zkw, nodepath, - TaskState.TASK_DONE.get("some-worker")); - count--; - } - } - assertEquals(0, count); - } - private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, InterruptedException { String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); @@ -222,7 +197,6 @@ public class TestSplitLogManager { waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + 100); - assertEquals(1, numRescanPresent()); } @Test @@ -253,7 +227,6 @@ public class TestSplitLogManager { assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); - assertEquals(1, numRescanPresent()); } @Test @@ -286,7 +259,6 @@ public class TestSplitLogManager { ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3")); waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100); - assertEquals(2, numRescanPresent()); Thread.sleep(to + 100); assertEquals(2L, tot_mgr_resubmit.get()); } @@ -311,16 +283,12 @@ public class TestSplitLogManager { waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); - assertEquals(1, numRescanPresent()); byte[] taskstate = ZKUtil.getData(zkw, tasknode); assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), taskstate)); - setRescanNodeDone(1); - waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); - assertEquals(0, numRescanPresent()); return; } @@ -377,7 +345,6 @@ public class TestSplitLogManager { waitForCounter(tot_mgr_resubmit, 0, 1, 1000); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); - assertEquals(1, numRescanPresent()); byte[] taskstate = ZKUtil.getData(zkw, tasknode); assertTrue(Arrays.equals(taskstate, @@ -417,18 +384,38 @@ public class TestSplitLogManager { TaskState.TASK_OWNED.get("dummy-worker")); } - // since all the nodes in the system are not unassigned the - // unassigned_timeout must not have kicked in - assertEquals(0, numRescanPresent()); - // since we have stopped heartbeating the owned node therefore it should // get resubmitted LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + 500); - assertEquals(1, numRescanPresent()); // now all the nodes are unassigned. manager should post another rescan waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + 500); - assertEquals(2, numRescanPresent()); + } + + @Test + public void testDeadWorker() throws Exception { + LOG.info("testDeadWorker"); + + conf.setLong("hbase.splitlog.max.resubmit", 0); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + + String tasknode = submitTaskAndWait(batch, "foo/1"); + int version = ZKUtil.checkExists(zkw, tasknode); + + ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); + waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + slm.handleDeadWorker("worker1"); + waitForCounter(tot_mgr_resubmit, 0, 1, 1000); + waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 1000); + + int version1 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version1 > version); + byte[] taskstate = ZKUtil.getData(zkw, tasknode); + assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), + taskstate)); + return; } }