From 6c993d87bf24827cb13bed26612d8ba8fd43ebaf Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Dec 2013 11:36:36 -0800 Subject: [PATCH 1/5] Indexing service API and GUI improvements! - New APIs: waitingTasks, completeTasks, task payload - GUI for the above, and for task logs + status --- .../indexing/overlord/DbTaskStorage.java | 54 +++- .../indexing/overlord/ForkingTaskRunner.java | 2 +- .../overlord/HeapMemoryTaskStorage.java | 34 ++- .../indexing/overlord/TaskRunnerWorkItem.java | 6 + .../druid/indexing/overlord/TaskStorage.java | 7 + .../overlord/TaskStorageQueryAdapter.java | 20 ++ .../overlord/http/OverlordResource.java | 262 +++++++++++++----- .../resources/indexer_static/console.html | 8 + .../indexer_static/js/console-0.0.1.js | 31 ++- 9 files changed, 341 insertions(+), 83 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java index 7d3ad05512e..ff2d82d03cf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java @@ -34,7 +34,6 @@ import com.metamx.common.RetryUtils; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; -import com.mysql.jdbc.exceptions.MySQLTimeoutException; import com.mysql.jdbc.exceptions.MySQLTransientException; import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; @@ -43,6 +42,7 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; +import org.joda.time.Period; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.exceptions.CallbackFailedException; @@ -65,6 +65,7 @@ public class DbTaskStorage implements TaskStorage private final DbTablesConfig dbTables; private final IDBI dbi; + private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis(); private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); @Inject @@ -271,6 +272,45 @@ public class DbTaskStorage implements TaskStorage ); } + @Override + public List getRecentlyFinishedTaskStatuses() + { + final DateTime recent = new DateTime().minus(RECENCY_THRESHOLD); + return retryingHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + final List> dbTasks = + handle.createQuery( + String.format( + "SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date", + dbTables.getTasksTable() + ) + ).bind("recent", recent.toString()).list(); + + final ImmutableList.Builder statuses = ImmutableList.builder(); + for (final Map row : dbTasks) { + final String id = row.get("id").toString(); + + try { + final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class); + if (status.isComplete()) { + statuses.add(status); + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit(); + } + } + + return statuses.build(); + } + } + ); + } + @Override public void addLock(final String taskid, final TaskLock taskLock) { @@ -407,7 +447,8 @@ public class DbTaskStorage implements TaskStorage for (final Map dbTaskLog : dbTaskLogs) { try { retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class)); - } catch (Exception e) { + } + catch (Exception e) { log.makeAlert(e, "Failed to deserialize TaskLog") .addData("task", taskid) .addData("logPayload", dbTaskLog) @@ -451,7 +492,8 @@ public class DbTaskStorage implements TaskStorage /** * Retry SQL operations */ - private T retryingHandle(final HandleCallback callback) { + private T retryingHandle(final HandleCallback callback) + { final Callable call = new Callable() { @Override @@ -471,9 +513,11 @@ public class DbTaskStorage implements TaskStorage final int maxTries = 10; try { return RetryUtils.retry(call, shouldRetry, maxTries); - } catch (RuntimeException e) { + } + catch (RuntimeException e) { throw Throwables.propagate(e); - } catch (Exception e) { + } + catch (Exception e) { throw new CallbackFailedException(e); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index fce401c6641..1099bddbcc6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -391,7 +391,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer if (offset > 0) { raf.seek(offset); } else if (offset < 0 && offset < rafLength) { - raf.seek(rafLength + offset); + raf.seek(Math.max(0, rafLength + offset)); } return Channels.newInputStream(raf.getChannel()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java index ef23972ebe4..79eb41f1fbc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -31,6 +31,8 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.task.Task; +import org.joda.time.DateTime; +import org.joda.time.Period; import java.util.List; import java.util.Map; @@ -47,6 +49,7 @@ public class HeapMemoryTaskStorage implements TaskStorage private final Multimap taskLocks = HashMultimap.create(); private final Multimap taskActions = ArrayListMultimap.create(); + private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis(); private static final Logger log = new Logger(HeapMemoryTaskStorage.class); @Override @@ -69,7 +72,7 @@ public class HeapMemoryTaskStorage implements TaskStorage } log.info("Inserting task %s with status: %s", task.getId(), status); - tasks.put(task.getId(), new TaskStuff(task, status)); + tasks.put(task.getId(), new TaskStuff(task, status, new DateTime())); } finally { giant.unlock(); } @@ -139,7 +142,25 @@ public class HeapMemoryTaskStorage implements TaskStorage listBuilder.add(taskStuff.getTask()); } } + return listBuilder.build(); + } finally { + giant.unlock(); + } + } + @Override + public List getRecentlyFinishedTaskStatuses() + { + giant.lock(); + + try { + final ImmutableList.Builder listBuilder = ImmutableList.builder(); + final long recent = System.currentTimeMillis() - RECENCY_THRESHOLD; + for(final TaskStuff taskStuff : tasks.values()) { + if(taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().getMillis() > recent) { + listBuilder.add(taskStuff.getStatus()); + } + } return listBuilder.build(); } finally { giant.unlock(); @@ -212,8 +233,9 @@ public class HeapMemoryTaskStorage implements TaskStorage { final Task task; final TaskStatus status; + final DateTime createdDate; - private TaskStuff(Task task, TaskStatus status) + private TaskStuff(Task task, TaskStatus status, DateTime createdDate) { Preconditions.checkNotNull(task); Preconditions.checkNotNull(status); @@ -221,6 +243,7 @@ public class HeapMemoryTaskStorage implements TaskStorage this.task = task; this.status = status; + this.createdDate = Preconditions.checkNotNull(createdDate, "createdDate"); } public Task getTask() @@ -233,9 +256,14 @@ public class HeapMemoryTaskStorage implements TaskStorage return status; } + public DateTime getCreatedDate() + { + return createdDate; + } + private TaskStuff withStatus(TaskStatus _status) { - return new TaskStuff(task, _status); + return new TaskStuff(task, _status, createdDate); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java index a78faa24d03..2963c875257 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java @@ -19,6 +19,8 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ComparisonChain; import com.google.common.util.concurrent.ListenableFuture; import io.druid.indexing.common.TaskStatus; @@ -56,21 +58,25 @@ public class TaskRunnerWorkItem implements Comparable this.queueInsertionTime = queueInsertionTime; } + @JsonProperty public String getTaskId() { return taskId; } + @JsonIgnore public ListenableFuture getResult() { return result; } + @JsonProperty public DateTime getCreatedTime() { return createdTime; } + @JsonProperty public DateTime getQueueInsertionTime() { return queueInsertionTime; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java index 3a2145627df..191307fa949 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java @@ -82,6 +82,13 @@ public interface TaskStorage */ public List getActiveTasks(); + /** + * Returns a list of recently finished task statuses as stored in the storage facility. No particular order + * is guaranteed. No particular standard of "recent" is guaranteed, and in fact, this method is permitted to + * simply return nothing. + */ + public List getRecentlyFinishedTaskStatuses(); + /** * Returns a list of locks for a particular task. */ diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java index e9a2a8d5d7c..67ea11dcf33 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -19,14 +19,19 @@ package io.druid.indexing.overlord; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.task.Task; import io.druid.timeline.DataSegment; +import javax.annotation.Nullable; +import java.util.List; import java.util.Set; /** @@ -42,6 +47,21 @@ public class TaskStorageQueryAdapter this.storage = storage; } + public List getActiveTasks() + { + return storage.getActiveTasks(); + } + + public List getRecentlyFinishedTaskStatuses() + { + return storage.getRecentlyFinishedTaskStatuses(); + } + + public Optional getTask(final String taskid) + { + return storage.getTask(taskid); + } + public Optional getStatus(final String taskid) { return storage.getStatus(taskid); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index d3d58bef03d..c3a8ab1224b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -19,15 +19,20 @@ package io.druid.indexing.overlord.http; +import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.io.InputSupplier; +import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.common.config.JacksonConfigManager; +import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; @@ -40,6 +45,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.tasklogs.TaskLogStreamer; import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -52,6 +58,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -63,20 +71,6 @@ public class OverlordResource { private static final Logger log = new Logger(OverlordResource.class); - private static Function> simplifyTaskFn = - new Function>() - { - @Override - public Map apply(TaskRunnerWorkItem input) - { - return new ImmutableMap.Builder() - .put("id", input.getTaskId()) - .put("createdTime", input.getCreatedTime()) - .put("queueInsertionTime", input.getQueueInsertionTime()) - .build(); - } - }; - private final TaskMaster taskMaster; private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskLogStreamer taskLogStreamer; @@ -139,6 +133,14 @@ public class OverlordResource ); } + @GET + @Path("/task/{taskid}") + @Produces("application/json") + public Response getTaskPayload(@PathParam("taskid") String taskid) + { + return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid)); + } + @GET @Path("/task/{taskid}/status") @Produces("application/json") @@ -238,39 +240,64 @@ public class OverlordResource } @GET - @Path("/pendingTasks") + @Path("/waitingTasks") @Produces("application/json") - public Response getPendingTasks( - @QueryParam("full") String full - ) + public Response getWaitingTasks() { - if (full != null) { - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - return Response.ok(taskRunner.getPendingTasks()).build(); - } - } - ); - } - - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() + return workItemsResponse( + new Function>() { @Override - public Response apply(TaskRunner taskRunner) + public Collection apply(TaskRunner taskRunner) { - return Response.ok( - Collections2.transform( - taskRunner.getPendingTasks(), - simplifyTaskFn + // A bit roundabout, but works as a way of figuring out what tasks haven't been handed + // off to the runner yet: + final List activeTasks = taskStorageQueryAdapter.getActiveTasks(); + final Set runnersKnownTasks = Sets.newHashSet( + Iterables.transform( + taskRunner.getKnownTasks(), + new Function() + { + @Override + public String apply(final TaskRunnerWorkItem workItem) + { + return workItem.getTaskId(); + } + } ) - ).build(); + ); + final List waitingTasks = Lists.newArrayList(); + for (final Task task : activeTasks) { + if (!runnersKnownTasks.contains(task.getId())) { + waitingTasks.add( + // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it. + new TaskRunnerWorkItem( + task.getId(), + SettableFuture.create(), + new DateTime(0), + new DateTime(0) + ) + ); + } + } + return waitingTasks; + } + } + ); + } + + @GET + @Path("/pendingTasks") + @Produces("application/json") + public Response getPendingTasks() + { + return workItemsResponse( + new Function>() + { + @Override + public Collection apply(TaskRunner taskRunner) + { + return taskRunner.getPendingTasks(); } } ); @@ -279,42 +306,45 @@ public class OverlordResource @GET @Path("/runningTasks") @Produces("application/json") - public Response getRunningTasks( - @QueryParam("full") String full - ) + public Response getRunningTasks() { - if (full != null) { - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - return Response.ok(taskRunner.getRunningTasks()).build(); - } - } - ); - } - - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() + return workItemsResponse( + new Function>() { @Override - public Response apply(TaskRunner taskRunner) + public Collection apply(TaskRunner taskRunner) { - return Response.ok( - Collections2.transform( - taskRunner.getRunningTasks(), - simplifyTaskFn - ) - ).build(); + return taskRunner.getRunningTasks(); } } ); } + @GET + @Path("/completeTasks") + @Produces("application/json") + public Response getCompleteTasks() + { + final List completeTasks = Lists.transform( + taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(), + new Function() + { + @Override + public TaskResponseObject apply(TaskStatus taskStatus) + { + // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it. + return new TaskResponseObject( + taskStatus.getId(), + new DateTime(0), + new DateTime(0), + Optional.of(taskStatus) + ); + } + } + ); + return Response.ok(completeTasks).build(); + } + @GET @Path("/workers") @Produces("application/json") @@ -373,7 +403,39 @@ public class OverlordResource } } - public Response optionalTaskResponse(String taskid, String objectType, Optional x) + private Response workItemsResponse(final Function> fn) + { + return asLeaderWith( + taskMaster.getTaskRunner(), + new Function() + { + @Override + public Response apply(TaskRunner taskRunner) + { + return Response.ok( + Lists.transform( + Lists.newArrayList(fn.apply(taskRunner)), + new Function() + { + @Override + public TaskResponseObject apply(TaskRunnerWorkItem workItem) + { + return new TaskResponseObject( + workItem.getTaskId(), + workItem.getCreatedTime(), + workItem.getQueueInsertionTime(), + Optional.absent() + ); + } + } + ) + ).build(); + } + } + ); + } + + private Response optionalTaskResponse(String taskid, String objectType, Optional x) { final Map results = Maps.newHashMap(); results.put("task", taskid); @@ -385,7 +447,7 @@ public class OverlordResource } } - public Response asLeaderWith(Optional x, Function f) + private Response asLeaderWith(Optional x, Function f) { if (x.isPresent()) { return f.apply(x.get()); @@ -394,4 +456,62 @@ public class OverlordResource return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); } } + + private static class TaskResponseObject + { + private final String id; + private final DateTime createdTime; + private final DateTime queueInsertionTime; + private final Optional status; + + private TaskResponseObject( + String id, + DateTime createdTime, + DateTime queueInsertionTime, + Optional status + ) + { + this.id = id; + this.createdTime = createdTime; + this.queueInsertionTime = queueInsertionTime; + this.status = status; + } + + public String getId() + { + return id; + } + + public DateTime getCreatedTime() + { + return createdTime; + } + + public DateTime getQueueInsertionTime() + { + return queueInsertionTime; + } + + public Optional getStatus() + { + return status; + } + + @JsonValue + public Map toJson() + { + final Map data = Maps.newLinkedHashMap(); + data.put("id", id); + if (createdTime.getMillis() > 0) { + data.put("createdTime", createdTime); + } + if (queueInsertionTime.getMillis() > 0) { + data.put("queueInsertionTime", queueInsertionTime); + } + if (status.isPresent()) { + data.put("statusCode", status.get().getStatusCode().toString()); + } + return data; + } + } } diff --git a/indexing-service/src/main/resources/indexer_static/console.html b/indexing-service/src/main/resources/indexer_static/console.html index 6223eee6a1f..e782ff529ab 100644 --- a/indexing-service/src/main/resources/indexer_static/console.html +++ b/indexing-service/src/main/resources/indexer_static/console.html @@ -47,6 +47,14 @@
Loading Pending Tasks... this may take a few minutes
+

Waiting Tasks

+
Loading Waiting Tasks... this may take a few minutes
+
+ +

Complete Tasks

+
Loading Complete Tasks... this may take a few minutes
+
+

Workers

Loading Workers... this may take a few minutes
diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index e3ce86c85c9..adaa1fba83f 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -3,14 +3,39 @@ var oTable = []; $(document).ready(function() { + var augment = function(data) { + for (i = 0 ; i < data.length ; i++) { + var taskId = encodeURIComponent(data[i].id) + data[i].more = + 'payload' + + 'status' + + 'log (all)' + + 'log (last 8kb)' + } + } + $.get('/druid/indexer/v1/runningTasks', function(data) { $('.running_loading').hide(); - buildTable(data, $('#runningTable'), ["segments"]); + augment(data); + buildTable(data, $('#runningTable')); }); $.get('/druid/indexer/v1/pendingTasks', function(data) { $('.pending_loading').hide(); - buildTable(data, $('#pendingTable'), ["segments"]); + augment(data); + buildTable(data, $('#pendingTable')); + }); + + $.get('/druid/indexer/v1/waitingTasks', function(data) { + $('.waiting_loading').hide(); + augment(data); + buildTable(data, $('#waitingTable')); + }); + + $.get('/druid/indexer/v1/completeTasks', function(data) { + $('.complete_loading').hide(); + augment(data); + buildTable(data, $('#completeTable')); }); $.get('/druid/indexer/v1/workers', function(data) { @@ -22,4 +47,4 @@ $(document).ready(function() { $('.events_loading').hide(); buildTable(data, $('#eventTable')); }); -}); \ No newline at end of file +}); From e63c69dd57c33fe0fe169802e419b7b0cb5cfc8b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Dec 2013 12:27:45 -0800 Subject: [PATCH 2/5] TaskStorage: Return recently complete tasks in reverse chronological order --- .../indexing/overlord/DbTaskStorage.java | 2 +- .../overlord/HeapMemoryTaskStorage.java | 19 +++++++++++++++---- .../druid/indexing/overlord/TaskStorage.java | 4 ++-- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java index ff2d82d03cf..e39458ce0f1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java @@ -285,7 +285,7 @@ public class DbTaskStorage implements TaskStorage final List> dbTasks = handle.createQuery( String.format( - "SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date", + "SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC", dbTables.getTasksTable() ) ).bind("recent", recent.toString()).list(); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java index 79eb41f1fbc..4f2b72a0e63 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -19,6 +19,7 @@ package io.druid.indexing.overlord; +import com.google.api.client.util.Lists; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -26,6 +27,7 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; import com.metamx.common.logger.Logger; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; @@ -34,6 +36,7 @@ import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; import org.joda.time.Period; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; @@ -154,14 +157,22 @@ public class HeapMemoryTaskStorage implements TaskStorage giant.lock(); try { - final ImmutableList.Builder listBuilder = ImmutableList.builder(); + final List returns = Lists.newArrayList(); final long recent = System.currentTimeMillis() - RECENCY_THRESHOLD; - for(final TaskStuff taskStuff : tasks.values()) { + final Ordering createdDateDesc = new Ordering() + { + @Override + public int compare(TaskStuff a, TaskStuff b) + { + return a.getCreatedDate().compareTo(b.getCreatedDate()); + } + }.reverse(); + for(final TaskStuff taskStuff : createdDateDesc.sortedCopy(tasks.values())) { if(taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().getMillis() > recent) { - listBuilder.add(taskStuff.getStatus()); + returns.add(taskStuff.getStatus()); } } - return listBuilder.build(); + return returns; } finally { giant.unlock(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java index 191307fa949..fb289459256 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java @@ -84,8 +84,8 @@ public interface TaskStorage /** * Returns a list of recently finished task statuses as stored in the storage facility. No particular order - * is guaranteed. No particular standard of "recent" is guaranteed, and in fact, this method is permitted to - * simply return nothing. + * is guaranteed, but implementations are encouraged to return tasks in descending order of creation. No particular + * standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing. */ public List getRecentlyFinishedTaskStatuses(); From 52cdb20f106cbe1fd3064ba09e3f4203bd827268 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 13 Dec 2013 15:01:07 -0800 Subject: [PATCH 3/5] add better messaging and error handling --- .../Tutorial:-Loading-Your-Data-Part-1.md | 15 +++++---- .../overlord/http/OverlordResource.java | 3 ++ .../resources/indexer_static/console.html | 4 +-- .../firehose/LocalFirehoseFactory.java | 32 +++++++++++-------- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md index 2c56c81a839..40cd012bccd 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md @@ -94,6 +94,7 @@ druid.db.connector.user=druid druid.db.connector.password=diurd druid.selectors.indexing.serviceName=overlord +druid.indexer.queue.startDelay=PT0M druid.indexer.runner.javaOpts="-server -Xmx1g" druid.indexer.runner.startPort=8088 druid.indexer.fork.property.druid.computation.buffer.size=268435456 @@ -246,17 +247,19 @@ Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) should yield: } ] ``` -Problems? ---------- +Console +-------- -If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can read the individual task logs at: +The indexing service overlord has a console located as: ```bash -/log/.log - +localhost:8087/console.html ``` -One thing to note is that the log file will only exist once the task completes with either SUCCESS or FAILURE. +On this console, you can look at statuses and logs of recently submitted and completed tasks. + +If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can use the console to read the individual task logs. + Task logs can be stored locally or uploaded to [Deep Storage](Deep-Storage.html). More information about how to configure this is [here](Configuration.html). Most common data ingestion problems are around timestamp formats and other malformed data issues. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index c3a8ab1224b..a4ffeafb78c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -368,6 +368,9 @@ public class OverlordResource @Produces("application/json") public Response getScalingState() { + if (!taskMaster.getResourceManagementScheduler().isPresent()) { + return Response.ok().build(); + } return asLeaderWith( taskMaster.getResourceManagementScheduler(), new Function() diff --git a/indexing-service/src/main/resources/indexer_static/console.html b/indexing-service/src/main/resources/indexer_static/console.html index e782ff529ab..1a55daf7e95 100644 --- a/indexing-service/src/main/resources/indexer_static/console.html +++ b/indexing-service/src/main/resources/indexer_static/console.html @@ -59,8 +59,8 @@
Loading Workers... this may take a few minutes
-

Event Log

-
Loading Event Log... this may take a few minutes
+

Autoscaling Activity

+
Loading Autoscaling Activities... this may take a few minutes
diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index f78bc0ac390..df96aa45f5e 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.collect.Lists; +import com.metamx.common.ISE; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.impl.FileIteratingFirehose; @@ -78,21 +79,26 @@ public class LocalFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { - final LinkedList files = Lists.newLinkedList( - Arrays.asList( - baseDir.listFiles( - new FilenameFilter() - { - @Override - public boolean accept(File file, String name) - { - return name.contains(filter); - } - } - ) - ) + File[] foundFiles = baseDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File file, String name) + { + return name.contains(filter); + } + } ); + if (foundFiles == null || foundFiles.length == 0) { + throw new ISE("Found no files to ingest! Check your schema."); + } + + final LinkedList files = Lists.newLinkedList( + Arrays.asList(foundFiles) + ); + + return new FileIteratingFirehose( new Iterator() { From 4a8140be811b6986cd357a05a9d7102966214edd Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 13 Dec 2013 15:04:25 -0800 Subject: [PATCH 4/5] better messaging to console again --- .../src/main/resources/indexer_static/console.html | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/resources/indexer_static/console.html b/indexing-service/src/main/resources/indexer_static/console.html index 1a55daf7e95..f51383c72c0 100644 --- a/indexing-service/src/main/resources/indexer_static/console.html +++ b/indexing-service/src/main/resources/indexer_static/console.html @@ -43,11 +43,11 @@
Loading Running Tasks... this may take a few minutes
-

Pending Tasks

+

Pending Tasks - Tasks waiting to be assigned to a worker

Loading Pending Tasks... this may take a few minutes
-

Waiting Tasks

+

Waiting Tasks - Tasks waiting on locks

Loading Waiting Tasks... this may take a few minutes
@@ -55,7 +55,7 @@
Loading Complete Tasks... this may take a few minutes
-

Workers

+

Remote Workers

Loading Workers... this may take a few minutes
From 600dc7546f8ef7d625b208d570f27eaee1219824 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Dec 2013 16:02:54 -0800 Subject: [PATCH 5/5] Configurability of recency threshold --- .../common/config/TaskStorageConfig.java | 27 +++++++++++++++++++ .../indexing/overlord/DbTaskStorage.java | 14 +++++++--- .../overlord/HeapMemoryTaskStorage.java | 15 ++++++++--- .../overlord/http/OverlordResource.java | 17 ++++-------- .../indexing/overlord/TaskLifecycleTest.java | 9 ++++++- .../main/java/io/druid/cli/CliOverlord.java | 3 +++ 6 files changed, 65 insertions(+), 20 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java new file mode 100644 index 00000000000..fbbea09a8de --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java @@ -0,0 +1,27 @@ +package io.druid.indexing.common.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Duration; +import org.joda.time.Period; + +public class TaskStorageConfig +{ + @JsonProperty + private final Duration recentlyFinishedThreshold; + + @JsonCreator + public TaskStorageConfig( + @JsonProperty("recentlyFinishedThreshold") final Period recentlyFinishedThreshold + ) + { + this.recentlyFinishedThreshold = recentlyFinishedThreshold == null + ? new Period("PT24H").toStandardDuration() + : recentlyFinishedThreshold.toStandardDuration(); + } + + public Duration getRecentlyFinishedThreshold() + { + return recentlyFinishedThreshold; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java index e39458ce0f1..5dc6e1c6fff 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java @@ -40,6 +40,7 @@ import io.druid.db.DbTablesConfig; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; import org.joda.time.Period; @@ -64,17 +65,24 @@ public class DbTaskStorage implements TaskStorage private final DbConnector dbConnector; private final DbTablesConfig dbTables; private final IDBI dbi; + private final TaskStorageConfig config; - private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis(); private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); @Inject - public DbTaskStorage(ObjectMapper jsonMapper, DbConnector dbConnector, DbTablesConfig dbTables, IDBI dbi) + public DbTaskStorage( + final ObjectMapper jsonMapper, + final DbConnector dbConnector, + final DbTablesConfig dbTables, + final IDBI dbi, + final TaskStorageConfig config + ) { this.jsonMapper = jsonMapper; this.dbConnector = dbConnector; this.dbTables = dbTables; this.dbi = dbi; + this.config = config; } @LifecycleStart @@ -275,7 +283,7 @@ public class DbTaskStorage implements TaskStorage @Override public List getRecentlyFinishedTaskStatuses() { - final DateTime recent = new DateTime().minus(RECENCY_THRESHOLD); + final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold()); return retryingHandle( new HandleCallback>() { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java index 4f2b72a0e63..ef942e5c12f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -28,15 +28,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; -import org.joda.time.Period; -import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; @@ -47,14 +47,21 @@ import java.util.concurrent.locks.ReentrantLock; */ public class HeapMemoryTaskStorage implements TaskStorage { + private final TaskStorageConfig config; + private final ReentrantLock giant = new ReentrantLock(); private final Map tasks = Maps.newHashMap(); private final Multimap taskLocks = HashMultimap.create(); private final Multimap taskActions = ArrayListMultimap.create(); - private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis(); private static final Logger log = new Logger(HeapMemoryTaskStorage.class); + @Inject + public HeapMemoryTaskStorage(TaskStorageConfig config) + { + this.config = config; + } + @Override public void insert(Task task, TaskStatus status) { @@ -158,7 +165,7 @@ public class HeapMemoryTaskStorage implements TaskStorage try { final List returns = Lists.newArrayList(); - final long recent = System.currentTimeMillis() - RECENCY_THRESHOLD; + final long recent = System.currentTimeMillis() - config.getRecentlyFinishedThreshold().getMillis(); final Ordering createdDateDesc = new Ordering() { @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index a4ffeafb78c..f161cb3c278 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -368,20 +368,13 @@ public class OverlordResource @Produces("application/json") public Response getScalingState() { - if (!taskMaster.getResourceManagementScheduler().isPresent()) { + // Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler. + final Optional rms = taskMaster.getResourceManagementScheduler(); + if (rms.isPresent()) { + return Response.ok(rms.get().getStats()).build(); + } else { return Response.ok().build(); } - return asLeaderWith( - taskMaster.getResourceManagementScheduler(), - new Function() - { - @Override - public Response apply(ResourceManagementScheduler resourceManagementScheduler) - { - return Response.ok(resourceManagementScheduler.getStats()).build(); - } - } - ); } @GET diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 21c349a69c8..51f20e830a0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -54,6 +54,7 @@ import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.KillTask; @@ -75,7 +76,9 @@ import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -120,7 +123,11 @@ public class TaskLifecycleTest "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}", TaskQueueConfig.class ); - ts = new HeapMemoryTaskStorage(); + ts = new HeapMemoryTaskStorage( + new TaskStorageConfig(new Period("PT24H")) + { + } + ); tsqa = new TaskStorageQueryAdapter(ts); tl = new TaskLockbox(ts); mdc = newMockMDC(); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 9b869ce875e..166c6452eb9 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -45,6 +45,7 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; @@ -154,6 +155,8 @@ public class CliOverlord extends ServerRunnable private void configureTaskStorage(Binder binder) { + JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class); + PolyBind.createChoice( binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class) );