diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java index cf0fb4f3e24..05576184655 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java @@ -43,7 +43,6 @@ import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; -import org.joda.time.Period; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.exceptions.CallbackFailedException; @@ -98,7 +97,7 @@ public class DbTaskStorage implements TaskStorage } @Override - public void insert(final Task task, final TaskStatus status) + public void insert(final Task task, final TaskStatus status) throws TaskExistsException { Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(status, "status"); @@ -137,9 +136,11 @@ public class DbTaskStorage implements TaskStorage } ); } - catch (StatementException e) { - // Might be a duplicate task ID. - if (getTask(task.getId()).isPresent()) { + catch (Exception e) { + final boolean isStatementException = e instanceof StatementException || + (e instanceof CallbackFailedException + && e.getCause() instanceof StatementException); + if (isStatementException && getTask(task.getId()).isPresent()) { throw new TaskExistsException(task.getId(), e); } else { throw e; @@ -533,11 +534,8 @@ public class DbTaskStorage implements TaskStorage try { return RetryUtils.retry(call, shouldRetry, maxTries); } - catch (RuntimeException e) { - throw Throwables.propagate(e); - } catch (Exception e) { - throw new CallbackFailedException(e); + throw Throwables.propagate(e); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java index ef942e5c12f..f279f4fad22 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -63,7 +63,7 @@ public class HeapMemoryTaskStorage implements TaskStorage } @Override - public void insert(Task task, TaskStatus status) + public void insert(Task task, TaskStatus status) throws TaskExistsException { giant.lock(); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 2c482b775f9..0dbc9031200 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -61,6 +61,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -335,7 +336,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer pendingTaskPayloads.remove(taskId); log.info("Removed task from pending queue: %s", taskId); } else if (completeTasks.containsKey(taskId)) { - cleanup(completeTasks.get(taskId).getWorker().getHost(), taskId); + cleanup(taskId); } else { final ZkWorker zkWorker = findWorkerRunningTask(taskId); @@ -469,28 +470,32 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer /** * Removes a task from the complete queue and clears out the ZK status path of the task. * - * @param workerId - the worker that was previously running the task * @param taskId - the task to cleanup */ - private void cleanup(final String workerId, final String taskId) + private void cleanup(final String taskId) { if (!started) { return; } - if (completeTasks.remove(taskId) == null) { + final RemoteTaskRunnerWorkItem removed = completeTasks.remove(taskId); + final Worker worker = removed.getWorker(); + if (removed == null || worker == null) { log.makeAlert("WTF?! Asked to cleanup nonexistent task") - .addData("workerId", workerId) .addData("taskId", taskId) .emit(); } else { + final String workerId = worker.getHost(); log.info("Cleaning up task[%s] on worker[%s]", taskId, workerId); final String statusPath = JOINER.join(zkPaths.getIndexerStatusPath(), workerId, taskId); try { cf.delete().guaranteed().forPath(statusPath); } - catch (Exception e) { + catch (KeeperException.NoNodeException e) { log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", statusPath); } + catch (Exception e) { + throw Throwables.propagate(e); + } } } @@ -593,7 +598,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer elapsed, config.getTaskAssignmentTimeout() ); - taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId())); break; } @@ -666,7 +670,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer SettableFuture.create(), zkWorker.getWorker() ); - runningTasks.put(taskId, taskRunnerWorkItem); + runningTasks.put(taskId, taskRunnerWorkItem.withWorker(zkWorker.getWorker())); } if (taskStatus.isComplete()) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java index 3b752e086bb..91181cc5d24 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java @@ -19,7 +19,7 @@ package io.druid.indexing.overlord; -public class TaskExistsException extends RuntimeException +public class TaskExistsException extends Exception { private final String taskId; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 693a504542c..29c265dc067 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -296,7 +296,7 @@ public class TaskQueue * * @return true */ - public boolean add(final Task task) + public boolean add(final Task task) throws TaskExistsException { giant.lock(); @@ -306,15 +306,8 @@ public class TaskQueue Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize()); // If this throws with any sort of exception, including TaskExistsException, we don't want to - // insert the task into our queue. - try { - taskStorage.insert(task, TaskStatus.running(task.getId())); - } - catch (TaskExistsException e) { - log.warn("Attempt to add task twice: %s", task.getId()); - throw Throwables.propagate(e); - } - + // insert the task into our queue. So don't catch it. + taskStorage.insert(task, TaskStatus.running(task.getId())); tasks.add(task); managementMayBeNecessary.signalAll(); return true; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java index fb289459256..c783c582896 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java @@ -30,10 +30,11 @@ import java.util.List; public interface TaskStorage { /** - * Adds a task to the storage facility with a particular status. If the task ID already exists, this method - * will throw a {@link TaskExistsException}. + * Adds a task to the storage facility with a particular status. + * + * @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists */ - public void insert(Task task, TaskStatus status); + public void insert(Task task, TaskStatus status) throws TaskExistsException; /** * Persists task status in the storage facility. This method should throw an exception if the task status lifecycle diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index f161cb3c278..5a3eb1c5e07 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -36,6 +36,7 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.TaskExistsException; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; import io.druid.indexing.overlord.TaskRunner; @@ -45,6 +46,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.tasklogs.TaskLogStreamer; import io.druid.timeline.DataSegment; +import org.apache.zookeeper.data.Stat; import org.joda.time.DateTime; import javax.ws.rs.Consumes; @@ -126,8 +128,15 @@ public class OverlordResource @Override public Response apply(TaskQueue taskQueue) { - taskQueue.add(task); - return Response.ok(ImmutableMap.of("task", task.getId())).build(); + try { + taskQueue.add(task); + return Response.ok(ImmutableMap.of("task", task.getId())).build(); + } + catch (TaskExistsException e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", String.format("Task[%s] already exists!", task.getId()))) + .build(); + } } } );