Merge branch 'master' into improve-bard

This commit is contained in:
fjy 2013-05-14 11:10:52 -07:00
commit 4c93cecd31
11 changed files with 43 additions and 23 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -25,6 +25,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
@ -230,6 +231,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
return null;
}
public boolean isWorkerRunningTask(String workerHost, String taskId)
{
ZkWorker zkWorker = zkWorkers.get(workerHost);
return (zkWorker != null && zkWorker.getRunningTasks().contains(taskId));
}
/**
* A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.
*
@ -350,6 +358,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
/**
* Adds a task to the pending queue
*
* @param taskRunnerWorkItem
*/
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem)
@ -489,25 +498,36 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
cf.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(
JOINER.join(
config.getIndexerTaskPath(),
theWorker.getHost(),
task.getId()
),
rawBytes
);
String taskPath = JOINER.join(config.getIndexerTaskPath(), theWorker.getHost(), task.getId());
if (cf.checkExists().forPath(taskPath) == null) {
cf.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(
taskPath, rawBytes
);
}
runningTasks.put(task.getId(), pendingTasks.remove(task.getId()));
log.info("Task %s switched from pending to running", task.getId());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
Stopwatch timeoutStopwatch = new Stopwatch();
timeoutStopwatch.start();
synchronized (statusLock) {
while (findWorkerRunningTask(task.getId()) == null) {
while (!isWorkerRunningTask(theWorker.getHost(), task.getId())) {
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
if (timeoutStopwatch.elapsedMillis() >= config.getTaskAssignmentTimeoutDuration().getMillis()) {
log.error(
"Something went wrong! %s never ran task %s after %s!",
theWorker.getHost(),
task.getId(),
config.getTaskAssignmentTimeoutDuration()
);
retryTask(runningTasks.get(task.getId()), theWorker.getHost());
break;
}
}
}
}
@ -687,7 +707,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
);
if (workerQueue.isEmpty()) {
log.info("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return null;
}

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version>
<version>0.4.13-SNAPSHOT</version>
</parent>
<dependencies>