Adding a cancelled field to tell if a cancellable task is cancelled (#1732)

* Changes made by adding the Task indicator and the IllegalArgs.

Signed-off-by: meghasaik <kavmegha@amazon.com>

* Changes made by changing the String message for uncancellable task.

Signed-off-by: meghasaik <kavmegha@amazon.com>
This commit is contained in:
Megha Sai Kavikondala 2021-12-16 06:49:49 -08:00 committed by GitHub
parent e66ea2c4f3
commit 22bfadf324
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 215 additions and 7 deletions

View File

@ -53,6 +53,7 @@ public class TaskInfo {
private long startTime;
private long runningTimeNanos;
private boolean cancellable;
private boolean cancelled;
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
@ -117,6 +118,14 @@ public class TaskInfo {
this.cancellable = cancellable;
}
public boolean isCancelled() {
return cancelled;
}
void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}
public TaskId getParentTaskId() {
return parentTaskId;
}
@ -158,6 +167,7 @@ public class TaskInfo {
parser.declareLong(TaskInfo::setStartTime, new ParseField("start_time_in_millis"));
parser.declareLong(TaskInfo::setRunningTimeNanos, new ParseField("running_time_in_nanos"));
parser.declareBoolean(TaskInfo::setCancellable, new ParseField("cancellable"));
parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
@ -171,6 +181,7 @@ public class TaskInfo {
return getStartTime() == taskInfo.getStartTime()
&& getRunningTimeNanos() == taskInfo.getRunningTimeNanos()
&& isCancellable() == taskInfo.isCancellable()
&& isCancelled() == taskInfo.isCancelled()
&& Objects.equals(getTaskId(), taskInfo.getTaskId())
&& Objects.equals(getType(), taskInfo.getType())
&& Objects.equals(getAction(), taskInfo.getAction())
@ -190,6 +201,7 @@ public class TaskInfo {
getStartTime(),
getRunningTimeNanos(),
isCancellable(),
isCancelled(),
getParentTaskId(),
status,
getHeaders()
@ -216,6 +228,8 @@ public class TaskInfo {
+ runningTimeNanos
+ ", cancellable="
+ cancellable
+ ", cancelled="
+ cancelled
+ ", parentTaskId="
+ parentTaskId
+ ", status="

View File

@ -90,11 +90,24 @@ public class GetTaskResponseTests extends OpenSearchTestCase {
long startTime = randomLong();
long runningTimeNanos = randomLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable == true ? randomBoolean() : false;
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers
);
}
private static TaskId randomTaskId() {

View File

@ -82,6 +82,8 @@ public class CancelTasksResponseTests extends AbstractResponseTestCase<
}
for (int i = 0; i < 4; i++) {
boolean cancellable = randomBoolean();
boolean cancelled = cancellable == true ? randomBoolean() : false;
tasks.add(
new org.opensearch.tasks.TaskInfo(
new TaskId(NODE_ID, (long) i),
@ -91,7 +93,8 @@ public class CancelTasksResponseTests extends AbstractResponseTestCase<
new FakeTaskStatus(randomAlphaOfLength(4), randomInt()),
randomLongBetween(1, 3),
randomIntBetween(5, 10),
false,
cancellable,
cancelled,
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value")
)
@ -128,6 +131,7 @@ public class CancelTasksResponseTests extends AbstractResponseTestCase<
assertEquals(ti.getStartTime(), taskInfo.getStartTime());
assertEquals(ti.getRunningTimeNanos(), taskInfo.getRunningTimeNanos());
assertEquals(ti.isCancellable(), taskInfo.isCancellable());
assertEquals(ti.isCancelled(), taskInfo.isCancelled());
assertEquals(ti.getParentTaskId().getNodeId(), taskInfo.getParentTaskId().getNodeId());
assertEquals(ti.getParentTaskId().getId(), taskInfo.getParentTaskId().getId());
FakeTaskStatus status = (FakeTaskStatus) ti.getStatus();

View File

@ -129,6 +129,7 @@ public class TransportRethrottleActionTests extends OpenSearchTestCase {
0,
0,
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()
)
@ -164,6 +165,7 @@ public class TransportRethrottleActionTests extends OpenSearchTestCase {
0,
0,
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()
)

View File

@ -901,7 +901,19 @@ public class TasksIT extends OpenSearchIntegTestCase {
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(
new TaskResult(
new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()),
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap()
),
new RuntimeException("test")
),
new ActionListener<Void>() {

View File

@ -130,6 +130,7 @@ public class Task {
startTime,
System.nanoTime() - startTimeNanos,
this instanceof CancellableTask,
this instanceof CancellableTask && ((CancellableTask) this).isCancelled(),
parentTask,
headers
);

View File

@ -32,6 +32,7 @@
package org.opensearch.tasks;
import org.opensearch.Version;
import org.opensearch.common.ParseField;
import org.opensearch.common.Strings;
import org.opensearch.common.bytes.BytesReference;
@ -79,6 +80,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
private final boolean cancellable;
private final boolean cancelled;
private final TaskId parentTaskId;
private final Map<String, String> headers;
@ -92,9 +95,13 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
long startTime,
long runningTimeNanos,
boolean cancellable,
boolean cancelled,
TaskId parentTaskId,
Map<String, String> headers
) {
if (cancellable == false && cancelled == true) {
throw new IllegalArgumentException("task cannot be cancelled");
}
this.taskId = taskId;
this.type = type;
this.action = action;
@ -103,6 +110,7 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
this.startTime = startTime;
this.runningTimeNanos = runningTimeNanos;
this.cancellable = cancellable;
this.cancelled = cancelled;
this.parentTaskId = parentTaskId;
this.headers = headers;
}
@ -119,6 +127,14 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
startTime = in.readLong();
runningTimeNanos = in.readLong();
cancellable = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
cancelled = in.readBoolean();
} else {
cancelled = false;
}
if (cancellable == false && cancelled == true) {
throw new IllegalArgumentException("task cannot be cancelled");
}
parentTaskId = TaskId.readFromStream(in);
headers = in.readMap(StreamInput::readString, StreamInput::readString);
}
@ -133,6 +149,9 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
out.writeLong(startTime);
out.writeLong(runningTimeNanos);
out.writeBoolean(cancellable);
if (out.getVersion().onOrAfter(Version.V_2_0_0)) {
out.writeBoolean(cancelled);
}
parentTaskId.writeTo(out);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
@ -186,6 +205,13 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
return cancellable;
}
/**
* Returns true if the task has been cancelled
*/
public boolean isCancelled() {
return cancelled;
}
/**
* Returns the parent task id
*/
@ -218,6 +244,7 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
}
builder.field("running_time_in_nanos", runningTimeNanos);
builder.field("cancellable", cancellable);
builder.field("cancelled", cancelled);
if (parentTaskId.isSet()) {
builder.field("parent_task_id", parentTaskId.toString());
}
@ -243,6 +270,7 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
long startTime = (Long) a[i++];
long runningTimeNanos = (Long) a[i++];
boolean cancellable = (Boolean) a[i++];
boolean cancelled = a[i++] == Boolean.TRUE;
String parentTaskIdString = (String) a[i++];
@SuppressWarnings("unchecked")
Map<String, String> headers = (Map<String, String>) a[i++];
@ -252,7 +280,19 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
}
RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes);
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
id,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers
);
});
static {
// Note for the future: this has to be backwards and forwards compatible with all changes to the task storage format
@ -266,6 +306,7 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
PARSER.declareLong(constructorArg(), new ParseField("start_time_in_millis"));
PARSER.declareLong(constructorArg(), new ParseField("running_time_in_nanos"));
PARSER.declareBoolean(constructorArg(), new ParseField("cancellable"));
PARSER.declareBoolean(optionalConstructorArg(), new ParseField("cancelled"));
PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers"));
}
@ -290,12 +331,25 @@ public final class TaskInfo implements Writeable, ToXContentFragment {
&& Objects.equals(runningTimeNanos, other.runningTimeNanos)
&& Objects.equals(parentTaskId, other.parentTaskId)
&& Objects.equals(cancellable, other.cancellable)
&& Objects.equals(cancelled, other.cancelled)
&& Objects.equals(status, other.status)
&& Objects.equals(headers, other.headers);
}
@Override
public int hashCode() {
return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status, headers);
return Objects.hash(
taskId,
type,
action,
description,
startTime,
runningTimeNanos,
parentTaskId,
cancellable,
cancelled,
status,
headers
);
}
}

