HBASE-5081 Distributed log splitting deleteNode races againsth splitLog retry; REAPPLY
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1222045 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fa979ebab2
commit
6c97552ee1
|
@ -477,6 +477,8 @@ Release 0.92.0 - Unreleased
|
||||||
HBASE-4868 TestOfflineMetaRebuildBase#testMetaRebuild occasionally fails
|
HBASE-4868 TestOfflineMetaRebuildBase#testMetaRebuild occasionally fails
|
||||||
(Gao Jinchao)
|
(Gao Jinchao)
|
||||||
HBASE-4874 Run tests with non-secure random, some tests hang otherwise (Lars H)
|
HBASE-4874 Run tests with non-secure random, some tests hang otherwise (Lars H)
|
||||||
|
HBASE-5081 Distributed log splitting deleteNode races againsth
|
||||||
|
splitLog retry (Jimmy Xiang)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||||
|
|
|
@ -332,6 +332,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
LOG.warn("Error splitting " + path);
|
LOG.warn("Error splitting " + path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
boolean safeToDeleteNodeAsync = true;
|
||||||
Task task = tasks.get(path);
|
Task task = tasks.get(path);
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
if (!ZKSplitLog.isRescanNode(watcher, path)) {
|
if (!ZKSplitLog.isRescanNode(watcher, path)) {
|
||||||
|
@ -345,6 +346,13 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
// forgetting about them then we will have to handle the race when
|
// forgetting about them then we will have to handle the race when
|
||||||
// accessing task.batch here.
|
// accessing task.batch here.
|
||||||
if (!task.isOrphan()) {
|
if (!task.isOrphan()) {
|
||||||
|
if (status != SUCCESS) {
|
||||||
|
// If the task is failed, deleting the node asynchronously
|
||||||
|
// will cause race issue against split log retry.
|
||||||
|
// In this case, we should delete it now.
|
||||||
|
safeToDeleteNodeAsync = false;
|
||||||
|
deleteNodeNow(path);
|
||||||
|
}
|
||||||
synchronized (task.batch) {
|
synchronized (task.batch) {
|
||||||
if (status == SUCCESS) {
|
if (status == SUCCESS) {
|
||||||
task.batch.done++;
|
task.batch.done++;
|
||||||
|
@ -359,11 +367,35 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
// delete the task node in zk. Keep trying indefinitely - its an async
|
// delete the task node in zk. Keep trying indefinitely - its an async
|
||||||
// call and no one is blocked waiting for this node to be deleted. All
|
// call and no one is blocked waiting for this node to be deleted. All
|
||||||
// task names are unique (log.<timestamp>) there is no risk of deleting
|
// task names are unique (log.<timestamp>) there is no risk of deleting
|
||||||
// a future task.
|
// a future task. This is true if the task status is SUCCESS, otherwise,
|
||||||
deleteNode(path, Long.MAX_VALUE);
|
// it may race against split log retry.
|
||||||
|
if (safeToDeleteNodeAsync) {
|
||||||
|
deleteNode(path, Long.MAX_VALUE);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void deleteNodeNow(String path) {
|
||||||
|
try {
|
||||||
|
tot_mgr_node_delete_queued.incrementAndGet();
|
||||||
|
this.watcher.getRecoverableZooKeeper().delete(path, -1);
|
||||||
|
tot_mgr_task_deleted.incrementAndGet();
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
if (ke.code() != KeeperException.Code.NONODE) {
|
||||||
|
tot_mgr_node_delete_err.incrementAndGet();
|
||||||
|
LOG.warn("Failed to delete failed task node: "
|
||||||
|
+ path + " due to " + ke.getMessage());
|
||||||
|
} else {
|
||||||
|
LOG.info("Failed task node does not exist, "
|
||||||
|
+ "either was never created or was already deleted: " + path);
|
||||||
|
tot_mgr_task_deleted.incrementAndGet();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.warn("Interrupted while waiting for failed task node to be deleted");
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void createNode(String path, Long retry_count) {
|
private void createNode(String path, Long retry_count) {
|
||||||
ZKUtil.asyncCreate(this.watcher, path,
|
ZKUtil.asyncCreate(this.watcher, path,
|
||||||
TaskState.TASK_UNASSIGNED.get(serverName), new CreateAsyncCallback(),
|
TaskState.TASK_UNASSIGNED.get(serverName), new CreateAsyncCallback(),
|
||||||
|
|
|
@ -328,6 +328,28 @@ public class TestSplitLogManager {
|
||||||
}
|
}
|
||||||
waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
|
waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
|
||||||
assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
|
assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
|
||||||
|
|
||||||
|
conf.setInt("hbase.splitlog.max.resubmit", 0);
|
||||||
|
slm.stopTrackingTasks(batch);
|
||||||
|
batch = new TaskBatch();
|
||||||
|
resetCounters();
|
||||||
|
|
||||||
|
// inject a failed task node, and retry
|
||||||
|
ZKUtil.createAndWatch(zkw, tasknode, TaskState.TASK_ERR.get("worker"));
|
||||||
|
|
||||||
|
slm.enqueueSplitTask("foo/1", batch);
|
||||||
|
assertEquals(1, batch.installed);
|
||||||
|
assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
|
||||||
|
waitForCounter(tot_mgr_node_already_exists, 0, 1, 1000);
|
||||||
|
|
||||||
|
synchronized (batch) {
|
||||||
|
while (batch.installed != batch.error) {
|
||||||
|
batch.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
|
||||||
|
|
||||||
conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);
|
conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue