mirror of https://github.com/apache/druid.git
Missing Overlord and MiddleManager api docs (#7042)
* document middle manager api * re-arrange * correction * document more missing overlord api calls, minor re-arrange of some code i was referencing * fix it * this will fix it * fixup * link to other docs
This commit is contained in:
parent
80a2ef7be4
commit
cadb6c5280
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.java.util.common.RE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -31,8 +30,6 @@ import java.util.Objects;
|
|||
|
||||
public class TaskStatusPlus
|
||||
{
|
||||
private static final Logger log = new Logger(TaskStatusPlus.class);
|
||||
|
||||
private final String id;
|
||||
private final String type;
|
||||
private final DateTime createdTime;
|
||||
|
@ -74,7 +71,6 @@ public class TaskStatusPlus
|
|||
);
|
||||
}
|
||||
|
||||
|
||||
@JsonCreator
|
||||
public TaskStatusPlus(
|
||||
@JsonProperty("id") String id,
|
||||
|
|
|
@ -143,14 +143,17 @@ Returns full segment metadata for a specific segment as stored in the metadata s
|
|||
|
||||
* `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments`
|
||||
|
||||
Returns a list of all segments, overlapping with any of given intervals, for a datasource as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
|
||||
Returns a list of all segments, overlapping with any of given intervals, for a datasource as stored in the metadata store. Request body is array of string IS0 8601 intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
|
||||
|
||||
* `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments?full`
|
||||
|
||||
Returns a list of all segments, overlapping with any of given intervals, for a datasource with the full segment metadata as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
|
||||
Returns a list of all segments, overlapping with any of given intervals, for a datasource with the full segment metadata as stored in the metadata store. Request body is array of string ISO 8601 intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
|
||||
|
||||
#### Datasources
|
||||
|
||||
Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/`
|
||||
(e.g., 2016-06-27_2016-06-28).
|
||||
|
||||
##### GET
|
||||
|
||||
* `/druid/coordinator/v1/datasources`
|
||||
|
@ -187,7 +190,7 @@ Returns a map of an interval to a map of segment metadata to a set of server nam
|
|||
|
||||
* `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}`
|
||||
|
||||
Returns a set of segment ids for an ISO8601 interval. Note that {interval} parameters are delimited by a `_` instead of a `/` (e.g., 2016-06-27_2016-06-28).
|
||||
Returns a set of segment ids for an interval.
|
||||
|
||||
* `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}?simple`
|
||||
|
||||
|
@ -234,18 +237,19 @@ Enables a segment of a datasource.
|
|||
Disables a datasource.
|
||||
|
||||
* `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}`
|
||||
* `@Deprecated. /druid/coordinator/v1/datasources/{dataSourceName}?kill=true&interval={myISO8601Interval}`
|
||||
* `@Deprecated. /druid/coordinator/v1/datasources/{dataSourceName}?kill=true&interval={myInterval}`
|
||||
|
||||
Runs a [Kill task](../ingestion/tasks.html) for a given interval and datasource.
|
||||
|
||||
Note that {interval} parameters are delimited by a `_` instead of a `/` (e.g., 2016-06-27_2016-06-28).
|
||||
|
||||
* `/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}`
|
||||
|
||||
Disables a segment.
|
||||
|
||||
#### Retention Rules
|
||||
|
||||
Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/`
|
||||
(e.g., 2016-06-27_2016-06-28).
|
||||
|
||||
##### GET
|
||||
|
||||
* `/druid/coordinator/v1/rules`
|
||||
|
@ -292,9 +296,10 @@ Optional Header Parameters for auditing the config change can also be specified.
|
|||
|
||||
#### Intervals
|
||||
|
||||
##### GET
|
||||
Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/`
|
||||
(e.g., 2016-06-27_2016-06-28).
|
||||
|
||||
Note that {interval} parameters are delimited by a `_` instead of a `/` (e.g., 2016-06-27_2016-06-28).
|
||||
##### GET
|
||||
|
||||
* `/druid/coordinator/v1/intervals`
|
||||
|
||||
|
@ -338,6 +343,7 @@ will be set for them.
|
|||
|
||||
Creates or updates the compaction config for a dataSource. See [Compaction Configuration](../configuration/index.html#compaction-dynamic-configuration) for configuration details.
|
||||
|
||||
|
||||
##### DELETE
|
||||
|
||||
* `/druid/coordinator/v1/config/compaction/{dataSource}`
|
||||
|
@ -357,12 +363,12 @@ ports.
|
|||
* `/druid/coordinator/v1/servers?simple`
|
||||
|
||||
Returns a list of server data objects in which each object has the following keys:
|
||||
- `host`: host URL include (`{hostname}:{port}`)
|
||||
- `type`: node type (`indexer-executor`, `historical`)
|
||||
- `currSize`: storage size currently used
|
||||
- `maxSize`: maximum storage size
|
||||
- `priority`
|
||||
- `tier`
|
||||
* `host`: host URL include (`{hostname}:{port}`)
|
||||
* `type`: node type (`indexer-executor`, `historical`)
|
||||
* `currSize`: storage size currently used
|
||||
* `maxSize`: maximum storage size
|
||||
* `priority`
|
||||
* `tier`
|
||||
|
||||
### Overlord
|
||||
|
||||
|
@ -382,8 +388,44 @@ only want the active leader to be considered in-service at the load balancer.
|
|||
|
||||
#### Tasks<a name="overlord-tasks"></a>
|
||||
|
||||
Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/`
|
||||
(e.g., 2016-06-27_2016-06-28).
|
||||
|
||||
##### GET
|
||||
|
||||
* `/druid/indexer/v1/tasks`
|
||||
|
||||
Retrieve list of tasks. Accepts query string parameters `state`, `datasource`, `createdTimeInterval`, `max`, and `type`.
|
||||
|
||||
|Query Parameter |Description |
|
||||
|---|---|
|
||||
|`state`|filter list of tasks by task state, valid options are `running`, `complete`, `waiting`, and `pending`.|
|
||||
| `datasource`| return tasks filtered by Druid datasource.|
|
||||
| `createdTimeInterval`| return tasks created within the specified interval. |
|
||||
| `max`| maximum number of `"complete"` tasks to return. Only applies when `state` is set to `"complete"`.|
|
||||
| `type`| filter tasks by task type. See [task documentation](../ingestion/tasks.html) for more details.|
|
||||
|
||||
|
||||
* `/druid/indexer/v1/completeTasks`
|
||||
|
||||
Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=complete`.
|
||||
|
||||
* `/druid/indexer/v1/runningTasks`
|
||||
|
||||
Retrieve list of running tasks. Equivalent to `/druid/indexer/v1/tasks?state=running`.
|
||||
|
||||
* `/druid/indexer/v1/waitingTasks`
|
||||
|
||||
Retrieve list of waiting tasks. Equivalent to `/druid/indexer/v1/tasks?state=waiting`.
|
||||
|
||||
* `/druid/indexer/v1/pendingTasks`
|
||||
|
||||
Retrieve list of pending tasks. Equivalent to `/druid/indexer/v1/tasks?state=pending`.
|
||||
|
||||
* `/druid/indexer/v1/task/{taskId}`
|
||||
|
||||
Retrieve the 'payload' of a task.
|
||||
|
||||
* `/druid/indexer/v1/task/{taskId}/status`
|
||||
|
||||
Retrieve the status of a task.
|
||||
|
@ -406,14 +448,27 @@ Retrieve a [task completion report](../ingestion/reports.html) for a task. Only
|
|||
|
||||
Endpoint for submitting tasks and supervisor specs to the Overlord. Returns the taskId of the submitted task.
|
||||
|
||||
* `druid/indexer/v1/task/{taskId}/shutdown`
|
||||
* `/druid/indexer/v1/task/{taskId}/shutdown`
|
||||
|
||||
Shuts down a task.
|
||||
|
||||
* `druid/indexer/v1/datasources/{dataSource}/shutdownAllTasks`
|
||||
* `/druid/indexer/v1/datasources/{dataSource}/shutdownAllTasks`
|
||||
|
||||
Shuts down all tasks for a dataSource.
|
||||
|
||||
* `/druid/indexer/v1/taskStatus`
|
||||
|
||||
Retrieve list of task status objects for list of task id strings in request body.
|
||||
|
||||
##### DELETE
|
||||
|
||||
* `/druid/indexer/v1/pendingSegments/{dataSource}`
|
||||
|
||||
Manually clean up pending segments table in metadata storage for `datasource`. Returns a JSON object response with
|
||||
`numDeleted` and count of rows deleted from the pending segments table. This API is used by the
|
||||
`druid.coordinator.kill.pendingSegments.on` [coordinator setting](../configuration/index.html#coordinator-operation)
|
||||
which automates this operation to perform periodically.
|
||||
|
||||
#### Supervisors
|
||||
|
||||
##### GET
|
||||
|
@ -490,13 +545,94 @@ This API is deprecated and will be removed in future releases.
|
|||
Please use the equivalent 'terminate' instead.
|
||||
</div>
|
||||
|
||||
#### Dynamic Configuration
|
||||
See [Overlord Dynamic Configuration](../configuration/index.html#overlord-dynamic-configuration) for details.
|
||||
|
||||
Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/`
|
||||
(e.g., 2016-06-27_2016-06-28).
|
||||
|
||||
##### GET
|
||||
|
||||
* `/druid/indexer/v1/worker`
|
||||
|
||||
Retreives current overlord dynamic configuration.
|
||||
|
||||
* `/druid/indexer/v1/worker/history?interval={interval}&counter={count}`
|
||||
|
||||
Retrieves history of changes to overlord dynamic configuration. Accepts `interval` and `count` query string parameters
|
||||
to filter by interval and limit the number of results respectively.
|
||||
|
||||
* `/druid/indexer/v1/scaling`
|
||||
|
||||
Retrieves overlord scaling events if auto-scaling runners are in use.
|
||||
|
||||
##### POST
|
||||
|
||||
* /druid/indexer/v1/worker
|
||||
|
||||
Update overlord dynamic worker configuration.
|
||||
|
||||
## Data Server
|
||||
|
||||
This section documents the API endpoints for the processes that reside on Data servers (MiddleManagers/Peons and Historicals) in the suggested [three-server configuration](../design/processes.html#server-types).
|
||||
This section documents the API endpoints for the processes that reside on Data servers (MiddleManagers/Peons and Historicals)
|
||||
in the suggested [three-server configuration](../design/processes.html#server-types).
|
||||
|
||||
### MiddleManager
|
||||
|
||||
The MiddleManager does not have any API endpoints beyond the [common endpoints](#common).
|
||||
##### GET
|
||||
|
||||
* `/druid/worker/v1/enabled`
|
||||
|
||||
Check whether a MiddleManager is in an enabled or disabled state. Returns JSON object keyed by the combined `druid.host`
|
||||
and `druid.port` with the boolean state as the value.
|
||||
|
||||
```json
|
||||
{"localhost:8091":true}
|
||||
```
|
||||
|
||||
* `/druid/worker/v1/tasks`
|
||||
|
||||
Retrieve a list of active tasks being run on MiddleManager. Returns JSON list of taskid strings. Normal usage should
|
||||
prefer to use the `/druid/indexer/v1/tasks` [Overlord API](#overlord) or one of it's task state specific variants instead.
|
||||
|
||||
```json
|
||||
["index_wikiticker_2019-02-11T02:20:15.316Z"]
|
||||
```
|
||||
|
||||
* `/druid/worker/v1/task/{taskid}/log`
|
||||
|
||||
Retrieve task log output stream by task id. Normal usage should prefer to use the `/druid/indexer/v1/task/{taskId}/log`
|
||||
[Overlord API](#overlord) instead.
|
||||
|
||||
##### POST
|
||||
|
||||
* `/druid/worker/v1/disable`
|
||||
|
||||
'Disable' a MiddleManager, causing it to stop accepting new tasks but complete all existing tasks. Returns JSON object
|
||||
keyed by the combined `druid.host` and `druid.port`:
|
||||
|
||||
```json
|
||||
{"localhost:8091":"disabled"}
|
||||
```
|
||||
|
||||
* `/druid/worker/v1/enable`
|
||||
|
||||
'Enable' a MiddleManager, allowing it to accept new tasks again if it was previously disabled. Returns JSON object
|
||||
keyed by the combined `druid.host` and `druid.port`:
|
||||
|
||||
```json
|
||||
{"localhost:8091":"enabled"}
|
||||
```
|
||||
|
||||
* `/druid/worker/v1/task/{taskid}/shutdown`
|
||||
|
||||
Shutdown a running task by `taskid`. Normal usage should prefer to use the `/druid/indexer/v1/task/{taskId}/shutdown`
|
||||
[Overlord API](#overlord) instead. Returns JSON:
|
||||
|
||||
```json
|
||||
{"task":"index_kafka_wikiticker_f7011f8ffba384b_fpeclode"}
|
||||
```
|
||||
|
||||
|
||||
### Peon
|
||||
|
||||
|
@ -536,6 +672,9 @@ This section documents the API endpoints for the processes that reside on Query
|
|||
|
||||
#### Datasource Information
|
||||
|
||||
Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/`
|
||||
(e.g., 2016-06-27_2016-06-28).
|
||||
|
||||
##### GET
|
||||
|
||||
* `/druid/v2/datasources`
|
||||
|
@ -546,7 +685,7 @@ Returns a list of queryable datasources.
|
|||
|
||||
Returns the dimensions and metrics of the datasource. Optionally, you can provide request parameter "full" to get list of served intervals with dimensions and metrics being served for those intervals. You can also provide request param "interval" explicitly to refer to a particular interval.
|
||||
|
||||
If no interval is specified, a default interval spanning a configurable period before the current time will be used. The duration of this interval is specified in ISO8601 format via:
|
||||
If no interval is specified, a default interval spanning a configurable period before the current time will be used. The default duration of this interval is specified in ISO 8601 duration format via:
|
||||
|
||||
druid.query.segmentMetadata.defaultHistory
|
||||
|
||||
|
@ -555,7 +694,7 @@ druid.query.segmentMetadata.defaultHistory
|
|||
Returns the dimensions of the datasource.
|
||||
|
||||
<div class="note caution">
|
||||
This API is deprecated and will be removed in future releases. Please use [SegmentMetadataQuery](../querying/segmentmetadataquery.html) instead
|
||||
This API is deprecated and will be removed in future releases. Please use <a href="../querying/segmentmetadataquery.html">SegmentMetadataQuery</a> instead
|
||||
which provides more comprehensive information and supports all dataSource types including streaming dataSources. It's also encouraged to use [INFORMATION_SCHEMA tables](../querying/sql.html#retrieving-metadata)
|
||||
if you're using SQL.
|
||||
</div>
|
||||
|
@ -565,12 +704,12 @@ if you're using SQL.
|
|||
Returns the metrics of the datasource.
|
||||
|
||||
<div class="note caution">
|
||||
This API is deprecated and will be removed in future releases. Please use [SegmentMetadataQuery](../querying/segmentmetadataquery.html) instead
|
||||
This API is deprecated and will be removed in future releases. Please use <a href="../querying/segmentmetadataquery.html">SegmentMetadataQuery</a> instead
|
||||
which provides more comprehensive information and supports all dataSource types including streaming dataSources. It's also encouraged to use [INFORMATION_SCHEMA tables](../querying/sql.html#retrieving-metadata)
|
||||
if you're using SQL.
|
||||
</div>
|
||||
|
||||
* `/druid/v2/datasources/{dataSourceName}/candidates?intervals={comma-separated-intervals-in-ISO8601-format}&numCandidates={numCandidates}`
|
||||
* `/druid/v2/datasources/{dataSourceName}/candidates?intervals={comma-separated-intervals}&numCandidates={numCandidates}`
|
||||
|
||||
Returns segment information lists including server locations for the given datasource and intervals. If "numCandidates" is not specified, it will return all servers for each interval.
|
||||
|
||||
|
|
|
@ -101,7 +101,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -122,7 +121,6 @@ public class OverlordResource
|
|||
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
|
||||
private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete");
|
||||
|
||||
|
||||
@Inject
|
||||
public OverlordResource(
|
||||
TaskMaster taskMaster,
|
||||
|
@ -503,100 +501,6 @@ public class OverlordResource
|
|||
return getTasks("waiting", null, null, null, null, req);
|
||||
}
|
||||
|
||||
private static class AnyTask extends TaskRunnerWorkItem
|
||||
{
|
||||
private final String taskType;
|
||||
private final String dataSource;
|
||||
private final TaskState taskState;
|
||||
private final RunnerTaskState runnerTaskState;
|
||||
private final DateTime createdTime;
|
||||
private final DateTime queueInsertionTime;
|
||||
private final TaskLocation taskLocation;
|
||||
|
||||
AnyTask(
|
||||
String taskId,
|
||||
String taskType,
|
||||
ListenableFuture<TaskStatus> result,
|
||||
String dataSource,
|
||||
TaskState state,
|
||||
RunnerTaskState runnerState,
|
||||
DateTime createdTime,
|
||||
DateTime queueInsertionTime,
|
||||
TaskLocation taskLocation
|
||||
)
|
||||
{
|
||||
super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH);
|
||||
this.taskType = taskType;
|
||||
this.dataSource = dataSource;
|
||||
this.taskState = state;
|
||||
this.runnerTaskState = runnerState;
|
||||
this.createdTime = createdTime;
|
||||
this.queueInsertionTime = queueInsertionTime;
|
||||
this.taskLocation = taskLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskLocation getLocation()
|
||||
{
|
||||
return taskLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTaskType()
|
||||
{
|
||||
return taskType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
public TaskState getTaskState()
|
||||
{
|
||||
return taskState;
|
||||
}
|
||||
|
||||
public RunnerTaskState getRunnerTaskState()
|
||||
{
|
||||
return runnerTaskState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateTime getCreatedTime()
|
||||
{
|
||||
return createdTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateTime getQueueInsertionTime()
|
||||
{
|
||||
return queueInsertionTime;
|
||||
}
|
||||
|
||||
public AnyTask withTaskState(
|
||||
TaskState newTaskState,
|
||||
RunnerTaskState runnerState,
|
||||
DateTime createdTime,
|
||||
DateTime queueInsertionTime,
|
||||
TaskLocation taskLocation
|
||||
)
|
||||
{
|
||||
return new AnyTask(
|
||||
getTaskId(),
|
||||
getTaskType(),
|
||||
getResult(),
|
||||
getDataSource(),
|
||||
newTaskState,
|
||||
runnerState,
|
||||
createdTime,
|
||||
queueInsertionTime,
|
||||
taskLocation
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/pendingTasks")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
|
@ -760,120 +664,6 @@ public class OverlordResource
|
|||
return Response.ok(authorizedList).build();
|
||||
}
|
||||
|
||||
private static BiFunction<TaskInfo<Task, TaskStatus>, RunnerTaskState, TaskStatusPlus> newTaskInfo2TaskStatusPlusFn()
|
||||
{
|
||||
return (taskInfo, runnerTaskState) -> new TaskStatusPlus(
|
||||
taskInfo.getId(),
|
||||
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
|
||||
taskInfo.getCreatedTime(),
|
||||
// Would be nice to include the real queue insertion time, but the
|
||||
// TaskStorage API doesn't yet allow it.
|
||||
DateTimes.EPOCH,
|
||||
taskInfo.getStatus().getStatusCode(),
|
||||
runnerTaskState,
|
||||
taskInfo.getStatus().getDuration(),
|
||||
TaskLocation.unknown(),
|
||||
taskInfo.getDataSource(),
|
||||
taskInfo.getStatus().getErrorMsg()
|
||||
);
|
||||
}
|
||||
|
||||
private List<AnyTask> filterActiveTasks(
|
||||
RunnerTaskState state,
|
||||
List<AnyTask> allTasks
|
||||
)
|
||||
{
|
||||
//divide active tasks into 3 lists : running, pending, waiting
|
||||
Optional<TaskRunner> taskRunnerOpt = taskMaster.getTaskRunner();
|
||||
if (!taskRunnerOpt.isPresent()) {
|
||||
throw new WebApplicationException(
|
||||
Response.serverError().entity("No task runner found").build()
|
||||
);
|
||||
}
|
||||
TaskRunner runner = taskRunnerOpt.get();
|
||||
// the order of tasks below is waiting, pending, running to prevent
|
||||
// skipping a task, it's the order in which tasks will change state
|
||||
// if they do while this is code is executing, so a task might be
|
||||
// counted twice but never skipped
|
||||
if (RunnerTaskState.WAITING.equals(state)) {
|
||||
Collection<? extends TaskRunnerWorkItem> runnersKnownTasks = runner.getKnownTasks();
|
||||
Set<String> runnerKnownTaskIds = runnersKnownTasks
|
||||
.stream()
|
||||
.map(TaskRunnerWorkItem::getTaskId)
|
||||
.collect(Collectors.toSet());
|
||||
final List<AnyTask> waitingTasks = new ArrayList<>();
|
||||
for (TaskRunnerWorkItem task : allTasks) {
|
||||
if (!runnerKnownTaskIds.contains(task.getTaskId())) {
|
||||
waitingTasks.add(((AnyTask) task).withTaskState(
|
||||
TaskState.RUNNING,
|
||||
RunnerTaskState.WAITING,
|
||||
task.getCreatedTime(),
|
||||
task.getQueueInsertionTime(),
|
||||
task.getLocation()
|
||||
));
|
||||
}
|
||||
}
|
||||
return waitingTasks;
|
||||
}
|
||||
|
||||
if (RunnerTaskState.PENDING.equals(state)) {
|
||||
Collection<? extends TaskRunnerWorkItem> knownPendingTasks = runner.getPendingTasks();
|
||||
Set<String> pendingTaskIds = knownPendingTasks
|
||||
.stream()
|
||||
.map(TaskRunnerWorkItem::getTaskId)
|
||||
.collect(Collectors.toSet());
|
||||
Map<String, TaskRunnerWorkItem> workItemIdMap = knownPendingTasks
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
TaskRunnerWorkItem::getTaskId,
|
||||
java.util.function.Function.identity(),
|
||||
(previousWorkItem, newWorkItem) -> newWorkItem
|
||||
));
|
||||
final List<AnyTask> pendingTasks = new ArrayList<>();
|
||||
for (TaskRunnerWorkItem task : allTasks) {
|
||||
if (pendingTaskIds.contains(task.getTaskId())) {
|
||||
pendingTasks.add(((AnyTask) task).withTaskState(
|
||||
TaskState.RUNNING,
|
||||
RunnerTaskState.PENDING,
|
||||
workItemIdMap.get(task.getTaskId()).getCreatedTime(),
|
||||
workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
|
||||
workItemIdMap.get(task.getTaskId()).getLocation()
|
||||
));
|
||||
}
|
||||
}
|
||||
return pendingTasks;
|
||||
}
|
||||
|
||||
if (RunnerTaskState.RUNNING.equals(state)) {
|
||||
Collection<? extends TaskRunnerWorkItem> knownRunningTasks = runner.getRunningTasks();
|
||||
Set<String> runningTaskIds = knownRunningTasks
|
||||
.stream()
|
||||
.map(TaskRunnerWorkItem::getTaskId)
|
||||
.collect(Collectors.toSet());
|
||||
Map<String, TaskRunnerWorkItem> workItemIdMap = knownRunningTasks
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
TaskRunnerWorkItem::getTaskId,
|
||||
java.util.function.Function.identity(),
|
||||
(previousWorkItem, newWorkItem) -> newWorkItem
|
||||
));
|
||||
final List<AnyTask> runningTasks = new ArrayList<>();
|
||||
for (TaskRunnerWorkItem task : allTasks) {
|
||||
if (runningTaskIds.contains(task.getTaskId())) {
|
||||
runningTasks.add(((AnyTask) task).withTaskState(
|
||||
TaskState.RUNNING,
|
||||
RunnerTaskState.RUNNING,
|
||||
workItemIdMap.get(task.getTaskId()).getCreatedTime(),
|
||||
workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
|
||||
workItemIdMap.get(task.getTaskId()).getLocation()
|
||||
));
|
||||
}
|
||||
}
|
||||
return runningTasks;
|
||||
}
|
||||
return allTasks;
|
||||
}
|
||||
|
||||
@DELETE
|
||||
@Path("/pendingSegments/{dataSource}")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
|
@ -1016,6 +806,102 @@ public class OverlordResource
|
|||
}
|
||||
}
|
||||
|
||||
private List<AnyTask> filterActiveTasks(
|
||||
RunnerTaskState state,
|
||||
List<AnyTask> allTasks
|
||||
)
|
||||
{
|
||||
//divide active tasks into 3 lists : running, pending, waiting
|
||||
Optional<TaskRunner> taskRunnerOpt = taskMaster.getTaskRunner();
|
||||
if (!taskRunnerOpt.isPresent()) {
|
||||
throw new WebApplicationException(
|
||||
Response.serverError().entity("No task runner found").build()
|
||||
);
|
||||
}
|
||||
TaskRunner runner = taskRunnerOpt.get();
|
||||
// the order of tasks below is waiting, pending, running to prevent
|
||||
// skipping a task, it's the order in which tasks will change state
|
||||
// if they do while this is code is executing, so a task might be
|
||||
// counted twice but never skipped
|
||||
if (RunnerTaskState.WAITING.equals(state)) {
|
||||
Collection<? extends TaskRunnerWorkItem> runnersKnownTasks = runner.getKnownTasks();
|
||||
Set<String> runnerKnownTaskIds = runnersKnownTasks
|
||||
.stream()
|
||||
.map(TaskRunnerWorkItem::getTaskId)
|
||||
.collect(Collectors.toSet());
|
||||
final List<AnyTask> waitingTasks = new ArrayList<>();
|
||||
for (TaskRunnerWorkItem task : allTasks) {
|
||||
if (!runnerKnownTaskIds.contains(task.getTaskId())) {
|
||||
waitingTasks.add(((AnyTask) task).withTaskState(
|
||||
TaskState.RUNNING,
|
||||
RunnerTaskState.WAITING,
|
||||
task.getCreatedTime(),
|
||||
task.getQueueInsertionTime(),
|
||||
task.getLocation()
|
||||
));
|
||||
}
|
||||
}
|
||||
return waitingTasks;
|
||||
}
|
||||
|
||||
if (RunnerTaskState.PENDING.equals(state)) {
|
||||
Collection<? extends TaskRunnerWorkItem> knownPendingTasks = runner.getPendingTasks();
|
||||
Set<String> pendingTaskIds = knownPendingTasks
|
||||
.stream()
|
||||
.map(TaskRunnerWorkItem::getTaskId)
|
||||
.collect(Collectors.toSet());
|
||||
Map<String, TaskRunnerWorkItem> workItemIdMap = knownPendingTasks
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
TaskRunnerWorkItem::getTaskId,
|
||||
java.util.function.Function.identity(),
|
||||
(previousWorkItem, newWorkItem) -> newWorkItem
|
||||
));
|
||||
final List<AnyTask> pendingTasks = new ArrayList<>();
|
||||
for (TaskRunnerWorkItem task : allTasks) {
|
||||
if (pendingTaskIds.contains(task.getTaskId())) {
|
||||
pendingTasks.add(((AnyTask) task).withTaskState(
|
||||
TaskState.RUNNING,
|
||||
RunnerTaskState.PENDING,
|
||||
workItemIdMap.get(task.getTaskId()).getCreatedTime(),
|
||||
workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
|
||||
workItemIdMap.get(task.getTaskId()).getLocation()
|
||||
));
|
||||
}
|
||||
}
|
||||
return pendingTasks;
|
||||
}
|
||||
|
||||
if (RunnerTaskState.RUNNING.equals(state)) {
|
||||
Collection<? extends TaskRunnerWorkItem> knownRunningTasks = runner.getRunningTasks();
|
||||
Set<String> runningTaskIds = knownRunningTasks
|
||||
.stream()
|
||||
.map(TaskRunnerWorkItem::getTaskId)
|
||||
.collect(Collectors.toSet());
|
||||
Map<String, TaskRunnerWorkItem> workItemIdMap = knownRunningTasks
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
TaskRunnerWorkItem::getTaskId,
|
||||
java.util.function.Function.identity(),
|
||||
(previousWorkItem, newWorkItem) -> newWorkItem
|
||||
));
|
||||
final List<AnyTask> runningTasks = new ArrayList<>();
|
||||
for (TaskRunnerWorkItem task : allTasks) {
|
||||
if (runningTaskIds.contains(task.getTaskId())) {
|
||||
runningTasks.add(((AnyTask) task).withTaskState(
|
||||
TaskState.RUNNING,
|
||||
RunnerTaskState.RUNNING,
|
||||
workItemIdMap.get(task.getTaskId()).getCreatedTime(),
|
||||
workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
|
||||
workItemIdMap.get(task.getTaskId()).getLocation()
|
||||
));
|
||||
}
|
||||
}
|
||||
return runningTasks;
|
||||
}
|
||||
return allTasks;
|
||||
}
|
||||
|
||||
private List<TaskStatusPlus> securedTaskStatusPlus(
|
||||
List<TaskStatusPlus> collectionToFilter,
|
||||
@Nullable String dataSource,
|
||||
|
@ -1057,4 +943,98 @@ public class OverlordResource
|
|||
)
|
||||
);
|
||||
}
|
||||
|
||||
private static class AnyTask extends TaskRunnerWorkItem
|
||||
{
|
||||
private final String taskType;
|
||||
private final String dataSource;
|
||||
private final TaskState taskState;
|
||||
private final RunnerTaskState runnerTaskState;
|
||||
private final DateTime createdTime;
|
||||
private final DateTime queueInsertionTime;
|
||||
private final TaskLocation taskLocation;
|
||||
|
||||
AnyTask(
|
||||
String taskId,
|
||||
String taskType,
|
||||
ListenableFuture<TaskStatus> result,
|
||||
String dataSource,
|
||||
TaskState state,
|
||||
RunnerTaskState runnerState,
|
||||
DateTime createdTime,
|
||||
DateTime queueInsertionTime,
|
||||
TaskLocation taskLocation
|
||||
)
|
||||
{
|
||||
super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH);
|
||||
this.taskType = taskType;
|
||||
this.dataSource = dataSource;
|
||||
this.taskState = state;
|
||||
this.runnerTaskState = runnerState;
|
||||
this.createdTime = createdTime;
|
||||
this.queueInsertionTime = queueInsertionTime;
|
||||
this.taskLocation = taskLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskLocation getLocation()
|
||||
{
|
||||
return taskLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTaskType()
|
||||
{
|
||||
return taskType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
public TaskState getTaskState()
|
||||
{
|
||||
return taskState;
|
||||
}
|
||||
|
||||
public RunnerTaskState getRunnerTaskState()
|
||||
{
|
||||
return runnerTaskState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateTime getCreatedTime()
|
||||
{
|
||||
return createdTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateTime getQueueInsertionTime()
|
||||
{
|
||||
return queueInsertionTime;
|
||||
}
|
||||
|
||||
public AnyTask withTaskState(
|
||||
TaskState newTaskState,
|
||||
RunnerTaskState runnerState,
|
||||
DateTime createdTime,
|
||||
DateTime queueInsertionTime,
|
||||
TaskLocation taskLocation
|
||||
)
|
||||
{
|
||||
return new AnyTask(
|
||||
getTaskId(),
|
||||
getTaskType(),
|
||||
getResult(),
|
||||
getDataSource(),
|
||||
newTaskState,
|
||||
runnerState,
|
||||
createdTime,
|
||||
queueInsertionTime,
|
||||
taskLocation
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue