diff --git a/common/src/main/java/io/druid/indexing/overlord/MetadataStorageActionHandler.java b/common/src/main/java/io/druid/indexing/overlord/MetadataStorageActionHandler.java index a734a9bbec1..b9d7c501978 100644 --- a/common/src/main/java/io/druid/indexing/overlord/MetadataStorageActionHandler.java +++ b/common/src/main/java/io/druid/indexing/overlord/MetadataStorageActionHandler.java @@ -19,48 +19,51 @@ package io.druid.indexing.overlord; +import com.google.common.base.Optional; +import com.metamx.common.Pair; +import org.joda.time.DateTime; + import java.util.List; import java.util.Map; -public interface MetadataStorageActionHandler +public interface MetadataStorageActionHandler { /* Insert stuff on the table */ public void insert( - String tableName, String id, - String createdDate, + DateTime createdDate, String dataSource, - byte[] payload, - int active, - byte[] statusPayload - ) throws Exception; + TaskType task, + boolean active, + TaskStatusType status + ) throws TaskExistsException; /* Insert stuff. @returns 1 if status of the task with the given id has been updated successfully */ - public int setStatus(String tableName, String Id, int active, byte[] statusPayload); + public boolean setStatus(String taskId, boolean active, TaskStatusType statusPayload); /* Retrieve a task with the given ID */ - public List> getTask(String tableName, String Id); + public Optional getTask(String taskId); /* Retrieve a task status with the given ID */ - public List> getTaskStatus(String tableName, String Id); + public Optional getTaskStatus(String taskId); /* Retrieve active tasks */ - public List> getActiveTasks(String tableName); + public List> getActiveTasksWithStatus(); /* Retrieve task statuses that have been created sooner than the given time */ - public List> getRecentlyFinishedTaskStatuses(String tableName, String recent); + public List getRecentlyFinishedTaskStatuses(DateTime start); /* Add lock to the task with given ID */ - public int addLock(String tableName, String Id, byte[] lock); + public int addLock(String taskId, TaskLockType lock); /* Remove taskLock with given ID */ - public int removeLock(String tableName, long lockId); + public int removeLock(long lockId); - public int addAuditLog(String tableName, String Id, byte[] taskAction); + public int addAuditLog(String taskId, TaskActionType taskAction); /* Get logs for task with given ID */ - public List> getTaskLogs(String tableName, String Id); + public List getTaskLogs(String taskId); /* Get locks for task with given ID */ - public List> getTaskLocks(String tableName, String Id); + public Map getTaskLocks(String taskId); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java b/common/src/main/java/io/druid/indexing/overlord/TaskExistsException.java similarity index 100% rename from indexing-service/src/main/java/io/druid/indexing/overlord/TaskExistsException.java rename to common/src/main/java/io/druid/indexing/overlord/TaskExistsException.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index d7c68491e4d..1de7d11aaec 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -56,8 +56,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.RandomAccessFile; -import java.nio.channels.Channels; import java.util.Collection; import java.util.List; import java.util.Map; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java index a3bfc4eb8a1..72f5d3b1c15 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java @@ -19,57 +19,49 @@ package io.druid.indexing.overlord; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.inject.Inject; +import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; -import io.druid.db.MetadataStorageConnector; -import io.druid.db.MetadataStorageTablesConfig; -import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; +import io.druid.db.MetadataStorageConnector; +import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; -import org.skife.jdbi.v2.exceptions.CallbackFailedException; -import org.skife.jdbi.v2.exceptions.StatementException; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; public class MetadataTaskStorage implements TaskStorage { - private final ObjectMapper jsonMapper; private final MetadataStorageConnector metadataStorageConnector; - private final MetadataStorageTablesConfig dbTables; private final TaskStorageConfig config; - private final MetadataStorageActionHandler handler; + private final MetadataStorageActionHandler handler; private static final EmittingLogger log = new EmittingLogger(MetadataTaskStorage.class); @Inject public MetadataTaskStorage( - final ObjectMapper jsonMapper, final MetadataStorageConnector metadataStorageConnector, - final MetadataStorageTablesConfig dbTables, final TaskStorageConfig config, final MetadataStorageActionHandler handler ) { - this.jsonMapper = jsonMapper; this.metadataStorageConnector = metadataStorageConnector; - this.dbTables = dbTables; this.config = config; - this.handler = handler; + // this is a little janky but haven't figured out how to get Guice to do this yet. + this.handler = (MetadataStorageActionHandler) handler; } @LifecycleStart @@ -100,23 +92,19 @@ public class MetadataTaskStorage implements TaskStorage try { handler.insert( - dbTables.getTasksTable(), task.getId(), - new DateTime().toString(), + new DateTime(), task.getDataSource(), - jsonMapper.writeValueAsBytes(task), - status.isRunnable() ? 1 : 0, - jsonMapper.writeValueAsBytes(status) + task, + status.isRunnable(), + status ); } catch (Exception e) { - final boolean isStatementException = e instanceof StatementException || - (e instanceof CallbackFailedException - && e.getCause() instanceof StatementException); - if (isStatementException && getTask(task.getId()).isPresent()) { - throw new TaskExistsException(task.getId(), e); + if(e instanceof TaskExistsException) { + throw (TaskExistsException) e; } else { - throw Throwables.propagate(e); + Throwables.propagate(e); } } } @@ -128,103 +116,77 @@ public class MetadataTaskStorage implements TaskStorage log.info("Updating task %s to status: %s", status.getId(), status); - try { - int updated = handler.setStatus( - dbTables.getTasksTable(), - status.getId(), - status.isRunnable() ? 1 : 0, - jsonMapper.writeValueAsBytes(status) - ); - if (updated != 1) { - throw new IllegalStateException(String.format("Active task not found: %s", status.getId())); - } - } - catch (Exception e) { - throw Throwables.propagate(e); + final boolean set = handler.setStatus( + status.getId(), + status.isRunnable(), + status + ); + if (!set) { + throw new IllegalStateException(String.format("Active task not found: %s", status.getId())); } } @Override - public Optional getTask(final String taskid) + public Optional getTask(final String taskId) { - try { - final List> dbTasks = handler.getTask(dbTables.getTasksTable(), taskid); - if (dbTasks.size() == 0) { - return Optional.absent(); - } else { - final Map dbStatus = Iterables.getOnlyElement(dbTasks); - return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("payload"), Task.class)); - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } + return handler.getTask(taskId); } @Override - public Optional getStatus(final String taskid) + public Optional getStatus(final String taskId) { - try { - final List> dbStatuses = handler.getTaskStatus(dbTables.getTasksTable(), taskid); - if (dbStatuses.size() == 0) { - return Optional.absent(); - } else { - final Map dbStatus = Iterables.getOnlyElement(dbStatuses); - return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("status_payload"), TaskStatus.class)); - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } + return handler.getTaskStatus(taskId); } @Override public List getActiveTasks() { - final List> dbTasks = handler.getActiveTasks(dbTables.getTasksTable()); - - final ImmutableList.Builder tasks = ImmutableList.builder(); - for (final Map row : dbTasks) { - final String id = row.get("id").toString(); - - try { - final Task task = jsonMapper.readValue((byte[]) row.get("payload"), Task.class); - final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class); - - if (status.isRunnable()) { - tasks.add(task); - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit(); - } - } - - return tasks.build(); + return ImmutableList.copyOf( + Iterables.transform( + Iterables.filter( + handler.getActiveTasksWithStatus(), + new Predicate>() + { + @Override + public boolean apply( + @Nullable Pair input + ) + { + return input.rhs.isRunnable(); + } + } + ), + new Function, Task>() + { + @Nullable + @Override + public Task apply(@Nullable Pair input) + { + return input.lhs; + } + } + ) + ); } @Override public List getRecentlyFinishedTaskStatuses() { - final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold()); + final DateTime start = new DateTime().minus(config.getRecentlyFinishedThreshold()); - final List> dbTasks = handler.getRecentlyFinishedTaskStatuses(dbTables.getTasksTable(), recent.toString()); - final ImmutableList.Builder statuses = ImmutableList.builder(); - for (final Map row : dbTasks) { - final String id = row.get("id").toString(); - - try { - final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class); - if (status.isComplete()) { - statuses.add(status); - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit(); - } - } - - return statuses.build(); + return ImmutableList.copyOf( + Iterables.filter( + handler.getRecentlyFinishedTaskStatuses(start), + new Predicate() + { + @Override + public boolean apply(TaskStatus status) + { + return status.isComplete(); + } + } + ) + ); } @Override @@ -240,14 +202,7 @@ public class MetadataTaskStorage implements TaskStorage taskid ); - try { - handler.addLock(dbTables.getTaskLockTable(), taskid, jsonMapper.writeValueAsBytes(taskLock)); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - + handler.addLock(taskid, taskLock); } @Override @@ -264,7 +219,7 @@ public class MetadataTaskStorage implements TaskStorage if (taskLock.equals(taskLockToRemove)) { log.info("Deleting TaskLock with id[%d]: %s", id, taskLock); - handler.removeLock(dbTables.getTaskLockTable(), id); + handler.removeLock(id); } } } @@ -293,52 +248,17 @@ public class MetadataTaskStorage implements TaskStorage log.info("Logging action for task[%s]: %s", task.getId(), taskAction); - try { - handler.addAuditLog(dbTables.getTaskLogTable(), task.getId(), jsonMapper.writeValueAsBytes(taskAction)); - } - catch (Exception e) { - throw Throwables.propagate(e); - } + handler.addAuditLog(task.getId(), taskAction); } @Override - public List getAuditLogs(final String taskid) + public List getAuditLogs(final String taskId) { - final List> dbTaskLogs = handler.getTaskLogs(dbTables.getTaskLogTable(), taskid); - final List retList = Lists.newArrayList(); - for (final Map dbTaskLog : dbTaskLogs) { - try { - retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class)); - } - catch (Exception e) { - log.makeAlert(e, "Failed to deserialize TaskLog") - .addData("task", taskid) - .addData("logPayload", dbTaskLog) - .emit(); - } - } - return retList; + return handler.getTaskLogs(taskId); } private Map getLocksWithIds(final String taskid) { - final List> dbTaskLocks = handler.getTaskLocks(dbTables.getTaskLockTable(), taskid); - - final Map retMap = Maps.newHashMap(); - for (final Map row : dbTaskLocks) { - try { - retMap.put( - (Long) row.get("id"), - jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class) - ); - } - catch (Exception e) { - log.makeAlert(e, "Failed to deserialize TaskLock") - .addData("task", taskid) - .addData("lockPayload", row) - .emit(); - } - } - return retMap; + return handler.getTaskLocks(taskid); } -} \ No newline at end of file +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index b5f86ea7494..7cb95135e03 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -29,7 +29,6 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.common.guava.DSuppliers; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.indexing.common.IndexingServiceCondition; @@ -41,7 +40,6 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; -import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; import io.druid.jackson.DefaultObjectMapper; @@ -59,7 +57,6 @@ import org.junit.Test; import java.util.Set; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicReference; public class RemoteTaskRunnerTest { diff --git a/server/src/main/java/io/druid/db/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/db/SQLMetadataStorageActionHandler.java index df726926f2a..49e798474d7 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/io/druid/db/SQLMetadataStorageActionHandler.java @@ -19,17 +19,37 @@ package io.druid.db; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Maps; +import com.google.common.base.Charsets; +import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import com.google.inject.Inject; +import com.google.inject.name.Named; +import com.metamx.common.Pair; import com.metamx.common.RetryUtils; +import com.metamx.emitter.EmittingLogger; import io.druid.indexing.overlord.MetadataStorageActionHandler; +import io.druid.indexing.overlord.TaskExistsException; +import org.joda.time.DateTime; +import org.skife.jdbi.v2.FoldController; +import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.exceptions.DBIException; +import org.skife.jdbi.v2.exceptions.StatementException; import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; import org.skife.jdbi.v2.tweak.HandleCallback; +import org.skife.jdbi.v2.tweak.ResultSetMapper; +import org.skife.jdbi.v2.util.ByteArrayMapper; +import java.io.IOException; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLRecoverableException; import java.sql.SQLTransientException; @@ -37,98 +57,148 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -public class SQLMetadataStorageActionHandler implements MetadataStorageActionHandler +public class SQLMetadataStorageActionHandler + implements MetadataStorageActionHandler { + private static final EmittingLogger log = new EmittingLogger(SQLMetadataStorageActionHandler.class); + private final IDBI dbi; private final SQLMetadataConnector connector; + private final MetadataStorageTablesConfig config; + private final ObjectMapper jsonMapper; + private final TypeReference taskType; + private final TypeReference taskStatusType; + private final TypeReference taskActionType; + private final TypeReference taskLockType; + @Inject public SQLMetadataStorageActionHandler( final IDBI dbi, - final SQLMetadataConnector connector - ) + final SQLMetadataConnector connector, + final MetadataStorageTablesConfig config, + final ObjectMapper jsonMapper, + // we all love type erasure + final @Named("taskType") TypeReference taskType, + final @Named("taskStatusType") TypeReference taskStatusType, + final @Named("taskActionType") TypeReference taskActionType, + final @Named("taskLockType") TypeReference taskLockType + ) { this.dbi = dbi; this.connector = connector; + this.config = config; + this.jsonMapper = jsonMapper; + this.taskType = taskType; + this.taskStatusType = taskStatusType; + this.taskActionType = taskActionType; + this.taskLockType = taskLockType; } - /* Insert stuff. @returns number of entries inserted on success */ + /** + * Insert stuff + * + */ public void insert( - final String tableName, final String id, - final String createdDate, + final DateTime createdDate, final String dataSource, - final byte[] payload, - final int active, - final byte[] statusPayload - ) + final TaskType task, + final boolean active, + final TaskStatusType status + ) throws TaskExistsException { - retryingHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception + try { + retryingHandle( + new HandleCallback() { - handle.createStatement( - String.format( - "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", - tableName - ) - ) - .bind("id", id) - .bind("created_date", createdDate) - .bind("datasource", dataSource) - .bind("payload", payload) - .bind("active", active) - .bind("status_payload", statusPayload) - .execute(); - return null; + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement( + String.format( + "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", + config.getTasksTable() + ) + ) + .bind("id", id) + .bind("created_date", createdDate) + .bind("datasource", dataSource) + .bind("payload", jsonMapper.writeValueAsBytes(task)) + .bind("active", active) + .bind("status_payload", jsonMapper.writeValueAsBytes(status)) + .execute(); + return null; + } } - } - ); + ); + } catch(Exception e) { + final boolean isStatementException = e instanceof StatementException || + (e instanceof CallbackFailedException + && e.getCause() instanceof StatementException); + if (isStatementException && getTask(id).isPresent()) { + throw new TaskExistsException(id, e); + } else { + throw Throwables.propagate(e); + } + } } - /* Insert stuff. @returns 1 if status of the task with the given id has been updated successfully */ - public int setStatus(final String tableName, final String Id, final int active, final byte[] statusPayload) + /** + * Update task status. + * + * @return true if status of the task with the given id has been updated successfully + */ + public boolean setStatus(final String taskId, final boolean active, final TaskStatusType status) { return retryingHandle( - new HandleCallback() + new HandleCallback() { @Override - public Integer withHandle(Handle handle) throws Exception + public Boolean withHandle(Handle handle) throws Exception { return handle.createStatement( String.format( "UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1", - tableName + config.getTasksTable() ) ) - .bind("id", Id) + .bind("id", taskId) .bind("active", active) - .bind("status_payload", statusPayload) - .execute(); + .bind("status_payload", jsonMapper.writeValueAsBytes(status)) + .execute() == 1; } } ); } - /* Retrieve a task with the given ID */ - public List> getTask(final String tableName, final String Id) + /* */ + + /** + * Retrieve a task with the given ID + * + * @param taskId task ID + * @return task if it exists + */ + public Optional getTask(final String taskId) { return retryingHandle( - new HandleCallback>>() + new HandleCallback>() { @Override - public List> withHandle(Handle handle) throws Exception + public Optional withHandle(Handle handle) throws Exception { - return handle.createQuery( - String.format( - "SELECT payload FROM %s WHERE id = :id", - tableName - ) + return Optional.fromNullable( + jsonMapper.readValue( + handle.createQuery( + String.format("SELECT payload FROM %s WHERE id = :id", config.getTasksTable()) ) - .bind("id", Id) - .list(); + .bind("id", taskId) + .map(ByteArrayMapper.FIRST) + .first(), + taskType + ) + ); } } ); @@ -136,42 +206,72 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan } /* Retrieve a task status with the given ID */ - public List> getTaskStatus(final String tableName, final String Id) + public Optional getTaskStatus(final String taskId) { return retryingHandle( - new HandleCallback>>() - { - @Override - public List> withHandle(Handle handle) throws Exception + new HandleCallback>() { - return handle.createQuery( - String.format( - "SELECT status_payload FROM %s WHERE id = :id", - tableName - ) - ) - .bind("id", Id) - .list(); + @Override + public Optional withHandle(Handle handle) throws Exception + { + return Optional.fromNullable( + jsonMapper.readValue( + handle.createQuery( + String.format("SELECT status_payload FROM %s WHERE id = :id", config.getTasksTable()) + ) + .bind("id", taskId) + .map(ByteArrayMapper.FIRST) + .first(), + taskStatusType + ) + ); + } } - } ); } /* Retrieve active tasks */ - public List> getActiveTasks(final String tableName) + public List> getActiveTasksWithStatus() { return retryingHandle( - new HandleCallback>>() + new HandleCallback>>() { @Override - public List> withHandle(Handle handle) throws Exception + public List> withHandle(Handle handle) throws Exception { - return handle.createQuery( - String.format( - "SELECT id, payload, status_payload FROM %s WHERE active = 1 ORDER BY created_date", - tableName - ) - ).list(); + return handle + .createQuery( + String.format( + "SELECT id, payload, status_payload FROM %s WHERE active = TRUE ORDER BY created_date", + config.getTasksTable() + ) + ) + .map( + new ResultSetMapper>() + { + @Override + public Pair map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + try { + return Pair.of( + jsonMapper.readValue( + r.getBytes("payload"), + taskType + ), + jsonMapper.readValue( + r.getBytes("status_payload"), + taskStatusType + ) + ); + } + catch (IOException e) { + log.makeAlert(e, "Failed to parse task payload").addData("task", r.getString("id")).emit(); + throw new SQLException(e); + } + } + } + ).list(); } } ); @@ -179,27 +279,49 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan } /* Retrieve task statuses that have been created sooner than the given time */ - public List> getRecentlyFinishedTaskStatuses(final String tableName, final String recent) + public List getRecentlyFinishedTaskStatuses(final DateTime start) { return retryingHandle( - new HandleCallback>>() + new HandleCallback>() { @Override - public List> withHandle(Handle handle) throws Exception + public List withHandle(Handle handle) throws Exception { - return handle.createQuery( - String.format( - "SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC", - tableName - ) - ).bind("recent", recent.toString()).list(); + return handle + .createQuery( + String.format( + "SELECT id, status_payload FROM %s WHERE active = FALSE AND created_date >= :start ORDER BY created_date DESC", + config.getTasksTable() + ) + ).bind("start", start.toString()) + .map( + new ResultSetMapper() + { + @Override + public TaskStatusType map(int index, ResultSet r, StatementContext ctx) throws SQLException + { + try { + return jsonMapper.readValue( + r.getBytes("status_payload"), + taskStatusType + ); + } + catch (IOException e) { + log.makeAlert(e, "Failed to parse status payload") + .addData("task", r.getString("id")) + .emit(); + throw new SQLException(e); + } + } + } + ).list(); } } ); } /* Add lock to the task with given ID */ - public int addLock(final String tableName, final String Id, final byte[] lock) + public int addLock(final String taskId, final TaskLockType lock) { return retryingHandle( new HandleCallback() @@ -210,11 +332,11 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan return handle.createStatement( String.format( "INSERT INTO %s (task_id, lock_payload) VALUES (:task_id, :lock_payload)", - tableName + config.getTaskLockTable() ) ) - .bind("task_id", Id) - .bind("lock_payload", lock) + .bind("task_id", taskId) + .bind("lock_payload", jsonMapper.writeValueAsBytes(lock)) .execute(); } } @@ -222,7 +344,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan } /* Remove taskLock with given ID */ - public int removeLock(final String tableName, final long lockId) + public int removeLock(final long lockId) { return retryingHandle( new HandleCallback() @@ -233,7 +355,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan return handle.createStatement( String.format( "DELETE FROM %s WHERE id = :id", - tableName + config.getTaskLockTable() ) ) .bind("id", lockId) @@ -243,7 +365,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan ); } - public int addAuditLog(final String tableName, final String Id, final byte[] taskAction) + public int addAuditLog(final String taskId, final TaskActionType taskAction) { return retryingHandle( new HandleCallback() @@ -254,11 +376,11 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan return handle.createStatement( String.format( "INSERT INTO %s (task_id, log_payload) VALUES (:task_id, :log_payload)", - tableName + config.getTaskLogTable() ) ) - .bind("task_id", Id) - .bind("log_payload", taskAction) + .bind("task_id", taskId) + .bind("log_payload", jsonMapper.writeValueAsBytes(taskAction)) .execute(); } } @@ -266,44 +388,116 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan } /* Get logs for task with given ID */ - public List> getTaskLogs(final String tableName, final String Id) + public List getTaskLogs(final String taskId) { return retryingHandle( - new HandleCallback>>() + new HandleCallback>() { @Override - public List> withHandle(Handle handle) throws Exception + public List withHandle(Handle handle) throws Exception { - return handle.createQuery( - String.format( - "SELECT log_payload FROM %s WHERE task_id = :task_id", - tableName - ) + return handle + .createQuery( + String.format( + "SELECT log_payload FROM %s WHERE task_id = :task_id", + config.getTaskLogTable() ) - .bind("task_id", Id) - .list(); + ) + .bind("task_id", taskId) + .map(ByteArrayMapper.FIRST) + .fold( + Lists.newLinkedList(), + new Folder3, byte[]>() + { + @Override + public List fold( + List list, byte[] bytes, FoldController control, StatementContext ctx + ) throws SQLException + { + try { + list.add( + jsonMapper.readValue( + bytes, taskActionType + ) + ); + return list; + } + catch (IOException e) { + log.makeAlert(e, "Failed to deserialize TaskLog") + .addData("task", taskId) + .addData("logPayload", new String(bytes, Charsets.UTF_8)) + .emit(); + throw new SQLException(e); + } + } + } + ); } } ); } /* Get locks for task with given ID */ - public List> getTaskLocks(final String tableName, final String Id) + public Map getTaskLocks(final String taskId) { return retryingHandle( - new HandleCallback>>() + new HandleCallback>() { @Override - public List> withHandle(Handle handle) throws Exception + public Map withHandle(Handle handle) throws Exception { return handle.createQuery( String.format( "SELECT id, lock_payload FROM %s WHERE task_id = :task_id", - tableName + config.getTaskLockTable() ) ) - .bind("task_id", Id) - .list(); + .bind("task_id", taskId) + .map( + new ResultSetMapper>() + { + @Override + public Pair map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + try { + return Pair.of( + r.getLong("id"), + jsonMapper.readValue( + r.getBytes("lock_payload"), + taskLockType + ) + ); + } + catch (IOException e) { + log.makeAlert(e, "Failed to deserialize TaskLock") + .addData("task", r.getLong("id")) + .addData( + "lockPayload", new String(r.getBytes("lock_payload"), Charsets.UTF_8) + ) + .emit(); + throw new SQLException(e); + } + } + } + ) + .fold( + Maps.newLinkedHashMap(), + new Folder3, Pair>() + { + @Override + public Map fold( + Map accumulator, + Pair lock, + FoldController control, + StatementContext ctx + ) throws SQLException + { + accumulator.put(lock.lhs, lock.rhs); + return accumulator; + } + } + ); } } ); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index e4ee9410072..ef5a44c45ed 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -19,6 +19,7 @@ package io.druid.cli; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Injector; @@ -31,6 +32,7 @@ import com.google.inject.servlet.GuiceFilter; import com.google.inject.util.Providers; import com.metamx.common.logger.Logger; import io.airlift.command.Command; +import io.druid.indexing.common.TaskStatus; import io.druid.db.IndexerSQLMetadataStorageCoordinator; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceModuleHelper; @@ -43,17 +45,20 @@ import io.druid.guice.LifecycleModule; import io.druid.guice.ListProvider; import io.druid.guice.ManageLifecycle; import io.druid.guice.PolyBind; +import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; +import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.task.Task; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; -import io.druid.indexing.overlord.MetadataTaskStorage; import io.druid.indexing.overlord.ForkingTaskRunnerFactory; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.indexing.overlord.MetadataTaskStorage; import io.druid.indexing.overlord.RemoteTaskRunnerFactory; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskMaster; @@ -180,6 +185,24 @@ public class CliOverlord extends ServerRunnable storageBinder.addBinding("db").to(MetadataTaskStorage.class).in(ManageLifecycle.class); binder.bind(MetadataTaskStorage.class).in(LazySingleton.class); + + // gotta love type erasure + binder.bind(TypeReference.class).annotatedWith(Names.named("taskType")).toInstance( + new TypeReference() + { + } + ); + binder.bind(TypeReference.class).annotatedWith(Names.named("taskStatusType")).toInstance(new TypeReference(){}); + binder.bind(TypeReference.class).annotatedWith(Names.named("taskActionType")).toInstance( + new TypeReference() + { + } + ); + binder.bind(TypeReference.class).annotatedWith(Names.named("taskLockType")).toInstance( + new TypeReference() + { + } + ); } private void configureRunners(Binder binder)