Merge pull request #327 from metamx/index-service-api-gui

Indexing service API and GUI improvements!
This commit is contained in:
fjy 2013-12-13 16:17:10 -08:00
commit 196d0ac614
14 changed files with 446 additions and 120 deletions

View File

@ -94,6 +94,7 @@ druid.db.connector.user=druid
druid.db.connector.password=diurd druid.db.connector.password=diurd
druid.selectors.indexing.serviceName=overlord druid.selectors.indexing.serviceName=overlord
druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx1g" druid.indexer.runner.javaOpts="-server -Xmx1g"
druid.indexer.runner.startPort=8088 druid.indexer.runner.startPort=8088
druid.indexer.fork.property.druid.computation.buffer.size=268435456 druid.indexer.fork.property.druid.computation.buffer.size=268435456
@ -246,17 +247,19 @@ Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) should yield:
} ] } ]
``` ```
Problems? Console
--------- --------
If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can read the individual task logs at: The indexing service overlord has a console located as:
```bash ```bash
<Current working directory>/log/<task_id>.log localhost:8087/console.html
``` ```
One thing to note is that the log file will only exist once the task completes with either SUCCESS or FAILURE. On this console, you can look at statuses and logs of recently submitted and completed tasks.
If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can use the console to read the individual task logs.
Task logs can be stored locally or uploaded to [Deep Storage](Deep-Storage.html). More information about how to configure this is [here](Configuration.html). Task logs can be stored locally or uploaded to [Deep Storage](Deep-Storage.html). More information about how to configure this is [here](Configuration.html).
Most common data ingestion problems are around timestamp formats and other malformed data issues. Most common data ingestion problems are around timestamp formats and other malformed data issues.

View File

@ -0,0 +1,27 @@
package io.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import org.joda.time.Period;
public class TaskStorageConfig
{
@JsonProperty
private final Duration recentlyFinishedThreshold;
@JsonCreator
public TaskStorageConfig(
@JsonProperty("recentlyFinishedThreshold") final Period recentlyFinishedThreshold
)
{
this.recentlyFinishedThreshold = recentlyFinishedThreshold == null
? new Period("PT24H").toStandardDuration()
: recentlyFinishedThreshold.toStandardDuration();
}
public Duration getRecentlyFinishedThreshold()
{
return recentlyFinishedThreshold;
}
}

View File

