Indexing service API and GUI improvements!

- New APIs: waitingTasks, completeTasks, task payload
- GUI for the above, and for task logs + status
This commit is contained in:
Gian Merlino 2013-12-13 11:36:36 -08:00
parent 407f8caf94
commit 6c993d87bf
9 changed files with 341 additions and 83 deletions

View File

@ -34,7 +34,6 @@ import com.metamx.common.RetryUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.mysql.jdbc.exceptions.MySQLTimeoutException;
import com.mysql.jdbc.exceptions.MySQLTransientException;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
@ -43,6 +42,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
@ -65,6 +65,7 @@ public class DbTaskStorage implements TaskStorage
private final DbTablesConfig dbTables;
private final IDBI dbi;
private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis();
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
@Inject
@ -271,6 +272,45 @@ public class DbTaskStorage implements TaskStorage
);
}
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
final DateTime recent = new DateTime().minus(RECENCY_THRESHOLD);
return retryingHandle(
new HandleCallback<List<TaskStatus>>()
{
@Override
public List<TaskStatus> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date",
dbTables.getTasksTable()
)
).bind("recent", recent.toString()).list();
final ImmutableList.Builder<TaskStatus> statuses = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isComplete()) {
statuses.add(status);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit();
}
}
return statuses.build();
}
}
);
}
@Override
public void addLock(final String taskid, final TaskLock taskLock)
{
@ -407,7 +447,8 @@ public class DbTaskStorage implements TaskStorage
for (final Map<String, Object> dbTaskLog : dbTaskLogs) {
try {
retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
} catch (Exception e) {
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskid)
.addData("logPayload", dbTaskLog)
@ -451,7 +492,8 @@ public class DbTaskStorage implements TaskStorage
/**
* Retry SQL operations
*/
private <T> T retryingHandle(final HandleCallback<T> callback) {
private <T> T retryingHandle(final HandleCallback<T> callback)
{
final Callable<T> call = new Callable<T>()
{
@Override
@ -471,9 +513,11 @@ public class DbTaskStorage implements TaskStorage
final int maxTries = 10;
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
} catch (RuntimeException e) {
}
catch (RuntimeException e) {
throw Throwables.propagate(e);
} catch (Exception e) {
}
catch (Exception e) {
throw new CallbackFailedException(e);
}
}

View File

@ -391,7 +391,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
if (offset > 0) {
raf.seek(offset);
} else if (offset < 0 && offset < rafLength) {
raf.seek(rafLength + offset);
raf.seek(Math.max(0, rafLength + offset));
}
return Channels.newInputStream(raf.getChannel());
}

View File

@ -31,6 +31,8 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.Period;
import java.util.List;
import java.util.Map;
@ -47,6 +49,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
private final Multimap<String, TaskLock> taskLocks = HashMultimap.create();
private final Multimap<String, TaskAction> taskActions = ArrayListMultimap.create();
private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis();
private static final Logger log = new Logger(HeapMemoryTaskStorage.class);
@Override
@ -69,7 +72,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
log.info("Inserting task %s with status: %s", task.getId(), status);
tasks.put(task.getId(), new TaskStuff(task, status));
tasks.put(task.getId(), new TaskStuff(task, status, new DateTime()));
} finally {
giant.unlock();
}
@ -139,7 +142,25 @@ public class HeapMemoryTaskStorage implements TaskStorage
listBuilder.add(taskStuff.getTask());
}
}
return listBuilder.build();
} finally {
giant.unlock();
}
}
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
giant.lock();
try {
final ImmutableList.Builder<TaskStatus> listBuilder = ImmutableList.builder();
final long recent = System.currentTimeMillis() - RECENCY_THRESHOLD;
for(final TaskStuff taskStuff : tasks.values()) {
if(taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().getMillis() > recent) {
listBuilder.add(taskStuff.getStatus());
}
}
return listBuilder.build();
} finally {
giant.unlock();
@ -212,8 +233,9 @@ public class HeapMemoryTaskStorage implements TaskStorage
{
final Task task;
final TaskStatus status;
final DateTime createdDate;
private TaskStuff(Task task, TaskStatus status)
private TaskStuff(Task task, TaskStatus status, DateTime createdDate)
{
Preconditions.checkNotNull(task);
Preconditions.checkNotNull(status);
@ -221,6 +243,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
this.task = task;
this.status = status;
this.createdDate = Preconditions.checkNotNull(createdDate, "createdDate");
}
public Task getTask()
@ -233,9 +256,14 @@ public class HeapMemoryTaskStorage implements TaskStorage
return status;
}
public DateTime getCreatedDate()
{
return createdDate;
}
private TaskStuff withStatus(TaskStatus _status)
{
return new TaskStuff(task, _status);
return new TaskStuff(task, _status, createdDate);
}
}
}

