better abstraction for metadatastorage

This commit is contained in:
Xavier Léauté 2014-10-30 18:23:35 -07:00
parent 3cc1b2e690
commit 377151beda
7 changed files with 419 additions and 284 deletions

View File

@ -19,48 +19,51 @@
package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.metamx.common.Pair;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Map;
public interface MetadataStorageActionHandler
public interface MetadataStorageActionHandler<TaskType, TaskStatusType, TaskActionType, TaskLockType>
{
/* Insert stuff on the table */
public void insert(
String tableName,
String id,
String createdDate,
DateTime createdDate,
String dataSource,
byte[] payload,
int active,
byte[] statusPayload
) throws Exception;
TaskType task,
boolean active,
TaskStatusType status
) throws TaskExistsException;
/* Insert stuff. @returns 1 if status of the task with the given id has been updated successfully */
public int setStatus(String tableName, String Id, int active, byte[] statusPayload);
public boolean setStatus(String taskId, boolean active, TaskStatusType statusPayload);
/* Retrieve a task with the given ID */
public List<Map<String, Object>> getTask(String tableName, String Id);
public Optional<TaskType> getTask(String taskId);
/* Retrieve a task status with the given ID */
public List<Map<String, Object>> getTaskStatus(String tableName, String Id);
public Optional<TaskStatusType> getTaskStatus(String taskId);
/* Retrieve active tasks */
public List<Map<String, Object>> getActiveTasks(String tableName);
public List<Pair<TaskType, TaskStatusType>> getActiveTasksWithStatus();
/* Retrieve task statuses that have been created sooner than the given time */
public List<Map<String, Object>> getRecentlyFinishedTaskStatuses(String tableName, String recent);
public List<TaskStatusType> getRecentlyFinishedTaskStatuses(DateTime start);
/* Add lock to the task with given ID */
public int addLock(String tableName, String Id, byte[] lock);
public int addLock(String taskId, TaskLockType lock);
/* Remove taskLock with given ID */
public int removeLock(String tableName, long lockId);
public int removeLock(long lockId);
public int addAuditLog(String tableName, String Id, byte[] taskAction);
public int addAuditLog(String taskId, TaskActionType taskAction);
/* Get logs for task with given ID */
public List<Map<String, Object>> getTaskLogs(String tableName, String Id);
public List<TaskActionType> getTaskLogs(String taskId);
/* Get locks for task with given ID */
public List<Map<String, Object>> getTaskLocks(String tableName, String Id);
public Map<Long, TaskLockType> getTaskLocks(String taskId);
}

View File

@ -56,8 +56,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.util.Collection;
import java.util.List;
import java.util.Map;

View File