@ -34,15 +34,16 @@ import com.metamx.common.RetryUtils;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.mysql.jdbc.exceptions.MySQLTimeoutException;
import com.mysql.jdbc.exceptions.MySQLTransientException; import com.mysql.jdbc.exceptions.MySQLTransientException;
import io.druid.db.DbConnector; import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig; import io.druid.db.DbTablesConfig;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Period;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.exceptions.CallbackFailedException;
@ -64,16 +65,24 @@ public class DbTaskStorage implements TaskStorage
private final DbConnector dbConnector; private final DbConnector dbConnector;
private final DbTablesConfig dbTables; private final DbTablesConfig dbTables;
private final IDBI dbi; private final IDBI dbi;
private final TaskStorageConfig config;
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
@Inject @Inject
public DbTaskStorage(ObjectMapper jsonMapper, DbConnector dbConnector, DbTablesConfig dbTables, IDBI dbi) public DbTaskStorage(
final ObjectMapper jsonMapper,
final DbConnector dbConnector,
final DbTablesConfig dbTables,
final IDBI dbi,
final TaskStorageConfig config
)
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.dbConnector = dbConnector; this.dbConnector = dbConnector;
this.dbTables = dbTables; this.dbTables = dbTables;
this.dbi = dbi; this.dbi = dbi;
this.config = config;
} }
@LifecycleStart @LifecycleStart
@ -271,6 +280,45 @@ public class DbTaskStorage implements TaskStorage
); );
} }
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold());
return retryingHandle(
new HandleCallback<List<TaskStatus>>()
{
@Override
public List<TaskStatus> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC",
dbTables.getTasksTable()
)
).bind("recent", recent.toString()).list();
final ImmutableList.Builder<TaskStatus> statuses = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isComplete()) {
statuses.add(status);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit();
}
}
return statuses.build();
}
}
);
}
@Override @Override
public void addLock(final String taskid, final TaskLock taskLock) public void addLock(final String taskid, final TaskLock taskLock)
{ {
@ -407,7 +455,8 @@ public class DbTaskStorage implements TaskStorage
for (final Map<String, Object> dbTaskLog : dbTaskLogs) { for (final Map<String, Object> dbTaskLog : dbTaskLogs) {
try { try {
retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class)); retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
} catch (Exception e) { }
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLog") log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskid) .addData("task", taskid)
.addData("logPayload", dbTaskLog) .addData("logPayload", dbTaskLog)
@ -451,7 +500,8 @@ public class DbTaskStorage implements TaskStorage
/** /**
* Retry SQL operations * Retry SQL operations
*/ */
private <T> T retryingHandle(final HandleCallback<T> callback) { private <T> T retryingHandle(final HandleCallback<T> callback)
{
final Callable<T> call = new Callable<T>() final Callable<T> call = new Callable<T>()
{ {
@Override @Override
@ -471,9 +521,11 @@ public class DbTaskStorage implements TaskStorage
final int maxTries = 10; final int maxTries = 10;
try { try {
return RetryUtils.retry(call, shouldRetry, maxTries); return RetryUtils.retry(call, shouldRetry, maxTries);
} catch (RuntimeException e) { }
catch (RuntimeException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} catch (Exception e) { }
catch (Exception e) {
throw new CallbackFailedException(e); throw new CallbackFailedException(e);
} }
} }

View File

@ -391,7 +391,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
if (offset > 0) { if (offset > 0) {
raf.seek(offset); raf.seek(offset);
} else if (offset < 0 && offset < rafLength) { } else if (offset < 0 && offset < rafLength) {
raf.seek(rafLength + offset); raf.seek(Math.max(0, rafLength + offset));
} }
return Channels.newInputStream(raf.getChannel()); return Channels.newInputStream(raf.getChannel());
} }

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.api.client.util.Lists;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
@ -26,11 +27,15 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -42,6 +47,8 @@ import java.util.concurrent.locks.ReentrantLock;
*/ */
public class HeapMemoryTaskStorage implements TaskStorage public class HeapMemoryTaskStorage implements TaskStorage
{ {
private final TaskStorageConfig config;
private final ReentrantLock giant = new ReentrantLock(); private final ReentrantLock giant = new ReentrantLock();
private final Map<String, TaskStuff> tasks = Maps.newHashMap(); private final Map<String, TaskStuff> tasks = Maps.newHashMap();
private final Multimap<String, TaskLock> taskLocks = HashMultimap.create(); private final Multimap<String, TaskLock> taskLocks = HashMultimap.create();
@ -49,6 +56,12 @@ public class HeapMemoryTaskStorage implements TaskStorage
private static final Logger log = new Logger(HeapMemoryTaskStorage.class); private static final Logger log = new Logger(HeapMemoryTaskStorage.class);
@Inject
public HeapMemoryTaskStorage(TaskStorageConfig config)
{
this.config = config;
}
@Override @Override
public void insert(Task task, TaskStatus status) public void insert(Task task, TaskStatus status)
{ {
@ -69,7 +82,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
} }
log.info("Inserting task %s with status: %s", task.getId(), status); log.info("Inserting task %s with status: %s", task.getId(), status);
tasks.put(task.getId(), new TaskStuff(task, status)); tasks.put(task.getId(), new TaskStuff(task, status, new DateTime()));
} finally { } finally {
giant.unlock(); giant.unlock();
} }
@ -139,13 +152,39 @@ public class HeapMemoryTaskStorage implements TaskStorage
listBuilder.add(taskStuff.getTask()); listBuilder.add(taskStuff.getTask());
} }
} }
return listBuilder.build(); return listBuilder.build();
} finally { } finally {
giant.unlock(); giant.unlock();
} }
} }
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
giant.lock();
try {
final List<TaskStatus> returns = Lists.newArrayList();
final long recent = System.currentTimeMillis() - config.getRecentlyFinishedThreshold().getMillis();
final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
{
@Override
public int compare(TaskStuff a, TaskStuff b)
{
return a.getCreatedDate().compareTo(b.getCreatedDate());
}
}.reverse();
for(final TaskStuff taskStuff : createdDateDesc.sortedCopy(tasks.values())) {
if(taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().getMillis() > recent) {
returns.add(taskStuff.getStatus());
}
}
return returns;
} finally {
giant.unlock();
}
}
@Override @Override
public void addLock(final String taskid, final TaskLock taskLock) public void addLock(final String taskid, final TaskLock taskLock)
{ {
@ -212,8 +251,9 @@ public class HeapMemoryTaskStorage implements TaskStorage
{ {
final Task task; final Task task;
final TaskStatus status; final TaskStatus status;
final DateTime createdDate;
private TaskStuff(Task task, TaskStatus status) private TaskStuff(Task task, TaskStatus status, DateTime createdDate)
{ {
Preconditions.checkNotNull(task); Preconditions.checkNotNull(task);
Preconditions.checkNotNull(status); Preconditions.checkNotNull(status);
@ -221,6 +261,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
this.task = task; this.task = task;
this.status = status; this.status = status;
this.createdDate = Preconditions.checkNotNull(createdDate, "createdDate");
} }
public Task getTask() public Task getTask()
@ -233,9 +274,14 @@ public class HeapMemoryTaskStorage implements TaskStorage
return status; return status;
} }
public DateTime getCreatedDate()
{
return createdDate;
}
private TaskStuff withStatus(TaskStatus _status) private TaskStuff withStatus(TaskStatus _status)
{ {
return new TaskStuff(task, _status); return new TaskStuff(task, _status, createdDate);
} }
} }
} }

View File

@ -19,6 +19,8 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
@ -56,21 +58,25 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
this.queueInsertionTime = queueInsertionTime; this.queueInsertionTime = queueInsertionTime;
} }
@JsonProperty
public String getTaskId() public String getTaskId()
{ {
return taskId; return taskId;
} }
@JsonIgnore
public ListenableFuture<TaskStatus> getResult() public ListenableFuture<TaskStatus> getResult()
{ {
return result; return result;
} }
@JsonProperty
public DateTime getCreatedTime() public DateTime getCreatedTime()
{ {
return createdTime; return createdTime;
} }
@JsonProperty
public DateTime getQueueInsertionTime() public DateTime getQueueInsertionTime()
{ {
return queueInsertionTime; return queueInsertionTime;

View File

@ -82,6 +82,13 @@ public interface TaskStorage
*/ */
public List<Task> getActiveTasks(); public List<Task> getActiveTasks();
/**
* Returns a list of recently finished task statuses as stored in the storage facility. No particular order
* is guaranteed, but implementations are encouraged to return tasks in descending order of creation. No particular
* standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing.
*/
public List<TaskStatus> getRecentlyFinishedTaskStatuses();
/** /**
* Returns a list of locks for a particular task. * Returns a list of locks for a particular task.
*/ */

View File

@ -19,14 +19,19 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
@ -42,6 +47,21 @@ public class TaskStorageQueryAdapter
this.storage = storage; this.storage = storage;
} }
public List<Task> getActiveTasks()
{
return storage.getActiveTasks();
}
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
return storage.getRecentlyFinishedTaskStatuses();
}
public Optional<Task> getTask(final String taskid)
{
return storage.getTask(taskid);
}
public Optional<TaskStatus> getStatus(final String taskid) public Optional<TaskStatus> getStatus(final String taskid)
{ {
return storage.getStatus(taskid); return storage.getStatus(taskid);

View File

@ -19,15 +19,20 @@
package io.druid.indexing.overlord.http; package io.druid.indexing.overlord.http;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.common.config.JacksonConfigManager; import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
@ -40,6 +45,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer; import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue; import javax.ws.rs.DefaultValue;
@ -52,6 +58,8 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -63,20 +71,6 @@ public class OverlordResource
{ {
private static final Logger log = new Logger(OverlordResource.class); private static final Logger log = new Logger(OverlordResource.class);
private static Function<TaskRunnerWorkItem, Map<String, Object>> simplifyTaskFn =
new Function<TaskRunnerWorkItem, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(TaskRunnerWorkItem input)
{
return new ImmutableMap.Builder<String, Object>()
.put("id", input.getTaskId())
.put("createdTime", input.getCreatedTime())
.put("queueInsertionTime", input.getQueueInsertionTime())
.build();
}
};
private final TaskMaster taskMaster; private final TaskMaster taskMaster;
private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogStreamer taskLogStreamer; private final TaskLogStreamer taskLogStreamer;
@ -139,6 +133,14 @@ public class OverlordResource
); );
} }
@GET
@Path("/task/{taskid}")
@Produces("application/json")
public Response getTaskPayload(@PathParam("taskid") String taskid)
{
return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid));
}
@GET @GET
@Path("/task/{taskid}/status") @Path("/task/{taskid}/status")
@Produces("application/json") @Produces("application/json")
@ -238,39 +240,64 @@ public class OverlordResource
} }
@GET @GET
@Path("/pendingTasks") @Path("/waitingTasks")
@Produces("application/json") @Produces("application/json")
public Response getPendingTasks( public Response getWaitingTasks()
@QueryParam("full") String full
)
{ {
if (full != null) { return workItemsResponse(
return asLeaderWith( new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>()
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{ {
@Override @Override
public Response apply(TaskRunner taskRunner) public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
{ {
return Response.ok(taskRunner.getPendingTasks()).build(); // A bit roundabout, but works as a way of figuring out what tasks haven't been handed
// off to the runner yet:
final List<Task> activeTasks = taskStorageQueryAdapter.getActiveTasks();
final Set<String> runnersKnownTasks = Sets.newHashSet(
Iterables.transform(
taskRunner.getKnownTasks(),
new Function<TaskRunnerWorkItem, String>()
{
@Override
public String apply(final TaskRunnerWorkItem workItem)
{
return workItem.getTaskId();
}
}
)
);
final List<TaskRunnerWorkItem> waitingTasks = Lists.newArrayList();
for (final Task task : activeTasks) {
if (!runnersKnownTasks.contains(task.getId())) {
waitingTasks.add(
// Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it.
new TaskRunnerWorkItem(
task.getId(),
SettableFuture.<TaskStatus>create(),
new DateTime(0),
new DateTime(0)
)
);
}
}
return waitingTasks;
} }
} }
); );
} }
return asLeaderWith( @GET
taskMaster.getTaskRunner(), @Path("/pendingTasks")
new Function<TaskRunner, Response>() @Produces("application/json")
public Response getPendingTasks()
{
return workItemsResponse(
new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>()
{ {
@Override @Override
public Response apply(TaskRunner taskRunner) public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
{ {
return Response.ok( return taskRunner.getPendingTasks();
Collections2.transform(
taskRunner.getPendingTasks(),
simplifyTaskFn
)
).build();
} }
} }
); );
@ -279,40 +306,43 @@ public class OverlordResource
@GET @GET
@Path("/runningTasks") @Path("/runningTasks")
@Produces("application/json") @Produces("application/json")
public Response getRunningTasks( public Response getRunningTasks()
@QueryParam("full") String full
)
{ {
if (full != null) { return workItemsResponse(
return asLeaderWith( new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>()
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{ {
@Override @Override
public Response apply(TaskRunner taskRunner) public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
{ {
return Response.ok(taskRunner.getRunningTasks()).build(); return taskRunner.getRunningTasks();
} }
} }
); );
} }
return asLeaderWith( @GET
taskMaster.getTaskRunner(), @Path("/completeTasks")
new Function<TaskRunner, Response>() @Produces("application/json")
public Response getCompleteTasks()
{
final List<TaskResponseObject> completeTasks = Lists.transform(
taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(),
new Function<TaskStatus, TaskResponseObject>()
{ {
@Override @Override
public Response apply(TaskRunner taskRunner) public TaskResponseObject apply(TaskStatus taskStatus)
{ {
return Response.ok( // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it.
Collections2.transform( return new TaskResponseObject(
taskRunner.getRunningTasks(), taskStatus.getId(),
simplifyTaskFn new DateTime(0),
) new DateTime(0),
).build(); Optional.of(taskStatus)
);
} }
} }
); );
return Response.ok(completeTasks).build();
} }
@GET @GET
@ -338,18 +368,14 @@ public class OverlordResource
@Produces("application/json") @Produces("application/json")
public Response getScalingState() public Response getScalingState()
{ {
return asLeaderWith( // Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler.
taskMaster.getResourceManagementScheduler(), final Optional<ResourceManagementScheduler> rms = taskMaster.getResourceManagementScheduler();
new Function<ResourceManagementScheduler, Response>() if (rms.isPresent()) {
{ return Response.ok(rms.get().getStats()).build();
@Override } else {
public Response apply(ResourceManagementScheduler resourceManagementScheduler) return Response.ok().build();
{
return Response.ok(resourceManagementScheduler.getStats()).build();
} }
} }
);
}
@GET @GET
@Path("/task/{taskid}/log") @Path("/task/{taskid}/log")
@ -373,7 +399,39 @@ public class OverlordResource
} }
} }
public <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x) private Response workItemsResponse(final Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>> fn)
{
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(
Lists.transform(
Lists.newArrayList(fn.apply(taskRunner)),
new Function<TaskRunnerWorkItem, TaskResponseObject>()
{
@Override
public TaskResponseObject apply(TaskRunnerWorkItem workItem)
{
return new TaskResponseObject(
workItem.getTaskId(),
workItem.getCreatedTime(),
workItem.getQueueInsertionTime(),
Optional.<TaskStatus>absent()
);
}
}
)
).build();
}
}
);
}
private <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x)
{ {
final Map<String, Object> results = Maps.newHashMap(); final Map<String, Object> results = Maps.newHashMap();
results.put("task", taskid); results.put("task", taskid);
@ -385,7 +443,7 @@ public class OverlordResource
} }
} }
public <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f) private <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
{ {
if (x.isPresent()) { if (x.isPresent()) {
return f.apply(x.get()); return f.apply(x.get());
@ -394,4 +452,62 @@ public class OverlordResource
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
} }
} }
private static class TaskResponseObject
{
private final String id;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
private final Optional<TaskStatus> status;
private TaskResponseObject(
String id,
DateTime createdTime,
DateTime queueInsertionTime,
Optional<TaskStatus> status
)
{
this.id = id;
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
this.status = status;
}
public String getId()
{
return id;
}
public DateTime getCreatedTime()
{
return createdTime;
}
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;
}
public Optional<TaskStatus> getStatus()
{
return status;
}
@JsonValue
public Map<String, Object> toJson()
{
final Map<String, Object> data = Maps.newLinkedHashMap();
data.put("id", id);
if (createdTime.getMillis() > 0) {
data.put("createdTime", createdTime);
}
if (queueInsertionTime.getMillis() > 0) {
data.put("queueInsertionTime", queueInsertionTime);
}
if (status.isPresent()) {
data.put("statusCode", status.get().getStatusCode().toString());
}
return data;
}
}
} }

