Add group_id to the sys.tasks table (#8304)

* Add group_id to overlord tasks API and sys.tasks table

* adjust test

* modify docs

* Make groupId nullable

* fix integration test

* fix toString

* Remove groupId from TaskInfo

* Modify docs and tests

* modify TaskMonitorTest
This commit is contained in:
Surekha 2019-08-22 15:28:23 -07:00 committed by GitHub
parent fba92ae469
commit cf2a2dd917
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 95 additions and 13 deletions

View File

@ -42,9 +42,12 @@ public class TaskStatusPlus
@Nullable
private final String errorMsg;
@Nullable
private final String groupId;
public TaskStatusPlus(
String id,
@Nullable String groupId,
String type, // nullable for backward compatibility
DateTime createdTime,
DateTime queueInsertionTime,
@ -58,6 +61,7 @@ public class TaskStatusPlus
{
this(
id,
groupId,
type,
createdTime,
queueInsertionTime,
@ -74,6 +78,7 @@ public class TaskStatusPlus
@JsonCreator
public TaskStatusPlus(
@JsonProperty("id") String id,
@JsonProperty("groupId") @Nullable String groupId,
@JsonProperty("type") @Nullable String type, // nullable for backward compatibility
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@ -90,6 +95,7 @@ public class TaskStatusPlus
Preconditions.checkNotNull(duration, "duration");
}
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = groupId;
this.type = type;
this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime");
this.queueInsertionTime = Preconditions.checkNotNull(queueInsertionTime, "queueInsertionTime");
@ -117,6 +123,13 @@ public class TaskStatusPlus
return id;
}
@Nullable
@JsonProperty
public String getGroupId()
{
return groupId;
}
@Nullable
@JsonProperty
public String getType()
@ -195,6 +208,7 @@ public class TaskStatusPlus
}
TaskStatusPlus that = (TaskStatusPlus) o;
return Objects.equals(getId(), that.getId()) &&
Objects.equals(getGroupId(), that.getGroupId()) &&
Objects.equals(getType(), that.getType()) &&
Objects.equals(getCreatedTime(), that.getCreatedTime()) &&
Objects.equals(getQueueInsertionTime(), that.getQueueInsertionTime()) &&
@ -210,6 +224,7 @@ public class TaskStatusPlus
{
return Objects.hash(
getId(),
getGroupId(),
getType(),
getCreatedTime(),
getQueueInsertionTime(),
@ -226,6 +241,7 @@ public class TaskStatusPlus
{
return "TaskStatusPlus{" +
"id='" + id + '\'' +
", groupId='" + groupId + '\'' +
", type='" + type + '\'' +
", createdTime=" + createdTime +
", queueInsertionTime=" + queueInsertionTime +

View File

@ -46,6 +46,7 @@ public class TaskStatusPlusTest
);
final TaskStatusPlus status = new TaskStatusPlus(
"testId",
"testGroupId",
"testType",
DateTimes.nowUtc(),
DateTimes.nowUtc(),
@ -71,6 +72,7 @@ public class TaskStatusPlusTest
);
final String json = "{\n"
+ "\"id\": \"testId\",\n"
+ "\"groupId\": \"testGroupId\",\n"
+ "\"type\": \"testType\",\n"
+ "\"createdTime\": \"2018-09-17T06:35:17.392Z\",\n"
+ "\"queueInsertionTime\": \"2018-09-17T06:35:17.392Z\",\n"

View File

@ -751,7 +751,8 @@ check out the documentation for [ingestion tasks](../ingestion/tasks.html).
|Column|Type|Notes|
|------|-----|-----|
|task_id|STRING|Unique task identifier|
|type|STRING|Task type, for example this value is "index" for indexing tasks. See [tasks-overview](../ingestion/tasks.md)|
|group_id|STRING|Task group ID for this task, the value depends on the task `type`. For example, for native index tasks, it's same as `task_id`, for sub tasks, this value is the parent task's ID|
|type|STRING|Task type, for example this value is "index" for indexing tasks. See [tasks-overview](../ingestion/tasks.html)|
|datasource|STRING|Datasource name being indexed|
|created_time|STRING|Timestamp in ISO8601 format corresponding to when the ingestion task was created. Note that this value is populated for completed and waiting tasks. For running and pending tasks this value is set to 1970-01-01T00:00:00Z|
|queue_insertion_time|STRING|Timestamp in ISO8601 format corresponding to when this task was added to the queue on the Overlord|

View File

@ -203,7 +203,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
for (final TaskStuff taskStuff : tasks.values()) {
if (taskStuff.getStatus().isRunnable()) {
TaskInfo t = new TaskInfo(
TaskInfo t = new TaskInfo<>(
taskStuff.getTask().getId(),
taskStuff.getCreatedDate(),
taskStuff.getStatus(),
@ -267,7 +267,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
for (final TaskStuff taskStuff : list) {
String id = taskStuff.getTask().getId();
TaskInfo t = new TaskInfo(
TaskInfo t = new TaskInfo<>(
id,
taskStuff.getCreatedDate(),
taskStuff.getStatus(),

View File

@ -215,7 +215,8 @@ public class MetadataTaskStorage implements TaskStorage
{
return ImmutableList.copyOf(
handler.getCompletedTaskInfo(
DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
DateTimes.nowUtc()
.minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
maxTaskStatuses,
datasource
)

View File

@ -264,6 +264,7 @@ public class OverlordResource
workItem.getTaskId(),
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
// Would be nice to include the real queue insertion time, but the
@ -285,6 +286,7 @@ public class OverlordResource
taskid,
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
// Would be nice to include the real queue insertion time, but the
@ -577,6 +579,7 @@ public class OverlordResource
List<TaskStatusPlus> finalTaskList = new ArrayList<>();
Function<AnyTask, TaskStatusPlus> activeTaskTransformFunc = workItem -> new TaskStatusPlus(
workItem.getTaskId(),
workItem.getTaskGroupId(),
workItem.getTaskType(),
workItem.getCreatedTime(),
workItem.getQueueInsertionTime(),
@ -590,6 +593,7 @@ public class OverlordResource
Function<TaskInfo<Task, TaskStatus>, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
// Would be nice to include the real queue insertion time, but the
@ -626,6 +630,7 @@ public class OverlordResource
allActiveTasks.add(
new AnyTask(
task.getId(),
task.getTask() == null ? null : task.getTask().getGroupId(),
task.getTask() == null ? null : task.getTask().getType(),
SettableFuture.create(),
task.getDataSource(),
@ -990,6 +995,7 @@ public class OverlordResource
private static class AnyTask extends TaskRunnerWorkItem
{
private final String taskGroupId;
private final String taskType;
private final String dataSource;
private final TaskState taskState;
@ -1000,6 +1006,7 @@ public class OverlordResource
AnyTask(
String taskId,
String taskGroupId,
String taskType,
ListenableFuture<TaskStatus> result,
String dataSource,
@ -1011,6 +1018,7 @@ public class OverlordResource
)
{
super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH);
this.taskGroupId = taskGroupId;
this.taskType = taskType;
this.dataSource = dataSource;
this.taskState = state;
@ -1038,6 +1046,11 @@ public class OverlordResource
return dataSource;
}
public String getTaskGroupId()
{
return taskGroupId;
}
public TaskState getTaskState()
{
return taskState;
@ -1070,6 +1083,7 @@ public class OverlordResource
{
return new AnyTask(
getTaskId(),
getTaskGroupId(),
getTaskType(),
getResult(),
getDataSource(),

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -52,6 +53,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
@ -138,16 +140,24 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
public String runTask(Object taskObject)
{
final Task subTask = (Task) taskObject;
try {
getTaskStorage().insert(subTask, TaskStatus.running(subTask.getId()));
}
catch (EntryExistsException e) {
throw new RuntimeException(e);
}
tasks.put(subTask.getId(), service.submit(() -> {
try {
final TaskToolbox toolbox = createTaskToolbox(subTask);
if (subTask.isReady(toolbox.getTaskActionClient())) {
return subTask.run(toolbox);
} else {
getTaskStorage().setStatus(TaskStatus.failure(subTask.getId()));
throw new ISE("task[%s] is not ready", subTask.getId());
}
}
catch (Exception e) {
getTaskStorage().setStatus(TaskStatus.failure(subTask.getId(), e.getMessage()));
throw new RuntimeException(e);
}
}));
@ -158,6 +168,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
public TaskStatusResponse getTaskStatus(String taskId)
{
final Future<TaskStatus> taskStatusFuture = tasks.get(taskId);
final Optional<Task> task = getTaskStorage().getTask(taskId);
final String groupId = task.isPresent() ? task.get().getGroupId() : null;
if (taskStatusFuture != null) {
try {
if (taskStatusFuture.isDone()) {
@ -166,6 +178,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
taskId,
new TaskStatusPlus(
taskId,
groupId,
SinglePhaseSubTask.TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,
@ -182,6 +195,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
taskId,
new TaskStatusPlus(
taskId,
groupId,
SinglePhaseSubTask.TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,
@ -203,6 +217,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
taskId,
new TaskStatusPlus(
taskId,
groupId,
SinglePhaseSubTask.TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,

View File

@ -606,6 +606,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
getId(),
new TaskStatusPlus(
subTask.getId(),
subTask.getGroupId(),
subTask.getType(),
DateTimes.EPOCH,
DateTimes.EPOCH,
@ -708,6 +709,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
taskHistories.computeIfAbsent(specId, k -> new ArrayList<>()).add(
new TaskStatusPlus(
getId(),
getGroupId(),
getType(),
DateTimes.EPOCH,
DateTimes.EPOCH,

View File

@ -196,6 +196,7 @@ public class TaskMonitorTest
taskId,
new TaskStatusPlus(
taskId,
"groupId",
"testTask",
DateTimes.EPOCH,
DateTimes.EPOCH,

View File

@ -1001,7 +1001,13 @@ public class OverlordResourceTest
final TaskStatus status = TaskStatus.running("mytask");
EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("mytask"))
.andReturn(new TaskInfo<>(task.getId(), DateTimes.of("2018-01-01"), status, task.getDataSource(), task));
.andReturn(new TaskInfo(
task.getId(),
DateTimes.of("2018-01-01"),
status,
task.getDataSource(),
task
));
EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("othertask"))
.andReturn(null);
@ -1029,6 +1035,7 @@ public class OverlordResourceTest
new TaskStatusResponse(
"mytask",
new TaskStatusPlus(
"mytask",
"mytask",
"noop",
DateTimes.of("2018-01-01"),

View File

@ -2,6 +2,7 @@
{
"task_id": "index_auth_test_2030-04-30T01:13:31.893Z",
"type": null,
"group_id": null,
"datasource": "auth_test",
"created_time": "2030-04-30T01:13:31.893Z",
"queue_insertion_time": "1970-01-01T00:00:00.000Z",

View File

@ -165,6 +165,7 @@ public class SystemSchema extends AbstractSchema
static final RowSignature TASKS_SIGNATURE = RowSignature
.builder()
.add("task_id", ValueType.STRING)
.add("group_id", ValueType.STRING)
.add("type", ValueType.STRING)
.add("datasource", ValueType.STRING)
.add("created_time", ValueType.STRING)
@ -649,6 +650,7 @@ public class SystemSchema extends AbstractSchema
}
return new Object[]{
task.getId(),
task.getGroupId(),
task.getType(),
task.getDataSource(),
toStringOrNull(task.getCreatedTime()),

View File

@ -466,7 +466,7 @@ public class SystemSchemaTest extends CalciteTestBase
final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> sysFields = sysRowType.getFieldList();
Assert.assertEquals(13, sysFields.size());
Assert.assertEquals(14, sysFields.size());
Assert.assertEquals("task_id", sysFields.get(0).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, sysFields.get(0).getType().getSqlTypeName());
@ -1012,6 +1012,7 @@ public class SystemSchemaTest extends CalciteTestBase
String json = "[{\n"
+ "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+ "\t\"groupId\": \"group_index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+ "\t\"type\": \"index\",\n"
+ "\t\"createdTime\": \"2018-09-20T22:33:44.922Z\",\n"
+ "\t\"queueInsertionTime\": \"1970-01-01T00:00:00.000Z\",\n"
@ -1027,6 +1028,7 @@ public class SystemSchemaTest extends CalciteTestBase
+ "\t\"errorMsg\": null\n"
+ "}, {\n"
+ "\t\"id\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n"
+ "\t\"groupId\": \"group_index_wikipedia_2018-09-21T18:38:47.773Z\",\n"
+ "\t\"type\": \"index\",\n"
+ "\t\"createdTime\": \"2018-09-21T18:38:47.873Z\",\n"
+ "\t\"queueInsertionTime\": \"2018-09-21T18:38:47.910Z\",\n"
@ -1077,17 +1079,35 @@ public class SystemSchemaTest extends CalciteTestBase
Object[] row0 = rows.get(0);
Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[0].toString());
Assert.assertEquals("FAILED", row0[5].toString());
Assert.assertEquals("NONE", row0[6].toString());
Assert.assertEquals(-1L, row0[7]);
Assert.assertEquals("testHost:1234", row0[8]);
Assert.assertEquals("group_index_wikipedia_2018-09-20T22:33:44.911Z", row0[1].toString());
Assert.assertEquals("index", row0[2].toString());
Assert.assertEquals("wikipedia", row0[3].toString());
Assert.assertEquals("2018-09-20T22:33:44.922Z", row0[4].toString());
Assert.assertEquals("1970-01-01T00:00:00.000Z", row0[5].toString());
Assert.assertEquals("FAILED", row0[6].toString());
Assert.assertEquals("NONE", row0[7].toString());
Assert.assertEquals(-1L, row0[8]);
Assert.assertEquals("testHost:1234", row0[9]);
Assert.assertEquals("testHost", row0[10]);
Assert.assertEquals(1234L, row0[11]);
Assert.assertEquals(-1L, row0[12]);
Assert.assertEquals(null, row0[13]);
Object[] row1 = rows.get(1);
Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[0].toString());
Assert.assertEquals("RUNNING", row1[5].toString());
Assert.assertEquals("group_index_wikipedia_2018-09-21T18:38:47.773Z", row1[1].toString());
Assert.assertEquals("index", row1[2].toString());
Assert.assertEquals("wikipedia", row1[3].toString());
Assert.assertEquals("2018-09-21T18:38:47.873Z", row1[4].toString());
Assert.assertEquals("2018-09-21T18:38:47.910Z", row1[5].toString());
Assert.assertEquals("RUNNING", row1[6].toString());
Assert.assertEquals(0L, row1[7]);
Assert.assertEquals("192.168.1.6:8100", row1[8]);
Assert.assertEquals("RUNNING", row1[7].toString());
Assert.assertEquals(0L, row1[8]);
Assert.assertEquals("192.168.1.6:8100", row1[9]);
Assert.assertEquals("192.168.1.6", row1[10]);
Assert.assertEquals(8100L, row1[11]);
Assert.assertEquals(-1L, row1[12]);
Assert.assertEquals(null, row1[13]);
// Verify value types.
verifyTypes(rows, SystemSchema.TASKS_SIGNATURE);