HBASE-3065 Retry all 'retryable' zk operations; e.g. connection loss; addendum... split log zk code url encodes which interacts w/ new naming of znodes that this patch introduces... this commit fixes testslitlogworker
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1152003 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b11659d679
commit
54e4164ce7
|
@ -414,8 +414,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
|||
! TaskState.TASK_DONE.equals(data, serverName) &&
|
||||
! TaskState.TASK_ERR.equals(data, serverName) &&
|
||||
! TaskState.TASK_RESIGNED.equals(data, serverName)) {
|
||||
LOG.info("task " + taskpath + " preempted from server " +
|
||||
serverName + " ... current task state and owner - " +
|
||||
LOG.info("task " + taskpath + " preempted from " +
|
||||
serverName + ", current task state and owner=" +
|
||||
new String(data));
|
||||
stopTask();
|
||||
}
|
||||
|
|
|
@ -51,7 +51,8 @@ public class ZKSplitLog {
|
|||
public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
|
||||
|
||||
/**
|
||||
* Gets the full path node name for the log file being split
|
||||
* Gets the full path node name for the log file being split.
|
||||
* This method will url encode the filename.
|
||||
* @param zkw zk reference
|
||||
* @param filename log file name (only the basename)
|
||||
*/
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
|
||||
|
@ -264,7 +265,8 @@ public class TestSplitLogWorker {
|
|||
Thread.yield(); // let the worker start
|
||||
Thread.sleep(100);
|
||||
|
||||
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
|
||||
String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
|
||||
zkw.getRecoverableZooKeeper().create(task,
|
||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
|
||||
|
@ -272,12 +274,12 @@ public class TestSplitLogWorker {
|
|||
// now the worker is busy doing the above task
|
||||
|
||||
// preempt the task, have it owned by another worker
|
||||
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "task"),
|
||||
TaskState.TASK_UNASSIGNED.get("manager"));
|
||||
ZKUtil.setData(zkw, task, TaskState.TASK_UNASSIGNED.get("manager"));
|
||||
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
||||
|
||||
// create a RESCAN node
|
||||
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
|
||||
String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
|
||||
rescan = zkw.getRecoverableZooKeeper().create(rescan,
|
||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT_SEQUENTIAL);
|
||||
|
||||
|
@ -285,8 +287,7 @@ public class TestSplitLogWorker {
|
|||
// RESCAN node might not have been processed if the worker became busy
|
||||
// with the above task. preempt the task again so that now the RESCAN
|
||||
// node is processed
|
||||
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "task"),
|
||||
TaskState.TASK_UNASSIGNED.get("manager"));
|
||||
ZKUtil.setData(zkw, task, TaskState.TASK_UNASSIGNED.get("manager"));
|
||||
waitForCounter(tot_wkr_preempt_task, 1, 2, 1000);
|
||||
waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000);
|
||||
|
||||
|
@ -296,8 +297,11 @@ public class TestSplitLogWorker {
|
|||
for (String node : nodes) {
|
||||
num++;
|
||||
if (node.startsWith("RESCAN")) {
|
||||
assertTrue(TaskState.TASK_DONE.equals(ZKUtil.getData(zkw,
|
||||
ZKSplitLog.getEncodedNodeName(zkw, node)), "svr"));
|
||||
String name = ZKSplitLog.getEncodedNodeName(zkw, node);
|
||||
String fn = ZKSplitLog.getFileName(name);
|
||||
byte [] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(zkw.splitLogZNode, fn));
|
||||
String datastr = Bytes.toString(data);
|
||||
assertTrue("data=" + datastr, TaskState.TASK_DONE.equals(data, "svr"));
|
||||
}
|
||||
}
|
||||
assertEquals(2, num);
|
||||
|
|
Loading…
Reference in New Issue