From 1331f2ce568ab54bd3bdbceffb842803ec304c2e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 13 Jan 2014 15:42:05 -0800 Subject: [PATCH] TaskStorage.add() now throws TaskExistsException, and the servlet respects it The servlet will throw 400 rather than 500 when a task already exists, to signify that the request has no hope of ever working. --- .../druid/indexing/overlord/DbTaskStorage.java | 16 +++++++--------- .../indexing/overlord/HeapMemoryTaskStorage.java | 2 +- .../indexing/overlord/TaskExistsException.java | 2 +- .../io/druid/indexing/overlord/TaskQueue.java | 13 +++---------- .../io/druid/indexing/overlord/TaskStorage.java | 7 ++++--- .../indexing/overlord/http/OverlordResource.java | 13 +++++++++++-- 6 files changed, 27 insertions(+), 26 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java index cf0fb4f3e24..05576184655 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java @@ -43,7 +43,6 @@ import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; -import org.joda.time.Period; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.exceptions.CallbackFailedException; @@ -98,7 +97,7 @@ public class DbTaskStorage implements TaskStorage } @Override - public void insert(final Task task, final TaskStatus status) + public void insert(final Task task, final TaskStatus status) throws TaskExistsException { Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(status, "status"); @@ -137,9 +136,11 @@ public class DbTaskStorage implements TaskStorage } ); } - catch (StatementException e) { - // Might be a duplicate task ID. - if (getTask(task.getId()).isPresent()) { + catch (Exception e) { + final boolean isStatementException = e instanceof StatementException || + (e instanceof CallbackFailedException + && e.getCause() instanceof StatementException); + if (isStatementException && getTask(task.getId()).isPresent()) { throw new TaskExistsException(task.getId(), e); } else { throw e; @@ -533,11 +534,8 @@ public class DbTaskStorage implements TaskStorage try { return RetryUtils.retry(call, shouldRetry, maxTries); } - catch (RuntimeException e) { - throw Throwables.propagate(e); - } catch (Exception e) { - throw new CallbackFailedException(e); + throw Throwables.propagate(e); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java index ef942e5c12f..f279f4fad22 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -63,7 +63,7 @@ public class HeapMemoryTaskStorage implements TaskStorage } @Override - public void insert(Task task, TaskStatus status) + public void insert(Task task, TaskStatus status) throws TaskExistsException { giant.lock(); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java index 3b752e086bb..91181cc5d24 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java @@ -19,7 +19,7 @@ package io.druid.indexing.overlord; -public class TaskExistsException extends RuntimeException +public class TaskExistsException extends Exception { private final String taskId; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 693a504542c..29c265dc067 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -296,7 +296,7 @@ public class TaskQueue * * @return true */ - public boolean add(final Task task) + public boolean add(final Task task) throws TaskExistsException { giant.lock(); @@ -306,15 +306,8 @@ public class TaskQueue Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize()); // If this throws with any sort of exception, including TaskExistsException, we don't want to - // insert the task into our queue. - try { - taskStorage.insert(task, TaskStatus.running(task.getId())); - } - catch (TaskExistsException e) { - log.warn("Attempt to add task twice: %s", task.getId()); - throw Throwables.propagate(e); - } - + // insert the task into our queue. So don't catch it. + taskStorage.insert(task, TaskStatus.running(task.getId())); tasks.add(task); managementMayBeNecessary.signalAll(); return true; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java index fb289459256..c783c582896 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java @@ -30,10 +30,11 @@ import java.util.List; public interface TaskStorage { /** - * Adds a task to the storage facility with a particular status. If the task ID already exists, this method - * will throw a {@link TaskExistsException}. + * Adds a task to the storage facility with a particular status. + * + * @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists */ - public void insert(Task task, TaskStatus status); + public void insert(Task task, TaskStatus status) throws TaskExistsException; /** * Persists task status in the storage facility. This method should throw an exception if the task status lifecycle diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index f161cb3c278..5a3eb1c5e07 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -36,6 +36,7 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.TaskExistsException; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; import io.druid.indexing.overlord.TaskRunner; @@ -45,6 +46,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.tasklogs.TaskLogStreamer; import io.druid.timeline.DataSegment; +import org.apache.zookeeper.data.Stat; import org.joda.time.DateTime; import javax.ws.rs.Consumes; @@ -126,8 +128,15 @@ public class OverlordResource @Override public Response apply(TaskQueue taskQueue) { - taskQueue.add(task); - return Response.ok(ImmutableMap.of("task", task.getId())).build(); + try { + taskQueue.add(task); + return Response.ok(ImmutableMap.of("task", task.getId())).build(); + } + catch (TaskExistsException e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", String.format("Task[%s] already exists!", task.getId()))) + .build(); + } } } );