@ -19,57 +19,49 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.db.MetadataStorageConnector;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.StatementException;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
public class MetadataTaskStorage implements TaskStorage
{
private final ObjectMapper jsonMapper;
private final MetadataStorageConnector metadataStorageConnector;
private final MetadataStorageTablesConfig dbTables;
private final TaskStorageConfig config;
private final MetadataStorageActionHandler handler;
private final MetadataStorageActionHandler<Task, TaskStatus, TaskAction, TaskLock> handler;
private static final EmittingLogger log = new EmittingLogger(MetadataTaskStorage.class);
@Inject
public MetadataTaskStorage(
final ObjectMapper jsonMapper,
final MetadataStorageConnector metadataStorageConnector,
final MetadataStorageTablesConfig dbTables,
final TaskStorageConfig config,
final MetadataStorageActionHandler handler
)
{
this.jsonMapper = jsonMapper;
this.metadataStorageConnector = metadataStorageConnector;
this.dbTables = dbTables;
this.config = config;
this.handler = handler;
// this is a little janky but haven't figured out how to get Guice to do this yet.
this.handler = (MetadataStorageActionHandler<Task, TaskStatus, TaskAction, TaskLock>) handler;
}
@LifecycleStart
@ -100,23 +92,19 @@ public class MetadataTaskStorage implements TaskStorage
try {
handler.insert(
dbTables.getTasksTable(),
task.getId(),
new DateTime().toString(),
new DateTime(),
task.getDataSource(),
jsonMapper.writeValueAsBytes(task),
status.isRunnable() ? 1 : 0,
jsonMapper.writeValueAsBytes(status)
task,
status.isRunnable(),
status
);
}
catch (Exception e) {
final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
if (isStatementException && getTask(task.getId()).isPresent()) {
throw new TaskExistsException(task.getId(), e);
if(e instanceof TaskExistsException) {
throw (TaskExistsException) e;
} else {
throw Throwables.propagate(e);
Throwables.propagate(e);
}
}
}
@ -128,103 +116,77 @@ public class MetadataTaskStorage implements TaskStorage
log.info("Updating task %s to status: %s", status.getId(), status);
try {
int updated = handler.setStatus(
dbTables.getTasksTable(),
status.getId(),
status.isRunnable() ? 1 : 0,
jsonMapper.writeValueAsBytes(status)
);
if (updated != 1) {
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
final boolean set = handler.setStatus(
status.getId(),
status.isRunnable(),
status
);
if (!set) {
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
}
}
@Override
public Optional<Task> getTask(final String taskid)
public Optional<Task> getTask(final String taskId)
{
try {
final List<Map<String, Object>> dbTasks = handler.getTask(dbTables.getTasksTable(), taskid);
if (dbTasks.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("payload"), Task.class));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return handler.getTask(taskId);
}
@Override
public Optional<TaskStatus> getStatus(final String taskid)
public Optional<TaskStatus> getStatus(final String taskId)
{
try {
final List<Map<String, Object>> dbStatuses = handler.getTaskStatus(dbTables.getTasksTable(), taskid);
if (dbStatuses.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbStatuses);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("status_payload"), TaskStatus.class));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return handler.getTaskStatus(taskId);
}
@Override
public List<Task> getActiveTasks()
{
final List<Map<String, Object>> dbTasks = handler.getActiveTasks(dbTables.getTasksTable());
final ImmutableList.Builder<Task> tasks = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final Task task = jsonMapper.readValue((byte[]) row.get("payload"), Task.class);
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isRunnable()) {
tasks.add(task);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
}
}
return tasks.build();
return ImmutableList.copyOf(
Iterables.transform(
Iterables.filter(
handler.getActiveTasksWithStatus(),
new Predicate<Pair<Task, TaskStatus>>()
{
@Override
public boolean apply(
@Nullable Pair<Task, TaskStatus> input
)
{
return input.rhs.isRunnable();
}
}
),
new Function<Pair<Task, TaskStatus>, Task>()
{
@Nullable
@Override
public Task apply(@Nullable Pair<Task, TaskStatus> input)
{
return input.lhs;
}
}
)
);
}
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold());
final DateTime start = new DateTime().minus(config.getRecentlyFinishedThreshold());
final List<Map<String, Object>> dbTasks = handler.getRecentlyFinishedTaskStatuses(dbTables.getTasksTable(), recent.toString());
final ImmutableList.Builder<TaskStatus> statuses = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isComplete()) {
statuses.add(status);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit();
}
}
return statuses.build();
return ImmutableList.copyOf(
Iterables.filter(
handler.getRecentlyFinishedTaskStatuses(start),
new Predicate<TaskStatus>()
{
@Override
public boolean apply(TaskStatus status)
{
return status.isComplete();
}
}
)
);
}
@Override
@ -240,14 +202,7 @@ public class MetadataTaskStorage implements TaskStorage
taskid
);
try {
handler.addLock(dbTables.getTaskLockTable(), taskid, jsonMapper.writeValueAsBytes(taskLock));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
handler.addLock(taskid, taskLock);
}
@Override
@ -264,7 +219,7 @@ public class MetadataTaskStorage implements TaskStorage
if (taskLock.equals(taskLockToRemove)) {
log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
handler.removeLock(dbTables.getTaskLockTable(), id);
handler.removeLock(id);
}
}
}
@ -293,52 +248,17 @@ public class MetadataTaskStorage implements TaskStorage
log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
try {
handler.addAuditLog(dbTables.getTaskLogTable(), task.getId(), jsonMapper.writeValueAsBytes(taskAction));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
handler.addAuditLog(task.getId(), taskAction);
}
@Override
public List<TaskAction> getAuditLogs(final String taskid)
public List<TaskAction> getAuditLogs(final String taskId)
{
final List<Map<String, Object>> dbTaskLogs = handler.getTaskLogs(dbTables.getTaskLogTable(), taskid);
final List<TaskAction> retList = Lists.newArrayList();
for (final Map<String, Object> dbTaskLog : dbTaskLogs) {
try {
retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskid)
.addData("logPayload", dbTaskLog)
.emit();
}
}
return retList;
return handler.getTaskLogs(taskId);
}
private Map<Long, TaskLock> getLocksWithIds(final String taskid)
{
final List<Map<String, Object>> dbTaskLocks = handler.getTaskLocks(dbTables.getTaskLockTable(), taskid);
final Map<Long, TaskLock> retMap = Maps.newHashMap();
for (final Map<String, Object> row : dbTaskLocks) {
try {
retMap.put(
(Long) row.get("id"),
jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", taskid)
.addData("lockPayload", row)
.emit();
}
}
return retMap;
return handler.getTaskLocks(taskid);
}
}
}

