Introduce TaskExistsException, thrown by TaskStorage.insert when appropriate

This commit is contained in:
Gian Merlino 2013-01-31 08:10:59 -08:00
parent 0e469c6f4c
commit fe38ed2547
6 changed files with 88 additions and 34 deletions

View File

@ -33,6 +33,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.exceptions.StatementException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.List;
@ -67,6 +68,7 @@ public class DbTaskStorage implements TaskStorage
log.info("Inserting task %s with status: %s", task.getId(), status);
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@ -90,6 +92,14 @@ public class DbTaskStorage implements TaskStorage
}
}
);
} catch (StatementException e) {
// Might be a duplicate task ID.
if(getTask(task.getId()).isPresent()) {
throw new TaskExistsException(task.getId(), e);
} else {
throw e;
}
}
}
@Override

View File

@ -21,15 +21,12 @@ package com.metamx.druid.merger.coordinator;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@ -54,7 +51,11 @@ public class LocalTaskStorage implements TaskStorage
task.getId(),
status.getId()
);
Preconditions.checkState(!tasks.containsKey(task.getId()), "Task ID must not already be present: %s", task.getId());
if(tasks.containsKey(task.getId())) {
throw new TaskExistsException(task.getId());
}
log.info("Inserting task %s with status: %s", task.getId(), status);
tasks.put(task.getId(), new TaskStuff(task, status));
}
@ -64,7 +65,7 @@ public class LocalTaskStorage implements TaskStorage
{
Preconditions.checkNotNull(taskid, "taskid");
if(tasks.containsKey(taskid)) {
return Optional.of(tasks.get(taskid).task);
return Optional.of(tasks.get(taskid).getTask());
} else {
return Optional.absent();
}
@ -85,7 +86,7 @@ public class LocalTaskStorage implements TaskStorage
{
Preconditions.checkNotNull(taskid, "taskid");
if(tasks.containsKey(taskid)) {
return Optional.of(tasks.get(taskid).status);
return Optional.of(tasks.get(taskid).getStatus());
} else {
return Optional.absent();
}
@ -106,7 +107,7 @@ public class LocalTaskStorage implements TaskStorage
{
Preconditions.checkNotNull(taskid, "taskid");
if(tasks.containsKey(taskid)) {
return tasks.get(taskid).version;
return tasks.get(taskid).getVersion();
} else {
return Optional.absent();
}
@ -117,8 +118,8 @@ public class LocalTaskStorage implements TaskStorage
{
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
for(final TaskStuff taskStuff : tasks.values()) {
if(taskStuff.status.isRunnable()) {
listBuilder.add(taskStuff.task);
if(taskStuff.getStatus().isRunnable()) {
listBuilder.add(taskStuff.getTask());
}
}
@ -147,6 +148,21 @@ public class LocalTaskStorage implements TaskStorage
this.version = version;
}
public Task getTask()
{
return task;
}
public TaskStatus getStatus()
{
return status;
}
public Optional<String> getVersion()
{
return version;
}
private TaskStuff withStatus(TaskStatus _status)
{
return new TaskStuff(task, _status, version);

View File

@ -0,0 +1,22 @@
package com.metamx.druid.merger.coordinator;
public class TaskExistsException extends RuntimeException
{
private final String taskId;
public TaskExistsException(String taskId, Throwable t)
{
super(String.format("Task exists: %s", taskId), t);
this.taskId = taskId;
}
public TaskExistsException(String taskId)
{
this(taskId, null);
}
public String getTaskId()
{
return taskId;
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -184,9 +185,14 @@ public class TaskQueue
try {
Preconditions.checkState(active, "Queue is not active!");
// If this throws, we don't want to insert the task into our queue.
// (This is how we detect duplicates)
// If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue.
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
} catch(TaskExistsException e) {
log.warn("Attempt to add task twice: %s", task.getId());
throw Throwables.propagate(e);
}
queue.add(task);
workMayBeAvailable.signalAll();

View File

@ -29,7 +29,7 @@ public interface TaskStorage
{
/**
* Adds a task to the storage facility with a particular status. If the task ID already exists, this method
* will throw an exception.
* will throw a {@link TaskExistsException}.
*/
public void insert(Task task, TaskStatus status);

View File

@ -87,7 +87,7 @@ public class TaskQueueTest
thrown = null;
try {
tq.add(newTask("T5", "G5", "baz", new Interval("2013-02-01/PT1H")));
} catch(IllegalStateException e) {
} catch(TaskExistsException e) {
thrown = e;
}