diff --git a/CHANGES.txt b/CHANGES.txt index b4054cc4082..08a861e9a2f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -477,6 +477,8 @@ Release 0.92.0 - Unreleased HBASE-4868 TestOfflineMetaRebuildBase#testMetaRebuild occasionally fails (Gao Jinchao) HBASE-4874 Run tests with non-secure random, some tests hang otherwise (Lars H) + HBASE-5081 Distributed log splitting deleteNode races againsth + splitLog retry (Jimmy Xiang) IMPROVEMENTS HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 667a8b18e7f..d4f97cd59b0 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -332,6 +332,7 @@ public class SplitLogManager extends ZooKeeperListener { LOG.warn("Error splitting " + path); } } + boolean safeToDeleteNodeAsync = true; Task task = tasks.get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { @@ -345,6 +346,13 @@ public class SplitLogManager extends ZooKeeperListener { // forgetting about them then we will have to handle the race when // accessing task.batch here. if (!task.isOrphan()) { + if (status != SUCCESS) { + // If the task is failed, deleting the node asynchronously + // will cause race issue against split log retry. + // In this case, we should delete it now. + safeToDeleteNodeAsync = false; + deleteNodeNow(path); + } synchronized (task.batch) { if (status == SUCCESS) { task.batch.done++; @@ -359,11 +367,35 @@ public class SplitLogManager extends ZooKeeperListener { // delete the task node in zk. Keep trying indefinitely - its an async // call and no one is blocked waiting for this node to be deleted. All // task names are unique (log.) there is no risk of deleting - // a future task. - deleteNode(path, Long.MAX_VALUE); + // a future task. This is true if the task status is SUCCESS, otherwise, + // it may race against split log retry. + if (safeToDeleteNodeAsync) { + deleteNode(path, Long.MAX_VALUE); + } return; } + private void deleteNodeNow(String path) { + try { + tot_mgr_node_delete_queued.incrementAndGet(); + this.watcher.getRecoverableZooKeeper().delete(path, -1); + tot_mgr_task_deleted.incrementAndGet(); + } catch (KeeperException ke) { + if (ke.code() != KeeperException.Code.NONODE) { + tot_mgr_node_delete_err.incrementAndGet(); + LOG.warn("Failed to delete failed task node: " + + path + " due to " + ke.getMessage()); + } else { + LOG.info("Failed task node does not exist, " + + "either was never created or was already deleted: " + path); + tot_mgr_task_deleted.incrementAndGet(); + } + } catch (InterruptedException ie) { + LOG.warn("Interrupted while waiting for failed task node to be deleted"); + Thread.currentThread().interrupt(); + } + } + private void createNode(String path, Long retry_count) { ZKUtil.asyncCreate(this.watcher, path, TaskState.TASK_UNASSIGNED.get(serverName), new CreateAsyncCallback(), diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 32ad7e8ffb9..8afbc9eee11 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -328,6 +328,28 @@ public class TestSplitLogManager { } waitForCounter(tot_mgr_task_deleted, 0, 1, 1000); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); + + conf.setInt("hbase.splitlog.max.resubmit", 0); + slm.stopTrackingTasks(batch); + batch = new TaskBatch(); + resetCounters(); + + // inject a failed task node, and retry + ZKUtil.createAndWatch(zkw, tasknode, TaskState.TASK_ERR.get("worker")); + + slm.enqueueSplitTask("foo/1", batch); + assertEquals(1, batch.installed); + assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch); + waitForCounter(tot_mgr_node_already_exists, 0, 1, 1000); + + synchronized (batch) { + while (batch.installed != batch.error) { + batch.wait(); + } + } + waitForCounter(tot_mgr_task_deleted, 0, 1, 1000); + assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); + conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); }