diff --git a/docs/api-reference/sql-ingestion-api.md b/docs/api-reference/sql-ingestion-api.md index 5492c3ea46d..988f860a85d 100644 --- a/docs/api-reference/sql-ingestion-api.md +++ b/docs/api-reference/sql-ingestion-api.md @@ -287,6 +287,16 @@ The response shows an example report for a query. "status": "SUCCESS", "startTime": "2022-09-14T22:12:09.266Z", "durationMs": 28227, + "workers": { + "0": [ + { + "workerId": "query-3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e-worker0_0", + "state": "SUCCESS", + "durationMs": 15511, + "pendingMs": 137 + } + ] + }, "pendingTasks": 0, "runningTasks": 2, "segmentLoadStatus": { @@ -607,7 +617,8 @@ The following table describes the response fields when you retrieve a report for | `multiStageQuery.payload.status.workers.` | Array of worker tasks including retries. | | `multiStageQuery.payload.status.workers.[].workerId` | Id of the worker task.| | | `multiStageQuery.payload.status.workers.[].status` | RUNNING, SUCCESS, or FAILED.| -| `multiStageQuery.payload.status.workers.[].durationMs` | Milliseconds elapsed after the worker task started running. It is -1 for worker tasks with status RUNNING.| +| `multiStageQuery.payload.status.workers.[].durationMs` | Milliseconds elapsed between when the worker task was first requested and when it finished. It is -1 for worker tasks with status RUNNING.| +| `multiStageQuery.payload.status.workers.[].pendingMs` | Milliseconds elapsed between when the worker task was first requested and when it fully started RUNNING. Actual work time can be calculated using `actualWorkTimeMS = durationMs - pendingMs`.| | `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not fully started. -1 denotes that the number is currently unknown. | | `multiStageQuery.payload.status.runningTasks` | Number of currently running tasks. Should be at least 1 since the controller is included. | | `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading container. Only present after the segments have been published. | diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java index c2092e7f24a..a485831532f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java @@ -306,7 +306,6 @@ public class MSQWorkerTaskLauncher * Blocks the call untill the worker tasks are ready to be contacted for work. * * @param workerSet - * * @throws InterruptedException */ public void waitUntilWorkersReady(Set workerSet) throws InterruptedException @@ -353,45 +352,6 @@ public class MSQWorkerTaskLauncher } } - public static class WorkerStats - { - String workerId; - TaskState state; - long duration; - - /** - * For JSON deserialization only - */ - public WorkerStats() - { - } - - public WorkerStats(String workerId, TaskState state, long duration) - { - this.workerId = workerId; - this.state = state; - this.duration = duration; - } - - @JsonProperty - public String getWorkerId() - { - return workerId; - } - - @JsonProperty - public TaskState getState() - { - return state; - } - - @JsonProperty("durationMs") - public long getDuration() - { - return duration; - } - } - public Map> getWorkerStats() { final Map> workerStats = new TreeMap<>(); @@ -400,14 +360,17 @@ public class MSQWorkerTaskLauncher TaskTracker taskTracker = taskEntry.getValue(); + TaskStatus taskStatus = taskTracker.statusRef.get(); workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>()) - .add(new WorkerStats(taskEntry.getKey(), - taskTracker.status.getStatusCode(), - // getDuration() returns -1 for running tasks. - // It's not calculated on-the-fly here since - // taskTracker.startTimeMillis marks task - // submission time rather than the actual start. - taskTracker.status.getDuration() + .add(new WorkerStats( + taskEntry.getKey(), + taskStatus.getStatusCode(), + // getDuration() returns -1 for running tasks. + // It's not calculated on-the-fly here since + // taskTracker.startTimeMillis marks task + // submission time rather than the actual start. + taskStatus.getDuration(), + taskTracker.taskPendingTimeInMs() )); } @@ -576,18 +539,20 @@ public class MSQWorkerTaskLauncher for (Map.Entry statusEntry : statuses.entrySet()) { final String taskId = statusEntry.getKey(); final TaskTracker tracker = taskTrackers.get(taskId); - tracker.status = statusEntry.getValue(); + tracker.updateStatus(statusEntry.getValue()); + TaskStatus status = tracker.statusRef.get(); - if (!tracker.status.getStatusCode().isComplete() && tracker.unknownLocation()) { + if (!status.getStatusCode().isComplete() && tracker.unknownLocation()) { // Look up location if not known. Note: this location is not used to actually contact the task. For that, // we have SpecificTaskServiceLocator. This location is only used to determine if a task has started up. - tracker.initialLocation = workerManager.location(taskId); + tracker.setLocation(workerManager.location(taskId)); } - if (tracker.status.getStatusCode() == TaskState.RUNNING && !tracker.unknownLocation()) { + if (status.getStatusCode() == TaskState.RUNNING && !tracker.unknownLocation()) { synchronized (taskIds) { if (fullyStartedTasks.add(tracker.workerNumber)) { recentFullyStartedWorkerTimeInMillis.set(System.currentTimeMillis()); + tracker.setFullyStartedTime(System.currentTimeMillis()); } taskIds.notifyAll(); } @@ -616,7 +581,7 @@ public class MSQWorkerTaskLauncher continue; } - if (tracker.status == null) { + if (tracker.statusRef.get() == null) { removeWorkerFromFullyStartedWorkers(tracker); final String errorMessage = StringUtils.format("Task [%s] status missing", taskId); log.info(errorMessage + ". Trying to relaunch the worker"); @@ -635,9 +600,10 @@ public class MSQWorkerTaskLauncher )); } else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) { removeWorkerFromFullyStartedWorkers(tracker); - log.info("Task[%s] failed because %s. Trying to relaunch the worker", taskId, tracker.status.getErrorMsg()); + TaskStatus taskStatus = tracker.statusRef.get(); + log.info("Task[%s] failed because %s. Trying to relaunch the worker", taskId, taskStatus.getErrorMsg()); tracker.enableRetrying(); - retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId, tracker.status.getErrorMsg())); + retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId, taskStatus.getErrorMsg())); } } } @@ -717,7 +683,7 @@ public class MSQWorkerTaskLauncher Limits.PER_WORKER_RELAUNCH_LIMIT, relaunchTask.getId(), relaunchTask.getWorkerNumber(), - tracker.status.getErrorMsg() + tracker.statusRef.get().getErrorMsg() )); } if (currentRelaunchCount > Limits.TOTAL_RELAUNCH_LIMIT) { @@ -725,7 +691,7 @@ public class MSQWorkerTaskLauncher Limits.TOTAL_RELAUNCH_LIMIT, currentRelaunchCount, relaunchTask.getId(), - tracker.status.getErrorMsg() + tracker.statusRef.get().getErrorMsg() )); } } @@ -737,8 +703,9 @@ public class MSQWorkerTaskLauncher for (final Map.Entry taskEntry : taskTrackers.entrySet()) { final String taskId = taskEntry.getKey(); final TaskTracker tracker = taskEntry.getValue(); - if (!canceledWorkerTasks.contains(taskId) - && (tracker.status == null || !tracker.status.getStatusCode().isComplete())) { + if ((!canceledWorkerTasks.contains(taskId)) + && + (!tracker.isComplete())) { canceledWorkerTasks.add(taskId); context.workerManager().cancel(taskId); } @@ -831,11 +798,12 @@ public class MSQWorkerTaskLauncher { private final int workerNumber; private final long startTimeMillis = System.currentTimeMillis(); + private final AtomicLong taskFullyStartedTimeRef = new AtomicLong(); private final MSQWorkerTask msqWorkerTask; - private TaskStatus status; - private TaskLocation initialLocation; + private final AtomicReference statusRef = new AtomicReference<>(); + private final AtomicReference initialLocationRef = new AtomicReference<>(); - private boolean isRetrying = false; + private final AtomicBoolean isRetryingRef = new AtomicBoolean(false); public TaskTracker(int workerNumber, MSQWorkerTask msqWorkerTask) { @@ -845,16 +813,19 @@ public class MSQWorkerTaskLauncher public boolean unknownLocation() { + TaskLocation initialLocation = initialLocationRef.get(); return initialLocation == null || TaskLocation.unknown().equals(initialLocation); } public boolean isComplete() { + TaskStatus status = statusRef.get(); return status != null && status.getStatusCode().isComplete(); } public boolean didFail() { + TaskStatus status = statusRef.get(); return status != null && status.getStatusCode().isFailure(); } @@ -869,6 +840,7 @@ public class MSQWorkerTaskLauncher public boolean didRunTimeOut(final long maxTaskStartDelayMillis) { long currentTimeMillis = System.currentTimeMillis(); + TaskStatus status = statusRef.get(); return (status == null || status.getStatusCode() == TaskState.RUNNING) && unknownLocation() && currentTimeMillis - recentFullyStartedWorkerTimeInMillis.get() > maxTaskStartDelayMillis @@ -880,17 +852,87 @@ public class MSQWorkerTaskLauncher */ public void enableRetrying() { - isRetrying = true; + isRetryingRef.set(true); } /** * Checks is the task is retrying, - * - * @return */ public boolean isRetrying() { - return isRetrying; + return isRetryingRef.get(); + } + + public void setLocation(TaskLocation taskLocation) + { + initialLocationRef.set(taskLocation); + } + + public void updateStatus(TaskStatus taskStatus) + { + statusRef.set(taskStatus); + } + + public void setFullyStartedTime(long currentTimeMillis) + { + taskFullyStartedTimeRef.set(currentTimeMillis); + } + + public long taskPendingTimeInMs() + { + long currentFullyStartingTime = taskFullyStartedTimeRef.get(); + if (currentFullyStartingTime == 0) { + return System.currentTimeMillis() - startTimeMillis; + } else { + return Math.max(0, currentFullyStartingTime - startTimeMillis); + } + } + } + + public static class WorkerStats + { + String workerId; + TaskState state; + long duration; + long pendingTimeInMs; + + /** + * For JSON deserialization only + */ + public WorkerStats() + { + } + + public WorkerStats(String workerId, TaskState state, long duration, long pendingTimeInMs) + { + this.workerId = workerId; + this.state = state; + this.duration = duration; + this.pendingTimeInMs = pendingTimeInMs; + } + + @JsonProperty + public String getWorkerId() + { + return workerId; + } + + @JsonProperty + public TaskState getState() + { + return state; + } + + @JsonProperty("durationMs") + public long getDuration() + { + return duration; + } + + @JsonProperty("pendingMs") + public long getPendingTimeInMs() + { + return pendingTimeInMs; } } }