From 9669e79df23653a78a283b4087ab710dfe8664b0 Mon Sep 17 00:00:00 2001 From: Himanshu Date: Thu, 28 Apr 2016 12:28:00 -0500 Subject: [PATCH] fix misleading error log due to race in RTR and concurrency test (#2878) --- .../indexing/overlord/RemoteTaskRunner.java | 53 +++-- ...kRunnerRunPendingTasksConcurrencyTest.java | 150 +++++++++++++ .../overlord/RemoteTaskRunnerTest.java | 131 ++--------- .../overlord/RemoteTaskRunnerTestUtils.java | 203 ++++++++++++++++++ 4 files changed, 415 insertions(+), 122 deletions(-) create mode 100644 indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 1c7345c98b7..fe4eba5772f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -588,7 +588,11 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer String taskId = taskRunnerWorkItem.getTaskId(); if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) { try { - if (tryAssignTask(pendingTaskPayloads.get(taskId), taskRunnerWorkItem)) { + //this can still be null due to race from explicit task shutdown request + //or if another thread steals and completes this task right after this thread makes copy + //of pending tasks. See https://github.com/druid-io/druid/issues/2842 . + Task task = pendingTaskPayloads.get(taskId); + if (task != null && tryAssignTask(task, taskRunnerWorkItem)) { pendingTaskPayloads.remove(taskId); } } @@ -597,7 +601,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer .addData("taskId", taskRunnerWorkItem.getTaskId()) .emit(); RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId); - taskComplete(workItem, null, TaskStatus.failure(taskId)); + if (workItem != null) { + taskComplete(workItem, null, TaskStatus.failure(taskId)); + } } finally { tryAssignTasks.remove(taskId); @@ -677,8 +683,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } ZkWorker assignedWorker = null; + Optional immutableZkWorker = null; try { - final Optional immutableZkWorker = strategy.findWorkerForTask( + immutableZkWorker = strategy.findWorkerForTask( config, ImmutableMap.copyOf( Maps.transformEntries( @@ -708,22 +715,38 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer task ); - if (immutableZkWorker.isPresent() - && - workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId()) - == null) { - assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); - return announceTask(task, assignedWorker, taskRunnerWorkItem); + if (immutableZkWorker.isPresent()) { + if (workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId()) + == null) { + assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); + return announceTask(task, assignedWorker, taskRunnerWorkItem); + } else { + log.debug( + "Lost race to run task [%s] on worker [%s]. Workers to ack tasks are [%s].", + task.getId(), + immutableZkWorker.get().getWorker().getHost(), + workersWithUnacknowledgedTask + ); + } + } else { + log.debug( + "Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].", + task.getId(), + zkWorkers.values(), + workersWithUnacknowledgedTask + ); } - log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); return false; } finally { if (assignedWorker != null) { workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost()); - // note that this is essential as a task might not get a worker because a worker was assigned another task. - // so this will ensure that other pending tasks are tried for assignment again. + } + + if(immutableZkWorker.isPresent()) { + //if this attempt lost the race to run the task then there might be another worker available to try on. + //if this attempt won the race to run the task then other task might be able to use this worker now after task ack. runPendingTasks(); } } @@ -1196,4 +1219,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer { return config; } + + @VisibleForTesting + Map getWorkersWithUnacknowledgedTask() + { + return workersWithUnacknowledgedTask; + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java new file mode 100644 index 00000000000..6f77215834a --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java @@ -0,0 +1,150 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + * + */ + +package io.druid.indexing.overlord; + +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TestTasks; +import io.druid.indexing.common.task.Task; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + */ +public class RemoteTaskRunnerRunPendingTasksConcurrencyTest +{ + private RemoteTaskRunner remoteTaskRunner; + private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils(); + + @Before + public void setUp() throws Exception + { + rtrTestUtils.setUp(); + } + + @After + public void tearDown() throws Exception + { + if (remoteTaskRunner != null) { + remoteTaskRunner.stop(); + } + rtrTestUtils.tearDown(); + } + + // This task reproduces the races described in https://github.com/druid-io/druid/issues/2842 + @Test(timeout = 60_000) + public void testConcurrency() throws Exception + { + rtrTestUtils.makeWorker("worker0"); + rtrTestUtils.makeWorker("worker1"); + + remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner( + new TestRemoteTaskRunnerConfig(new Period("PT3600S")) + { + public int getPendingTasksRunnerNumThreads() + { + return 2; + } + } + ); + + int numTasks = 6; + ListenableFuture[] results = new ListenableFuture[numTasks]; + Task[] tasks = new Task[numTasks]; + + //2 tasks + for (int i = 0; i < 2; i++) { + tasks[i] = TestTasks.unending("task" + i); + results[i] = (remoteTaskRunner.run(tasks[i])); + } + + while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) { + Thread.sleep(5); + } + + //3 more tasks, all of which get queued up + for (int i = 2; i < 5; i++) { + tasks[i] = TestTasks.unending("task" + i); + results[i] = (remoteTaskRunner.run(tasks[i])); + } + + //simulate completion of task0 and task1 + if (rtrTestUtils.taskAnnounced("worker0", tasks[0].getId())) { + rtrTestUtils.mockWorkerRunningTask("worker0", tasks[0]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[0]); + rtrTestUtils.mockWorkerRunningTask("worker1", tasks[1]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[1]); + } else { + rtrTestUtils.mockWorkerRunningTask("worker0", tasks[1]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[1]); + rtrTestUtils.mockWorkerRunningTask("worker1", tasks[0]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[0]); + } + + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[0].get().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[1].get().getStatusCode()); + + // now both threads race to run the last 3 tasks. task2 and task3 are being assigned + while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) { + Thread.sleep(5); + } + + //cancel task4, both executor threads should be able to ignore task4 + remoteTaskRunner.shutdown("task4"); + + //simulate completion of task3 before task2 so that the executor thread with task2 + //gets to task3 and ignores it + if (rtrTestUtils.taskAnnounced("worker0", tasks[3].getId())) { + rtrTestUtils.mockWorkerRunningTask("worker0", tasks[3]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[3]); + rtrTestUtils.mockWorkerRunningTask("worker1", tasks[2]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[2]); + } else { + rtrTestUtils.mockWorkerRunningTask("worker1", tasks[3]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[3]); + rtrTestUtils.mockWorkerRunningTask("worker0", tasks[2]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[2]); + } + + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[2].get().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[3].get().getStatusCode()); + + //ensure that RTR is doing OK and still making progress + tasks[5] = TestTasks.unending("task5"); + results[5] = remoteTaskRunner.run(tasks[5]); + while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 1) { + Thread.sleep(5); + } + if (rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) { + rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]); + } else { + rtrTestUtils.mockWorkerRunningTask("worker1", tasks[5]); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[5]); + } + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[5].get().getStatusCode()); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 3bda5f25d80..72381a06559 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -23,36 +23,22 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.common.guava.DSuppliers; -import io.druid.curator.PotentiallyGzippedCompressionProvider; -import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.indexing.common.IndexingServiceCondition; -import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TestRealtimeTask; import io.druid.indexing.common.TestTasks; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; -import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; -import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; -import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; -import io.druid.server.initialization.IndexerZkConfig; -import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.curator.test.TestingCluster; import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; import org.joda.time.Period; @@ -65,46 +51,29 @@ import java.util.Collection; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; public class RemoteTaskRunnerTest { - private static final Joiner joiner = Joiner.on("/"); - private static final String basePath = "/test/druid"; - private static final String announcementsPath = String.format("%s/indexer/announcements/worker", basePath); - private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath); - private static final String statusPath = String.format("%s/indexer/status/worker", basePath); + private static final Joiner joiner = RemoteTaskRunnerTestUtils.joiner; + private static final String workerHost = "worker"; + private static final String announcementsPath = joiner.join(RemoteTaskRunnerTestUtils.announcementsPath, workerHost); + private static final String statusPath = joiner.join(RemoteTaskRunnerTestUtils.statusPath, workerHost); private static final int TIMEOUT_SECONDS = 20; - private static final TaskLocation DUMMY_LOCATION = TaskLocation.create("dummy", 9000); - private ObjectMapper jsonMapper; - - private TestingCluster testingCluster; - private CuratorFramework cf; private RemoteTaskRunner remoteTaskRunner; + private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils(); + private ObjectMapper jsonMapper; + private CuratorFramework cf; private Task task; - private Worker worker; @Before public void setUp() throws Exception { - TestUtils testUtils = new TestUtils(); - jsonMapper = testUtils.getTestObjectMapper(); - - testingCluster = new TestingCluster(1); - testingCluster.start(); - - cf = CuratorFrameworkFactory.builder() - .connectString(testingCluster.getConnectString()) - .retryPolicy(new ExponentialBackoffRetry(1, 10)) - .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) - .build(); - cf.start(); - cf.blockUntilConnected(); - cf.create().creatingParentsIfNeeded().forPath(basePath); - cf.create().creatingParentsIfNeeded().forPath(tasksPath); + rtrTestUtils.setUp(); + jsonMapper = rtrTestUtils.getObjectMapper(); + cf = rtrTestUtils.getCuratorFramework(); task = TestTasks.unending("task"); } @@ -112,9 +81,10 @@ public class RemoteTaskRunnerTest @After public void tearDown() throws Exception { - remoteTaskRunner.stop(); - cf.close(); - testingCluster.stop(); + if (remoteTaskRunner != null) { + remoteTaskRunner.stop(); + } + rtrTestUtils.tearDown(); } @Test @@ -461,80 +431,27 @@ public class RemoteTaskRunnerTest private void makeRemoteTaskRunner(RemoteTaskRunnerConfig config) throws Exception { - remoteTaskRunner = new RemoteTaskRunner( - jsonMapper, - config, - new IndexerZkConfig( - new ZkPathsConfig() - { - @Override - public String getBase() - { - return basePath; - } - }, null, null, null, null, null - ), - cf, - new SimplePathChildrenCacheFactory.Builder().build(), - null, - DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())), - ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"), - new NoopResourceManagementStrategy() - ); - - remoteTaskRunner.start(); + remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(config); } private void makeWorker() throws Exception { - worker = new Worker( - "worker", - "localhost", - 3, - "0" - ); - - cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( - announcementsPath, - jsonMapper.writeValueAsBytes(worker) - ); + worker = rtrTestUtils.makeWorker(workerHost); } private void disableWorker() throws Exception { - cf.setData().forPath( - announcementsPath, - jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "")) - ); + rtrTestUtils.disableWorker(worker); } private boolean taskAnnounced(final String taskId) { - return pathExists(joiner.join(tasksPath, taskId)); + return rtrTestUtils.taskAnnounced(workerHost, taskId); } private boolean workerRunningTask(final String taskId) { - return pathExists(joiner.join(statusPath, taskId)); - } - - private boolean pathExists(final String path) - { - return TestUtils.conditionValid( - new IndexingServiceCondition() - { - @Override - public boolean isValid() - { - try { - return cf.checkExists().forPath(path) != null; - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ); + return rtrTestUtils.workerRunningTask(workerHost, taskId); } private boolean workerCompletedTask(final ListenableFuture result) @@ -553,18 +470,12 @@ public class RemoteTaskRunnerTest private void mockWorkerRunningTask(final Task task) throws Exception { - cf.delete().forPath(joiner.join(tasksPath, task.getId())); - - TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()), DUMMY_LOCATION); - cf.create() - .creatingParentsIfNeeded() - .forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); + rtrTestUtils.mockWorkerRunningTask("worker", task); } private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception { - TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId()), DUMMY_LOCATION); - cf.setData().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker", task); } @Test diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java new file mode 100644 index 00000000000..226be910f80 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -0,0 +1,203 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + * + */ + +package io.druid.indexing.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.base.Throwables; +import com.metamx.common.concurrent.ScheduledExecutors; +import io.druid.common.guava.DSuppliers; +import io.druid.curator.PotentiallyGzippedCompressionProvider; +import io.druid.curator.cache.SimplePathChildrenCacheFactory; +import io.druid.indexing.common.IndexingServiceCondition; +import io.druid.indexing.common.TaskLocation; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import io.druid.indexing.worker.TaskAnnouncement; +import io.druid.indexing.worker.Worker; +import io.druid.server.initialization.IndexerZkConfig; +import io.druid.server.initialization.ZkPathsConfig; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.apache.zookeeper.CreateMode; + +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class RemoteTaskRunnerTestUtils +{ + static final Joiner joiner = Joiner.on("/"); + static final String basePath = "/test/druid"; + static final String announcementsPath = String.format("%s/indexer/announcements", basePath); + static final String tasksPath = String.format("%s/indexer/tasks", basePath); + static final String statusPath = String.format("%s/indexer/status", basePath); + static final TaskLocation DUMMY_LOCATION = TaskLocation.create("dummy", 9000); + + private TestingCluster testingCluster; + + private CuratorFramework cf; + private ObjectMapper jsonMapper; + + RemoteTaskRunnerTestUtils() + { + TestUtils testUtils = new TestUtils(); + jsonMapper = testUtils.getTestObjectMapper(); + } + + CuratorFramework getCuratorFramework() + { + return cf; + } + + ObjectMapper getObjectMapper() + { + return jsonMapper; + } + + void setUp() throws Exception + { + testingCluster = new TestingCluster(1); + testingCluster.start(); + + cf = CuratorFrameworkFactory.builder() + .connectString(testingCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) + .build(); + cf.start(); + cf.blockUntilConnected(); + cf.create().creatingParentsIfNeeded().forPath(basePath); + cf.create().creatingParentsIfNeeded().forPath(tasksPath); + } + + void tearDown() throws Exception + { + cf.close(); + testingCluster.stop(); + } + + RemoteTaskRunner makeRemoteTaskRunner(RemoteTaskRunnerConfig config) throws Exception + { + RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner( + jsonMapper, + config, + new IndexerZkConfig( + new ZkPathsConfig() + { + @Override + public String getBase() + { + return basePath; + } + }, null, null, null, null, null + ), + cf, + new SimplePathChildrenCacheFactory.Builder().build(), + null, + DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())), + ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"), + new NoopResourceManagementStrategy() + ); + + remoteTaskRunner.start(); + return remoteTaskRunner; + } + + Worker makeWorker(final String workerId) throws Exception + { + Worker worker = new Worker( + workerId, + workerId, + 3, + "0" + ); + + cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( + joiner.join(announcementsPath, workerId), + jsonMapper.writeValueAsBytes(worker) + ); + cf.create().creatingParentsIfNeeded().forPath(joiner.join(tasksPath, workerId)); + + return worker; + } + + void disableWorker(Worker worker) throws Exception + { + cf.setData().forPath( + joiner.join(announcementsPath, worker.getHost()), + jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "")) + ); + } + + void mockWorkerRunningTask(final String workerId, final Task task) throws Exception + { + cf.delete().forPath(joiner.join(tasksPath, workerId, task.getId())); + + TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()), DUMMY_LOCATION); + cf.create() + .creatingParentsIfNeeded() + .forPath(joiner.join(statusPath, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); + } + + void mockWorkerCompleteSuccessfulTask(final String workerId, final Task task) throws Exception + { + TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId()), DUMMY_LOCATION); + cf.setData().forPath(joiner.join(statusPath, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); + } + + boolean workerRunningTask(final String workerId, final String taskId) + { + return pathExists(joiner.join(statusPath, workerId, taskId)); + } + + boolean taskAnnounced(final String workerId, final String taskId) + { + return pathExists(joiner.join(tasksPath, workerId, taskId)); + } + + boolean pathExists(final String path) + { + return TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + try { + return cf.checkExists().forPath(path) != null; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } +}