Add task start status to worker report (#13263)

* Add task start status to worker report

* Address review comments

* Address review comments

* Update documentation

* Update spelling checks
This commit is contained in:
Adarsh Sanjeev 2022-10-28 12:00:15 +05:30 committed by GitHub
parent 49e54a0ec6
commit 4775427e2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 8 deletions

View File

@ -553,6 +553,8 @@ The following table describes the response fields when you retrieve a report for
|multiStageQuery.payload.status.status|RUNNING, SUCCESS, or FAILED.|
|multiStageQuery.payload.status.startTime|Start time of the query in ISO format. Only present if the query has started running.|
|multiStageQuery.payload.status.durationMs|Milliseconds elapsed after the query has started running. -1 denotes that the query hasn't started running yet.|
|multiStageQuery.payload.status.pendingTasks|Number of tasks that are not fully started. -1 denotes that the number is currently unknown.|
|multiStageQuery.payload.status.runningTasks|Number of currently running tasks. Should be at least 1 since the controller is included.|
|multiStageQuery.payload.status.errorReport|Error object. Only present if there was an error.|
|multiStageQuery.payload.status.errorReport.taskId|The task that reported the error, if known. May be a controller task or a worker task.|
|multiStageQuery.payload.status.errorReport.host|The hostname and port of the task that reported the error, if known.|

View File

@ -447,7 +447,8 @@ public class ControllerImpl implements Controller
errorForReport,
workerWarnings,
queryStartTime,
new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis()
new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher
),
stagesReport,
countersSnapshot,
@ -716,7 +717,8 @@ public class ControllerImpl implements Controller
null,
workerWarnings,
queryStartTime,
queryStartTime == null ? -1L : new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis()
queryStartTime == null ? -1L : new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher
),
makeStageReport(
queryDef,
@ -1807,10 +1809,27 @@ public class ControllerImpl implements Controller
@Nullable final MSQErrorReport errorReport,
final Queue<MSQErrorReport> errorReports,
@Nullable final DateTime queryStartTime,
final long queryDuration
final long queryDuration,
MSQWorkerTaskLauncher taskLauncher
)
{
return new MSQStatusReport(taskState, errorReport, errorReports, queryStartTime, queryDuration);
int pendingTasks = -1;
int runningTasks = 1;
if (taskLauncher != null) {
Pair<Integer, Integer> workerTaskStatus = taskLauncher.getWorkerTaskStatus();
pendingTasks = workerTaskStatus.lhs;
runningTasks = workerTaskStatus.rhs + 1; // To account for controller.
}
return new MSQStatusReport(
taskState,
errorReport,
errorReports,
queryStartTime,
queryDuration,
pendingTasks,
runningTasks
);
}
private static InputSpecSlicerFactory makeInputSpecSlicerFactory(final DataSegmentTimelineView timelineView)

View File

@ -30,6 +30,7 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
@ -345,6 +346,19 @@ public class MSQWorkerTaskLauncher
}
}
/**
* Returns a pair which contains the number of currently running worker tasks and the number of worker tasks that are
* not yet fully started as left and right respectively.
*/
public Pair<Integer, Integer> getWorkerTaskStatus()
{
synchronized (taskIds) {
int runningTasks = fullyStartedTasks.size();
int pendingTasks = desiredTaskCount - runningTasks;
return Pair.of(runningTasks, pendingTasks);
}
}
/**
* Used by the main loop to update {@link #taskTrackers} and {@link #fullyStartedTasks}.
*/

View File

@ -46,6 +46,9 @@ public class MSQStatusReport
private final long durationMs;
private final int pendingTasks;
private final int runningTasks;
@JsonCreator
public MSQStatusReport(
@ -53,7 +56,9 @@ public class MSQStatusReport
@JsonProperty("errorReport") @Nullable MSQErrorReport errorReport,
@JsonProperty("warnings") Collection<MSQErrorReport> warningReports,
@JsonProperty("startTime") @Nullable DateTime startTime,
@JsonProperty("durationMs") long durationMs
@JsonProperty("durationMs") long durationMs,
@JsonProperty("pendingTasks") int pendingTasks,
@JsonProperty("runningTasks") int runningTasks
)
{
this.status = Preconditions.checkNotNull(status, "status");
@ -61,6 +66,8 @@ public class MSQStatusReport
this.warningReports = warningReports != null ? warningReports : Collections.emptyList();
this.startTime = startTime;
this.durationMs = durationMs;
this.pendingTasks = pendingTasks;
this.runningTasks = runningTasks;
}
@JsonProperty
@ -92,6 +99,18 @@ public class MSQStatusReport
return startTime;
}
@JsonProperty
public int getPendingTasks()
{
return pendingTasks;
}
@JsonProperty
public int getRunningTasks()
{
return runningTasks;
}
@JsonProperty
public long getDurationMs()
{

View File

@ -91,7 +91,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@ -119,6 +119,8 @@ public class MSQTaskReportTest
Assert.assertEquals(TASK_ID, report2.getTaskId());
Assert.assertEquals(report.getPayload().getStatus().getStatus(), report2.getPayload().getStatus().getStatus());
Assert.assertNull(report2.getPayload().getStatus().getErrorReport());
Assert.assertEquals(report.getPayload().getStatus().getRunningTasks(), report2.getPayload().getStatus().getRunningTasks());
Assert.assertEquals(report.getPayload().getStatus().getPendingTasks(), report2.getPayload().getStatus().getPendingTasks());
Assert.assertEquals(report.getPayload().getStages(), report2.getPayload().getStages());
Yielder<Object[]> yielder = report2.getPayload().getResults().getResultYielder();
@ -142,7 +144,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0),
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, 1, 2),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@ -179,7 +181,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),

View File

@ -632,6 +632,8 @@ multiStageQuery.payload.status
multiStageQuery.payload.status.status
multiStageQuery.payload.status.startTime
multiStageQuery.payload.status.durationMs
multiStageQuery.payload.status.pendingTasks
multiStageQuery.payload.status.runningTasks
multiStageQuery.payload.status.errorReport
multiStageQuery.payload.status.errorReport.taskId
multiStageQuery.payload.status.errorReport.host