Add resource stats to task framework (#2089)

* Add resource stats to task framework

Signed-off-by: sruti1312 <srutiparthiban@gmail.com>

* Update thread resource info and add tests

Signed-off-by: sruti1312 <srutiparthiban@gmail.com>
This commit is contained in:
Sruti Parthiban 2022-03-28 10:00:59 -07:00 committed by GitHub
parent 5dd75bb0aa
commit 8b997c1d84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 844 additions and 43 deletions

View File

@ -57,6 +57,7 @@ public class TaskInfo {
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
private final Map<String, Object> resourceStats = new HashMap<>();
public TaskInfo(TaskId taskId) {
this.taskId = taskId;
@ -150,6 +151,14 @@ public class TaskInfo {
return status;
}
void setResourceStats(Map<String, Object> resourceStats) {
this.resourceStats.putAll(resourceStats);
}
public Map<String, Object> getResourceStats() {
return resourceStats;
}
private void noOpParse(Object s) {}
public static final ObjectParser.NamedObjectParser<TaskInfo, Void> PARSER;
@ -170,6 +179,7 @@ public class TaskInfo {
parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
parser.declareObject(TaskInfo::setResourceStats, (p, c) -> p.map(), new ParseField("resource_stats"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
}
@ -188,7 +198,8 @@ public class TaskInfo {
&& Objects.equals(getDescription(), taskInfo.getDescription())
&& Objects.equals(getParentTaskId(), taskInfo.getParentTaskId())
&& Objects.equals(status, taskInfo.status)
&& Objects.equals(getHeaders(), taskInfo.getHeaders());
&& Objects.equals(getHeaders(), taskInfo.getHeaders())
&& Objects.equals(getResourceStats(), taskInfo.getResourceStats());
}
@Override
@ -204,7 +215,8 @@ public class TaskInfo {
isCancelled(),
getParentTaskId(),
status,
getHeaders()
getHeaders(),
getResourceStats()
);
}
@ -236,6 +248,8 @@ public class TaskInfo {
+ status
+ ", headers="
+ headers
+ ", resource_stats="
+ resourceStats
+ '}';
}
}

View File

@ -38,6 +38,8 @@ import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.tasks.RawTaskStatus;
import org.opensearch.tasks.TaskResourceStats;
import org.opensearch.tasks.TaskResourceUsage;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
@ -45,6 +47,7 @@ import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.opensearch.test.AbstractXContentTestCase.xContentTester;
@ -57,7 +60,7 @@ public class GetTaskResponseTests extends OpenSearchTestCase {
)
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(true)
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status"))
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status") || field.contains("resource_stats"))
.test();
}
@ -106,7 +109,8 @@ public class GetTaskResponseTests extends OpenSearchTestCase {
cancellable,
cancelled,
parentTaskId,
headers
headers,
randomResourceStats()
);
}
@ -127,4 +131,14 @@ public class GetTaskResponseTests extends OpenSearchTestCase {
throw new IllegalStateException(e);
}
}
private static TaskResourceStats randomResourceStats() {
return randomBoolean() ? null : new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
{
for (int i = 0; i < randomInt(5); i++) {
put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong()));
}
}
});
}
}

View File

@ -96,7 +96,8 @@ public class CancelTasksResponseTests extends AbstractResponseTestCase<
cancellable,
cancelled,
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value")
Collections.singletonMap("x-header-of", "some-value"),
null
)
);
}

View File

@ -131,7 +131,8 @@ public class TransportRethrottleActionTests extends OpenSearchTestCase {
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()
Collections.emptyMap(),
null
)
);
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
@ -167,7 +168,8 @@ public class TransportRethrottleActionTests extends OpenSearchTestCase {
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()
Collections.emptyMap(),
null
)
);
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));

View File

@ -907,7 +907,8 @@ public class TasksIT extends OpenSearchIntegTestCase {
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap()
Collections.emptyMap(),
null
),
new RuntimeException("test")
),

View File

