Merger: Make json exceptions while bootstrapping non-fatal

This commit is contained in:
Gian Merlino 2013-03-13 22:32:12 -07:00
parent cf470b1ed4
commit a1c823402b
4 changed files with 44 additions and 32 deletions

View File

@ -36,6 +36,7 @@ import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
@ -52,7 +53,7 @@ public class DbTaskStorage implements TaskStorage
private final IndexerDbConnectorConfig dbConnectorConfig;
private final DBI dbi;
private static final Logger log = new Logger(DbTaskStorage.class);
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, DBI dbi)
{
@ -203,18 +204,18 @@ public class DbTaskStorage implements TaskStorage
}
@Override
public List<Task> getRunningTasks()
public List<String> getRunningTaskIds()
{
return dbi.withHandle(
new HandleCallback<List<Task>>()
new HandleCallback<List<String>>()
{
@Override
public List<Task> withHandle(Handle handle) throws Exception
public List<String> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE status_code = :status_code",
"SELECT id FROM %s WHERE status_code = :status_code",
dbConnectorConfig.getTaskTable()
)
)
@ -222,16 +223,12 @@ public class DbTaskStorage implements TaskStorage
.list();
return Lists.transform(
dbTasks, new Function<Map<String, Object>, Task>()
dbTasks, new Function<Map<String, Object>, String>()
{
@Override
public Task apply(Map<String, Object> row)
public String apply(Map<String, Object> row)
{
try {
return jsonMapper.readValue(row.get("payload").toString(), Task.class);
} catch(Exception e) {
throw Throwables.propagate(e);
}
return row.get("id").toString();
}
}
);

View File

@ -128,15 +128,15 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
public List<Task> getRunningTasks()
public List<String> getRunningTaskIds()
{
giant.lock();
try {
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
final ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
for(final TaskStuff taskStuff : tasks.values()) {
if(taskStuff.getStatus().isRunnable()) {
listBuilder.add(taskStuff.getTask());
listBuilder.add(taskStuff.getTask().getId());
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@ -28,8 +29,8 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.emitter.EmittingLogger;
@ -89,18 +90,32 @@ public class TaskQueue
queue.clear();
taskLockbox.clear();
// Add running tasks to the queue
final List<Task> runningTasks = taskStorage.getRunningTasks();
for(final Task task : runningTasks) {
queue.add(task);
}
// Get all locks, along with which tasks they belong to
// Get all running tasks and their locks
final Multimap<TaskLock, Task> tasksByLock = ArrayListMultimap.create();
for(final Task runningTask : runningTasks) {
for(final TaskLock taskLock : taskStorage.getLocks(runningTask.getId())) {
tasksByLock.put(taskLock, runningTask);
for (final String taskId : taskStorage.getRunningTaskIds()) {
try {
// .get since TaskStorage semantics should mean this task is always found
final Task task = taskStorage.getTask(taskId).get();
final List<TaskLock> taskLocks = taskStorage.getLocks(task.getId());
queue.add(task);
for (final TaskLock taskLock : taskLocks) {
tasksByLock.put(taskLock, task);
}
}
catch (Exception e) {
log.makeAlert("Failed to bootstrap task").addData("task", taskId).emit();
// A bit goofy to special-case JsonProcessingException, but we don't want to suppress bootstrap problems on
// any old Exception or even IOException...
if (e instanceof JsonProcessingException || e.getCause() instanceof JsonProcessingException) {
// Mark this task a failure, and continue bootstrapping
taskStorage.setStatus(TaskStatus.failure(taskId));
} else {
throw Throwables.propagate(e);
}
}
}
@ -150,7 +165,7 @@ public class TaskQueue
}
}
log.info("Bootstrapped %,d tasks. Ready to go!", runningTasks.size());
log.info("Bootstrapped %,d tasks with %,d locks. Ready to go!", queue.size(), tasksByLock.keySet().size());
} finally {
giant.unlock();
}
@ -214,7 +229,7 @@ public class TaskQueue
// insert the task into our queue.
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
} catch(TaskExistsException e) {
} catch (TaskExistsException e) {
log.warn("Attempt to add task twice: %s", task.getId());
throw Throwables.propagate(e);
}

View File

@ -20,9 +20,9 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Optional;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskAction;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.task.Task;
import java.util.List;
@ -77,9 +77,9 @@ public interface TaskStorage
public List<TaskAction> getAuditLogs(String taskid);
/**
* Returns a list of currently-running tasks as stored in the storage facility, in no particular order.
* Returns a list of currently-running task IDs as stored in the storage facility, in no particular order.
*/
public List<Task> getRunningTasks();
public List<String> getRunningTaskIds();
/**
* Returns a list of locks for a particular task.