View File

@ -29,7 +29,6 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.indexing.common.IndexingServiceCondition;
@ -41,7 +40,6 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
@ -59,7 +57,6 @@ import org.junit.Test;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
public class RemoteTaskRunnerTest
{

View File

@ -19,17 +19,37 @@
package io.druid.db;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Maps;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.metamx.common.Pair;
import com.metamx.common.RetryUtils;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.MetadataStorageActionHandler;
import io.druid.indexing.overlord.TaskExistsException;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.StatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
@ -37,98 +57,148 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class SQLMetadataStorageActionHandler implements MetadataStorageActionHandler
public class SQLMetadataStorageActionHandler<TaskType, TaskStatusType, TaskActionType, TaskLockType>
implements MetadataStorageActionHandler<TaskType, TaskStatusType, TaskActionType, TaskLockType>
{
private static final EmittingLogger log = new EmittingLogger(SQLMetadataStorageActionHandler.class);
private final IDBI dbi;
private final SQLMetadataConnector connector;
private final MetadataStorageTablesConfig config;
private final ObjectMapper jsonMapper;
private final TypeReference taskType;
private final TypeReference taskStatusType;
private final TypeReference taskActionType;
private final TypeReference taskLockType;
@Inject
public SQLMetadataStorageActionHandler(
final IDBI dbi,
final SQLMetadataConnector connector
)
final SQLMetadataConnector connector,
final MetadataStorageTablesConfig config,
final ObjectMapper jsonMapper,
// we all love type erasure
final @Named("taskType") TypeReference taskType,
final @Named("taskStatusType") TypeReference taskStatusType,
final @Named("taskActionType") TypeReference taskActionType,
final @Named("taskLockType") TypeReference taskLockType
)
{
this.dbi = dbi;
this.connector = connector;
this.config = config;
this.jsonMapper = jsonMapper;
this.taskType = taskType;
this.taskStatusType = taskStatusType;
this.taskActionType = taskActionType;
this.taskLockType = taskLockType;
}
/* Insert stuff. @returns number of entries inserted on success */
/**
* Insert stuff
*
*/
public void insert(
final String tableName,
final String id,
final String createdDate,
final DateTime createdDate,
final String dataSource,
final byte[] payload,
final int active,
final byte[] statusPayload
)
final TaskType task,
final boolean active,
final TaskStatusType status
) throws TaskExistsException
{
retryingHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
try {
retryingHandle(
new HandleCallback<Void>()
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
tableName
)
)
.bind("id", id)
.bind("created_date", createdDate)
.bind("datasource", dataSource)
.bind("payload", payload)
.bind("active", active)
.bind("status_payload", statusPayload)
.execute();
return null;
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
config.getTasksTable()
)
)
.bind("id", id)
.bind("created_date", createdDate)
.bind("datasource", dataSource)
.bind("payload", jsonMapper.writeValueAsBytes(task))
.bind("active", active)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
return null;
}
}
}
);
);
} catch(Exception e) {
final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
if (isStatementException && getTask(id).isPresent()) {
throw new TaskExistsException(id, e);
} else {
throw Throwables.propagate(e);
}
}
}
/* Insert stuff. @returns 1 if status of the task with the given id has been updated successfully */
public int setStatus(final String tableName, final String Id, final int active, final byte[] statusPayload)
/**
* Update task status.
*
* @return true if status of the task with the given id has been updated successfully
*/
public boolean setStatus(final String taskId, final boolean active, final TaskStatusType status)
{
return retryingHandle(
new HandleCallback<Integer>()
new HandleCallback<Boolean>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
public Boolean withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
tableName
config.getTasksTable()
)
)
.bind("id", Id)
.bind("id", taskId)
.bind("active", active)
.bind("status_payload", statusPayload)
.execute();
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute() == 1;
}
}
);
}
/* Retrieve a task with the given ID */
public List<Map<String, Object>> getTask(final String tableName, final String Id)
/* */
/**
* Retrieve a task with the given ID
*
* @param taskId task ID
* @return task if it exists
*/
public Optional<TaskType> getTask(final String taskId)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
new HandleCallback<Optional<TaskType>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
public Optional<TaskType> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE id = :id",
tableName
)
return Optional.fromNullable(
jsonMapper.<TaskType>readValue(
handle.createQuery(
String.format("SELECT payload FROM %s WHERE id = :id", config.getTasksTable())
)
.bind("id", Id)
.list();
.bind("id", taskId)
.map(ByteArrayMapper.FIRST)
.first(),
taskType
)
);
}
}
);
@ -136,42 +206,72 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
}
/* Retrieve a task status with the given ID */
public List<Map<String, Object>> getTaskStatus(final String tableName, final String Id)
public Optional<TaskStatusType> getTaskStatus(final String taskId)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
new HandleCallback<Optional<TaskStatusType>>()
{
return handle.createQuery(
String.format(
"SELECT status_payload FROM %s WHERE id = :id",
tableName
)
)
.bind("id", Id)
.list();
@Override
public Optional<TaskStatusType> withHandle(Handle handle) throws Exception
{
return Optional.fromNullable(
jsonMapper.<TaskStatusType>readValue(
handle.createQuery(
String.format("SELECT status_payload FROM %s WHERE id = :id", config.getTasksTable())
)
.bind("id", taskId)
.map(ByteArrayMapper.FIRST)
.first(),
taskStatusType
)
);
}
}
}
);
}
/* Retrieve active tasks */
public List<Map<String, Object>> getActiveTasks(final String tableName)
public List<Pair<TaskType, TaskStatusType>> getActiveTasksWithStatus()
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
new HandleCallback<List<Pair<TaskType, TaskStatusType>>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
public List<Pair<TaskType, TaskStatusType>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT id, payload, status_payload FROM %s WHERE active = 1 ORDER BY created_date",
tableName
)
).list();
return handle
.createQuery(
String.format(
"SELECT id, payload, status_payload FROM %s WHERE active = TRUE ORDER BY created_date",
config.getTasksTable()
)
)
.map(
new ResultSetMapper<Pair<TaskType, TaskStatusType>>()
{
@Override
public Pair<TaskType, TaskStatusType> map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return Pair.of(
jsonMapper.<TaskType>readValue(
r.getBytes("payload"),
taskType
),
jsonMapper.<TaskStatusType>readValue(
r.getBytes("status_payload"),
taskStatusType
)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", r.getString("id")).emit();
throw new SQLException(e);
}
}
}
).list();
}
}
);
@ -179,27 +279,49 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
}
/* Retrieve task statuses that have been created sooner than the given time */
public List<Map<String, Object>> getRecentlyFinishedTaskStatuses(final String tableName, final String recent)
public List<TaskStatusType> getRecentlyFinishedTaskStatuses(final DateTime start)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
new HandleCallback<List<TaskStatusType>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
public List<TaskStatusType> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC",
tableName
)
).bind("recent", recent.toString()).list();
return handle
.createQuery(
String.format(
"SELECT id, status_payload FROM %s WHERE active = FALSE AND created_date >= :start ORDER BY created_date DESC",
config.getTasksTable()
)
).bind("start", start.toString())
.map(
new ResultSetMapper<TaskStatusType>()
{
@Override
public TaskStatusType map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
return jsonMapper.readValue(
r.getBytes("status_payload"),
taskStatusType
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to parse status payload")
.addData("task", r.getString("id"))
.emit();
throw new SQLException(e);
}
}
}
).list();
}
}
);
}
/* Add lock to the task with given ID */
public int addLock(final String tableName, final String Id, final byte[] lock)
public int addLock(final String taskId, final TaskLockType lock)
{
return retryingHandle(
new HandleCallback<Integer>()
@ -210,11 +332,11 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, lock_payload) VALUES (:task_id, :lock_payload)",
tableName
config.getTaskLockTable()
)
)
.bind("task_id", Id)
.bind("lock_payload", lock)
.bind("task_id", taskId)
.bind("lock_payload", jsonMapper.writeValueAsBytes(lock))
.execute();
}
}
@ -222,7 +344,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
}
/* Remove taskLock with given ID */
public int removeLock(final String tableName, final long lockId)
public int removeLock(final long lockId)
{
return retryingHandle(
new HandleCallback<Integer>()
@ -233,7 +355,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
return handle.createStatement(
String.format(
"DELETE FROM %s WHERE id = :id",
tableName
config.getTaskLockTable()
)
)
.bind("id", lockId)
@ -243,7 +365,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
);
}
public int addAuditLog(final String tableName, final String Id, final byte[] taskAction)
public int addAuditLog(final String taskId, final TaskActionType taskAction)
{
return retryingHandle(
new HandleCallback<Integer>()
@ -254,11 +376,11 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, log_payload) VALUES (:task_id, :log_payload)",
tableName
config.getTaskLogTable()
)
)
.bind("task_id", Id)
.bind("log_payload", taskAction)
.bind("task_id", taskId)
.bind("log_payload", jsonMapper.writeValueAsBytes(taskAction))
.execute();
}
}
@ -266,44 +388,116 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
}
/* Get logs for task with given ID */
public List<Map<String, Object>> getTaskLogs(final String tableName, final String Id)
public List<TaskActionType> getTaskLogs(final String taskId)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
new HandleCallback<List<TaskActionType>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
public List<TaskActionType> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT log_payload FROM %s WHERE task_id = :task_id",
tableName
)
return handle
.createQuery(
String.format(
"SELECT log_payload FROM %s WHERE task_id = :task_id",
config.getTaskLogTable()
)
.bind("task_id", Id)
.list();
)
.bind("task_id", taskId)
.map(ByteArrayMapper.FIRST)
.fold(
Lists.<TaskActionType>newLinkedList(),
new Folder3<List<TaskActionType>, byte[]>()
{
@Override
public List<TaskActionType> fold(
List<TaskActionType> list, byte[] bytes, FoldController control, StatementContext ctx
) throws SQLException
{
try {
list.add(
jsonMapper.<TaskActionType>readValue(
bytes, taskActionType
)
);
return list;
}
catch (IOException e) {
log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskId)
.addData("logPayload", new String(bytes, Charsets.UTF_8))
.emit();
throw new SQLException(e);
}
}
}
);
}
}
);
}
/* Get locks for task with given ID */
public List<Map<String, Object>> getTaskLocks(final String tableName, final String Id)
public Map<Long, TaskLockType> getTaskLocks(final String taskId)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
new HandleCallback<Map<Long, TaskLockType>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
public Map<Long, TaskLockType> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT id, lock_payload FROM %s WHERE task_id = :task_id",
tableName
config.getTaskLockTable()
)
)
.bind("task_id", Id)
.list();
.bind("task_id", taskId)
.map(
new ResultSetMapper<Pair<Long, TaskLockType>>()
{
@Override
public Pair<Long, TaskLockType> map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return Pair.of(
r.getLong("id"),
jsonMapper.<TaskLockType>readValue(
r.getBytes("lock_payload"),
taskLockType
)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", r.getLong("id"))
.addData(
"lockPayload", new String(r.getBytes("lock_payload"), Charsets.UTF_8)
)
.emit();
throw new SQLException(e);
}
}
}
)
.fold(
Maps.<Long, TaskLockType>newLinkedHashMap(),
new Folder3<Map<Long, TaskLockType>, Pair<Long, TaskLockType>>()
{
@Override
public Map<Long, TaskLockType> fold(
Map<Long, TaskLockType> accumulator,
Pair<Long, TaskLockType> lock,
FoldController control,
StatementContext ctx
) throws SQLException
{
accumulator.put(lock.lhs, lock.rhs);
return accumulator;
}
}
);
}
}
);

