mirror of
https://github.com/apache/druid.git
synced 2025-02-25 04:16:07 +00:00
TaskStorage.add() now throws TaskExistsException, and the servlet respects it
The servlet will throw 400 rather than 500 when a task already exists, to signify that the request has no hope of ever working.
This commit is contained in:
parent
a72c4429f7
commit
1331f2ce56
@ -43,7 +43,6 @@ import io.druid.indexing.common.actions.TaskAction;
|
||||
import io.druid.indexing.common.config.TaskStorageConfig;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Period;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
|
||||
@ -98,7 +97,7 @@ public class DbTaskStorage implements TaskStorage
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(final Task task, final TaskStatus status)
|
||||
public void insert(final Task task, final TaskStatus status) throws TaskExistsException
|
||||
{
|
||||
Preconditions.checkNotNull(task, "task");
|
||||
Preconditions.checkNotNull(status, "status");
|
||||
@ -137,9 +136,11 @@ public class DbTaskStorage implements TaskStorage
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (StatementException e) {
|
||||
// Might be a duplicate task ID.
|
||||
if (getTask(task.getId()).isPresent()) {
|
||||
catch (Exception e) {
|
||||
final boolean isStatementException = e instanceof StatementException ||
|
||||
(e instanceof CallbackFailedException
|
||||
&& e.getCause() instanceof StatementException);
|
||||
if (isStatementException && getTask(task.getId()).isPresent()) {
|
||||
throw new TaskExistsException(task.getId(), e);
|
||||
} else {
|
||||
throw e;
|
||||
@ -533,11 +534,8 @@ public class DbTaskStorage implements TaskStorage
|
||||
try {
|
||||
return RetryUtils.retry(call, shouldRetry, maxTries);
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new CallbackFailedException(e);
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,7 +63,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(Task task, TaskStatus status)
|
||||
public void insert(Task task, TaskStatus status) throws TaskExistsException
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
|
@ -19,7 +19,7 @@
|
||||
|
||||
package io.druid.indexing.overlord;
|
||||
|
||||
public class TaskExistsException extends RuntimeException
|
||||
public class TaskExistsException extends Exception
|
||||
{
|
||||
private final String taskId;
|
||||
|
||||
|
@ -296,7 +296,7 @@ public class TaskQueue
|
||||
*
|
||||
* @return true
|
||||
*/
|
||||
public boolean add(final Task task)
|
||||
public boolean add(final Task task) throws TaskExistsException
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
@ -306,15 +306,8 @@ public class TaskQueue
|
||||
Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize());
|
||||
|
||||
// If this throws with any sort of exception, including TaskExistsException, we don't want to
|
||||
// insert the task into our queue.
|
||||
try {
|
||||
taskStorage.insert(task, TaskStatus.running(task.getId()));
|
||||
}
|
||||
catch (TaskExistsException e) {
|
||||
log.warn("Attempt to add task twice: %s", task.getId());
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
// insert the task into our queue. So don't catch it.
|
||||
taskStorage.insert(task, TaskStatus.running(task.getId()));
|
||||
tasks.add(task);
|
||||
managementMayBeNecessary.signalAll();
|
||||
return true;
|
||||
|
@ -30,10 +30,11 @@ import java.util.List;
|
||||
public interface TaskStorage
|
||||
{
|
||||
/**
|
||||
* Adds a task to the storage facility with a particular status. If the task ID already exists, this method
|
||||
* will throw a {@link TaskExistsException}.
|
||||
* Adds a task to the storage facility with a particular status.
|
||||
*
|
||||
* @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists
|
||||
*/
|
||||
public void insert(Task task, TaskStatus status);
|
||||
public void insert(Task task, TaskStatus status) throws TaskExistsException;
|
||||
|
||||
/**
|
||||
* Persists task status in the storage facility. This method should throw an exception if the task status lifecycle
|
||||
|
@ -36,6 +36,7 @@ import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.actions.TaskActionClient;
|
||||
import io.druid.indexing.common.actions.TaskActionHolder;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.TaskExistsException;
|
||||
import io.druid.indexing.overlord.TaskMaster;
|
||||
import io.druid.indexing.overlord.TaskQueue;
|
||||
import io.druid.indexing.overlord.TaskRunner;
|
||||
@ -45,6 +46,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
|
||||
import io.druid.indexing.overlord.setup.WorkerSetupData;
|
||||
import io.druid.tasklogs.TaskLogStreamer;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.ws.rs.Consumes;
|
||||
@ -126,8 +128,15 @@ public class OverlordResource
|
||||
@Override
|
||||
public Response apply(TaskQueue taskQueue)
|
||||
{
|
||||
taskQueue.add(task);
|
||||
return Response.ok(ImmutableMap.of("task", task.getId())).build();
|
||||
try {
|
||||
taskQueue.add(task);
|
||||
return Response.ok(ImmutableMap.of("task", task.getId())).build();
|
||||
}
|
||||
catch (TaskExistsException e) {
|
||||
return Response.status(Response.Status.BAD_REQUEST)
|
||||
.entity(ImmutableMap.of("error", String.format("Task[%s] already exists!", task.getId())))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
Loading…
x
Reference in New Issue
Block a user