View File

@ -19,6 +19,8 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexing.common.TaskStatus;
@ -56,21 +58,25 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
this.queueInsertionTime = queueInsertionTime;
}
@JsonProperty
public String getTaskId()
{
return taskId;
}
@JsonIgnore
public ListenableFuture<TaskStatus> getResult()
{
return result;
}
@JsonProperty
public DateTime getCreatedTime()
{
return createdTime;
}
@JsonProperty
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;

View File

@ -82,6 +82,13 @@ public interface TaskStorage
*/
public List<Task> getActiveTasks();
/**
* Returns a list of recently finished task statuses as stored in the storage facility. No particular order
* is guaranteed. No particular standard of "recent" is guaranteed, and in fact, this method is permitted to
* simply return nothing.
*/
public List<TaskStatus> getRecentlyFinishedTaskStatuses();
/**
* Returns a list of locks for a particular task.
*/

View File

@ -19,14 +19,19 @@
package io.druid.indexing.overlord;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
/**
@ -42,6 +47,21 @@ public class TaskStorageQueryAdapter
this.storage = storage;
}
public List<Task> getActiveTasks()
{
return storage.getActiveTasks();
}
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
return storage.getRecentlyFinishedTaskStatuses();
}
public Optional<Task> getTask(final String taskid)
{
return storage.getTask(taskid);
}
public Optional<TaskStatus> getStatus(final String taskid)
{
return storage.getStatus(taskid);

View File

@ -19,15 +19,20 @@
package io.druid.indexing.overlord.http;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
@ -40,6 +45,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
@ -52,6 +58,8 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@ -63,20 +71,6 @@ public class OverlordResource
{
private static final Logger log = new Logger(OverlordResource.class);
private static Function<TaskRunnerWorkItem, Map<String, Object>> simplifyTaskFn =
new Function<TaskRunnerWorkItem, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(TaskRunnerWorkItem input)
{
return new ImmutableMap.Builder<String, Object>()
.put("id", input.getTaskId())
.put("createdTime", input.getCreatedTime())
.put("queueInsertionTime", input.getQueueInsertionTime())
.build();
}
};
private final TaskMaster taskMaster;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogStreamer taskLogStreamer;
@ -139,6 +133,14 @@ public class OverlordResource
);
}
@GET
@Path("/task/{taskid}")
@Produces("application/json")
public Response getTaskPayload(@PathParam("taskid") String taskid)
{
return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid));
}
@GET
@Path("/task/{taskid}/status")
@Produces("application/json")
@ -238,39 +240,64 @@ public class OverlordResource
}
@GET
@Path("/pendingTasks")
@Path("/waitingTasks")
@Produces("application/json")
public Response getPendingTasks(
@QueryParam("full") String full
)
public Response getWaitingTasks()
{
if (full != null) {
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(taskRunner.getPendingTasks()).build();
}
}
);
}
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
return workItemsResponse(
new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>()
{
@Override
public Response apply(TaskRunner taskRunner)
public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
{
return Response.ok(
Collections2.transform(
taskRunner.getPendingTasks(),
simplifyTaskFn
// A bit roundabout, but works as a way of figuring out what tasks haven't been handed
// off to the runner yet:
final List<Task> activeTasks = taskStorageQueryAdapter.getActiveTasks();
final Set<String> runnersKnownTasks = Sets.newHashSet(
Iterables.transform(
taskRunner.getKnownTasks(),
new Function<TaskRunnerWorkItem, String>()
{
@Override
public String apply(final TaskRunnerWorkItem workItem)
{
return workItem.getTaskId();
}
}
)
).build();
);
final List<TaskRunnerWorkItem> waitingTasks = Lists.newArrayList();
for (final Task task : activeTasks) {
if (!runnersKnownTasks.contains(task.getId())) {
waitingTasks.add(
// Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it.
new TaskRunnerWorkItem(
task.getId(),
SettableFuture.<TaskStatus>create(),
new DateTime(0),
new DateTime(0)
)
);
}
}
return waitingTasks;
}
}
);
}
@GET
@Path("/pendingTasks")
@Produces("application/json")
public Response getPendingTasks()
{
return workItemsResponse(
new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>()
{
@Override
public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
{
return taskRunner.getPendingTasks();
}
}
);
@ -279,42 +306,45 @@ public class OverlordResource
@GET
@Path("/runningTasks")
@Produces("application/json")
public Response getRunningTasks(
@QueryParam("full") String full
)
public Response getRunningTasks()
{
if (full != null) {
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(taskRunner.getRunningTasks()).build();
}
}
);
}
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
return workItemsResponse(
new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>()
{
@Override
public Response apply(TaskRunner taskRunner)
public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
{
return Response.ok(
Collections2.transform(
taskRunner.getRunningTasks(),
simplifyTaskFn
)
).build();
return taskRunner.getRunningTasks();
}
}
);
}
@GET
@Path("/completeTasks")
@Produces("application/json")
public Response getCompleteTasks()
{
final List<TaskResponseObject> completeTasks = Lists.transform(
taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(),
new Function<TaskStatus, TaskResponseObject>()
{
@Override
public TaskResponseObject apply(TaskStatus taskStatus)
{
// Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it.
return new TaskResponseObject(
taskStatus.getId(),
new DateTime(0),
new DateTime(0),
Optional.of(taskStatus)
);
}
}
);
return Response.ok(completeTasks).build();
}
@GET
@Path("/workers")
@Produces("application/json")
@ -373,7 +403,39 @@ public class OverlordResource
}
}
public <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x)
private Response workItemsResponse(final Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>> fn)
{
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(
Lists.transform(
Lists.newArrayList(fn.apply(taskRunner)),
new Function<TaskRunnerWorkItem, TaskResponseObject>()
{
@Override
public TaskResponseObject apply(TaskRunnerWorkItem workItem)
{
return new TaskResponseObject(
workItem.getTaskId(),
workItem.getCreatedTime(),
workItem.getQueueInsertionTime(),
Optional.<TaskStatus>absent()
);
}
}
)
).build();
}
}
);
}
private <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x)
{
final Map<String, Object> results = Maps.newHashMap();
results.put("task", taskid);
@ -385,7 +447,7 @@ public class OverlordResource
}
}
public <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
private <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
{
if (x.isPresent()) {
return f.apply(x.get());
@ -394,4 +456,62 @@ public class OverlordResource
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
}
private static class TaskResponseObject
{
private final String id;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
private final Optional<TaskStatus> status;
private TaskResponseObject(
String id,
DateTime createdTime,
DateTime queueInsertionTime,
Optional<TaskStatus> status
)
{
this.id = id;
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
this.status = status;
}
public String getId()
{
return id;
}
public DateTime getCreatedTime()
{
return createdTime;
}
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;
}
public Optional<TaskStatus> getStatus()
{
return status;
}
@JsonValue
public Map<String, Object> toJson()
{
final Map<String, Object> data = Maps.newLinkedHashMap();
data.put("id", id);
if (createdTime.getMillis() > 0) {
data.put("createdTime", createdTime);
}
if (queueInsertionTime.getMillis() > 0) {
data.put("queueInsertionTime", queueInsertionTime);
}
if (status.isPresent()) {
data.put("statusCode", status.get().getStatusCode().toString());
}
return data;
}
}
}

View File

@ -47,6 +47,14 @@
<div class="pending_loading">Loading Pending Tasks... this may take a few minutes</div>
<table id="pendingTable"></table>
<h2>Waiting Tasks</h2>
<div class="waiting_loading">Loading Waiting Tasks... this may take a few minutes</div>
<table id="waitingTable"></table>
<h2>Complete Tasks</h2>
<div class="complete_loading">Loading Complete Tasks... this may take a few minutes</div>
<table id="completeTable"></table>
<h2>Workers</h2>
<div class="workers_loading">Loading Workers... this may take a few minutes</div>
<table id="workerTable"></table>

View File

@ -3,14 +3,39 @@
var oTable = [];
$(document).ready(function() {
var augment = function(data) {
for (i = 0 ; i < data.length ; i++) {
var taskId = encodeURIComponent(data[i].id)
data[i].more =
'<a href="/druid/indexer/v1/task/' + taskId + '">payload</a>' +
'<a href="/druid/indexer/v1/task/' + taskId + '/status">status</a>' +
'<a href="/druid/indexer/v1/task/' + taskId + '/log">log (all)</a>' +
'<a href="/druid/indexer/v1/task/' + taskId + '/log?offset=-8192">log (last 8kb)</a>'
}
}
$.get('/druid/indexer/v1/runningTasks', function(data) {
$('.running_loading').hide();
buildTable(data, $('#runningTable'), ["segments"]);
augment(data);
buildTable(data, $('#runningTable'));
});
$.get('/druid/indexer/v1/pendingTasks', function(data) {
$('.pending_loading').hide();
buildTable(data, $('#pendingTable'), ["segments"]);
augment(data);
buildTable(data, $('#pendingTable'));
});
$.get('/druid/indexer/v1/waitingTasks', function(data) {
$('.waiting_loading').hide();
augment(data);
buildTable(data, $('#waitingTable'));
});
$.get('/druid/indexer/v1/completeTasks', function(data) {
$('.complete_loading').hide();
augment(data);
buildTable(data, $('#completeTable'));
});
$.get('/druid/indexer/v1/workers', function(data) {
@ -22,4 +47,4 @@ $(document).ready(function() {
$('.events_loading').hide();
buildTable(data, $('#eventTable'));
});
});
});