@ -137,6 +137,7 @@ public class RestTasksAction extends AbstractCatAction {
// Task detailed info
if (detailed) {
table.addCell("description", "default:true;alias:desc;desc:task action");
table.addCell("resource_stats", "default:false;desc:resource consumption info of the task");
}
table.endHeaders();
return table;
@ -173,6 +174,7 @@ public class RestTasksAction extends AbstractCatAction {
if (detailed) {
table.addCell(taskInfo.getDescription());
table.addCell(taskInfo.getResourceStats());
}
table.endRow();
}

View File

@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.tasks;
/**
* Different resource stats are defined.
*/
public enum ResourceStats {
CPU("cpu_time_in_nanos"),
MEMORY("memory_in_bytes");
private final String statsName;
ResourceStats(String statsName) {
this.statsName = statsName;
}
@Override
public String toString() {
return statsName;
}
}

View File

@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.tasks;
/** Defines the different types of resource stats. */
public enum ResourceStatsType {
// resource stats of the worker thread reported directly from runnable.
WORKER_STATS("worker_stats", false);
private final String statsType;
private final boolean onlyForAnalysis;
ResourceStatsType(String statsType, boolean onlyForAnalysis) {
this.statsType = statsType;
this.onlyForAnalysis = onlyForAnalysis;
}
public boolean isOnlyForAnalysis() {
return onlyForAnalysis;
}
@Override
public String toString() {
return statsType;
}
}

View File

@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.tasks;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* Thread resource usage information for particular resource stats type.
* <p>
* It captures the resource usage information like memory, CPU about a particular execution of thread
* for a specific stats type.
*/
public class ResourceUsageInfo {
private static final Logger logger = LogManager.getLogger(ResourceUsageInfo.class);
private final EnumMap<ResourceStats, ResourceStatsInfo> statsInfo = new EnumMap<>(ResourceStats.class);
public ResourceUsageInfo(ResourceUsageMetric... resourceUsageMetrics) {
for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) {
this.statsInfo.put(resourceUsageMetric.getStats(), new ResourceStatsInfo(resourceUsageMetric.getValue()));
}
}
public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) {
for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) {
final ResourceStatsInfo resourceStatsInfo = statsInfo.get(resourceUsageMetric.getStats());
if (resourceStatsInfo != null) {
updateResourceUsageInfo(resourceStatsInfo, resourceUsageMetric);
} else {
throw new IllegalStateException(
"cannot update ["
+ resourceUsageMetric.getStats().toString()
+ "] entry as its not present current_stats_info:"
+ statsInfo
);
}
}
}
private void updateResourceUsageInfo(ResourceStatsInfo resourceStatsInfo, ResourceUsageMetric resourceUsageMetric) {
long currentEndValue;
long newEndValue;
do {
currentEndValue = resourceStatsInfo.endValue.get();
newEndValue = resourceUsageMetric.getValue();
if (currentEndValue > newEndValue) {
logger.debug(
"dropping resource usage update as the new value is lower than current value ["
+ "resource_stats=[{}], "
+ "current_end_value={}, "
+ "new_end_value={}]",
resourceUsageMetric.getStats(),
currentEndValue,
newEndValue
);
return;
}
} while (!resourceStatsInfo.endValue.compareAndSet(currentEndValue, newEndValue));
logger.debug(
"updated resource usage info [resource_stats=[{}], " + "old_end_value={}, new_end_value={}]",
resourceUsageMetric.getStats(),
currentEndValue,
newEndValue
);
}
public Map<ResourceStats, ResourceStatsInfo> getStatsInfo() {
return Collections.unmodifiableMap(statsInfo);
}
@Override
public String toString() {
return statsInfo.toString();
}
/**
* Defines resource stats information.
*/
static class ResourceStatsInfo {
private final long startValue;
private final AtomicLong endValue;
private ResourceStatsInfo(long startValue) {
this.startValue = startValue;
this.endValue = new AtomicLong(startValue);
}
public long getTotalValue() {
return endValue.get() - startValue;
}
@Override
public String toString() {
return String.valueOf(getTotalValue());
}
}
}

View File

@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.tasks;
public class ResourceUsageMetric {
private final ResourceStats stats;
private final long value;
public ResourceUsageMetric(ResourceStats stats, long value) {
this.stats = stats;
this.value = value;
}
public ResourceStats getStats() {
return stats;
}
public long getValue() {
return value;
}
}

View File

