Fix timeout in RemoteTaskRunnerTest (#5191)

* Fix timeout in RemoteTaskRunnerTest

* add message for npe
This commit is contained in:
Jihoon Son 2017-12-22 17:40:11 +09:00 committed by GitHub
parent ba873c614b
commit b31abd03ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 7 deletions

View File

@ -481,10 +481,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return null;
}
public boolean isWorkerRunningTask(Worker worker, String taskId)
public boolean isWorkerRunningTask(ZkWorker worker, String taskId)
{
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
return (zkWorker != null && zkWorker.isRunningTask(taskId));
return Preconditions.checkNotNull(worker, "worker").isRunningTask(taskId);
}
/**
@ -869,7 +868,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
Stopwatch timeoutStopwatch = Stopwatch.createStarted();
while (!isWorkerRunningTask(theZkWorker.getWorker(), task.getId())) {
while (!isWorkerRunningTask(theZkWorker, task.getId())) {
final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);

View File

@ -27,16 +27,15 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerListener;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;

View File

@ -21,6 +21,7 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.metamx.http.client.HttpClient;
@ -170,10 +171,17 @@ public class RemoteTaskRunnerTestUtils
{
cf.delete().forPath(joiner.join(tasksPath, workerId, task.getId()));
final String taskStatusPath = joiner.join(statusPath, workerId, task.getId());
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()), DUMMY_LOCATION);
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(statusPath, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
.forPath(taskStatusPath, jsonMapper.writeValueAsBytes(taskAnnouncement));
Preconditions.checkNotNull(
cf.checkExists().forPath(taskStatusPath),
"Failed to write status on [%s]",
taskStatusPath
);
}
void mockWorkerCompleteSuccessfulTask(final String workerId, final Task task) throws Exception