View File

@ -16,6 +16,9 @@
"cancellable": {
"type": "boolean"
},
"cancelled": {
"type": "boolean"
},
"id": {
"type": "long"
},

View File

@ -48,7 +48,8 @@ public class TaskTests extends OpenSearchTestCase {
long taskId = randomIntBetween(0, 100000);
long startTime = randomNonNegativeLong();
long runningTime = randomNonNegativeLong();
boolean cancellable = randomBoolean();
boolean cancellable = false;
boolean cancelled = false;
TaskInfo taskInfo = new TaskInfo(
new TaskId(nodeId, taskId),
"test_type",
@ -58,6 +59,7 @@ public class TaskTests extends OpenSearchTestCase {
startTime,
runningTime,
cancellable,
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar")
);
@ -70,7 +72,85 @@ public class TaskTests extends OpenSearchTestCase {
assertEquals(((Number) map.get("start_time_in_millis")).longValue(), startTime);
assertEquals(((Number) map.get("running_time_in_nanos")).longValue(), runningTime);
assertEquals(map.get("cancellable"), cancellable);
assertEquals(map.get("cancelled"), cancelled);
assertEquals(map.get("headers"), Collections.singletonMap("foo", "bar"));
}
public void testCancellableOptionWhenCancelledTrue() {
String nodeId = randomAlphaOfLength(10);
long taskId = randomIntBetween(0, 100000);
long startTime = randomNonNegativeLong();
long runningTime = randomNonNegativeLong();
boolean cancellable = true;
boolean cancelled = true;
TaskInfo taskInfo = new TaskInfo(
new TaskId(nodeId, taskId),
"test_type",
"test_action",
"test_description",
null,
startTime,
runningTime,
cancellable,
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar")
);
String taskInfoString = taskInfo.toString();
Map<String, Object> map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2();
assertEquals(map.get("cancellable"), cancellable);
assertEquals(map.get("cancelled"), cancelled);
}
public void testCancellableOptionWhenCancelledFalse() {
String nodeId = randomAlphaOfLength(10);
long taskId = randomIntBetween(0, 100000);
long startTime = randomNonNegativeLong();
long runningTime = randomNonNegativeLong();
boolean cancellable = true;
boolean cancelled = false;
TaskInfo taskInfo = new TaskInfo(
new TaskId(nodeId, taskId),
"test_type",
"test_action",
"test_description",
null,
startTime,
runningTime,
cancellable,
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar")
);
String taskInfoString = taskInfo.toString();
Map<String, Object> map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2();
assertEquals(map.get("cancellable"), cancellable);
assertEquals(map.get("cancelled"), cancelled);
}
public void testNonCancellableOption() {
String nodeId = randomAlphaOfLength(10);
long taskId = randomIntBetween(0, 100000);
long startTime = randomNonNegativeLong();
long runningTime = randomNonNegativeLong();
boolean cancellable = false;
boolean cancelled = true;
Exception e = expectThrows(
IllegalArgumentException.class,
() -> new TaskInfo(
new TaskId(nodeId, taskId),
"test_type",
"test_action",
"test_description",
null,
startTime,
runningTime,
cancellable,
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar")
)
);
assertEquals(e.getMessage(), "task cannot be cancelled");
}
}

View File

@ -70,6 +70,7 @@ public class ListTasksResponseTests extends AbstractXContentTestCase<ListTasksRe
0,
1,
true,
false,
new TaskId("node1", 0),
Collections.singletonMap("foo", "bar")
);
@ -88,6 +89,7 @@ public class ListTasksResponseTests extends AbstractXContentTestCase<ListTasksRe
+ " \"running_time\" : \"1nanos\",\n"
+ " \"running_time_in_nanos\" : 1,\n"
+ " \"cancellable\" : true,\n"
+ " \"cancelled\" : false,\n"
+ " \"parent_task_id\" : \"node1:0\",\n"
+ " \"headers\" : {\n"
+ " \"foo\" : \"bar\"\n"

View File

@ -95,6 +95,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
);
@ -108,6 +109,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
);
@ -121,6 +123,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
);
@ -134,6 +137,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
);
@ -148,6 +152,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
);
@ -161,6 +166,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime() + between(1, 100),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
);
@ -174,6 +180,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime(),
info.getRunningTimeNanos() + between(1, 100),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders()
);
@ -187,6 +194,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable() == false,
false,
info.getParentTaskId(),
info.getHeaders()
);
@ -201,6 +209,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
parentId,
info.getHeaders()
);
@ -221,6 +230,7 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
headers
);
@ -238,11 +248,24 @@ public class TaskInfoTests extends AbstractSerializingTestCase<TaskInfo> {
long startTime = randomLong();
long runningTimeNanos = randomLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable == true ? randomBoolean() : false;
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers
);
}
private static TaskId randomTaskId() {