@ -32,6 +32,8 @@
package org.opensearch.tasks;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteable;
@ -39,18 +41,27 @@ import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Current task information
*/
public class Task {
private static final Logger logger = LogManager.getLogger(Task.class);
/**
* The request header to mark tasks with specific ids
*/
public static final String X_OPAQUE_ID = "X-Opaque-Id";
private static final String TOTAL = "total";
private final long id;
private final String type;
@ -63,6 +74,8 @@ public class Task {
private final Map<String, String> headers;
private final Map<Long, List<ThreadResourceInfo>> resourceStats;
/**
* The task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style).
*/
@ -74,7 +87,7 @@ public class Task {
private final long startTimeNanos;
public Task(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime(), headers);
this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime(), headers, new ConcurrentHashMap<>());
}
public Task(
@ -85,7 +98,8 @@ public class Task {
TaskId parentTask,
long startTime,
long startTimeNanos,
Map<String, String> headers
Map<String, String> headers,
ConcurrentHashMap<Long, List<ThreadResourceInfo>> resourceStats
) {
this.id = id;
this.type = type;
@ -95,6 +109,7 @@ public class Task {
this.startTime = startTime;
this.startTimeNanos = startTimeNanos;
this.headers = headers;
this.resourceStats = resourceStats;
}
/**
@ -108,19 +123,48 @@ public class Task {
* generate data?
*/
public final TaskInfo taskInfo(String localNodeId, boolean detailed) {
return taskInfo(localNodeId, detailed, detailed == false);
}
/**
* Build a version of the task status you can throw over the wire and back
* with the option to include resource stats or not.
* This method is only used during creating TaskResult to avoid storing resource information into the task index.
*
* @param excludeStats should information exclude resource stats.
* By default, detailed flag is used to control including resource information.
* But inorder to avoid storing resource stats into task index as strict mapping is enforced and breaks when adding this field.
* In the future, task-index-mapping.json can be modified to add resource stats.
*/
private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeStats) {
String description = null;
Task.Status status = null;
TaskResourceStats resourceStats = null;
if (detailed) {
description = getDescription();
status = getStatus();
}
return taskInfo(localNodeId, description, status);
if (excludeStats == false) {
resourceStats = new TaskResourceStats(new HashMap<>() {
{
put(TOTAL, getTotalResourceStats());
}
});
}
return taskInfo(localNodeId, description, status, resourceStats);
}
/**
* Build a {@link TaskInfo} for this task without resource stats.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status) {
return taskInfo(localNodeId, description, status, null);
}
/**
* Build a proper {@link TaskInfo} for this task.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status) {
protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) {
return new TaskInfo(
new TaskId(localNodeId, getId()),
getType(),
@ -132,7 +176,8 @@ public class Task {
this instanceof CancellableTask,
this instanceof CancellableTask && ((CancellableTask) this).isCancelled(),
parentTask,
headers
headers,
resourceStats
);
}
@ -195,6 +240,102 @@ public class Task {
return null;
}
/**
* Returns thread level resource consumption of the task
*/
public Map<Long, List<ThreadResourceInfo>> getResourceStats() {
return Collections.unmodifiableMap(resourceStats);
}
/**
* Returns current total resource usage of the task.
* Currently, this method is only called on demand, during get and listing of tasks.
* In the future, these values can be cached as an optimization.
*/
public TaskResourceUsage getTotalResourceStats() {
return new TaskResourceUsage(getTotalResourceUtilization(ResourceStats.CPU), getTotalResourceUtilization(ResourceStats.MEMORY));
}
/**
* Returns total resource consumption for a specific task stat.
*/
public long getTotalResourceUtilization(ResourceStats stats) {
long totalResourceConsumption = 0L;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
totalResourceConsumption += statsInfo.getTotalValue();
}
}
}
return totalResourceConsumption;
}
/**
* Adds thread's starting resource consumption information
* @param threadId ID of the thread
* @param statsType stats type
* @param resourceUsageMetrics resource consumption metrics of the thread
* @throws IllegalStateException matching active thread entry was found which is not expected.
*/
public void startThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) {
final List<ThreadResourceInfo> threadResourceInfoList = resourceStats.computeIfAbsent(threadId, k -> new ArrayList<>());
// active thread entry should not be present in the list
for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) {
if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) {
throw new IllegalStateException(
"unexpected active thread resource entry present [" + threadId + "]:[" + threadResourceInfo + "]"
);
}
}
threadResourceInfoList.add(new ThreadResourceInfo(statsType, resourceUsageMetrics));
}
/**
* This method is used to update the resource consumption stats so that the data isn't too stale for long-running task.
* If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception.
* @param threadId ID of the thread
* @param statsType stats type
* @param resourceUsageMetrics resource consumption metrics of the thread
* @throws IllegalStateException if no matching active thread entry was found.
*/
public void updateThreadResourceStats(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) {
final List<ThreadResourceInfo> threadResourceInfoList = resourceStats.get(threadId);
if (threadResourceInfoList != null) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) {
// the active entry present in the list is updated
if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) {
threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics);
return;
}
}
}
throw new IllegalStateException("cannot update if active thread resource entry is not present");
}
/**
* Record the thread's final resource consumption values.
* If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception.
* @param threadId ID of the thread
* @param statsType stats type
* @param resourceUsageMetrics resource consumption metrics of the thread
* @throws IllegalStateException if no matching active thread entry was found.
*/
public void stopThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) {
final List<ThreadResourceInfo> threadResourceInfoList = resourceStats.get(threadId);
if (threadResourceInfoList != null) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) {
if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) {
threadResourceInfo.setActive(false);
threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics);
return;
}
}
}
throw new IllegalStateException("cannot update final values if active thread resource entry is not present");
}
/**
* Report of the internal status of a task. These can vary wildly from task
* to task because each task is implemented differently but we should try
@ -217,12 +358,12 @@ public class Task {
}
public TaskResult result(DiscoveryNode node, Exception error) throws IOException {
return new TaskResult(taskInfo(node.getId(), true), error);
return new TaskResult(taskInfo(node.getId(), true, true), error);
}
public TaskResult result(DiscoveryNode node, ActionResponse response) throws IOException {
if (response instanceof ToXContent) {
return new TaskResult(taskInfo(node.getId(), true), (ToXContent) response);
return new TaskResult(taskInfo(node.getId(), true, true), (ToXContent) response);
} else {
throw new IllegalStateException("response has to implement ToXContent to be able to store the results");
}

View File

@ -86,6 +86,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
private final Map<String, String> headers;
private final TaskResourceStats resourceStats;
public TaskInfo(
TaskId taskId,
String type,
@ -97,7 +99,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
boolean cancellable,
boolean cancelled,
TaskId parentTaskId,
Map<String, String> headers
Map<String, String> headers,
TaskResourceStats resourceStats
) {
if (cancellable == false && cancelled == true) {
throw new IllegalArgumentException("task cannot be cancelled");
@ -113,11 +116,13 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
this.cancelled = cancelled;
this.parentTaskId = parentTaskId;
this.headers = headers;
this.resourceStats = resourceStats;
}
/**
* Read from a stream.
*/
@SuppressWarnings("unchecked")
public TaskInfo(StreamInput in) throws IOException {
taskId = TaskId.readFromStream(in);
type = in.readString();
@ -137,6 +142,11 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
}
parentTaskId = TaskId.readFromStream(in);
headers = in.readMap(StreamInput::readString, StreamInput::readString);
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
resourceStats = in.readOptionalWriteable(TaskResourceStats::new);
} else {
resourceStats = null;
}
}
@Override
@ -154,6 +164,9 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
}
parentTaskId.writeTo(out);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
if (out.getVersion().onOrAfter(Version.V_2_0_0)) {
out.writeOptionalWriteable(resourceStats);
}
}
public TaskId getTaskId() {
@ -226,6 +239,13 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
return headers;
}
/**
* Returns the task resource information
*/
public TaskResourceStats getResourceStats() {
return resourceStats;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node", taskId.getNodeId());
@ -253,6 +273,11 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
builder.field(attribute.getKey(), attribute.getValue());
}
builder.endObject();
if (resourceStats != null) {
builder.startObject("resource_stats");
resourceStats.toXContent(builder, params);
builder.endObject();
}
return builder;
}
@ -278,6 +303,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
// This might happen if we are reading an old version of task info
headers = Collections.emptyMap();
}
@SuppressWarnings("unchecked")
TaskResourceStats resourceStats = (TaskResourceStats) a[i++];
RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes);
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
return new TaskInfo(
@ -291,7 +318,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
cancellable,
cancelled,
parentTaskId,
headers
headers,
resourceStats
);
});
static {
@ -309,6 +337,7 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
PARSER.declareBoolean(optionalConstructorArg(), new ParseField("cancelled"));
PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> TaskResourceStats.fromXContent(p), new ParseField("resource_stats"));
}
@Override
@ -333,7 +362,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
&& Objects.equals(cancellable, other.cancellable)
&& Objects.equals(cancelled, other.cancelled)
&& Objects.equals(status, other.status)
&& Objects.equals(headers, other.headers);
&& Objects.equals(headers, other.headers)
&& Objects.equals(resourceStats, other.resourceStats);
}
@Override
@ -349,7 +379,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
cancellable,
cancelled,
status,
headers
headers,
resourceStats
);
}
}

View File

@ -0,0 +1,106 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.tasks;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Resource information about a currently running task.
* <p>
* Writeable TaskResourceStats objects are used to represent resource
* snapshot information about currently running task.
*/
public class TaskResourceStats implements Writeable, ToXContentFragment {
private final Map<String, TaskResourceUsage> resourceUsage;
public TaskResourceStats(Map<String, TaskResourceUsage> resourceUsage) {
this.resourceUsage = Objects.requireNonNull(resourceUsage, "resource usage is required");
}
/**
* Read from a stream.
*/
public TaskResourceStats(StreamInput in) throws IOException {
resourceUsage = in.readMap(StreamInput::readString, TaskResourceUsage::readFromStream);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(resourceUsage, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
}
public Map<String, TaskResourceUsage> getResourceUsageInfo() {
return resourceUsage;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (Map.Entry<String, TaskResourceUsage> resourceUsageEntry : resourceUsage.entrySet()) {
builder.startObject(resourceUsageEntry.getKey());
if (resourceUsageEntry.getValue() != null) {
resourceUsageEntry.getValue().toXContent(builder, params);
}
builder.endObject();
}
return builder;
}
public static TaskResourceStats fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == null) {
token = parser.nextToken();
}
if (token == XContentParser.Token.START_OBJECT) {
token = parser.nextToken();
}
final Map<String, TaskResourceUsage> resourceStats = new HashMap<>();
if (token == XContentParser.Token.FIELD_NAME) {
assert parser.currentToken() == XContentParser.Token.FIELD_NAME : "Expected field name but saw [" + parser.currentToken() + "]";
do {
// Must point to field name
String fieldName = parser.currentName();
// And then the value
TaskResourceUsage value = TaskResourceUsage.fromXContent(parser);
resourceStats.put(fieldName, value);
} while (parser.nextToken() == XContentParser.Token.FIELD_NAME);
}
return new TaskResourceStats(resourceStats);
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
// Implements equals and hashcode for testing
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != TaskResourceStats.class) {
return false;
}
TaskResourceStats other = (TaskResourceStats) obj;
return Objects.equals(resourceUsage, other.resourceUsage);
}
@Override
public int hashCode() {
return Objects.hash(resourceUsage);
}
}

View File

@ -0,0 +1,105 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.tasks;
import org.opensearch.common.ParseField;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ConstructingObjectParser;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg;
/**
* Task resource usage information
* <p>
* Writeable TaskResourceUsage objects are used to represent resource usage
* information of running tasks.
*/
public class TaskResourceUsage implements Writeable, ToXContentFragment {
private static final ParseField CPU_TIME_IN_NANOS = new ParseField("cpu_time_in_nanos");
private static final ParseField MEMORY_IN_BYTES = new ParseField("memory_in_bytes");
private final long cpuTimeInNanos;
private final long memoryInBytes;
public TaskResourceUsage(long cpuTimeInNanos, long memoryInBytes) {
this.cpuTimeInNanos = cpuTimeInNanos;
this.memoryInBytes = memoryInBytes;
}
/**
* Read from a stream.
*/
public static TaskResourceUsage readFromStream(StreamInput in) throws IOException {
return new TaskResourceUsage(in.readVLong(), in.readVLong());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(cpuTimeInNanos);
out.writeVLong(memoryInBytes);
}
public long getCpuTimeInNanos() {
return cpuTimeInNanos;
}
public long getMemoryInBytes() {
return memoryInBytes;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(CPU_TIME_IN_NANOS.getPreferredName(), cpuTimeInNanos);
builder.field(MEMORY_IN_BYTES.getPreferredName(), memoryInBytes);
return builder;
}
public static final ConstructingObjectParser<TaskResourceUsage, Void> PARSER = new ConstructingObjectParser<>(
"task_resource_usage",
a -> new TaskResourceUsage((Long) a[0], (Long) a[1])
);
static {
PARSER.declareLong(constructorArg(), CPU_TIME_IN_NANOS);
PARSER.declareLong(constructorArg(), MEMORY_IN_BYTES);
}
public static TaskResourceUsage fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
// Implements equals and hashcode for testing
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != TaskResourceUsage.class) {
return false;
}
TaskResourceUsage other = (TaskResourceUsage) obj;
return Objects.equals(cpuTimeInNanos, other.cpuTimeInNanos) && Objects.equals(memoryInBytes, other.memoryInBytes);
}
@Override
public int hashCode() {
return Objects.hash(cpuTimeInNanos, memoryInBytes);
}
}

View File

@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.tasks;
/**
* Resource consumption information about a particular execution of thread.
* <p>
* It captures the resource usage information about a particular execution of thread
* for a specific stats type like worker_stats or response_stats etc.,
*/
public class ThreadResourceInfo {
private volatile boolean isActive = true;
private final ResourceStatsType statsType;
private final ResourceUsageInfo resourceUsageInfo;
public ThreadResourceInfo(ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) {
this.statsType = statsType;
this.resourceUsageInfo = new ResourceUsageInfo(resourceUsageMetrics);
}
/**
* Updates thread's resource consumption information.
*/
public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) {
resourceUsageInfo.recordResourceUsageMetrics(resourceUsageMetrics);
}
public void setActive(boolean isActive) {
this.isActive = isActive;
}
public boolean isActive() {
return isActive;
}
public ResourceStatsType getStatsType() {
return statsType;
}
public ResourceUsageInfo getResourceUsageInfo() {
return resourceUsageInfo;
}
@Override
public String toString() {
return resourceUsageInfo + ", stats_type=" + statsType + ", is_active=" + isActive;
}
}

View File

@ -31,16 +31,23 @@
package org.opensearch.action.admin.cluster.node.tasks;
import org.opensearch.action.search.SearchAction;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.ResourceUsageMetric;
import org.opensearch.tasks.ResourceStats;
import org.opensearch.tasks.ResourceStatsType;
import org.opensearch.test.OpenSearchTestCase;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import static org.opensearch.tasks.TaskInfoTests.randomResourceStats;
public class TaskTests extends OpenSearchTestCase {
public void testTaskInfoToString() {
@ -61,7 +68,8 @@ public class TaskTests extends OpenSearchTestCase {
cancellable,
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar")
Collections.singletonMap("foo", "bar"),
randomResourceStats(randomBoolean())
);
String taskInfoString = taskInfo.toString();
Map<String, Object> map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2();
@ -94,7 +102,8 @@ public class TaskTests extends OpenSearchTestCase {
cancellable,
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar")
Collections.singletonMap("foo", "bar"),
randomResourceStats(randomBoolean())
);
String taskInfoString = taskInfo.toString();
Map<String, Object> map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2();
@ -120,7 +129,8 @@ public class TaskTests extends OpenSearchTestCase {
cancellable,
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar")
Collections.singletonMap("foo", "bar"),
randomResourceStats(randomBoolean())
);
String taskInfoString = taskInfo.toString();
Map<String, Object> map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2();
@ -148,9 +158,75 @@ public class TaskTests extends OpenSearchTestCase {
cancellable,
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar")
Collections.singletonMap("foo", "bar"),
randomResourceStats(randomBoolean())
)
);
assertEquals(e.getMessage(), "task cannot be cancelled");
}
public void testTaskResourceStats() {
final Task task = new Task(
randomLong(),
"transport",
SearchAction.NAME,
"description",
new TaskId(randomLong() + ":" + randomLong()),
Collections.emptyMap()
);
long totalMemory = 0L;
long totalCPU = 0L;
// reporting resource consumption events and checking total consumption values
for (int i = 0; i < randomInt(10); i++) {
long initial_memory = randomLongBetween(1, 100);
long initial_cpu = randomLongBetween(1, 100);
ResourceUsageMetric[] initialTaskResourceMetrics = new ResourceUsageMetric[] {
new ResourceUsageMetric(ResourceStats.MEMORY, initial_memory),
new ResourceUsageMetric(ResourceStats.CPU, initial_cpu) };
task.startThreadResourceTracking(i, ResourceStatsType.WORKER_STATS, initialTaskResourceMetrics);
long memory = initial_memory + randomLongBetween(1, 10000);
long cpu = initial_cpu + randomLongBetween(1, 10000);
totalMemory += memory - initial_memory;
totalCPU += cpu - initial_cpu;
ResourceUsageMetric[] taskResourceMetrics = new ResourceUsageMetric[] {
new ResourceUsageMetric(ResourceStats.MEMORY, memory),
new ResourceUsageMetric(ResourceStats.CPU, cpu) };
task.updateThreadResourceStats(i, ResourceStatsType.WORKER_STATS, taskResourceMetrics);
task.stopThreadResourceTracking(i, ResourceStatsType.WORKER_STATS);
}
assertEquals(task.getTotalResourceStats().getMemoryInBytes(), totalMemory);
assertEquals(task.getTotalResourceStats().getCpuTimeInNanos(), totalCPU);
// updating should throw an IllegalStateException when active entry is not present.
try {
task.updateThreadResourceStats(randomInt(), ResourceStatsType.WORKER_STATS);
fail("update should not be successful as active entry is not present!");
} catch (IllegalStateException e) {
// pass
}
// re-adding a thread entry that is already present, should throw an exception
int threadId = randomInt();
task.startThreadResourceTracking(threadId, ResourceStatsType.WORKER_STATS, new ResourceUsageMetric(ResourceStats.MEMORY, 100));
try {
task.startThreadResourceTracking(threadId, ResourceStatsType.WORKER_STATS);
fail("add/start should not be successful as active entry is already present!");
} catch (IllegalStateException e) {
// pass
}
// existing active entry is present only for memory, update cannot be called with cpu values.
try {
task.updateThreadResourceStats(threadId, ResourceStatsType.WORKER_STATS, new ResourceUsageMetric(ResourceStats.CPU, 200));
fail("update should not be successful as entry for CPU is not present!");
} catch (IllegalStateException e) {
// pass
}
}
}