View File

@ -43,16 +43,24 @@
<div class="running_loading">Loading Running Tasks... this may take a few minutes</div> <div class="running_loading">Loading Running Tasks... this may take a few minutes</div>
<table id="runningTable"></table> <table id="runningTable"></table>
<h2>Pending Tasks</h2> <h2>Pending Tasks - Tasks waiting to be assigned to a worker</h2>
<div class="pending_loading">Loading Pending Tasks... this may take a few minutes</div> <div class="pending_loading">Loading Pending Tasks... this may take a few minutes</div>
<table id="pendingTable"></table> <table id="pendingTable"></table>
<h2>Workers</h2> <h2>Waiting Tasks - Tasks waiting on locks</h2>
<div class="waiting_loading">Loading Waiting Tasks... this may take a few minutes</div>
<table id="waitingTable"></table>
<h2>Complete Tasks</h2>
<div class="complete_loading">Loading Complete Tasks... this may take a few minutes</div>
<table id="completeTable"></table>
<h2>Remote Workers</h2>
<div class="workers_loading">Loading Workers... this may take a few minutes</div> <div class="workers_loading">Loading Workers... this may take a few minutes</div>
<table id="workerTable"></table> <table id="workerTable"></table>
<h2>Event Log</h2> <h2>Autoscaling Activity</h2>
<div class="events_loading">Loading Event Log... this may take a few minutes</div> <div class="events_loading">Loading Autoscaling Activities... this may take a few minutes</div>
<table id="eventTable"></table> <table id="eventTable"></table>
</div> </div>
</body> </body>

