Adding task pending time in MSQ reports (#15966)

Added a new field pendingMs in MSQ task reports. This helps in figuring out the exact run time of the MSQ worker tasks.
    Fixed data races.
This commit is contained in:
Karan Kumar 2024-02-27 14:41:28 +05:30 committed by GitHub
parent 38ecf980d0
commit 5bb5b41b18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 119 additions and 66 deletions

View File

@ -287,6 +287,16 @@ The response shows an example report for a query.
"status": "SUCCESS",
"startTime": "2022-09-14T22:12:09.266Z",
"durationMs": 28227,
"workers": {
"0": [
{
"workerId": "query-3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e-worker0_0",
"state": "SUCCESS",
"durationMs": 15511,
"pendingMs": 137
}
]
},
"pendingTasks": 0,
"runningTasks": 2,
"segmentLoadStatus": {
@ -607,7 +617,8 @@ The following table describes the response fields when you retrieve a report for
| `multiStageQuery.payload.status.workers.<workerNumber>` | Array of worker tasks including retries. |
| `multiStageQuery.payload.status.workers.<workerNumber>[].workerId` | Id of the worker task.| |
| `multiStageQuery.payload.status.workers.<workerNumber>[].status` | RUNNING, SUCCESS, or FAILED.|
| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` | Milliseconds elapsed after the worker task started running. It is -1 for worker tasks with status RUNNING.|
| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` | Milliseconds elapsed between when the worker task was first requested and when it finished. It is -1 for worker tasks with status RUNNING.|
| `multiStageQuery.payload.status.workers.<workerNumber>[].pendingMs` | Milliseconds elapsed between when the worker task was first requested and when it fully started RUNNING. Actual work time can be calculated using `actualWorkTimeMS = durationMs - pendingMs`.|
| `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not fully started. -1 denotes that the number is currently unknown. |
| `multiStageQuery.payload.status.runningTasks` | Number of currently running tasks. Should be at least 1 since the controller is included. |
| `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading container. Only present after the segments have been published. |

View File

@ -306,7 +306,6 @@ public class MSQWorkerTaskLauncher
* Blocks the call untill the worker tasks are ready to be contacted for work.
*
* @param workerSet
*
* @throws InterruptedException
*/
public void waitUntilWorkersReady(Set<Integer> workerSet) throws InterruptedException
@ -353,45 +352,6 @@ public class MSQWorkerTaskLauncher
}
}
public static class WorkerStats
{
String workerId;
TaskState state;
long duration;
/**
* For JSON deserialization only
*/
public WorkerStats()
{
}
public WorkerStats(String workerId, TaskState state, long duration)
{
this.workerId = workerId;
this.state = state;
this.duration = duration;
}
@JsonProperty
public String getWorkerId()
{
return workerId;
}
@JsonProperty
public TaskState getState()
{
return state;
}
@JsonProperty("durationMs")
public long getDuration()
{
return duration;
}
}
public Map<Integer, List<WorkerStats>> getWorkerStats()
{
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();
@ -400,14 +360,17 @@ public class MSQWorkerTaskLauncher
TaskTracker taskTracker = taskEntry.getValue();
TaskStatus taskStatus = taskTracker.statusRef.get();
workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>())
.add(new WorkerStats(taskEntry.getKey(),
taskTracker.status.getStatusCode(),
// getDuration() returns -1 for running tasks.
// It's not calculated on-the-fly here since
// taskTracker.startTimeMillis marks task
// submission time rather than the actual start.
taskTracker.status.getDuration()
.add(new WorkerStats(
taskEntry.getKey(),
taskStatus.getStatusCode(),
// getDuration() returns -1 for running tasks.
// It's not calculated on-the-fly here since
// taskTracker.startTimeMillis marks task
// submission time rather than the actual start.
taskStatus.getDuration(),
taskTracker.taskPendingTimeInMs()
));
}
@ -576,18 +539,20 @@ public class MSQWorkerTaskLauncher
for (Map.Entry<String, TaskStatus> statusEntry : statuses.entrySet()) {
final String taskId = statusEntry.getKey();
final TaskTracker tracker = taskTrackers.get(taskId);
tracker.status = statusEntry.getValue();
tracker.updateStatus(statusEntry.getValue());
TaskStatus status = tracker.statusRef.get();
if (!tracker.status.getStatusCode().isComplete() && tracker.unknownLocation()) {
if (!status.getStatusCode().isComplete() && tracker.unknownLocation()) {
// Look up location if not known. Note: this location is not used to actually contact the task. For that,
// we have SpecificTaskServiceLocator. This location is only used to determine if a task has started up.
tracker.initialLocation = workerManager.location(taskId);
tracker.setLocation(workerManager.location(taskId));
}
if (tracker.status.getStatusCode() == TaskState.RUNNING && !tracker.unknownLocation()) {
if (status.getStatusCode() == TaskState.RUNNING && !tracker.unknownLocation()) {
synchronized (taskIds) {
if (fullyStartedTasks.add(tracker.workerNumber)) {
recentFullyStartedWorkerTimeInMillis.set(System.currentTimeMillis());
tracker.setFullyStartedTime(System.currentTimeMillis());
}
taskIds.notifyAll();
}
@ -616,7 +581,7 @@ public class MSQWorkerTaskLauncher
continue;
}
if (tracker.status == null) {
if (tracker.statusRef.get() == null) {
removeWorkerFromFullyStartedWorkers(tracker);
final String errorMessage = StringUtils.format("Task [%s] status missing", taskId);
log.info(errorMessage + ". Trying to relaunch the worker");
@ -635,9 +600,10 @@ public class MSQWorkerTaskLauncher
));
} else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
removeWorkerFromFullyStartedWorkers(tracker);
log.info("Task[%s] failed because %s. Trying to relaunch the worker", taskId, tracker.status.getErrorMsg());
TaskStatus taskStatus = tracker.statusRef.get();
log.info("Task[%s] failed because %s. Trying to relaunch the worker", taskId, taskStatus.getErrorMsg());
tracker.enableRetrying();
retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId, tracker.status.getErrorMsg()));
retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId, taskStatus.getErrorMsg()));
}
}
}
@ -717,7 +683,7 @@ public class MSQWorkerTaskLauncher
Limits.PER_WORKER_RELAUNCH_LIMIT,
relaunchTask.getId(),
relaunchTask.getWorkerNumber(),
tracker.status.getErrorMsg()
tracker.statusRef.get().getErrorMsg()
));
}
if (currentRelaunchCount > Limits.TOTAL_RELAUNCH_LIMIT) {
@ -725,7 +691,7 @@ public class MSQWorkerTaskLauncher
Limits.TOTAL_RELAUNCH_LIMIT,
currentRelaunchCount,
relaunchTask.getId(),
tracker.status.getErrorMsg()
tracker.statusRef.get().getErrorMsg()
));
}
}
@ -737,8 +703,9 @@ public class MSQWorkerTaskLauncher
for (final Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) {
final String taskId = taskEntry.getKey();
final TaskTracker tracker = taskEntry.getValue();
if (!canceledWorkerTasks.contains(taskId)
&& (tracker.status == null || !tracker.status.getStatusCode().isComplete())) {
if ((!canceledWorkerTasks.contains(taskId))
&&
(!tracker.isComplete())) {
canceledWorkerTasks.add(taskId);
context.workerManager().cancel(taskId);
}
@ -831,11 +798,12 @@ public class MSQWorkerTaskLauncher
{
private final int workerNumber;
private final long startTimeMillis = System.currentTimeMillis();
private final AtomicLong taskFullyStartedTimeRef = new AtomicLong();
private final MSQWorkerTask msqWorkerTask;
private TaskStatus status;
private TaskLocation initialLocation;
private final AtomicReference<TaskStatus> statusRef = new AtomicReference<>();
private final AtomicReference<TaskLocation> initialLocationRef = new AtomicReference<>();
private boolean isRetrying = false;
private final AtomicBoolean isRetryingRef = new AtomicBoolean(false);
public TaskTracker(int workerNumber, MSQWorkerTask msqWorkerTask)
{
@ -845,16 +813,19 @@ public class MSQWorkerTaskLauncher
public boolean unknownLocation()
{
TaskLocation initialLocation = initialLocationRef.get();
return initialLocation == null || TaskLocation.unknown().equals(initialLocation);
}
public boolean isComplete()
{
TaskStatus status = statusRef.get();
return status != null && status.getStatusCode().isComplete();
}
public boolean didFail()
{
TaskStatus status = statusRef.get();
return status != null && status.getStatusCode().isFailure();
}
@ -869,6 +840,7 @@ public class MSQWorkerTaskLauncher
public boolean didRunTimeOut(final long maxTaskStartDelayMillis)
{
long currentTimeMillis = System.currentTimeMillis();
TaskStatus status = statusRef.get();
return (status == null || status.getStatusCode() == TaskState.RUNNING)
&& unknownLocation()
&& currentTimeMillis - recentFullyStartedWorkerTimeInMillis.get() > maxTaskStartDelayMillis
@ -880,17 +852,87 @@ public class MSQWorkerTaskLauncher
*/
public void enableRetrying()
{
isRetrying = true;
isRetryingRef.set(true);
}
/**
* Checks is the task is retrying,
*
* @return
*/
public boolean isRetrying()
{
return isRetrying;
return isRetryingRef.get();
}
public void setLocation(TaskLocation taskLocation)
{
initialLocationRef.set(taskLocation);
}
public void updateStatus(TaskStatus taskStatus)
{
statusRef.set(taskStatus);
}
public void setFullyStartedTime(long currentTimeMillis)
{
taskFullyStartedTimeRef.set(currentTimeMillis);
}
public long taskPendingTimeInMs()
{
long currentFullyStartingTime = taskFullyStartedTimeRef.get();
if (currentFullyStartingTime == 0) {
return System.currentTimeMillis() - startTimeMillis;
} else {
return Math.max(0, currentFullyStartingTime - startTimeMillis);
}
}
}
public static class WorkerStats
{
String workerId;
TaskState state;
long duration;
long pendingTimeInMs;
/**
* For JSON deserialization only
*/
public WorkerStats()
{
}
public WorkerStats(String workerId, TaskState state, long duration, long pendingTimeInMs)
{
this.workerId = workerId;
this.state = state;
this.duration = duration;
this.pendingTimeInMs = pendingTimeInMs;
}
@JsonProperty
public String getWorkerId()
{
return workerId;
}
@JsonProperty
public TaskState getState()
{
return state;
}
@JsonProperty("durationMs")
public long getDuration()
{
return duration;
}
@JsonProperty("pendingMs")
public long getPendingTimeInMs()
{
return pendingTimeInMs;
}
}
}