View File

@ -62,7 +62,7 @@ public class CancelTasksResponseTests extends AbstractXContentTestCase<CancelTas
private static List<TaskInfo> randomTasks() {
List<TaskInfo> randomTasks = new ArrayList<>();
for (int i = 0; i < randomInt(10); i++) {
randomTasks.add(TaskInfoTests.randomTaskInfo());
randomTasks.add(TaskInfoTests.randomTaskInfo(false));
}
return randomTasks;
}

View File

@ -45,6 +45,7 @@ import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -72,7 +73,12 @@ public class ListTasksResponseTests extends AbstractXContentTestCase<ListTasksRe
true,
false,
new TaskId("node1", 0),
Collections.singletonMap("foo", "bar")
Collections.singletonMap("foo", "bar"),
new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
{
put("dummy-type1", new TaskResourceUsage(100, 100));
}
})
);
ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList());
assertEquals(
@ -93,6 +99,12 @@ public class ListTasksResponseTests extends AbstractXContentTestCase<ListTasksRe
+ " \"parent_task_id\" : \"node1:0\",\n"
+ " \"headers\" : {\n"
+ " \"foo\" : \"bar\"\n"
+ " },\n"
+ " \"resource_stats\" : {\n"
+ " \"dummy-type1\" : {\n"
+ " \"cpu_time_in_nanos\" : 100,\n"
+ " \"memory_in_bytes\" : 100\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ]\n"
@ -127,8 +139,8 @@ public class ListTasksResponseTests extends AbstractXContentTestCase<ListTasksRe
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
// status and headers hold arbitrary content, we can't inject random fields in them
return field -> field.endsWith("status") || field.endsWith("headers");
// status, headers and resource_stats hold arbitrary content, we can't inject random fields in them
return field -> field.endsWith("status") || field.endsWith("headers") || field.contains("resource_stats");
}
@Override

