From 8b997c1d84fcc3ee99dec915982bdd642a124ea3 Mon Sep 17 00:00:00 2001 From: Sruti Parthiban Date: Mon, 28 Mar 2022 10:00:59 -0700 Subject: [PATCH] Add resource stats to task framework (#2089) * Add resource stats to task framework Signed-off-by: sruti1312 * Update thread resource info and add tests Signed-off-by: sruti1312 --- .../org/opensearch/client/tasks/TaskInfo.java | 18 +- .../core/tasks/GetTaskResponseTests.java | 18 +- .../tasks/CancelTasksResponseTests.java | 3 +- .../TransportRethrottleActionTests.java | 6 +- .../admin/cluster/node/tasks/TasksIT.java | 3 +- .../rest/action/cat/RestTasksAction.java | 2 + .../org/opensearch/tasks/ResourceStats.java | 28 ++++ .../opensearch/tasks/ResourceStatsType.java | 32 ++++ .../opensearch/tasks/ResourceUsageInfo.java | 108 ++++++++++++ .../opensearch/tasks/ResourceUsageMetric.java | 27 +++ .../main/java/org/opensearch/tasks/Task.java | 155 +++++++++++++++++- .../java/org/opensearch/tasks/TaskInfo.java | 39 ++++- .../opensearch/tasks/TaskResourceStats.java | 106 ++++++++++++ .../opensearch/tasks/TaskResourceUsage.java | 105 ++++++++++++ .../opensearch/tasks/ThreadResourceInfo.java | 54 ++++++ .../admin/cluster/node/tasks/TaskTests.java | 84 +++++++++- .../tasks/CancelTasksResponseTests.java | 2 +- .../tasks/ListTasksResponseTests.java | 18 +- .../org/opensearch/tasks/TaskInfoTests.java | 79 +++++++-- 19 files changed, 844 insertions(+), 43 deletions(-) create mode 100644 server/src/main/java/org/opensearch/tasks/ResourceStats.java create mode 100644 server/src/main/java/org/opensearch/tasks/ResourceStatsType.java create mode 100644 server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java create mode 100644 server/src/main/java/org/opensearch/tasks/ResourceUsageMetric.java create mode 100644 server/src/main/java/org/opensearch/tasks/TaskResourceStats.java create mode 100644 server/src/main/java/org/opensearch/tasks/TaskResourceUsage.java create mode 100644 server/src/main/java/org/opensearch/tasks/ThreadResourceInfo.java diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java b/client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java index de8374b283e..375f004dc30 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java @@ -57,6 +57,7 @@ public class TaskInfo { private TaskId parentTaskId; private final Map status = new HashMap<>(); private final Map headers = new HashMap<>(); + private final Map resourceStats = new HashMap<>(); public TaskInfo(TaskId taskId) { this.taskId = taskId; @@ -150,6 +151,14 @@ public class TaskInfo { return status; } + void setResourceStats(Map resourceStats) { + this.resourceStats.putAll(resourceStats); + } + + public Map getResourceStats() { + return resourceStats; + } + private void noOpParse(Object s) {} public static final ObjectParser.NamedObjectParser PARSER; @@ -170,6 +179,7 @@ public class TaskInfo { parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled")); parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id")); parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers")); + parser.declareObject(TaskInfo::setResourceStats, (p, c) -> p.map(), new ParseField("resource_stats")); PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null); } @@ -188,7 +198,8 @@ public class TaskInfo { && Objects.equals(getDescription(), taskInfo.getDescription()) && Objects.equals(getParentTaskId(), taskInfo.getParentTaskId()) && Objects.equals(status, taskInfo.status) - && Objects.equals(getHeaders(), taskInfo.getHeaders()); + && Objects.equals(getHeaders(), taskInfo.getHeaders()) + && Objects.equals(getResourceStats(), taskInfo.getResourceStats()); } @Override @@ -204,7 +215,8 @@ public class TaskInfo { isCancelled(), getParentTaskId(), status, - getHeaders() + getHeaders(), + getResourceStats() ); } @@ -236,6 +248,8 @@ public class TaskInfo { + status + ", headers=" + headers + + ", resource_stats=" + + resourceStats + '}'; } } diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java index 403e2953037..07ee0bedd47 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java @@ -38,6 +38,8 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.xcontent.ToXContent; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.tasks.RawTaskStatus; +import org.opensearch.tasks.TaskResourceStats; +import org.opensearch.tasks.TaskResourceUsage; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.tasks.TaskInfo; @@ -45,6 +47,7 @@ import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.opensearch.test.AbstractXContentTestCase.xContentTester; @@ -57,7 +60,7 @@ public class GetTaskResponseTests extends OpenSearchTestCase { ) .assertEqualsConsumer(this::assertEqualInstances) .assertToXContentEquivalence(true) - .randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status")) + .randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status") || field.contains("resource_stats")) .test(); } @@ -106,7 +109,8 @@ public class GetTaskResponseTests extends OpenSearchTestCase { cancellable, cancelled, parentTaskId, - headers + headers, + randomResourceStats() ); } @@ -127,4 +131,14 @@ public class GetTaskResponseTests extends OpenSearchTestCase { throw new IllegalStateException(e); } } + + private static TaskResourceStats randomResourceStats() { + return randomBoolean() ? null : new TaskResourceStats(new HashMap() { + { + for (int i = 0; i < randomInt(5); i++) { + put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong())); + } + } + }); + } } diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/tasks/CancelTasksResponseTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/tasks/CancelTasksResponseTests.java index 552a3712eea..26be36b7162 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/tasks/CancelTasksResponseTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/tasks/CancelTasksResponseTests.java @@ -96,7 +96,8 @@ public class CancelTasksResponseTests extends AbstractResponseTestCase< cancellable, cancelled, new TaskId("node1", randomLong()), - Collections.singletonMap("x-header-of", "some-value") + Collections.singletonMap("x-header-of", "some-value"), + null ) ); } diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/TransportRethrottleActionTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/TransportRethrottleActionTests.java index a9e1a59b7e4..6456aa0af9a 100644 --- a/modules/reindex/src/test/java/org/opensearch/index/reindex/TransportRethrottleActionTests.java +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/TransportRethrottleActionTests.java @@ -131,7 +131,8 @@ public class TransportRethrottleActionTests extends OpenSearchTestCase { true, false, new TaskId("test", task.getId()), - Collections.emptyMap() + Collections.emptyMap(), + null ) ); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); @@ -167,7 +168,8 @@ public class TransportRethrottleActionTests extends OpenSearchTestCase { true, false, new TaskId("test", task.getId()), - Collections.emptyMap() + Collections.emptyMap(), + null ) ); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java index fbac2f7dbff..ac0ae44eb73 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java @@ -907,7 +907,8 @@ public class TasksIT extends OpenSearchIntegTestCase { false, false, TaskId.EMPTY_TASK_ID, - Collections.emptyMap() + Collections.emptyMap(), + null ), new RuntimeException("test") ), diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestTasksAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestTasksAction.java index b87205593ce..a6624c2f8cf 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestTasksAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestTasksAction.java @@ -137,6 +137,7 @@ public class RestTasksAction extends AbstractCatAction { // Task detailed info if (detailed) { table.addCell("description", "default:true;alias:desc;desc:task action"); + table.addCell("resource_stats", "default:false;desc:resource consumption info of the task"); } table.endHeaders(); return table; @@ -173,6 +174,7 @@ public class RestTasksAction extends AbstractCatAction { if (detailed) { table.addCell(taskInfo.getDescription()); + table.addCell(taskInfo.getResourceStats()); } table.endRow(); } diff --git a/server/src/main/java/org/opensearch/tasks/ResourceStats.java b/server/src/main/java/org/opensearch/tasks/ResourceStats.java new file mode 100644 index 00000000000..aab103ad08d --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ResourceStats.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +/** + * Different resource stats are defined. + */ +public enum ResourceStats { + CPU("cpu_time_in_nanos"), + MEMORY("memory_in_bytes"); + + private final String statsName; + + ResourceStats(String statsName) { + this.statsName = statsName; + } + + @Override + public String toString() { + return statsName; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ResourceStatsType.java b/server/src/main/java/org/opensearch/tasks/ResourceStatsType.java new file mode 100644 index 00000000000..c670ac5ba68 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ResourceStatsType.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +/** Defines the different types of resource stats. */ +public enum ResourceStatsType { + // resource stats of the worker thread reported directly from runnable. + WORKER_STATS("worker_stats", false); + + private final String statsType; + private final boolean onlyForAnalysis; + + ResourceStatsType(String statsType, boolean onlyForAnalysis) { + this.statsType = statsType; + this.onlyForAnalysis = onlyForAnalysis; + } + + public boolean isOnlyForAnalysis() { + return onlyForAnalysis; + } + + @Override + public String toString() { + return statsType; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java b/server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java new file mode 100644 index 00000000000..ae58f712b63 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Thread resource usage information for particular resource stats type. + *

+ * It captures the resource usage information like memory, CPU about a particular execution of thread + * for a specific stats type. + */ +public class ResourceUsageInfo { + private static final Logger logger = LogManager.getLogger(ResourceUsageInfo.class); + private final EnumMap statsInfo = new EnumMap<>(ResourceStats.class); + + public ResourceUsageInfo(ResourceUsageMetric... resourceUsageMetrics) { + for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) { + this.statsInfo.put(resourceUsageMetric.getStats(), new ResourceStatsInfo(resourceUsageMetric.getValue())); + } + } + + public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) { + for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) { + final ResourceStatsInfo resourceStatsInfo = statsInfo.get(resourceUsageMetric.getStats()); + if (resourceStatsInfo != null) { + updateResourceUsageInfo(resourceStatsInfo, resourceUsageMetric); + } else { + throw new IllegalStateException( + "cannot update [" + + resourceUsageMetric.getStats().toString() + + "] entry as its not present current_stats_info:" + + statsInfo + ); + } + } + } + + private void updateResourceUsageInfo(ResourceStatsInfo resourceStatsInfo, ResourceUsageMetric resourceUsageMetric) { + long currentEndValue; + long newEndValue; + do { + currentEndValue = resourceStatsInfo.endValue.get(); + newEndValue = resourceUsageMetric.getValue(); + if (currentEndValue > newEndValue) { + logger.debug( + "dropping resource usage update as the new value is lower than current value [" + + "resource_stats=[{}], " + + "current_end_value={}, " + + "new_end_value={}]", + resourceUsageMetric.getStats(), + currentEndValue, + newEndValue + ); + return; + } + } while (!resourceStatsInfo.endValue.compareAndSet(currentEndValue, newEndValue)); + logger.debug( + "updated resource usage info [resource_stats=[{}], " + "old_end_value={}, new_end_value={}]", + resourceUsageMetric.getStats(), + currentEndValue, + newEndValue + ); + } + + public Map getStatsInfo() { + return Collections.unmodifiableMap(statsInfo); + } + + @Override + public String toString() { + return statsInfo.toString(); + } + + /** + * Defines resource stats information. + */ + static class ResourceStatsInfo { + private final long startValue; + private final AtomicLong endValue; + + private ResourceStatsInfo(long startValue) { + this.startValue = startValue; + this.endValue = new AtomicLong(startValue); + } + + public long getTotalValue() { + return endValue.get() - startValue; + } + + @Override + public String toString() { + return String.valueOf(getTotalValue()); + } + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ResourceUsageMetric.java b/server/src/main/java/org/opensearch/tasks/ResourceUsageMetric.java new file mode 100644 index 00000000000..0d13ffe6ec0 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ResourceUsageMetric.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +public class ResourceUsageMetric { + private final ResourceStats stats; + private final long value; + + public ResourceUsageMetric(ResourceStats stats, long value) { + this.stats = stats; + this.value = value; + } + + public ResourceStats getStats() { + return stats; + } + + public long getValue() { + return value; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index ad9d5c3f044..62453d08724 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -32,6 +32,8 @@ package org.opensearch.tasks; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.NamedWriteable; @@ -39,18 +41,27 @@ import org.opensearch.common.xcontent.ToXContent; import org.opensearch.common.xcontent.ToXContentObject; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Current task information */ public class Task { + private static final Logger logger = LogManager.getLogger(Task.class); + /** * The request header to mark tasks with specific ids */ public static final String X_OPAQUE_ID = "X-Opaque-Id"; + private static final String TOTAL = "total"; + private final long id; private final String type; @@ -63,6 +74,8 @@ public class Task { private final Map headers; + private final Map> resourceStats; + /** * The task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style). */ @@ -74,7 +87,7 @@ public class Task { private final long startTimeNanos; public Task(long id, String type, String action, String description, TaskId parentTask, Map headers) { - this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime(), headers); + this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime(), headers, new ConcurrentHashMap<>()); } public Task( @@ -85,7 +98,8 @@ public class Task { TaskId parentTask, long startTime, long startTimeNanos, - Map headers + Map headers, + ConcurrentHashMap> resourceStats ) { this.id = id; this.type = type; @@ -95,6 +109,7 @@ public class Task { this.startTime = startTime; this.startTimeNanos = startTimeNanos; this.headers = headers; + this.resourceStats = resourceStats; } /** @@ -108,19 +123,48 @@ public class Task { * generate data? */ public final TaskInfo taskInfo(String localNodeId, boolean detailed) { + return taskInfo(localNodeId, detailed, detailed == false); + } + + /** + * Build a version of the task status you can throw over the wire and back + * with the option to include resource stats or not. + * This method is only used during creating TaskResult to avoid storing resource information into the task index. + * + * @param excludeStats should information exclude resource stats. + * By default, detailed flag is used to control including resource information. + * But inorder to avoid storing resource stats into task index as strict mapping is enforced and breaks when adding this field. + * In the future, task-index-mapping.json can be modified to add resource stats. + */ + private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeStats) { String description = null; Task.Status status = null; + TaskResourceStats resourceStats = null; if (detailed) { description = getDescription(); status = getStatus(); } - return taskInfo(localNodeId, description, status); + if (excludeStats == false) { + resourceStats = new TaskResourceStats(new HashMap<>() { + { + put(TOTAL, getTotalResourceStats()); + } + }); + } + return taskInfo(localNodeId, description, status, resourceStats); + } + + /** + * Build a {@link TaskInfo} for this task without resource stats. + */ + protected final TaskInfo taskInfo(String localNodeId, String description, Status status) { + return taskInfo(localNodeId, description, status, null); } /** * Build a proper {@link TaskInfo} for this task. */ - protected final TaskInfo taskInfo(String localNodeId, String description, Status status) { + protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) { return new TaskInfo( new TaskId(localNodeId, getId()), getType(), @@ -132,7 +176,8 @@ public class Task { this instanceof CancellableTask, this instanceof CancellableTask && ((CancellableTask) this).isCancelled(), parentTask, - headers + headers, + resourceStats ); } @@ -195,6 +240,102 @@ public class Task { return null; } + /** + * Returns thread level resource consumption of the task + */ + public Map> getResourceStats() { + return Collections.unmodifiableMap(resourceStats); + } + + /** + * Returns current total resource usage of the task. + * Currently, this method is only called on demand, during get and listing of tasks. + * In the future, these values can be cached as an optimization. + */ + public TaskResourceUsage getTotalResourceStats() { + return new TaskResourceUsage(getTotalResourceUtilization(ResourceStats.CPU), getTotalResourceUtilization(ResourceStats.MEMORY)); + } + + /** + * Returns total resource consumption for a specific task stat. + */ + public long getTotalResourceUtilization(ResourceStats stats) { + long totalResourceConsumption = 0L; + for (List threadResourceInfosList : resourceStats.values()) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) { + final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats); + if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) { + totalResourceConsumption += statsInfo.getTotalValue(); + } + } + } + return totalResourceConsumption; + } + + /** + * Adds thread's starting resource consumption information + * @param threadId ID of the thread + * @param statsType stats type + * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException matching active thread entry was found which is not expected. + */ + public void startThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + final List threadResourceInfoList = resourceStats.computeIfAbsent(threadId, k -> new ArrayList<>()); + // active thread entry should not be present in the list + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + throw new IllegalStateException( + "unexpected active thread resource entry present [" + threadId + "]:[" + threadResourceInfo + "]" + ); + } + } + threadResourceInfoList.add(new ThreadResourceInfo(statsType, resourceUsageMetrics)); + } + + /** + * This method is used to update the resource consumption stats so that the data isn't too stale for long-running task. + * If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception. + * @param threadId ID of the thread + * @param statsType stats type + * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException if no matching active thread entry was found. + */ + public void updateThreadResourceStats(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + final List threadResourceInfoList = resourceStats.get(threadId); + if (threadResourceInfoList != null) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + // the active entry present in the list is updated + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); + return; + } + } + } + throw new IllegalStateException("cannot update if active thread resource entry is not present"); + } + + /** + * Record the thread's final resource consumption values. + * If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception. + * @param threadId ID of the thread + * @param statsType stats type + * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException if no matching active thread entry was found. + */ + public void stopThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + final List threadResourceInfoList = resourceStats.get(threadId); + if (threadResourceInfoList != null) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + threadResourceInfo.setActive(false); + threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); + return; + } + } + } + throw new IllegalStateException("cannot update final values if active thread resource entry is not present"); + } + /** * Report of the internal status of a task. These can vary wildly from task * to task because each task is implemented differently but we should try @@ -217,12 +358,12 @@ public class Task { } public TaskResult result(DiscoveryNode node, Exception error) throws IOException { - return new TaskResult(taskInfo(node.getId(), true), error); + return new TaskResult(taskInfo(node.getId(), true, true), error); } public TaskResult result(DiscoveryNode node, ActionResponse response) throws IOException { if (response instanceof ToXContent) { - return new TaskResult(taskInfo(node.getId(), true), (ToXContent) response); + return new TaskResult(taskInfo(node.getId(), true, true), (ToXContent) response); } else { throw new IllegalStateException("response has to implement ToXContent to be able to store the results"); } diff --git a/server/src/main/java/org/opensearch/tasks/TaskInfo.java b/server/src/main/java/org/opensearch/tasks/TaskInfo.java index cf77eaf540e..e6ba94a71b6 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/opensearch/tasks/TaskInfo.java @@ -86,6 +86,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment { private final Map headers; + private final TaskResourceStats resourceStats; + public TaskInfo( TaskId taskId, String type, @@ -97,7 +99,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment { boolean cancellable, boolean cancelled, TaskId parentTaskId, - Map headers + Map headers, + TaskResourceStats resourceStats ) { if (cancellable == false && cancelled == true) { throw new IllegalArgumentException("task cannot be cancelled"); @@ -113,11 +116,13 @@ public final class TaskInfo implements Writeable, ToXContentFragment { this.cancelled = cancelled; this.parentTaskId = parentTaskId; this.headers = headers; + this.resourceStats = resourceStats; } /** * Read from a stream. */ + @SuppressWarnings("unchecked") public TaskInfo(StreamInput in) throws IOException { taskId = TaskId.readFromStream(in); type = in.readString(); @@ -137,6 +142,11 @@ public final class TaskInfo implements Writeable, ToXContentFragment { } parentTaskId = TaskId.readFromStream(in); headers = in.readMap(StreamInput::readString, StreamInput::readString); + if (in.getVersion().onOrAfter(Version.V_2_0_0)) { + resourceStats = in.readOptionalWriteable(TaskResourceStats::new); + } else { + resourceStats = null; + } } @Override @@ -154,6 +164,9 @@ public final class TaskInfo implements Writeable, ToXContentFragment { } parentTaskId.writeTo(out); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); + if (out.getVersion().onOrAfter(Version.V_2_0_0)) { + out.writeOptionalWriteable(resourceStats); + } } public TaskId getTaskId() { @@ -226,6 +239,13 @@ public final class TaskInfo implements Writeable, ToXContentFragment { return headers; } + /** + * Returns the task resource information + */ + public TaskResourceStats getResourceStats() { + return resourceStats; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("node", taskId.getNodeId()); @@ -253,6 +273,11 @@ public final class TaskInfo implements Writeable, ToXContentFragment { builder.field(attribute.getKey(), attribute.getValue()); } builder.endObject(); + if (resourceStats != null) { + builder.startObject("resource_stats"); + resourceStats.toXContent(builder, params); + builder.endObject(); + } return builder; } @@ -278,6 +303,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment { // This might happen if we are reading an old version of task info headers = Collections.emptyMap(); } + @SuppressWarnings("unchecked") + TaskResourceStats resourceStats = (TaskResourceStats) a[i++]; RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes); TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString); return new TaskInfo( @@ -291,7 +318,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment { cancellable, cancelled, parentTaskId, - headers + headers, + resourceStats ); }); static { @@ -309,6 +337,7 @@ public final class TaskInfo implements Writeable, ToXContentFragment { PARSER.declareBoolean(optionalConstructorArg(), new ParseField("cancelled")); PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id")); PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers")); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> TaskResourceStats.fromXContent(p), new ParseField("resource_stats")); } @Override @@ -333,7 +362,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment { && Objects.equals(cancellable, other.cancellable) && Objects.equals(cancelled, other.cancelled) && Objects.equals(status, other.status) - && Objects.equals(headers, other.headers); + && Objects.equals(headers, other.headers) + && Objects.equals(resourceStats, other.resourceStats); } @Override @@ -349,7 +379,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment { cancellable, cancelled, status, - headers + headers, + resourceStats ); } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceStats.java b/server/src/main/java/org/opensearch/tasks/TaskResourceStats.java new file mode 100644 index 00000000000..c35e08ebb34 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceStats.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Resource information about a currently running task. + *

+ * Writeable TaskResourceStats objects are used to represent resource + * snapshot information about currently running task. + */ +public class TaskResourceStats implements Writeable, ToXContentFragment { + private final Map resourceUsage; + + public TaskResourceStats(Map resourceUsage) { + this.resourceUsage = Objects.requireNonNull(resourceUsage, "resource usage is required"); + } + + /** + * Read from a stream. + */ + public TaskResourceStats(StreamInput in) throws IOException { + resourceUsage = in.readMap(StreamInput::readString, TaskResourceUsage::readFromStream); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(resourceUsage, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream)); + } + + public Map getResourceUsageInfo() { + return resourceUsage; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + for (Map.Entry resourceUsageEntry : resourceUsage.entrySet()) { + builder.startObject(resourceUsageEntry.getKey()); + if (resourceUsageEntry.getValue() != null) { + resourceUsageEntry.getValue().toXContent(builder, params); + } + builder.endObject(); + } + return builder; + } + + public static TaskResourceStats fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token == null) { + token = parser.nextToken(); + } + if (token == XContentParser.Token.START_OBJECT) { + token = parser.nextToken(); + } + final Map resourceStats = new HashMap<>(); + if (token == XContentParser.Token.FIELD_NAME) { + assert parser.currentToken() == XContentParser.Token.FIELD_NAME : "Expected field name but saw [" + parser.currentToken() + "]"; + do { + // Must point to field name + String fieldName = parser.currentName(); + // And then the value + TaskResourceUsage value = TaskResourceUsage.fromXContent(parser); + resourceStats.put(fieldName, value); + } while (parser.nextToken() == XContentParser.Token.FIELD_NAME); + } + return new TaskResourceStats(resourceStats); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } + + // Implements equals and hashcode for testing + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != TaskResourceStats.class) { + return false; + } + TaskResourceStats other = (TaskResourceStats) obj; + return Objects.equals(resourceUsage, other.resourceUsage); + } + + @Override + public int hashCode() { + return Objects.hash(resourceUsage); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceUsage.java b/server/src/main/java/org/opensearch/tasks/TaskResourceUsage.java new file mode 100644 index 00000000000..6af3de2b78c --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceUsage.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import org.opensearch.common.ParseField; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ConstructingObjectParser; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg; + +/** + * Task resource usage information + *

+ * Writeable TaskResourceUsage objects are used to represent resource usage + * information of running tasks. + */ +public class TaskResourceUsage implements Writeable, ToXContentFragment { + private static final ParseField CPU_TIME_IN_NANOS = new ParseField("cpu_time_in_nanos"); + private static final ParseField MEMORY_IN_BYTES = new ParseField("memory_in_bytes"); + + private final long cpuTimeInNanos; + private final long memoryInBytes; + + public TaskResourceUsage(long cpuTimeInNanos, long memoryInBytes) { + this.cpuTimeInNanos = cpuTimeInNanos; + this.memoryInBytes = memoryInBytes; + } + + /** + * Read from a stream. + */ + public static TaskResourceUsage readFromStream(StreamInput in) throws IOException { + return new TaskResourceUsage(in.readVLong(), in.readVLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(cpuTimeInNanos); + out.writeVLong(memoryInBytes); + } + + public long getCpuTimeInNanos() { + return cpuTimeInNanos; + } + + public long getMemoryInBytes() { + return memoryInBytes; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(CPU_TIME_IN_NANOS.getPreferredName(), cpuTimeInNanos); + builder.field(MEMORY_IN_BYTES.getPreferredName(), memoryInBytes); + return builder; + } + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "task_resource_usage", + a -> new TaskResourceUsage((Long) a[0], (Long) a[1]) + ); + + static { + PARSER.declareLong(constructorArg(), CPU_TIME_IN_NANOS); + PARSER.declareLong(constructorArg(), MEMORY_IN_BYTES); + } + + public static TaskResourceUsage fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } + + // Implements equals and hashcode for testing + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != TaskResourceUsage.class) { + return false; + } + TaskResourceUsage other = (TaskResourceUsage) obj; + return Objects.equals(cpuTimeInNanos, other.cpuTimeInNanos) && Objects.equals(memoryInBytes, other.memoryInBytes); + } + + @Override + public int hashCode() { + return Objects.hash(cpuTimeInNanos, memoryInBytes); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ThreadResourceInfo.java b/server/src/main/java/org/opensearch/tasks/ThreadResourceInfo.java new file mode 100644 index 00000000000..8b45c38c8fb --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ThreadResourceInfo.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +/** + * Resource consumption information about a particular execution of thread. + *

+ * It captures the resource usage information about a particular execution of thread + * for a specific stats type like worker_stats or response_stats etc., + */ +public class ThreadResourceInfo { + private volatile boolean isActive = true; + private final ResourceStatsType statsType; + private final ResourceUsageInfo resourceUsageInfo; + + public ThreadResourceInfo(ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + this.statsType = statsType; + this.resourceUsageInfo = new ResourceUsageInfo(resourceUsageMetrics); + } + + /** + * Updates thread's resource consumption information. + */ + public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) { + resourceUsageInfo.recordResourceUsageMetrics(resourceUsageMetrics); + } + + public void setActive(boolean isActive) { + this.isActive = isActive; + } + + public boolean isActive() { + return isActive; + } + + public ResourceStatsType getStatsType() { + return statsType; + } + + public ResourceUsageInfo getResourceUsageInfo() { + return resourceUsageInfo; + } + + @Override + public String toString() { + return resourceUsageInfo + ", stats_type=" + statsType + ", is_active=" + isActive; + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java index 5f8d5992c9f..45db94577f1 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java @@ -31,16 +31,23 @@ package org.opensearch.action.admin.cluster.node.tasks; +import org.opensearch.action.search.SearchAction; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.tasks.TaskInfo; +import org.opensearch.tasks.ResourceUsageMetric; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.ResourceStatsType; import org.opensearch.test.OpenSearchTestCase; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Map; +import static org.opensearch.tasks.TaskInfoTests.randomResourceStats; + public class TaskTests extends OpenSearchTestCase { public void testTaskInfoToString() { @@ -61,7 +68,8 @@ public class TaskTests extends OpenSearchTestCase { cancellable, cancelled, TaskId.EMPTY_TASK_ID, - Collections.singletonMap("foo", "bar") + Collections.singletonMap("foo", "bar"), + randomResourceStats(randomBoolean()) ); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); @@ -94,7 +102,8 @@ public class TaskTests extends OpenSearchTestCase { cancellable, cancelled, TaskId.EMPTY_TASK_ID, - Collections.singletonMap("foo", "bar") + Collections.singletonMap("foo", "bar"), + randomResourceStats(randomBoolean()) ); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); @@ -120,7 +129,8 @@ public class TaskTests extends OpenSearchTestCase { cancellable, cancelled, TaskId.EMPTY_TASK_ID, - Collections.singletonMap("foo", "bar") + Collections.singletonMap("foo", "bar"), + randomResourceStats(randomBoolean()) ); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); @@ -148,9 +158,75 @@ public class TaskTests extends OpenSearchTestCase { cancellable, cancelled, TaskId.EMPTY_TASK_ID, - Collections.singletonMap("foo", "bar") + Collections.singletonMap("foo", "bar"), + randomResourceStats(randomBoolean()) ) ); assertEquals(e.getMessage(), "task cannot be cancelled"); } + + public void testTaskResourceStats() { + final Task task = new Task( + randomLong(), + "transport", + SearchAction.NAME, + "description", + new TaskId(randomLong() + ":" + randomLong()), + Collections.emptyMap() + ); + + long totalMemory = 0L; + long totalCPU = 0L; + + // reporting resource consumption events and checking total consumption values + for (int i = 0; i < randomInt(10); i++) { + long initial_memory = randomLongBetween(1, 100); + long initial_cpu = randomLongBetween(1, 100); + + ResourceUsageMetric[] initialTaskResourceMetrics = new ResourceUsageMetric[] { + new ResourceUsageMetric(ResourceStats.MEMORY, initial_memory), + new ResourceUsageMetric(ResourceStats.CPU, initial_cpu) }; + task.startThreadResourceTracking(i, ResourceStatsType.WORKER_STATS, initialTaskResourceMetrics); + + long memory = initial_memory + randomLongBetween(1, 10000); + long cpu = initial_cpu + randomLongBetween(1, 10000); + + totalMemory += memory - initial_memory; + totalCPU += cpu - initial_cpu; + + ResourceUsageMetric[] taskResourceMetrics = new ResourceUsageMetric[] { + new ResourceUsageMetric(ResourceStats.MEMORY, memory), + new ResourceUsageMetric(ResourceStats.CPU, cpu) }; + task.updateThreadResourceStats(i, ResourceStatsType.WORKER_STATS, taskResourceMetrics); + task.stopThreadResourceTracking(i, ResourceStatsType.WORKER_STATS); + } + assertEquals(task.getTotalResourceStats().getMemoryInBytes(), totalMemory); + assertEquals(task.getTotalResourceStats().getCpuTimeInNanos(), totalCPU); + + // updating should throw an IllegalStateException when active entry is not present. + try { + task.updateThreadResourceStats(randomInt(), ResourceStatsType.WORKER_STATS); + fail("update should not be successful as active entry is not present!"); + } catch (IllegalStateException e) { + // pass + } + + // re-adding a thread entry that is already present, should throw an exception + int threadId = randomInt(); + task.startThreadResourceTracking(threadId, ResourceStatsType.WORKER_STATS, new ResourceUsageMetric(ResourceStats.MEMORY, 100)); + try { + task.startThreadResourceTracking(threadId, ResourceStatsType.WORKER_STATS); + fail("add/start should not be successful as active entry is already present!"); + } catch (IllegalStateException e) { + // pass + } + + // existing active entry is present only for memory, update cannot be called with cpu values. + try { + task.updateThreadResourceStats(threadId, ResourceStatsType.WORKER_STATS, new ResourceUsageMetric(ResourceStats.CPU, 200)); + fail("update should not be successful as entry for CPU is not present!"); + } catch (IllegalStateException e) { + // pass + } + } } diff --git a/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java b/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java index 64d2979c2c5..c0ec4ca3d31 100644 --- a/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java +++ b/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java @@ -62,7 +62,7 @@ public class CancelTasksResponseTests extends AbstractXContentTestCase randomTasks() { List randomTasks = new ArrayList<>(); for (int i = 0; i < randomInt(10); i++) { - randomTasks.add(TaskInfoTests.randomTaskInfo()); + randomTasks.add(TaskInfoTests.randomTaskInfo(false)); } return randomTasks; } diff --git a/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java index 4d5feb46de1..0201509d03a 100644 --- a/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.net.ConnectException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.function.Predicate; import java.util.function.Supplier; @@ -72,7 +73,12 @@ public class ListTasksResponseTests extends AbstractXContentTestCase() { + { + put("dummy-type1", new TaskResourceUsage(100, 100)); + } + }) ); ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList()); assertEquals( @@ -93,6 +99,12 @@ public class ListTasksResponseTests extends AbstractXContentTestCase getRandomFieldsExcludeFilter() { - // status and headers hold arbitrary content, we can't inject random fields in them - return field -> field.endsWith("status") || field.endsWith("headers"); + // status, headers and resource_stats hold arbitrary content, we can't inject random fields in them + return field -> field.endsWith("status") || field.endsWith("headers") || field.contains("resource_stats"); } @Override diff --git a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java index 89b690d81a4..7c8cb323065 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java @@ -77,13 +77,13 @@ public class TaskInfoTests extends AbstractSerializingTestCase { @Override protected Predicate getRandomFieldsExcludeFilter() { - // status and headers hold arbitrary content, we can't inject random fields in them - return field -> "status".equals(field) || "headers".equals(field); + // status, headers and resource_stats hold arbitrary content, we can't inject random fields in them + return field -> "status".equals(field) || "headers".equals(field) || field.contains("resource_stats"); } @Override protected TaskInfo mutateInstance(TaskInfo info) { - switch (between(0, 9)) { + switch (between(0, 10)) { case 0: TaskId taskId = new TaskId(info.getTaskId().getNodeId() + randomAlphaOfLength(5), info.getTaskId().getId()); return new TaskInfo( @@ -97,7 +97,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable(), info.isCancelled(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getResourceStats() ); case 1: return new TaskInfo( @@ -111,7 +112,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable(), info.isCancelled(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getResourceStats() ); case 2: return new TaskInfo( @@ -125,7 +127,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable(), info.isCancelled(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getResourceStats() ); case 3: return new TaskInfo( @@ -139,7 +142,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable(), info.isCancelled(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getResourceStats() ); case 4: Task.Status newStatus = randomValueOtherThan(info.getStatus(), TaskInfoTests::randomRawTaskStatus); @@ -154,7 +158,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable(), info.isCancelled(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getResourceStats() ); case 5: return new TaskInfo( @@ -168,7 +173,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable(), info.isCancelled(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getResourceStats() ); case 6: return new TaskInfo( @@ -182,7 +188,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable(), info.isCancelled(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getResourceStats() ); case 7: return new TaskInfo( @@ -196,7 +203,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable() == false, false, info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getResourceStats() ); case 8: TaskId parentId = new TaskId(info.getParentTaskId().getNodeId() + randomAlphaOfLength(5), info.getParentTaskId().getId()); @@ -211,7 +219,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable(), info.isCancelled(), parentId, - info.getHeaders() + info.getHeaders(), + info.getResourceStats() ); case 9: Map headers = info.getHeaders(); @@ -232,7 +241,30 @@ public class TaskInfoTests extends AbstractSerializingTestCase { info.isCancellable(), info.isCancelled(), info.getParentTaskId(), - headers + headers, + info.getResourceStats() + ); + case 10: + Map resourceUsageMap; + if (info.getResourceStats() == null) { + resourceUsageMap = new HashMap<>(1); + } else { + resourceUsageMap = new HashMap<>(info.getResourceStats().getResourceUsageInfo()); + } + resourceUsageMap.put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong())); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + info.getHeaders(), + new TaskResourceStats(resourceUsageMap) ); default: throw new IllegalStateException(); @@ -240,11 +272,15 @@ public class TaskInfoTests extends AbstractSerializingTestCase { } static TaskInfo randomTaskInfo() { + return randomTaskInfo(randomBoolean()); + } + + static TaskInfo randomTaskInfo(boolean detailed) { TaskId taskId = randomTaskId(); String type = randomAlphaOfLength(5); String action = randomAlphaOfLength(5); - Task.Status status = randomBoolean() ? randomRawTaskStatus() : null; - String description = randomBoolean() ? randomAlphaOfLength(5) : null; + Task.Status status = detailed ? randomRawTaskStatus() : null; + String description = detailed ? randomAlphaOfLength(5) : null; long startTime = randomLong(); long runningTimeNanos = randomLong(); boolean cancellable = randomBoolean(); @@ -264,7 +300,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase { cancellable, cancelled, parentTaskId, - headers + headers, + randomResourceStats(detailed) ); } @@ -285,4 +322,14 @@ public class TaskInfoTests extends AbstractSerializingTestCase { throw new IllegalStateException(e); } } + + public static TaskResourceStats randomResourceStats(boolean detailed) { + return detailed ? new TaskResourceStats(new HashMap() { + { + for (int i = 0; i < randomInt(5); i++) { + put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong())); + } + } + }) : null; + } }