Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
fjy 2014-01-13 18:02:03 -08:00
commit 5c64ee2301
7 changed files with 39 additions and 34 deletions

View File

@ -43,7 +43,6 @@ import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Period;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.exceptions.CallbackFailedException;
@ -98,7 +97,7 @@ public class DbTaskStorage implements TaskStorage
} }
@Override @Override
public void insert(final Task task, final TaskStatus status) public void insert(final Task task, final TaskStatus status) throws TaskExistsException
{ {
Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status"); Preconditions.checkNotNull(status, "status");
@ -137,9 +136,11 @@ public class DbTaskStorage implements TaskStorage
} }
); );
} }
catch (StatementException e) { catch (Exception e) {
// Might be a duplicate task ID. final boolean isStatementException = e instanceof StatementException ||
if (getTask(task.getId()).isPresent()) { (e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
if (isStatementException && getTask(task.getId()).isPresent()) {
throw new TaskExistsException(task.getId(), e); throw new TaskExistsException(task.getId(), e);
} else { } else {
throw e; throw e;
@ -533,11 +534,8 @@ public class DbTaskStorage implements TaskStorage
try { try {
return RetryUtils.retry(call, shouldRetry, maxTries); return RetryUtils.retry(call, shouldRetry, maxTries);
} }
catch (RuntimeException e) {
throw Throwables.propagate(e);
}
catch (Exception e) { catch (Exception e) {
throw new CallbackFailedException(e); throw Throwables.propagate(e);
} }
} }

View File

@ -63,7 +63,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
} }
@Override @Override
public void insert(Task task, TaskStatus status) public void insert(Task task, TaskStatus status) throws TaskExistsException
{ {
giant.lock(); giant.lock();

View File

@ -61,6 +61,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -335,7 +336,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
pendingTaskPayloads.remove(taskId); pendingTaskPayloads.remove(taskId);
log.info("Removed task from pending queue: %s", taskId); log.info("Removed task from pending queue: %s", taskId);
} else if (completeTasks.containsKey(taskId)) { } else if (completeTasks.containsKey(taskId)) {
cleanup(completeTasks.get(taskId).getWorker().getHost(), taskId); cleanup(taskId);
} else { } else {
final ZkWorker zkWorker = findWorkerRunningTask(taskId); final ZkWorker zkWorker = findWorkerRunningTask(taskId);
@ -469,28 +470,32 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
/** /**
* Removes a task from the complete queue and clears out the ZK status path of the task. * Removes a task from the complete queue and clears out the ZK status path of the task.
* *
* @param workerId - the worker that was previously running the task
* @param taskId - the task to cleanup * @param taskId - the task to cleanup
*/ */
private void cleanup(final String workerId, final String taskId) private void cleanup(final String taskId)
{ {
if (!started) { if (!started) {
return; return;
} }
if (completeTasks.remove(taskId) == null) { final RemoteTaskRunnerWorkItem removed = completeTasks.remove(taskId);
final Worker worker = removed.getWorker();
if (removed == null || worker == null) {
log.makeAlert("WTF?! Asked to cleanup nonexistent task") log.makeAlert("WTF?! Asked to cleanup nonexistent task")
.addData("workerId", workerId)
.addData("taskId", taskId) .addData("taskId", taskId)
.emit(); .emit();
} else { } else {
final String workerId = worker.getHost();
log.info("Cleaning up task[%s] on worker[%s]", taskId, workerId); log.info("Cleaning up task[%s] on worker[%s]", taskId, workerId);
final String statusPath = JOINER.join(zkPaths.getIndexerStatusPath(), workerId, taskId); final String statusPath = JOINER.join(zkPaths.getIndexerStatusPath(), workerId, taskId);
try { try {
cf.delete().guaranteed().forPath(statusPath); cf.delete().guaranteed().forPath(statusPath);
} }
catch (Exception e) { catch (KeeperException.NoNodeException e) {
log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", statusPath); log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", statusPath);
} }
catch (Exception e) {
throw Throwables.propagate(e);
}
} }
} }
@ -593,7 +598,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
elapsed, elapsed,
config.getTaskAssignmentTimeout() config.getTaskAssignmentTimeout()
); );
taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId())); taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
break; break;
} }
@ -666,7 +670,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
SettableFuture.<TaskStatus>create(), SettableFuture.<TaskStatus>create(),
zkWorker.getWorker() zkWorker.getWorker()
); );
runningTasks.put(taskId, taskRunnerWorkItem); runningTasks.put(taskId, taskRunnerWorkItem.withWorker(zkWorker.getWorker()));
} }
if (taskStatus.isComplete()) { if (taskStatus.isComplete()) {

View File

@ -19,7 +19,7 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
public class TaskExistsException extends RuntimeException public class TaskExistsException extends Exception
{ {
private final String taskId; private final String taskId;

View File

@ -296,7 +296,7 @@ public class TaskQueue
* *
* @return true * @return true
*/ */
public boolean add(final Task task) public boolean add(final Task task) throws TaskExistsException
{ {
giant.lock(); giant.lock();
@ -306,15 +306,8 @@ public class TaskQueue
Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize()); Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize());
// If this throws with any sort of exception, including TaskExistsException, we don't want to // If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue. // insert the task into our queue. So don't catch it.
try { taskStorage.insert(task, TaskStatus.running(task.getId()));
taskStorage.insert(task, TaskStatus.running(task.getId()));
}
catch (TaskExistsException e) {
log.warn("Attempt to add task twice: %s", task.getId());
throw Throwables.propagate(e);
}
tasks.add(task); tasks.add(task);
managementMayBeNecessary.signalAll(); managementMayBeNecessary.signalAll();
return true; return true;

View File

@ -30,10 +30,11 @@ import java.util.List;
public interface TaskStorage public interface TaskStorage
{ {
/** /**
* Adds a task to the storage facility with a particular status. If the task ID already exists, this method * Adds a task to the storage facility with a particular status.
* will throw a {@link TaskExistsException}. *
* @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists
*/ */
public void insert(Task task, TaskStatus status); public void insert(Task task, TaskStatus status) throws TaskExistsException;
/** /**
* Persists task status in the storage facility. This method should throw an exception if the task status lifecycle * Persists task status in the storage facility. This method should throw an exception if the task status lifecycle

View File

@ -36,6 +36,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskExistsException;
import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue; import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunner;
@ -45,6 +46,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer; import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.zookeeper.data.Stat;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
@ -126,8 +128,15 @@ public class OverlordResource
@Override @Override
public Response apply(TaskQueue taskQueue) public Response apply(TaskQueue taskQueue)
{ {
taskQueue.add(task); try {
return Response.ok(ImmutableMap.of("task", task.getId())).build(); taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (TaskExistsException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", String.format("Task[%s] already exists!", task.getId())))
.build();
}
} }
} }
); );