View File

@ -77,13 +77,13 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
// status and headers hold arbitrary content, we can't inject random fields in them
return field -> "status".equals(field) || "headers".equals(field);
// status, headers and resource_stats hold arbitrary content, we can't inject random fields in them
return field -> "status".equals(field) || "headers".equals(field) || field.contains("resource_stats");
}
@Override
protected TaskInfo mutateInstance(TaskInfo info) {
switch (between(0, 9)) {
switch (between(0, 10)) {
case 0:
TaskId taskId = new TaskId(info.getTaskId().getNodeId() + randomAlphaOfLength(5), info.getTaskId().getId());
return new TaskInfo(
@ -97,7 +97,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
info.getHeaders(),
info.getResourceStats()
);
case 1:
return new TaskInfo(
@ -111,7 +112,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
info.getHeaders(),
info.getResourceStats()
);
case 2:
return new TaskInfo(
@ -125,7 +127,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
info.getHeaders(),
info.getResourceStats()
);
case 3:
return new TaskInfo(
@ -139,7 +142,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
info.getHeaders(),
info.getResourceStats()
);
case 4:
Task.Status newStatus = randomValueOtherThan(info.getStatus(), TaskInfoTests::randomRawTaskStatus);
@ -154,7 +158,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
info.getHeaders(),
info.getResourceStats()
);
case 5:
return new TaskInfo(
@ -168,7 +173,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
info.getHeaders(),
info.getResourceStats()
);
case 6:
return new TaskInfo(
@ -182,7 +188,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
info.getHeaders(),
info.getResourceStats()
);
case 7:
return new TaskInfo(
@ -196,7 +203,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable() == false,
false,
info.getParentTaskId(),
info.getHeaders()
info.getHeaders(),
info.getResourceStats()
);
case 8:
TaskId parentId = new TaskId(info.getParentTaskId().getNodeId() + randomAlphaOfLength(5), info.getParentTaskId().getId());
@ -211,7 +219,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable(),
info.isCancelled(),
parentId,
info.getHeaders()
info.getHeaders(),
info.getResourceStats()
);
case 9:
Map<String, String> headers = info.getHeaders();
@ -232,7 +241,30 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
headers
headers,
info.getResourceStats()
);
case 10:
Map<String, TaskResourceUsage> resourceUsageMap;
if (info.getResourceStats() == null) {
resourceUsageMap = new HashMap<>(1);
} else {
resourceUsageMap = new HashMap<>(info.getResourceStats().getResourceUsageInfo());
}
resourceUsageMap.put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong()));
return new TaskInfo(
info.getTaskId(),
info.getType(),
info.getAction(),
info.getDescription(),
info.getStatus(),
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders(),
new TaskResourceStats(resourceUsageMap)
);
default:
throw new IllegalStateException();
@ -240,11 +272,15 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
}
static TaskInfo randomTaskInfo() {
return randomTaskInfo(randomBoolean());
}
static TaskInfo randomTaskInfo(boolean detailed) {
TaskId taskId = randomTaskId();
String type = randomAlphaOfLength(5);
String action = randomAlphaOfLength(5);
Task.Status status = randomBoolean() ? randomRawTaskStatus() : null;
String description = randomBoolean() ? randomAlphaOfLength(5) : null;
Task.Status status = detailed ? randomRawTaskStatus() : null;
String description = detailed ? randomAlphaOfLength(5) : null;
long startTime = randomLong();
long runningTimeNanos = randomLong();
boolean cancellable = randomBoolean();
@ -264,7 +300,8 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
cancellable,
cancelled,
parentTaskId,
headers
headers,
randomResourceStats(detailed)
);
}
@ -285,4 +322,14 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
throw new IllegalStateException(e);
}
}
public static TaskResourceStats randomResourceStats(boolean detailed) {
return detailed ? new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
{
for (int i = 0; i < randomInt(5); i++) {
put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong()));
}
}
}) : null;
}
}