Add worker status and duration metrics in live and task reports (#15180)

Add worker status and duration metrics in live and task reports for tracking.
This commit is contained in:
Vishesh Garg 2023-10-30 09:43:22 +05:30 committed by GitHub
parent f4a74710e6
commit 039b05585c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 105 additions and 7 deletions

View File

@ -603,6 +603,11 @@ The following table describes the response fields when you retrieve a report for
| `multiStageQuery.payload.status.status` | RUNNING, SUCCESS, or FAILED. |
| `multiStageQuery.payload.status.startTime` | Start time of the query in ISO format. Only present if the query has started running. |
| `multiStageQuery.payload.status.durationMs` | Milliseconds elapsed after the query has started running. -1 denotes that the query hasn't started running yet. |
| `multiStageQuery.payload.status.workers` | Workers for the controller task.|
| `multiStageQuery.payload.status.workers.<workerNumber>` | Array of worker tasks including retries. |
| `multiStageQuery.payload.status.workers.<workerNumber>[].workerId` | Id of the worker task.| |
| `multiStageQuery.payload.status.workers.<workerNumber>[].status` | RUNNING, SUCCESS, or FAILED.|
| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` | Milliseconds elapsed after the worker task started running. It is -1 for worker tasks with status RUNNING.|
| `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not fully started. -1 denotes that the number is currently unknown. |
| `multiStageQuery.payload.status.runningTasks` | Number of currently running tasks. Should be at least 1 since the controller is included. |
| `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading container. Only present after the segments have been published. |

View File

@ -2262,11 +2262,13 @@ public class ControllerImpl implements Controller
{
int pendingTasks = -1;
int runningTasks = 1;
Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStatsMap = new HashMap<>();
if (taskLauncher != null) {
WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount();
pendingTasks = workerTaskCount.getPendingWorkerCount();
runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller.
workerStatsMap = taskLauncher.getWorkerStats();
}
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = segmentLoadWaiter == null ? null : segmentLoadWaiter.status();
@ -2277,6 +2279,7 @@ public class ControllerImpl implements Controller
errorReports,
queryStartTime,
queryDuration,
workerStatsMap,
pendingTasks,
runningTasks,
status

View File

@ -19,6 +19,7 @@
package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@ -47,15 +48,18 @@ import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -108,10 +112,11 @@ public class MSQWorkerTaskLauncher
@GuardedBy("taskIds")
private final IntSet fullyStartedTasks = new IntOpenHashSet();
// Mutable state accessible only to the main loop. LinkedHashMap since order of key set matters. Tasks are added
// here once they are submitted for running, but before they are fully started up.
// Mutable state accessed by mainLoop, ControllerImpl, and jetty (/liveReports) threads.
// Tasks are added here once they are submitted for running, but before they are fully started up.
// taskId -> taskTracker
private final Map<String, TaskTracker> taskTrackers = new LinkedHashMap<>();
private final ConcurrentMap<String, TaskTracker> taskTrackers = new ConcurrentSkipListMap<>(Comparator.comparingInt(
MSQTasks::workerFromTaskId));
// Set of tasks which are issued a cancel request by the controller.
private final Set<String> canceledWorkerTasks = ConcurrentHashMap.newKeySet();
@ -348,6 +353,70 @@ public class MSQWorkerTaskLauncher
}
}
public static class WorkerStats
{
String workerId;
TaskState state;
long duration;
/**
* For JSON deserialization only
*/
public WorkerStats()
{
}
public WorkerStats(String workerId, TaskState state, long duration)
{
this.workerId = workerId;
this.state = state;
this.duration = duration;
}
@JsonProperty
public String getWorkerId()
{
return workerId;
}
@JsonProperty
public TaskState getState()
{
return state;
}
@JsonProperty("durationMs")
public long getDuration()
{
return duration;
}
}
public Map<Integer, List<WorkerStats>> getWorkerStats()
{
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();
for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) {
TaskTracker taskTracker = taskEntry.getValue();
workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>())
.add(new WorkerStats(taskEntry.getKey(),
taskTracker.status.getStatusCode(),
// getDuration() returns -1 for running tasks.
// It's not calculated on-the-fly here since
// taskTracker.startTimeMillis marks task
// submission time rather than the actual start.
taskTracker.status.getDuration()
));
}
for (List<WorkerStats> workerStatsList : workerStats.values()) {
workerStatsList.sort(Comparator.comparing(WorkerStats::getWorkerId));
}
return workerStats;
}
private void mainLoop()
{
try {

View File

@ -25,12 +25,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.msq.exec.SegmentLoadStatusFetcher;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class MSQStatusReport
@ -47,6 +50,8 @@ public class MSQStatusReport
private final long durationMs;
private final Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats;
private final int pendingTasks;
private final int runningTasks;
@ -61,6 +66,7 @@ public class MSQStatusReport
@JsonProperty("warnings") Collection<MSQErrorReport> warningReports,
@JsonProperty("startTime") @Nullable DateTime startTime,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("workers") Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
@JsonProperty("pendingTasks") int pendingTasks,
@JsonProperty("runningTasks") int runningTasks,
@JsonProperty("segmentLoadWaiterStatus") @Nullable SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
@ -71,6 +77,7 @@ public class MSQStatusReport
this.warningReports = warningReports != null ? warningReports : Collections.emptyList();
this.startTime = startTime;
this.durationMs = durationMs;
this.workerStats = workerStats;
this.pendingTasks = pendingTasks;
this.runningTasks = runningTasks;
this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
@ -123,6 +130,12 @@ public class MSQStatusReport
return durationMs;
}
@JsonProperty("workers")
public Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> getWorkerStats()
{
return workerStats;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)

View File

@ -55,6 +55,7 @@ import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -107,7 +108,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@ -172,7 +173,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
@ -220,7 +221,7 @@ public class MSQTaskReportTest
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),

View File

@ -244,6 +244,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
@ -310,6 +311,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null

View File

@ -67,6 +67,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
@ -105,6 +106,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
@ -144,6 +146,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
@ -181,6 +184,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
@ -220,6 +224,7 @@ public class SqlStatementResourceHelperTest
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null