HBASE-4007 distributed log splitting can get indefinitely stuck

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1166989 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-09-09 04:05:26 +00:00
parent 76b123052e
commit 34f318b625
5 changed files with 123 additions and 71 deletions

View File

@ -253,6 +253,8 @@ Release 0.91.0 - Unreleased
(ramkrishna.s.vasudevan)
HBASE-4350 Fix a Bloom filter bug introduced by HFile v2 and
TestMultiColumnScanner that caught it (Mikhail Bautin)
HBASE-4007 distributed log splitting can get indefinitely stuck
(Prakash Khemani)

View File

@ -233,6 +233,9 @@ public class MasterFileSystem {
}
if (distributedLogSplitting) {
for (ServerName serverName : serverNames) {
splitLogManager.handleDeadWorker(serverName.toString());
}
splitTime = EnvironmentEdgeManager.currentTimeMillis();
try {
try {

View File

@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -57,6 +59,9 @@ import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.*;
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*;
/**
* Distributes the task of log splitting to the available region servers.
* Coordination happens via zookeeper. For every log file that has to be split a
@ -105,6 +110,9 @@ public class SplitLogManager extends ZooKeeperListener {
new ConcurrentHashMap<String, Task>();
private TimeoutMonitor timeoutMonitor;
private Set<String> deadWorkers = null;
private Object deadWorkersLock = new Object();
/**
* Its OK to construct this object even when region-servers are not online. It
* does lookup the orphan tasks in zk but it doesn't block for them to be
@ -307,9 +315,9 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
private void setDone(String path, boolean err) {
private void setDone(String path, TerminationStatus status) {
if (!ZKSplitLog.isRescanNode(watcher, path)) {
if (!err) {
if (status == SUCCESS) {
tot_mgr_log_split_success.incrementAndGet();
LOG.info("Done splitting " + path);
} else {
@ -329,7 +337,7 @@ public class SplitLogManager extends ZooKeeperListener {
// accessing task.batch here.
if (!task.isOrphan()) {
synchronized (task.batch) {
if (!err) {
if (status == SUCCESS) {
task.batch.done++;
} else {
task.batch.error++;
@ -366,7 +374,7 @@ public class SplitLogManager extends ZooKeeperListener {
private void createNodeFailure(String path) {
// TODO the Manger should split the log locally instead of giving up
LOG.warn("failed to create task node" + path);
setDone(path, true);
setDone(path, FAILURE);
}
@ -381,7 +389,7 @@ public class SplitLogManager extends ZooKeeperListener {
if (data == null) {
tot_mgr_null_data.incrementAndGet();
LOG.fatal("logic error - got null data " + path);
setDone(path, true);
setDone(path, FAILURE);
return;
}
data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
@ -390,36 +398,36 @@ public class SplitLogManager extends ZooKeeperListener {
LOG.debug("task not yet acquired " + path + " ver = " + version);
handleUnassignedTask(path);
} else if (TaskState.TASK_OWNED.equals(data)) {
registerHeartbeat(path, version,
heartbeat(path, version,
TaskState.TASK_OWNED.getWriterName(data));
} else if (TaskState.TASK_RESIGNED.equals(data)) {
LOG.info("task " + path + " entered state " + new String(data));
resubmit(path, true);
resubmitOrFail(path, FORCE);
} else if (TaskState.TASK_DONE.equals(data)) {
LOG.info("task " + path + " entered state " + new String(data));
if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data),
ZKSplitLog.getFileName(path)) == Status.DONE) {
setDone(path, false); // success
setDone(path, SUCCESS);
} else {
resubmit(path, false); // err
resubmitOrFail(path, CHECK);
}
} else {
setDone(path, false); // success
setDone(path, SUCCESS);
}
} else if (TaskState.TASK_ERR.equals(data)) {
LOG.info("task " + path + " entered state " + new String(data));
resubmit(path, false);
resubmitOrFail(path, CHECK);
} else {
LOG.fatal("logic error - unexpected zk state for path = " + path
+ " data = " + new String(data));
setDone(path, true);
setDone(path, FAILURE);
}
}
private void getDataSetWatchFailure(String path) {
LOG.warn("failed to set data watch " + path);
setDone(path, true);
setDone(path, FAILURE);
}
/**
@ -440,23 +448,19 @@ public class SplitLogManager extends ZooKeeperListener {
LOG.info("resubmitting unassigned orphan task " + path);
// ignore failure to resubmit. The timeout-monitor will handle it later
// albeit in a more crude fashion
resubmit(path, task, true);
resubmit(path, task, FORCE);
}
}
private void registerHeartbeat(String path, int new_version,
private void heartbeat(String path, int new_version,
String workerName) {
Task task = findOrCreateOrphanTask(path);
if (new_version != task.last_version) {
if (task.isUnassigned()) {
LOG.info("task " + path + " acquired by " + workerName);
}
// very noisy
//LOG.debug("heartbeat for " + path + " last_version=" + task.last_version +
// " last_update=" + task.last_update + " new_version=" +
// new_version);
task.last_update = EnvironmentEdgeManager.currentTimeMillis();
task.last_version = new_version;
task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(),
new_version, workerName);
tot_mgr_heartbeat.incrementAndGet();
} else {
assert false;
@ -465,14 +469,15 @@ public class SplitLogManager extends ZooKeeperListener {
return;
}
private boolean resubmit(String path, Task task, boolean force) {
private boolean resubmit(String path, Task task,
ResubmitDirective directive) {
// its ok if this thread misses the update to task.deleted. It will
// fail later
if (task.deleted) {
return false;
}
int version;
if (!force) {
if (directive != FORCE) {
if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) <
timeout) {
return false;
@ -485,7 +490,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
return false;
}
// race with registerHeartBeat that might be changing last_version
// race with heartbeat() that might be changing last_version
version = task.last_version;
} else {
version = -1;
@ -510,7 +515,7 @@ public class SplitLogManager extends ZooKeeperListener {
return false;
}
// don't count forced resubmits
if (!force) {
if (directive != FORCE) {
task.unforcedResubmits++;
}
task.setUnassigned();
@ -519,9 +524,9 @@ public class SplitLogManager extends ZooKeeperListener {
return true;
}
private void resubmit(String path, boolean force) {
if (resubmit(path, findOrCreateOrphanTask(path), force) == false) {
setDone(path, true); // error
private void resubmitOrFail(String path, ResubmitDirective directive) {
if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
setDone(path, FAILURE);
}
}
@ -558,14 +563,22 @@ public class SplitLogManager extends ZooKeeperListener {
* @throws KeeperException
*/
private void createRescanNode(long retries) {
// The RESCAN node will be deleted almost immediately by the
// SplitLogManager as soon as it is created because it is being
// created in the DONE state. This behavior prevents a buildup
// of RESCAN nodes. But there is also a chance that a SplitLogWorker
// might miss the watch-trigger that creation of RESCAN node provides.
// Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
// therefore this behavior is safe.
this.watcher.getRecoverableZooKeeper().getZooKeeper().
create(ZKSplitLog.getRescanNode(watcher),
TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL,
TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,
new CreateRescanAsyncCallback(), new Long(retries));
}
private void createRescanSuccess(String path) {
lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
tot_mgr_rescan.incrementAndGet();
getDataSetWatch(path, zkretries);
}
@ -698,6 +711,7 @@ public class SplitLogManager extends ZooKeeperListener {
static class Task {
long last_update;
int last_version;
String cur_worker_name;
TaskBatch batch;
boolean deleted;
int incarnation;
@ -707,6 +721,7 @@ public class SplitLogManager extends ZooKeeperListener {
public String toString() {
return ("last_update = " + last_update +
" last_version = " + last_version +
" cur_worker_name = " + cur_worker_name +
" deleted = " + deleted +
" incarnation = " + incarnation +
" resubmits = " + unforcedResubmits +
@ -739,11 +754,30 @@ public class SplitLogManager extends ZooKeeperListener {
return (last_update == -1);
}
public void heartbeat(long time, int version, String worker) {
last_version = version;
last_update = time;
cur_worker_name = worker;
}
public void setUnassigned() {
cur_worker_name = null;
last_update = -1;
}
}
void handleDeadWorker(String worker_name) {
// resubmit the tasks on the TimeoutMonitor thread. Makes it easier
// to reason about concurrency. Makes it easier to retry.
synchronized (deadWorkersLock) {
if (deadWorkers == null) {
deadWorkers = new HashSet<String>(100);
}
deadWorkers.add(worker_name);
}
LOG.info("dead splitlog worker " + worker_name);
}
/**
* Periodically checks all active tasks and resubmits the ones that have timed
* out
@ -759,10 +793,17 @@ public class SplitLogManager extends ZooKeeperListener {
int unassigned = 0;
int tot = 0;
boolean found_assigned_task = false;
Set<String> localDeadWorkers;
synchronized (deadWorkersLock) {
localDeadWorkers = deadWorkers;
deadWorkers = null;
}
for (Map.Entry<String, Task> e : tasks.entrySet()) {
String path = e.getKey();
Task task = e.getValue();
String cur_worker = task.cur_worker_name;
tot++;
// don't easily resubmit a task which hasn't been picked up yet. It
// might be a long while before a SplitLogWorker is free to pick up a
@ -774,7 +815,16 @@ public class SplitLogManager extends ZooKeeperListener {
continue;
}
found_assigned_task = true;
if (resubmit(path, task, false)) {
if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
tot_mgr_resubmit_dead_server_task.incrementAndGet();
if (resubmit(path, task, FORCE)) {
resubmitted++;
} else {
handleDeadWorker(cur_worker);
LOG.warn("Failed to resubmit task " + path + " owned by dead " +
cur_worker + ", will retry.");
}
} else if (resubmit(path, task, CHECK)) {
resubmitted++;
}
}
@ -994,4 +1044,12 @@ public class SplitLogManager extends ZooKeeperListener {
*/
public Status finish(String workerName, String taskname);
}
enum ResubmitDirective {
CHECK(),
FORCE();
}
enum TerminationStatus {
SUCCESS(),
FAILURE();
}
}

View File

@ -235,6 +235,8 @@ public class ZKSplitLog {
public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
public static AtomicLong tot_mgr_resubmit_dead_server_task =
new AtomicLong(0);

View File

@ -131,31 +131,6 @@ public class TestSplitLogManager {
assertTrue(false);
}
private int numRescanPresent() throws KeeperException {
int num = 0;
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
for (String node : nodes) {
if (ZKSplitLog.isRescanNode(zkw,
ZKUtil.joinZNode(zkw.splitLogZNode, node))) {
num++;
}
}
return num;
}
private void setRescanNodeDone(int count) throws KeeperException {
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
for (String node : nodes) {
String nodepath = ZKUtil.joinZNode(zkw.splitLogZNode, node);
if (ZKSplitLog.isRescanNode(zkw, nodepath)) {
ZKUtil.setData(zkw, nodepath,
TaskState.TASK_DONE.get("some-worker"));
count--;
}
}
assertEquals(0, count);
}
private String submitTaskAndWait(TaskBatch batch, String name)
throws KeeperException, InterruptedException {
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
@ -222,7 +197,6 @@ public class TestSplitLogManager {
waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
assertTrue(task.isUnassigned());
waitForCounter(tot_mgr_rescan, 0, 1, to + 100);
assertEquals(1, numRescanPresent());
}
@Test
@ -253,7 +227,6 @@ public class TestSplitLogManager {
assertTrue(task.isOrphan());
assertTrue(task.isUnassigned());
assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
assertEquals(1, numRescanPresent());
}
@Test
@ -286,7 +259,6 @@ public class TestSplitLogManager {
ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3"));
waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100);
assertEquals(2, numRescanPresent());
Thread.sleep(to + 100);
assertEquals(2L, tot_mgr_resubmit.get());
}
@ -311,16 +283,12 @@ public class TestSplitLogManager {
waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
assertEquals(1, numRescanPresent());
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
taskstate));
setRescanNodeDone(1);
waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
assertEquals(0, numRescanPresent());
return;
}
@ -377,7 +345,6 @@ public class TestSplitLogManager {
waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
assertEquals(1, numRescanPresent());
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
assertTrue(Arrays.equals(taskstate,
@ -417,18 +384,38 @@ public class TestSplitLogManager {
TaskState.TASK_OWNED.get("dummy-worker"));
}
// since all the nodes in the system are not unassigned the
// unassigned_timeout must not have kicked in
assertEquals(0, numRescanPresent());
// since we have stopped heartbeating the owned node therefore it should
// get resubmitted
LOG.info("waiting for manager to resubmit the orphan task");
waitForCounter(tot_mgr_resubmit, 0, 1, to + 500);
assertEquals(1, numRescanPresent());
// now all the nodes are unassigned. manager should post another rescan
waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + 500);
assertEquals(2, numRescanPresent());
}
@Test
public void testDeadWorker() throws Exception {
LOG.info("testDeadWorker");
conf.setLong("hbase.splitlog.max.resubmit", 0);
slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1");
int version = ZKUtil.checkExists(zkw, tasknode);
ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
slm.handleDeadWorker("worker1");
waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 1000);
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
taskstate));
return;
}
}