View File

@ -19,6 +19,7 @@
package io.druid.cli;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
@ -31,6 +32,7 @@ import com.google.inject.servlet.GuiceFilter;
import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.indexing.common.TaskStatus;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper;
@ -43,17 +45,20 @@ import io.druid.guice.LifecycleModule;
import io.druid.guice.ListProvider;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.PolyBind;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import io.druid.indexing.overlord.MetadataTaskStorage;
import io.druid.indexing.overlord.ForkingTaskRunnerFactory;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataTaskStorage;
import io.druid.indexing.overlord.RemoteTaskRunnerFactory;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskMaster;
@ -180,6 +185,24 @@ public class CliOverlord extends ServerRunnable
storageBinder.addBinding("db").to(MetadataTaskStorage.class).in(ManageLifecycle.class);
binder.bind(MetadataTaskStorage.class).in(LazySingleton.class);
// gotta love type erasure
binder.bind(TypeReference.class).annotatedWith(Names.named("taskType")).toInstance(
new TypeReference<Task>()
{
}
);
binder.bind(TypeReference.class).annotatedWith(Names.named("taskStatusType")).toInstance(new TypeReference<TaskStatus>(){});
binder.bind(TypeReference.class).annotatedWith(Names.named("taskActionType")).toInstance(
new TypeReference<TaskAction>()
{
}
);
binder.bind(TypeReference.class).annotatedWith(Names.named("taskLockType")).toInstance(
new TypeReference<TaskLock>()
{
}
);
}
private void configureRunners(Binder binder)