From 6c993d87bf24827cb13bed26612d8ba8fd43ebaf Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Dec 2013 11:36:36 -0800 Subject: [PATCH] Indexing service API and GUI improvements! - New APIs: waitingTasks, completeTasks, task payload - GUI for the above, and for task logs + status --- .../indexing/overlord/DbTaskStorage.java | 54 +++- .../indexing/overlord/ForkingTaskRunner.java | 2 +- .../overlord/HeapMemoryTaskStorage.java | 34 ++- .../indexing/overlord/TaskRunnerWorkItem.java | 6 + .../druid/indexing/overlord/TaskStorage.java | 7 + .../overlord/TaskStorageQueryAdapter.java | 20 ++ .../overlord/http/OverlordResource.java | 262 +++++++++++++----- .../resources/indexer_static/console.html | 8 + .../indexer_static/js/console-0.0.1.js | 31 ++- 9 files changed, 341 insertions(+), 83 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java index 7d3ad05512e..ff2d82d03cf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java @@ -34,7 +34,6 @@ import com.metamx.common.RetryUtils; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; -import com.mysql.jdbc.exceptions.MySQLTimeoutException; import com.mysql.jdbc.exceptions.MySQLTransientException; import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; @@ -43,6 +42,7 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; +import org.joda.time.Period; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.exceptions.CallbackFailedException; @@ -65,6 +65,7 @@ public class DbTaskStorage implements TaskStorage private final DbTablesConfig dbTables; private final IDBI dbi; + private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis(); private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); @Inject @@ -271,6 +272,45 @@ public class DbTaskStorage implements TaskStorage ); } + @Override + public List getRecentlyFinishedTaskStatuses() + { + final DateTime recent = new DateTime().minus(RECENCY_THRESHOLD); + return retryingHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + final List> dbTasks = + handle.createQuery( + String.format( + "SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date", + dbTables.getTasksTable() + ) + ).bind("recent", recent.toString()).list(); + + final ImmutableList.Builder statuses = ImmutableList.builder(); + for (final Map row : dbTasks) { + final String id = row.get("id").toString(); + + try { + final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class); + if (status.isComplete()) { + statuses.add(status); + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit(); + } + } + + return statuses.build(); + } + } + ); + } + @Override public void addLock(final String taskid, final TaskLock taskLock) { @@ -407,7 +447,8 @@ public class DbTaskStorage implements TaskStorage for (final Map dbTaskLog : dbTaskLogs) { try { retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class)); - } catch (Exception e) { + } + catch (Exception e) { log.makeAlert(e, "Failed to deserialize TaskLog") .addData("task", taskid) .addData("logPayload", dbTaskLog) @@ -451,7 +492,8 @@ public class DbTaskStorage implements TaskStorage /** * Retry SQL operations */ - private T retryingHandle(final HandleCallback callback) { + private T retryingHandle(final HandleCallback callback) + { final Callable call = new Callable() { @Override @@ -471,9 +513,11 @@ public class DbTaskStorage implements TaskStorage final int maxTries = 10; try { return RetryUtils.retry(call, shouldRetry, maxTries); - } catch (RuntimeException e) { + } + catch (RuntimeException e) { throw Throwables.propagate(e); - } catch (Exception e) { + } + catch (Exception e) { throw new CallbackFailedException(e); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index fce401c6641..1099bddbcc6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -391,7 +391,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer if (offset > 0) { raf.seek(offset); } else if (offset < 0 && offset < rafLength) { - raf.seek(rafLength + offset); + raf.seek(Math.max(0, rafLength + offset)); } return Channels.newInputStream(raf.getChannel()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java index ef23972ebe4..79eb41f1fbc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -31,6 +31,8 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.task.Task; +import org.joda.time.DateTime; +import org.joda.time.Period; import java.util.List; import java.util.Map; @@ -47,6 +49,7 @@ public class HeapMemoryTaskStorage implements TaskStorage private final Multimap taskLocks = HashMultimap.create(); private final Multimap taskActions = ArrayListMultimap.create(); + private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis(); private static final Logger log = new Logger(HeapMemoryTaskStorage.class); @Override @@ -69,7 +72,7 @@ public class HeapMemoryTaskStorage implements TaskStorage } log.info("Inserting task %s with status: %s", task.getId(), status); - tasks.put(task.getId(), new TaskStuff(task, status)); + tasks.put(task.getId(), new TaskStuff(task, status, new DateTime())); } finally { giant.unlock(); } @@ -139,7 +142,25 @@ public class HeapMemoryTaskStorage implements TaskStorage listBuilder.add(taskStuff.getTask()); } } + return listBuilder.build(); + } finally { + giant.unlock(); + } + } + @Override + public List getRecentlyFinishedTaskStatuses() + { + giant.lock(); + + try { + final ImmutableList.Builder listBuilder = ImmutableList.builder(); + final long recent = System.currentTimeMillis() - RECENCY_THRESHOLD; + for(final TaskStuff taskStuff : tasks.values()) { + if(taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().getMillis() > recent) { + listBuilder.add(taskStuff.getStatus()); + } + } return listBuilder.build(); } finally { giant.unlock(); @@ -212,8 +233,9 @@ public class HeapMemoryTaskStorage implements TaskStorage { final Task task; final TaskStatus status; + final DateTime createdDate; - private TaskStuff(Task task, TaskStatus status) + private TaskStuff(Task task, TaskStatus status, DateTime createdDate) { Preconditions.checkNotNull(task); Preconditions.checkNotNull(status); @@ -221,6 +243,7 @@ public class HeapMemoryTaskStorage implements TaskStorage this.task = task; this.status = status; + this.createdDate = Preconditions.checkNotNull(createdDate, "createdDate"); } public Task getTask() @@ -233,9 +256,14 @@ public class HeapMemoryTaskStorage implements TaskStorage return status; } + public DateTime getCreatedDate() + { + return createdDate; + } + private TaskStuff withStatus(TaskStatus _status) { - return new TaskStuff(task, _status); + return new TaskStuff(task, _status, createdDate); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java index a78faa24d03..2963c875257 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java @@ -19,6 +19,8 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ComparisonChain; import com.google.common.util.concurrent.ListenableFuture; import io.druid.indexing.common.TaskStatus; @@ -56,21 +58,25 @@ public class TaskRunnerWorkItem implements Comparable this.queueInsertionTime = queueInsertionTime; } + @JsonProperty public String getTaskId() { return taskId; } + @JsonIgnore public ListenableFuture getResult() { return result; } + @JsonProperty public DateTime getCreatedTime() { return createdTime; } + @JsonProperty public DateTime getQueueInsertionTime() { return queueInsertionTime; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java index 3a2145627df..191307fa949 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java @@ -82,6 +82,13 @@ public interface TaskStorage */ public List getActiveTasks(); + /** + * Returns a list of recently finished task statuses as stored in the storage facility. No particular order + * is guaranteed. No particular standard of "recent" is guaranteed, and in fact, this method is permitted to + * simply return nothing. + */ + public List getRecentlyFinishedTaskStatuses(); + /** * Returns a list of locks for a particular task. */ diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java index e9a2a8d5d7c..67ea11dcf33 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -19,14 +19,19 @@ package io.druid.indexing.overlord; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.task.Task; import io.druid.timeline.DataSegment; +import javax.annotation.Nullable; +import java.util.List; import java.util.Set; /** @@ -42,6 +47,21 @@ public class TaskStorageQueryAdapter this.storage = storage; } + public List getActiveTasks() + { + return storage.getActiveTasks(); + } + + public List getRecentlyFinishedTaskStatuses() + { + return storage.getRecentlyFinishedTaskStatuses(); + } + + public Optional getTask(final String taskid) + { + return storage.getTask(taskid); + } + public Optional getStatus(final String taskid) { return storage.getStatus(taskid); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index d3d58bef03d..c3a8ab1224b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -19,15 +19,20 @@ package io.druid.indexing.overlord.http; +import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.io.InputSupplier; +import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.common.config.JacksonConfigManager; +import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; @@ -40,6 +45,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.tasklogs.TaskLogStreamer; import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -52,6 +58,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -63,20 +71,6 @@ public class OverlordResource { private static final Logger log = new Logger(OverlordResource.class); - private static Function> simplifyTaskFn = - new Function>() - { - @Override - public Map apply(TaskRunnerWorkItem input) - { - return new ImmutableMap.Builder() - .put("id", input.getTaskId()) - .put("createdTime", input.getCreatedTime()) - .put("queueInsertionTime", input.getQueueInsertionTime()) - .build(); - } - }; - private final TaskMaster taskMaster; private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskLogStreamer taskLogStreamer; @@ -139,6 +133,14 @@ public class OverlordResource ); } + @GET + @Path("/task/{taskid}") + @Produces("application/json") + public Response getTaskPayload(@PathParam("taskid") String taskid) + { + return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid)); + } + @GET @Path("/task/{taskid}/status") @Produces("application/json") @@ -238,39 +240,64 @@ public class OverlordResource } @GET - @Path("/pendingTasks") + @Path("/waitingTasks") @Produces("application/json") - public Response getPendingTasks( - @QueryParam("full") String full - ) + public Response getWaitingTasks() { - if (full != null) { - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - return Response.ok(taskRunner.getPendingTasks()).build(); - } - } - ); - } - - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() + return workItemsResponse( + new Function>() { @Override - public Response apply(TaskRunner taskRunner) + public Collection apply(TaskRunner taskRunner) { - return Response.ok( - Collections2.transform( - taskRunner.getPendingTasks(), - simplifyTaskFn + // A bit roundabout, but works as a way of figuring out what tasks haven't been handed + // off to the runner yet: + final List activeTasks = taskStorageQueryAdapter.getActiveTasks(); + final Set runnersKnownTasks = Sets.newHashSet( + Iterables.transform( + taskRunner.getKnownTasks(), + new Function() + { + @Override + public String apply(final TaskRunnerWorkItem workItem) + { + return workItem.getTaskId(); + } + } ) - ).build(); + ); + final List waitingTasks = Lists.newArrayList(); + for (final Task task : activeTasks) { + if (!runnersKnownTasks.contains(task.getId())) { + waitingTasks.add( + // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it. + new TaskRunnerWorkItem( + task.getId(), + SettableFuture.create(), + new DateTime(0), + new DateTime(0) + ) + ); + } + } + return waitingTasks; + } + } + ); + } + + @GET + @Path("/pendingTasks") + @Produces("application/json") + public Response getPendingTasks() + { + return workItemsResponse( + new Function>() + { + @Override + public Collection apply(TaskRunner taskRunner) + { + return taskRunner.getPendingTasks(); } } ); @@ -279,42 +306,45 @@ public class OverlordResource @GET @Path("/runningTasks") @Produces("application/json") - public Response getRunningTasks( - @QueryParam("full") String full - ) + public Response getRunningTasks() { - if (full != null) { - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - return Response.ok(taskRunner.getRunningTasks()).build(); - } - } - ); - } - - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() + return workItemsResponse( + new Function>() { @Override - public Response apply(TaskRunner taskRunner) + public Collection apply(TaskRunner taskRunner) { - return Response.ok( - Collections2.transform( - taskRunner.getRunningTasks(), - simplifyTaskFn - ) - ).build(); + return taskRunner.getRunningTasks(); } } ); } + @GET + @Path("/completeTasks") + @Produces("application/json") + public Response getCompleteTasks() + { + final List completeTasks = Lists.transform( + taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(), + new Function() + { + @Override + public TaskResponseObject apply(TaskStatus taskStatus) + { + // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it. + return new TaskResponseObject( + taskStatus.getId(), + new DateTime(0), + new DateTime(0), + Optional.of(taskStatus) + ); + } + } + ); + return Response.ok(completeTasks).build(); + } + @GET @Path("/workers") @Produces("application/json") @@ -373,7 +403,39 @@ public class OverlordResource } } - public Response optionalTaskResponse(String taskid, String objectType, Optional x) + private Response workItemsResponse(final Function> fn) + { + return asLeaderWith( + taskMaster.getTaskRunner(), + new Function() + { + @Override + public Response apply(TaskRunner taskRunner) + { + return Response.ok( + Lists.transform( + Lists.newArrayList(fn.apply(taskRunner)), + new Function() + { + @Override + public TaskResponseObject apply(TaskRunnerWorkItem workItem) + { + return new TaskResponseObject( + workItem.getTaskId(), + workItem.getCreatedTime(), + workItem.getQueueInsertionTime(), + Optional.absent() + ); + } + } + ) + ).build(); + } + } + ); + } + + private Response optionalTaskResponse(String taskid, String objectType, Optional x) { final Map results = Maps.newHashMap(); results.put("task", taskid); @@ -385,7 +447,7 @@ public class OverlordResource } } - public Response asLeaderWith(Optional x, Function f) + private Response asLeaderWith(Optional x, Function f) { if (x.isPresent()) { return f.apply(x.get()); @@ -394,4 +456,62 @@ public class OverlordResource return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); } } + + private static class TaskResponseObject + { + private final String id; + private final DateTime createdTime; + private final DateTime queueInsertionTime; + private final Optional status; + + private TaskResponseObject( + String id, + DateTime createdTime, + DateTime queueInsertionTime, + Optional status + ) + { + this.id = id; + this.createdTime = createdTime; + this.queueInsertionTime = queueInsertionTime; + this.status = status; + } + + public String getId() + { + return id; + } + + public DateTime getCreatedTime() + { + return createdTime; + } + + public DateTime getQueueInsertionTime() + { + return queueInsertionTime; + } + + public Optional getStatus() + { + return status; + } + + @JsonValue + public Map toJson() + { + final Map data = Maps.newLinkedHashMap(); + data.put("id", id); + if (createdTime.getMillis() > 0) { + data.put("createdTime", createdTime); + } + if (queueInsertionTime.getMillis() > 0) { + data.put("queueInsertionTime", queueInsertionTime); + } + if (status.isPresent()) { + data.put("statusCode", status.get().getStatusCode().toString()); + } + return data; + } + } } diff --git a/indexing-service/src/main/resources/indexer_static/console.html b/indexing-service/src/main/resources/indexer_static/console.html index 6223eee6a1f..e782ff529ab 100644 --- a/indexing-service/src/main/resources/indexer_static/console.html +++ b/indexing-service/src/main/resources/indexer_static/console.html @@ -47,6 +47,14 @@
Loading Pending Tasks... this may take a few minutes
+

Waiting Tasks

+
Loading Waiting Tasks... this may take a few minutes
+
+ +

Complete Tasks

+
Loading Complete Tasks... this may take a few minutes
+
+

Workers

Loading Workers... this may take a few minutes
diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index e3ce86c85c9..adaa1fba83f 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -3,14 +3,39 @@ var oTable = []; $(document).ready(function() { + var augment = function(data) { + for (i = 0 ; i < data.length ; i++) { + var taskId = encodeURIComponent(data[i].id) + data[i].more = + 'payload' + + 'status' + + 'log (all)' + + 'log (last 8kb)' + } + } + $.get('/druid/indexer/v1/runningTasks', function(data) { $('.running_loading').hide(); - buildTable(data, $('#runningTable'), ["segments"]); + augment(data); + buildTable(data, $('#runningTable')); }); $.get('/druid/indexer/v1/pendingTasks', function(data) { $('.pending_loading').hide(); - buildTable(data, $('#pendingTable'), ["segments"]); + augment(data); + buildTable(data, $('#pendingTable')); + }); + + $.get('/druid/indexer/v1/waitingTasks', function(data) { + $('.waiting_loading').hide(); + augment(data); + buildTable(data, $('#waitingTable')); + }); + + $.get('/druid/indexer/v1/completeTasks', function(data) { + $('.complete_loading').hide(); + augment(data); + buildTable(data, $('#completeTable')); }); $.get('/druid/indexer/v1/workers', function(data) { @@ -22,4 +47,4 @@ $(document).ready(function() { $('.events_loading').hide(); buildTable(data, $('#eventTable')); }); -}); \ No newline at end of file +});