HBASE-3806 distributed log splitting double escapes task names
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1095613 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d8d26cb493
commit
baefac4c42
|
@ -79,7 +79,11 @@ Release 0.91.0 - Unreleased
|
||||||
HBASE-3781 hbase shell cannot start "NoMethodError: undefined method
|
HBASE-3781 hbase shell cannot start "NoMethodError: undefined method
|
||||||
`close' for nil:NilClass" (Mikael Sitruk)
|
`close' for nil:NilClass" (Mikael Sitruk)
|
||||||
HBASE-3802 Redundant list creation in HRegion
|
HBASE-3802 Redundant list creation in HRegion
|
||||||
|
HBASE-3788 Two error handlings in AssignmentManager.setOfflineInZooKeeper()
|
||||||
|
(Ted Yu)
|
||||||
HBASE-3800 HMaster is not able to start due to AlreadyCreatedException
|
HBASE-3800 HMaster is not able to start due to AlreadyCreatedException
|
||||||
|
HBASE-3806 distributed log splitting double escapes task names
|
||||||
|
(Prakash Khemani)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||||
|
|
|
@ -1102,7 +1102,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
*/
|
*/
|
||||||
boolean setOfflineInZooKeeper(final RegionState state) {
|
boolean setOfflineInZooKeeper(final RegionState state) {
|
||||||
if (!state.isClosed() && !state.isOffline()) {
|
if (!state.isClosed() && !state.isOffline()) {
|
||||||
new RuntimeException("Unexpected state trying to OFFLINE; " + state);
|
|
||||||
this.master.abort("Unexpected state trying to OFFLINE; " + state,
|
this.master.abort("Unexpected state trying to OFFLINE; " + state,
|
||||||
new IllegalStateException());
|
new IllegalStateException());
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -229,7 +229,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
|
|
||||||
boolean installTask(String taskname, TaskBatch batch) {
|
boolean installTask(String taskname, TaskBatch batch) {
|
||||||
tot_mgr_log_split_start.incrementAndGet();
|
tot_mgr_log_split_start.incrementAndGet();
|
||||||
String path = ZKSplitLog.getNodeName(watcher, taskname);
|
String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
|
||||||
Task oldtask = createTaskIfAbsent(path, batch);
|
Task oldtask = createTaskIfAbsent(path, batch);
|
||||||
if (oldtask == null) {
|
if (oldtask == null) {
|
||||||
// publish the task in zk
|
// publish the task in zk
|
||||||
|
@ -605,11 +605,19 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
" " + StringUtils.stringifyException(e));
|
" " + StringUtils.stringifyException(e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("found " + orphans.size() + " orphan tasks");
|
int rescan_nodes = 0;
|
||||||
for (String path : orphans) {
|
for (String path : orphans) {
|
||||||
LOG.info("found orphan task " + path);
|
String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
|
||||||
getDataSetWatch(ZKSplitLog.getNodeName(watcher, path), zkretries);
|
if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
|
||||||
|
rescan_nodes++;
|
||||||
|
LOG.debug("found orphan rescan node " + path);
|
||||||
|
} else {
|
||||||
|
LOG.info("found orphan task " + path);
|
||||||
|
}
|
||||||
|
getDataSetWatch(nodepath, zkretries);
|
||||||
}
|
}
|
||||||
|
LOG.info("found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
|
||||||
|
rescan_nodes + " rescan nodes");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -757,9 +765,9 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
LOG.debug("found pre-existing znode " + path);
|
LOG.debug("found pre-existing znode " + path);
|
||||||
tot_mgr_node_already_exists.incrementAndGet();
|
tot_mgr_node_already_exists.incrementAndGet();
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
|
|
||||||
path);
|
|
||||||
Long retry_count = (Long)ctx;
|
Long retry_count = (Long)ctx;
|
||||||
|
LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
|
||||||
|
path + " retry=" + retry_count);
|
||||||
if (retry_count == 0) {
|
if (retry_count == 0) {
|
||||||
tot_mgr_node_create_err.incrementAndGet();
|
tot_mgr_node_create_err.incrementAndGet();
|
||||||
createNodeFailure(path);
|
createNodeFailure(path);
|
||||||
|
@ -786,8 +794,9 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
Stat stat) {
|
Stat stat) {
|
||||||
tot_mgr_get_data_result.incrementAndGet();
|
tot_mgr_get_data_result.incrementAndGet();
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " "+ path);
|
|
||||||
Long retry_count = (Long) ctx;
|
Long retry_count = (Long) ctx;
|
||||||
|
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
|
||||||
|
path + " retry=" + retry_count);
|
||||||
if (retry_count == 0) {
|
if (retry_count == 0) {
|
||||||
tot_mgr_get_data_err.incrementAndGet();
|
tot_mgr_get_data_err.incrementAndGet();
|
||||||
getDataSetWatchFailure(path);
|
getDataSetWatchFailure(path);
|
||||||
|
@ -815,8 +824,9 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
if (rc != KeeperException.Code.NONODE.intValue()) {
|
if (rc != KeeperException.Code.NONODE.intValue()) {
|
||||||
tot_mgr_node_delete_err.incrementAndGet();
|
tot_mgr_node_delete_err.incrementAndGet();
|
||||||
LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path);
|
|
||||||
Long retry_count = (Long) ctx;
|
Long retry_count = (Long) ctx;
|
||||||
|
LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
|
||||||
|
path + " retry=" + retry_count);
|
||||||
if (retry_count == 0) {
|
if (retry_count == 0) {
|
||||||
LOG.warn("delete failed " + path);
|
LOG.warn("delete failed " + path);
|
||||||
deleteNodeFailure(path);
|
deleteNodeFailure(path);
|
||||||
|
@ -849,8 +859,9 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
@Override
|
@Override
|
||||||
public void processResult(int rc, String path, Object ctx, String name) {
|
public void processResult(int rc, String path, Object ctx, String name) {
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
LOG.warn("rc =" + KeeperException.Code.get(rc) + " for "+ path);
|
|
||||||
Long retry_count = (Long)ctx;
|
Long retry_count = (Long)ctx;
|
||||||
|
LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
|
||||||
|
" retry=" + retry_count);
|
||||||
if (retry_count == 0) {
|
if (retry_count == 0) {
|
||||||
createRescanFailure();
|
createRescanFailure();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -241,7 +241,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
currentVersion = stat.getVersion();
|
currentVersion = stat.getVersion();
|
||||||
if (ownTask() == false) {
|
if (ownTask(true) == false) {
|
||||||
tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
|
tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -262,7 +262,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean progress() {
|
public boolean progress() {
|
||||||
if (ownTask() == false) {
|
if (ownTask(false) == false) {
|
||||||
LOG.warn("Failed to heartbeat the task" + currentTask);
|
LOG.warn("Failed to heartbeat the task" + currentTask);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -321,23 +321,29 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
* <p>
|
* <p>
|
||||||
* @return true if task path is successfully locked
|
* @return true if task path is successfully locked
|
||||||
*/
|
*/
|
||||||
private boolean ownTask() {
|
private boolean ownTask(boolean isFirstTime) {
|
||||||
try {
|
try {
|
||||||
Stat stat = this.watcher.getZooKeeper().setData(currentTask,
|
Stat stat = this.watcher.getZooKeeper().setData(currentTask,
|
||||||
TaskState.TASK_OWNED.get(serverName), currentVersion);
|
TaskState.TASK_OWNED.get(serverName), currentVersion);
|
||||||
if (stat == null) {
|
if (stat == null) {
|
||||||
|
LOG.warn("zk.setData() returned null for path " + currentTask);
|
||||||
|
tot_wkr_task_heartbeat_failed.incrementAndGet();
|
||||||
return (false);
|
return (false);
|
||||||
}
|
}
|
||||||
currentVersion = stat.getVersion();
|
currentVersion = stat.getVersion();
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug ("hearbeat for path " + currentTask +
|
|
||||||
" successful, version = " + currentVersion);
|
|
||||||
}
|
|
||||||
tot_wkr_task_heartbeat.incrementAndGet();
|
tot_wkr_task_heartbeat.incrementAndGet();
|
||||||
return (true);
|
return (true);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
// either Bad Version or Node has been removed
|
if (!isFirstTime) {
|
||||||
LOG.warn("failed to assert ownership for " + currentTask, e);
|
if (e.code().equals(KeeperException.Code.NONODE)) {
|
||||||
|
LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
|
||||||
|
} else if (e.code().equals(KeeperException.Code.BADVERSION)) {
|
||||||
|
LOG.warn("BADVERSION failed to assert ownership for " +
|
||||||
|
currentTask, e);
|
||||||
|
} else {
|
||||||
|
LOG.warn("failed to assert ownership for " + currentTask, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (InterruptedException e1) {
|
} catch (InterruptedException e1) {
|
||||||
LOG.warn("Interrupted while trying to assert ownership of " +
|
LOG.warn("Interrupted while trying to assert ownership of " +
|
||||||
currentTask + " " + StringUtils.stringifyException(e1));
|
currentTask + " " + StringUtils.stringifyException(e1));
|
||||||
|
|
|
@ -56,7 +56,8 @@ public class ZKSplitLog {
|
||||||
* @param zkw zk reference
|
* @param zkw zk reference
|
||||||
* @param filename log file name (only the basename)
|
* @param filename log file name (only the basename)
|
||||||
*/
|
*/
|
||||||
public static String getNodeName(ZooKeeperWatcher zkw, String filename) {
|
public static String getEncodedNodeName(ZooKeeperWatcher zkw,
|
||||||
|
String filename) {
|
||||||
return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename));
|
return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,7 +84,7 @@ public class ZKSplitLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getRescanNode(ZooKeeperWatcher zkw) {
|
public static String getRescanNode(ZooKeeperWatcher zkw) {
|
||||||
return getNodeName(zkw, "RESCAN");
|
return ZKUtil.joinZNode(zkw.splitLogZNode, "RESCAN");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isRescanNode(ZooKeeperWatcher zkw, String path) {
|
public static boolean isRescanNode(ZooKeeperWatcher zkw, String path) {
|
||||||
|
|
|
@ -135,7 +135,8 @@ public class TestSplitLogManager {
|
||||||
int num = 0;
|
int num = 0;
|
||||||
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
|
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
|
||||||
for (String node : nodes) {
|
for (String node : nodes) {
|
||||||
if (ZKSplitLog.isRescanNode(zkw, ZKSplitLog.getNodeName(zkw, node))) {
|
if (ZKSplitLog.isRescanNode(zkw,
|
||||||
|
ZKUtil.joinZNode(zkw.splitLogZNode, node))) {
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -145,8 +146,9 @@ public class TestSplitLogManager {
|
||||||
private void setRescanNodeDone(int count) throws KeeperException {
|
private void setRescanNodeDone(int count) throws KeeperException {
|
||||||
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
|
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
|
||||||
for (String node : nodes) {
|
for (String node : nodes) {
|
||||||
if (ZKSplitLog.isRescanNode(zkw, ZKSplitLog.getNodeName(zkw, node))) {
|
String nodepath = ZKUtil.joinZNode(zkw.splitLogZNode, node);
|
||||||
ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, node),
|
if (ZKSplitLog.isRescanNode(zkw, nodepath)) {
|
||||||
|
ZKUtil.setData(zkw, nodepath,
|
||||||
TaskState.TASK_DONE.get("some-worker"));
|
TaskState.TASK_DONE.get("some-worker"));
|
||||||
count--;
|
count--;
|
||||||
}
|
}
|
||||||
|
@ -156,12 +158,12 @@ public class TestSplitLogManager {
|
||||||
|
|
||||||
private String submitTaskAndWait(TaskBatch batch, String name)
|
private String submitTaskAndWait(TaskBatch batch, String name)
|
||||||
throws KeeperException, InterruptedException {
|
throws KeeperException, InterruptedException {
|
||||||
String tasknode = ZKSplitLog.getNodeName(zkw, "foo");
|
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
|
||||||
NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
|
NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
|
||||||
zkw.registerListener(listener);
|
zkw.registerListener(listener);
|
||||||
ZKUtil.watchAndCheckExists(zkw, tasknode);
|
ZKUtil.watchAndCheckExists(zkw, tasknode);
|
||||||
|
|
||||||
slm.installTask("foo", batch);
|
slm.installTask(name, batch);
|
||||||
assertEquals(1, batch.installed);
|
assertEquals(1, batch.installed);
|
||||||
assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
|
assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
|
||||||
assertEquals(1L, tot_mgr_node_create_queued.get());
|
assertEquals(1L, tot_mgr_node_create_queued.get());
|
||||||
|
@ -184,7 +186,7 @@ public class TestSplitLogManager {
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
String tasknode = submitTaskAndWait(batch, "foo");
|
String tasknode = submitTaskAndWait(batch, "foo/1");
|
||||||
|
|
||||||
byte[] data = ZKUtil.getData(zkw, tasknode);
|
byte[] data = ZKUtil.getData(zkw, tasknode);
|
||||||
LOG.info("Task node created " + new String(data));
|
LOG.info("Task node created " + new String(data));
|
||||||
|
@ -195,7 +197,7 @@ public class TestSplitLogManager {
|
||||||
public void testOrphanTaskAcquisition() throws Exception {
|
public void testOrphanTaskAcquisition() throws Exception {
|
||||||
LOG.info("TestOrphanTaskAcquisition");
|
LOG.info("TestOrphanTaskAcquisition");
|
||||||
|
|
||||||
String tasknode = ZKSplitLog.getNodeName(zkw, "orphan");
|
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
|
||||||
zkw.getZooKeeper().create(tasknode,
|
zkw.getZooKeeper().create(tasknode,
|
||||||
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
@ -227,7 +229,7 @@ public class TestSplitLogManager {
|
||||||
public void testUnassignedOrphan() throws Exception {
|
public void testUnassignedOrphan() throws Exception {
|
||||||
LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
|
LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
|
||||||
" startup");
|
" startup");
|
||||||
String tasknode = ZKSplitLog.getNodeName(zkw, "orphan");
|
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
|
||||||
//create an unassigned orphan task
|
//create an unassigned orphan task
|
||||||
zkw.getZooKeeper().create(tasknode,
|
zkw.getZooKeeper().create(tasknode,
|
||||||
TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
||||||
|
@ -268,7 +270,7 @@ public class TestSplitLogManager {
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
String tasknode = submitTaskAndWait(batch, "foo");
|
String tasknode = submitTaskAndWait(batch, "foo/1");
|
||||||
int version = ZKUtil.checkExists(zkw, tasknode);
|
int version = ZKUtil.checkExists(zkw, tasknode);
|
||||||
|
|
||||||
ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
|
ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
|
||||||
|
@ -301,7 +303,7 @@ public class TestSplitLogManager {
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
String tasknode = submitTaskAndWait(batch, "foo");
|
String tasknode = submitTaskAndWait(batch, "foo/1");
|
||||||
int version = ZKUtil.checkExists(zkw, tasknode);
|
int version = ZKUtil.checkExists(zkw, tasknode);
|
||||||
|
|
||||||
ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
|
ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
|
||||||
|
@ -329,7 +331,7 @@ public class TestSplitLogManager {
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
|
slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
String tasknode = submitTaskAndWait(batch, "foo");
|
String tasknode = submitTaskAndWait(batch, "foo/1");
|
||||||
ZKUtil.setData(zkw, tasknode, TaskState.TASK_DONE.get("worker"));
|
ZKUtil.setData(zkw, tasknode, TaskState.TASK_DONE.get("worker"));
|
||||||
synchronized (batch) {
|
synchronized (batch) {
|
||||||
while (batch.installed != batch.done) {
|
while (batch.installed != batch.done) {
|
||||||
|
@ -349,7 +351,7 @@ public class TestSplitLogManager {
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
|
|
||||||
String tasknode = submitTaskAndWait(batch, "foo");
|
String tasknode = submitTaskAndWait(batch, "foo/1");
|
||||||
ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker"));
|
ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker"));
|
||||||
synchronized (batch) {
|
synchronized (batch) {
|
||||||
while (batch.installed != batch.error) {
|
while (batch.installed != batch.error) {
|
||||||
|
@ -368,7 +370,7 @@ public class TestSplitLogManager {
|
||||||
slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
|
slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
|
||||||
slm.finishInitialization();
|
slm.finishInitialization();
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
String tasknode = submitTaskAndWait(batch, "foo");
|
String tasknode = submitTaskAndWait(batch, "foo/1");
|
||||||
ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker"));
|
ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker"));
|
||||||
int version = ZKUtil.checkExists(zkw, tasknode);
|
int version = ZKUtil.checkExists(zkw, tasknode);
|
||||||
|
|
||||||
|
@ -388,7 +390,7 @@ public class TestSplitLogManager {
|
||||||
" resubmit");
|
" resubmit");
|
||||||
|
|
||||||
// create an orphan task in OWNED state
|
// create an orphan task in OWNED state
|
||||||
String tasknode1 = ZKSplitLog.getNodeName(zkw, "orphan");
|
String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
|
||||||
zkw.getZooKeeper().create(tasknode1,
|
zkw.getZooKeeper().create(tasknode1,
|
||||||
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
@ -406,7 +408,7 @@ public class TestSplitLogManager {
|
||||||
|
|
||||||
// submit another task which will stay in unassigned mode
|
// submit another task which will stay in unassigned mode
|
||||||
TaskBatch batch = new TaskBatch();
|
TaskBatch batch = new TaskBatch();
|
||||||
submitTaskAndWait(batch, "foo");
|
submitTaskAndWait(batch, "foo/1");
|
||||||
|
|
||||||
// keep updating the orphan owned node every to/2 seconds
|
// keep updating the orphan owned node every to/2 seconds
|
||||||
for (int i = 0; i < (3 * to)/100; i++) {
|
for (int i = 0; i < (3 * to)/100; i++) {
|
||||||
|
|
|
@ -135,7 +135,7 @@ public class TestSplitLogWorker {
|
||||||
public void testAcquireTaskAtStartup() throws Exception {
|
public void testAcquireTaskAtStartup() throws Exception {
|
||||||
LOG.info("testAcquireTaskAtStartup");
|
LOG.info("testAcquireTaskAtStartup");
|
||||||
|
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tatas"),
|
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
|
||||||
TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -144,14 +144,14 @@ public class TestSplitLogWorker {
|
||||||
slw.start();
|
slw.start();
|
||||||
waitForCounter(tot_wkr_task_acquired, 0, 1, 100);
|
waitForCounter(tot_wkr_task_acquired, 0, 1, 100);
|
||||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||||
ZKSplitLog.getNodeName(zkw, "tatas")), "rs"));
|
ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRaceForTask() throws Exception {
|
public void testRaceForTask() throws Exception {
|
||||||
LOG.info("testRaceForTask");
|
LOG.info("testRaceForTask");
|
||||||
|
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "trft"),
|
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -164,9 +164,9 @@ public class TestSplitLogWorker {
|
||||||
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
|
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
|
||||||
waitForCounter(tot_wkr_failed_to_grab_task_lost_race, 0, 1, 1000);
|
waitForCounter(tot_wkr_failed_to_grab_task_lost_race, 0, 1, 1000);
|
||||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||||
ZKSplitLog.getNodeName(zkw, "trft")), "svr1") ||
|
ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr1") ||
|
||||||
TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||||
ZKSplitLog.getNodeName(zkw, "trft")), "svr2"));
|
ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr2"));
|
||||||
slw1.stop();
|
slw1.stop();
|
||||||
slw2.stop();
|
slw2.stop();
|
||||||
slw1.worker.join();
|
slw1.worker.join();
|
||||||
|
@ -184,16 +184,16 @@ public class TestSplitLogWorker {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
// this time create a task node after starting the splitLogWorker
|
// this time create a task node after starting the splitLogWorker
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tpt_task"),
|
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
|
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
|
||||||
assertEquals(1, slw.taskReadySeq);
|
assertEquals(1, slw.taskReadySeq);
|
||||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||||
ZKSplitLog.getNodeName(zkw, "tpt_task")), "tpt_svr"));
|
ZKSplitLog.getEncodedNodeName(zkw, "tpt_task")), "tpt_svr"));
|
||||||
|
|
||||||
ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "tpt_task"),
|
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"));
|
TaskState.TASK_UNASSIGNED.get("manager"));
|
||||||
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
||||||
}
|
}
|
||||||
|
@ -207,7 +207,7 @@ public class TestSplitLogWorker {
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tmt_task"),
|
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -215,19 +215,19 @@ public class TestSplitLogWorker {
|
||||||
// now the worker is busy doing the above task
|
// now the worker is busy doing the above task
|
||||||
|
|
||||||
// create another task
|
// create another task
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tmt_task_2"),
|
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
// preempt the first task, have it owned by another worker
|
// preempt the first task, have it owned by another worker
|
||||||
ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "tmt_task"),
|
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
|
||||||
TaskState.TASK_OWNED.get("another-worker"));
|
TaskState.TASK_OWNED.get("another-worker"));
|
||||||
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
||||||
|
|
||||||
waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
|
waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
|
||||||
assertEquals(2, slw.taskReadySeq);
|
assertEquals(2, slw.taskReadySeq);
|
||||||
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
|
||||||
ZKSplitLog.getNodeName(zkw, "tmt_task_2")), "tmt_svr"));
|
ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2")), "tmt_svr"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -239,7 +239,7 @@ public class TestSplitLogWorker {
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "task"),
|
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -247,12 +247,12 @@ public class TestSplitLogWorker {
|
||||||
// now the worker is busy doing the above task
|
// now the worker is busy doing the above task
|
||||||
|
|
||||||
// preempt the task, have it owned by another worker
|
// preempt the task, have it owned by another worker
|
||||||
ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "task"),
|
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "task"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"));
|
TaskState.TASK_UNASSIGNED.get("manager"));
|
||||||
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
||||||
|
|
||||||
// create a RESCAN node
|
// create a RESCAN node
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "RESCAN"),
|
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT_SEQUENTIAL);
|
CreateMode.PERSISTENT_SEQUENTIAL);
|
||||||
|
|
||||||
|
@ -260,7 +260,7 @@ public class TestSplitLogWorker {
|
||||||
// RESCAN node might not have been processed if the worker became busy
|
// RESCAN node might not have been processed if the worker became busy
|
||||||
// with the above task. preempt the task again so that now the RESCAN
|
// with the above task. preempt the task again so that now the RESCAN
|
||||||
// node is processed
|
// node is processed
|
||||||
ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "task"),
|
ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "task"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"));
|
TaskState.TASK_UNASSIGNED.get("manager"));
|
||||||
waitForCounter(tot_wkr_preempt_task, 1, 2, 1000);
|
waitForCounter(tot_wkr_preempt_task, 1, 2, 1000);
|
||||||
waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000);
|
waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000);
|
||||||
|
@ -272,7 +272,7 @@ public class TestSplitLogWorker {
|
||||||
num++;
|
num++;
|
||||||
if (node.startsWith("RESCAN")) {
|
if (node.startsWith("RESCAN")) {
|
||||||
assertTrue(TaskState.TASK_DONE.equals(ZKUtil.getData(zkw,
|
assertTrue(TaskState.TASK_DONE.equals(ZKUtil.getData(zkw,
|
||||||
ZKSplitLog.getNodeName(zkw, node)), "svr"));
|
ZKSplitLog.getEncodedNodeName(zkw, node)), "svr"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertEquals(2, num);
|
assertEquals(2, num);
|
||||||
|
|
Loading…
Reference in New Issue