View File

@ -3,14 +3,39 @@
var oTable = []; var oTable = [];
$(document).ready(function() { $(document).ready(function() {
var augment = function(data) {
for (i = 0 ; i < data.length ; i++) {
var taskId = encodeURIComponent(data[i].id)
data[i].more =
'<a href="/druid/indexer/v1/task/' + taskId + '">payload</a>' +
'<a href="/druid/indexer/v1/task/' + taskId + '/status">status</a>' +
'<a href="/druid/indexer/v1/task/' + taskId + '/log">log (all)</a>' +
'<a href="/druid/indexer/v1/task/' + taskId + '/log?offset=-8192">log (last 8kb)</a>'
}
}
$.get('/druid/indexer/v1/runningTasks', function(data) { $.get('/druid/indexer/v1/runningTasks', function(data) {
$('.running_loading').hide(); $('.running_loading').hide();
buildTable(data, $('#runningTable'), ["segments"]); augment(data);
buildTable(data, $('#runningTable'));
}); });
$.get('/druid/indexer/v1/pendingTasks', function(data) { $.get('/druid/indexer/v1/pendingTasks', function(data) {
$('.pending_loading').hide(); $('.pending_loading').hide();
buildTable(data, $('#pendingTable'), ["segments"]); augment(data);
buildTable(data, $('#pendingTable'));
});
$.get('/druid/indexer/v1/waitingTasks', function(data) {
$('.waiting_loading').hide();
augment(data);
buildTable(data, $('#waitingTable'));
});
$.get('/druid/indexer/v1/completeTasks', function(data) {
$('.complete_loading').hide();
augment(data);
buildTable(data, $('#completeTable'));
}); });
$.get('/druid/indexer/v1/workers', function(data) { $.get('/druid/indexer/v1/workers', function(data) {

View File

@ -54,6 +54,7 @@ import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.AbstractFixedIntervalTask;
import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.KillTask;
@ -75,7 +76,9 @@ import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -120,7 +123,11 @@ public class TaskLifecycleTest
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}", "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}",
TaskQueueConfig.class TaskQueueConfig.class
); );
ts = new HeapMemoryTaskStorage(); ts = new HeapMemoryTaskStorage(
new TaskStorageConfig(new Period("PT24H"))
{
}
);
tsqa = new TaskStorageQueryAdapter(ts); tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts); tl = new TaskLockbox(ts);
mdc = newMockMDC(); mdc = newMockMDC();

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.FileIteratingFirehose; import io.druid.data.input.impl.FileIteratingFirehose;
@ -78,9 +79,7 @@ public class LocalFirehoseFactory implements FirehoseFactory
@Override @Override
public Firehose connect() throws IOException public Firehose connect() throws IOException
{ {
final LinkedList<File> files = Lists.<File>newLinkedList( File[] foundFiles = baseDir.listFiles(
Arrays.<File>asList(
baseDir.listFiles(
new FilenameFilter() new FilenameFilter()
{ {
@Override @Override
@ -89,10 +88,17 @@ public class LocalFirehoseFactory implements FirehoseFactory
return name.contains(filter); return name.contains(filter);
} }
} }
)
)
); );
if (foundFiles == null || foundFiles.length == 0) {
throw new ISE("Found no files to ingest! Check your schema.");
}
final LinkedList<File> files = Lists.<File>newLinkedList(
Arrays.asList(foundFiles)
);
return new FileIteratingFirehose( return new FileIteratingFirehose(
new Iterator<LineIterator>() new Iterator<LineIterator>()
{ {

View File

@ -45,6 +45,7 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
@ -154,6 +155,8 @@ public class CliOverlord extends ServerRunnable
private void configureTaskStorage(Binder binder) private void configureTaskStorage(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
PolyBind.createChoice( PolyBind.createChoice(
binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class) binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class)
); );