high level REST api: cancel task (#30745)
* Initial commit of rest high level exposure of cancel task * fix javadocs * address some code review comments * update branch to use tasks namespace instead of cluster * High-level client: list tasks failure to not lose nodeId This commit reworks testing for `ListTasksResponse` so that random fields insertion can be tested and xcontent equivalence can be checked too. Proper exclusions need to be configured, and failures need to be tested separately. This helped finding a little problem, whenever there is a node failure returned, the nodeId was lost as it was never printed out as part of the exception toXContent. * added comment * merge from master * re-work CancelTasksResponseTests to separate XContent failure cases from non-failure cases * remove duplication of logic in parser creation * code review changes * refactor TasksClient to support RequestOptions * add tests for parent task id * address final PR review comments, mostly formatting and such
This commit is contained in:
parent
e481b860a1
commit
e82e5cc2e8
|
@ -29,6 +29,7 @@ import org.apache.http.entity.ByteArrayEntity;
|
||||||
import org.apache.http.entity.ContentType;
|
import org.apache.http.entity.ContentType;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
|
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
|
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
|
||||||
|
@ -108,6 +109,17 @@ final class RequestConverters {
|
||||||
// Contains only status utility methods
|
// Contains only status utility methods
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Request cancelTasks(CancelTasksRequest cancelTasksRequest) {
|
||||||
|
Request request = new Request(HttpPost.METHOD_NAME, "/_tasks/_cancel");
|
||||||
|
Params params = new Params(request);
|
||||||
|
params.withTimeout(cancelTasksRequest.getTimeout())
|
||||||
|
.withTaskId(cancelTasksRequest.getTaskId())
|
||||||
|
.withNodes(cancelTasksRequest.getNodes())
|
||||||
|
.withParentTaskId(cancelTasksRequest.getParentTaskId())
|
||||||
|
.withActions(cancelTasksRequest.getActions());
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
static Request delete(DeleteRequest deleteRequest) {
|
static Request delete(DeleteRequest deleteRequest) {
|
||||||
String endpoint = endpoint(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
|
String endpoint = endpoint(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
|
||||||
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
|
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
|
||||||
|
@ -1070,6 +1082,13 @@ final class RequestConverters {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Params withTaskId(TaskId taskId) {
|
||||||
|
if (taskId != null && taskId.isSet()) {
|
||||||
|
return putParam("task_id", taskId.toString());
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
Params withParentTaskId(TaskId parentTaskId) {
|
Params withParentTaskId(TaskId parentTaskId) {
|
||||||
if (parentTaskId != null && parentTaskId.isSet()) {
|
if (parentTaskId != null && parentTaskId.isSet()) {
|
||||||
return putParam("parent_task_id", parentTaskId.toString());
|
return putParam("parent_task_id", parentTaskId.toString());
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.client;
|
package org.elasticsearch.client;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
|
|
||||||
|
@ -65,4 +67,45 @@ public final class TasksClient {
|
||||||
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, options,
|
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, options,
|
||||||
ListTasksResponse::fromXContent, listener, emptySet());
|
ListTasksResponse::fromXContent, listener, emptySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel one or more cluster tasks using the Task Management API.
|
||||||
|
*
|
||||||
|
* See
|
||||||
|
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
|
||||||
|
* @param cancelTasksRequest the request
|
||||||
|
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||||
|
* @return the response
|
||||||
|
* @throws IOException in case there is a problem sending the request or parsing back the response
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public CancelTasksResponse cancel(CancelTasksRequest cancelTasksRequest, RequestOptions options ) throws IOException {
|
||||||
|
return restHighLevelClient.performRequestAndParseEntity(
|
||||||
|
cancelTasksRequest,
|
||||||
|
RequestConverters::cancelTasks,
|
||||||
|
options,
|
||||||
|
parser -> CancelTasksResponse.fromXContent(parser),
|
||||||
|
emptySet()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asynchronously cancel one or more cluster tasks using the Task Management API.
|
||||||
|
*
|
||||||
|
* See
|
||||||
|
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
|
||||||
|
* @param cancelTasksRequest the request
|
||||||
|
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||||
|
* @param listener the listener to be notified upon request completion
|
||||||
|
*/
|
||||||
|
public void cancelAsync(CancelTasksRequest cancelTasksRequest, RequestOptions options, ActionListener<CancelTasksResponse> listener) {
|
||||||
|
restHighLevelClient.performRequestAsyncAndParseEntity(
|
||||||
|
cancelTasksRequest,
|
||||||
|
RequestConverters::cancelTasks,
|
||||||
|
options,
|
||||||
|
parser -> CancelTasksResponse.fromXContent(parser),
|
||||||
|
listener,
|
||||||
|
emptySet()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.http.entity.ByteArrayEntity;
|
||||||
import org.apache.http.util.EntityUtils;
|
import org.apache.http.util.EntityUtils;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
|
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
|
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
|
||||||
|
@ -1587,6 +1589,23 @@ public class RequestConvertersTests extends ESTestCase {
|
||||||
assertEquals(expectedParams, request.getParameters());
|
assertEquals(expectedParams, request.getParameters());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCancelTasks() {
|
||||||
|
CancelTasksRequest request = new CancelTasksRequest();
|
||||||
|
Map<String, String> expectedParams = new HashMap<>();
|
||||||
|
TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
|
||||||
|
TaskId parentTaskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
|
||||||
|
request.setTaskId(taskId);
|
||||||
|
request.setParentTaskId(parentTaskId);
|
||||||
|
expectedParams.put("task_id", taskId.toString());
|
||||||
|
expectedParams.put("parent_task_id", parentTaskId.toString());
|
||||||
|
Request httpRequest = RequestConverters.cancelTasks(request);
|
||||||
|
assertThat(httpRequest, notNullValue());
|
||||||
|
assertThat(httpRequest.getMethod(), equalTo(HttpPost.METHOD_NAME));
|
||||||
|
assertThat(httpRequest.getEntity(), nullValue());
|
||||||
|
assertThat(httpRequest.getEndpoint(), equalTo("/_tasks/_cancel"));
|
||||||
|
assertThat(httpRequest.getParameters(), equalTo(expectedParams));
|
||||||
|
}
|
||||||
|
|
||||||
public void testListTasks() {
|
public void testListTasks() {
|
||||||
{
|
{
|
||||||
ListTasksRequest request = new ListTasksRequest();
|
ListTasksRequest request = new ListTasksRequest();
|
||||||
|
|
|
@ -19,9 +19,12 @@
|
||||||
|
|
||||||
package org.elasticsearch.client;
|
package org.elasticsearch.client;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
|
||||||
|
import org.elasticsearch.tasks.TaskId;
|
||||||
import org.elasticsearch.tasks.TaskInfo;
|
import org.elasticsearch.tasks.TaskInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -58,4 +61,26 @@ public class TasksIT extends ESRestHighLevelClientTestCase {
|
||||||
assertTrue("List tasks were not found", listTasksFound);
|
assertTrue("List tasks were not found", listTasksFound);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCancelTasks() throws IOException {
|
||||||
|
ListTasksRequest listRequest = new ListTasksRequest();
|
||||||
|
ListTasksResponse listResponse = execute(
|
||||||
|
listRequest,
|
||||||
|
highLevelClient().tasks()::list,
|
||||||
|
highLevelClient().tasks()::listAsync
|
||||||
|
);
|
||||||
|
// in this case, probably no task will actually be cancelled.
|
||||||
|
// this is ok, that case is covered in TasksIT.testTasksCancellation
|
||||||
|
TaskInfo firstTask = listResponse.getTasks().get(0);
|
||||||
|
String node = listResponse.getPerNodeTasks().keySet().iterator().next();
|
||||||
|
|
||||||
|
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
|
||||||
|
cancelTasksRequest.setTaskId(new TaskId(node, firstTask.getId()));
|
||||||
|
cancelTasksRequest.setReason("testreason");
|
||||||
|
CancelTasksResponse response = execute(cancelTasksRequest,
|
||||||
|
highLevelClient().tasks()::cancel,
|
||||||
|
highLevelClient().tasks()::cancelAsync);
|
||||||
|
// Since the task may or may not have been cancelled, assert that we received a response only
|
||||||
|
// The actual testing of task cancellation is covered by TasksIT.testTasksCancellation
|
||||||
|
assertThat(response, notNullValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -178,4 +178,5 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
|
||||||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,8 @@ import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.LatchedActionListener;
|
import org.elasticsearch.action.LatchedActionListener;
|
||||||
import org.elasticsearch.action.TaskOperationFailure;
|
import org.elasticsearch.action.TaskOperationFailure;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
|
||||||
|
@ -146,4 +148,74 @@ public class TasksClientDocumentationIT extends ESRestHighLevelClientTestCase {
|
||||||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCancelTasks() throws IOException {
|
||||||
|
RestHighLevelClient client = highLevelClient();
|
||||||
|
{
|
||||||
|
// tag::cancel-tasks-request
|
||||||
|
CancelTasksRequest request = new CancelTasksRequest();
|
||||||
|
// end::cancel-tasks-request
|
||||||
|
|
||||||
|
// tag::cancel-tasks-request-filter
|
||||||
|
request.setTaskId(new TaskId("nodeId1", 42)); //<1>
|
||||||
|
request.setActions("cluster:*"); // <2>
|
||||||
|
request.setNodes("nodeId1", "nodeId2"); // <3>
|
||||||
|
// end::cancel-tasks-request-filter
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
CancelTasksRequest request = new CancelTasksRequest();
|
||||||
|
request.setTaskId(TaskId.EMPTY_TASK_ID);
|
||||||
|
|
||||||
|
// tag::cancel-tasks-execute
|
||||||
|
CancelTasksResponse response = client.tasks().cancel(request, RequestOptions.DEFAULT);
|
||||||
|
// end::cancel-tasks-execute
|
||||||
|
|
||||||
|
assertThat(response, notNullValue());
|
||||||
|
|
||||||
|
// tag::cancel-tasks-response-tasks
|
||||||
|
List<TaskInfo> tasks = response.getTasks(); // <1>
|
||||||
|
// end::cancel-tasks-response-tasks
|
||||||
|
|
||||||
|
|
||||||
|
// tag::cancel-tasks-response-failures
|
||||||
|
List<ElasticsearchException> nodeFailures = response.getNodeFailures(); // <1>
|
||||||
|
List<TaskOperationFailure> taskFailures = response.getTaskFailures(); // <2>
|
||||||
|
// end::-tasks-response-failures
|
||||||
|
|
||||||
|
assertThat(response.getNodeFailures(), equalTo(emptyList()));
|
||||||
|
assertThat(response.getTaskFailures(), equalTo(emptyList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAsyncCancelTasks() throws InterruptedException {
|
||||||
|
|
||||||
|
RestHighLevelClient client = highLevelClient();
|
||||||
|
{
|
||||||
|
CancelTasksRequest request = new CancelTasksRequest();
|
||||||
|
|
||||||
|
// tag::cancel-tasks-execute-listener
|
||||||
|
ActionListener<CancelTasksResponse> listener =
|
||||||
|
new ActionListener<CancelTasksResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(CancelTasksResponse response) {
|
||||||
|
// <1>
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
// <2>
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// end::cancel-tasks-execute-listener
|
||||||
|
|
||||||
|
// Replace the empty listener by a blocking listener in test
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
listener = new LatchedActionListener<>(listener, latch);
|
||||||
|
|
||||||
|
// tag::cancel-tasks-execute-async
|
||||||
|
client.tasks().cancelAsync(request, RequestOptions.DEFAULT, listener); // <1>
|
||||||
|
// end::cancel-tasks-execute-async
|
||||||
|
|
||||||
|
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,5 +140,7 @@ include::snapshot/verify_repository.asciidoc[]
|
||||||
The Java High Level REST Client supports the following Tasks APIs:
|
The Java High Level REST Client supports the following Tasks APIs:
|
||||||
|
|
||||||
* <<java-rest-high-tasks-list>>
|
* <<java-rest-high-tasks-list>>
|
||||||
|
* <<java-rest-high-cluster-cancel-tasks>>
|
||||||
|
|
||||||
include::tasks/list_tasks.asciidoc[]
|
include::tasks/list_tasks.asciidoc[]
|
||||||
|
include::tasks/cancel_tasks.asciidoc[]
|
||||||
|
|
|
@ -0,0 +1,82 @@
|
||||||
|
[[java-rest-high-cluster-cancel-tasks]]
|
||||||
|
=== Cancel Tasks API
|
||||||
|
|
||||||
|
The Cancel Tasks API allows cancellation of a currently running task.
|
||||||
|
|
||||||
|
==== Cancel Tasks Request
|
||||||
|
|
||||||
|
A `CancelTasksRequest`:
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-request]
|
||||||
|
--------------------------------------------------
|
||||||
|
There are no required parameters. The task cancellation command supports the same
|
||||||
|
task selection parameters as the list tasks command.
|
||||||
|
|
||||||
|
==== Parameters
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-request-filter]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> Cancel a task
|
||||||
|
<2> Cancel only cluster-related tasks
|
||||||
|
<3> Cancel all tasks running on nodes nodeId1 and nodeId2
|
||||||
|
|
||||||
|
==== Synchronous Execution
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-execute]
|
||||||
|
--------------------------------------------------
|
||||||
|
|
||||||
|
==== Asynchronous Execution
|
||||||
|
|
||||||
|
The asynchronous execution requires `CancelTasksRequest` instance and an
|
||||||
|
`ActionListener` instance to be passed to the asynchronous method:
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-execute-async]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> The `CancelTasksRequest` to execute and the `ActionListener` to use
|
||||||
|
when the execution completes
|
||||||
|
|
||||||
|
The asynchronous method does not block and returns immediately. Once it is
|
||||||
|
completed the `ActionListener` is called back using the `onResponse` method
|
||||||
|
if the execution successfully completed or using the `onFailure` method if
|
||||||
|
it failed.
|
||||||
|
|
||||||
|
A typical listener for `CancelTasksResponse` looks like:
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-execute-listener]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> Called when the execution is successfully completed. The response is
|
||||||
|
provided as an argument
|
||||||
|
<2> Called in case of a failure. The raised exception is provided as an argument
|
||||||
|
|
||||||
|
==== Cancel Tasks Response
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-tasks]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> List of cancelled tasks
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-calc]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> List of cancelled tasks grouped by a node
|
||||||
|
<2> List of cancelled tasks grouped by a parent task
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-failures]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> List of node failures
|
||||||
|
<2> List of task cancellation failures
|
||||||
|
|
|
@ -19,23 +19,48 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
|
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
|
||||||
|
|
||||||
import org.elasticsearch.action.FailedNodeException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.TaskOperationFailure;
|
import org.elasticsearch.action.TaskOperationFailure;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
|
import org.elasticsearch.common.ParseField;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.tasks.TaskInfo;
|
import org.elasticsearch.tasks.TaskInfo;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the list of tasks that were cancelled
|
* Returns the list of tasks that were cancelled
|
||||||
*/
|
*/
|
||||||
public class CancelTasksResponse extends ListTasksResponse {
|
public class CancelTasksResponse extends ListTasksResponse {
|
||||||
|
|
||||||
|
private static final ConstructingObjectParser<CancelTasksResponse, Void> PARSER =
|
||||||
|
setupParser("cancel_tasks_response", CancelTasksResponse::new);
|
||||||
|
|
||||||
public CancelTasksResponse() {
|
public CancelTasksResponse() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends FailedNodeException>
|
public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends ElasticsearchException>
|
||||||
nodeFailures) {
|
nodeFailures) {
|
||||||
super(tasks, taskFailures, nodeFailures);
|
super(tasks, taskFailures, nodeFailures);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
return super.toXContent(builder, params);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CancelTasksResponse fromXContent(XContentParser parser) {
|
||||||
|
return PARSER.apply(parser, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return Strings.toString(this, true, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.common.ParseField;
|
import org.elasticsearch.common.ParseField;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.TriFunction;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||||
|
@ -70,8 +71,14 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContentOb
|
||||||
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
|
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final ConstructingObjectParser<ListTasksResponse, Void> PARSER =
|
|
||||||
new ConstructingObjectParser<>("list_tasks_response", true,
|
protected static <T> ConstructingObjectParser<T, Void> setupParser(String name,
|
||||||
|
TriFunction<
|
||||||
|
List<TaskInfo>,
|
||||||
|
List<TaskOperationFailure>,
|
||||||
|
List<ElasticsearchException>,
|
||||||
|
T> ctor) {
|
||||||
|
ConstructingObjectParser<T, Void> parser = new ConstructingObjectParser<>(name, true,
|
||||||
constructingObjects -> {
|
constructingObjects -> {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -80,16 +87,18 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContentOb
|
||||||
List<TaskOperationFailure> tasksFailures = (List<TaskOperationFailure>) constructingObjects[i++];
|
List<TaskOperationFailure> tasksFailures = (List<TaskOperationFailure>) constructingObjects[i++];
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
List<ElasticsearchException> nodeFailures = (List<ElasticsearchException>) constructingObjects[i];
|
List<ElasticsearchException> nodeFailures = (List<ElasticsearchException>) constructingObjects[i];
|
||||||
return new ListTasksResponse(tasks, tasksFailures, nodeFailures);
|
return ctor.apply(tasks,tasksFailures, nodeFailures);
|
||||||
});
|
});
|
||||||
|
parser.declareObjectArray(optionalConstructorArg(), TaskInfo.PARSER, new ParseField(TASKS));
|
||||||
static {
|
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p), new ParseField(TASK_FAILURES));
|
||||||
PARSER.declareObjectArray(constructorArg(), TaskInfo.PARSER, new ParseField(TASKS));
|
parser.declareObjectArray(optionalConstructorArg(),
|
||||||
PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p), new ParseField(TASK_FAILURES));
|
(p, c) -> ElasticsearchException.fromXContent(p), new ParseField(NODE_FAILURES));
|
||||||
PARSER.declareObjectArray(optionalConstructorArg(),
|
return parser;
|
||||||
(parser, c) -> ElasticsearchException.fromXContent(parser), new ParseField(NODE_FAILURES));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final ConstructingObjectParser<ListTasksResponse, Void> PARSER =
|
||||||
|
setupParser("list_tasks_response", ListTasksResponse::new);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
|
|
|
@ -0,0 +1,116 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.tasks;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.FailedNodeException;
|
||||||
|
import org.elasticsearch.action.TaskOperationFailure;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.ConnectException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
public class CancelTasksResponseTests extends AbstractXContentTestCase<CancelTasksResponse> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected CancelTasksResponse createTestInstance() {
|
||||||
|
List<TaskInfo> randomTasks = randomTasks();
|
||||||
|
return new CancelTasksResponse(randomTasks, Collections.emptyList(), Collections.emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<TaskInfo> randomTasks() {
|
||||||
|
List<TaskInfo> randomTasks = new ArrayList<>();
|
||||||
|
for (int i = 0; i < randomInt(10); i++) {
|
||||||
|
randomTasks.add(TaskInfoTests.randomTaskInfo());
|
||||||
|
}
|
||||||
|
return randomTasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Predicate<String> getRandomFieldsExcludeFilter() {
|
||||||
|
//status and headers hold arbitrary content, we can't inject random fields in them
|
||||||
|
return field -> field.endsWith("status") || field.endsWith("headers");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertEqualInstances(CancelTasksResponse expectedInstance, CancelTasksResponse newInstance) {
|
||||||
|
assertNotSame(expectedInstance, newInstance);
|
||||||
|
assertThat(newInstance.getTasks(), equalTo(expectedInstance.getTasks()));
|
||||||
|
ListTasksResponseTests.assertOnNodeFailures(newInstance.getNodeFailures(), expectedInstance.getNodeFailures());
|
||||||
|
ListTasksResponseTests.assertOnTaskFailures(newInstance.getTaskFailures(), expectedInstance.getTaskFailures());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected CancelTasksResponse doParseInstance(XContentParser parser) {
|
||||||
|
return CancelTasksResponse.fromXContent(parser);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean supportsUnknownFields() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean assertToXContentEquivalence() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test parsing {@link ListTasksResponse} with inner failures as they don't support asserting on xcontent equivalence, given that
|
||||||
|
* exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()}
|
||||||
|
* without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end.
|
||||||
|
*/
|
||||||
|
public void testFromXContentWithFailures() throws IOException {
|
||||||
|
Supplier<CancelTasksResponse> instanceSupplier = CancelTasksResponseTests::createTestInstanceWithFailures;
|
||||||
|
//with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata,
|
||||||
|
//but that does not bother our assertions, as we only want to test that we don't break.
|
||||||
|
boolean supportsUnknownFields = true;
|
||||||
|
//exceptions are not of the same type whenever parsed back
|
||||||
|
boolean assertToXContentEquivalence = false;
|
||||||
|
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY,
|
||||||
|
getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
|
||||||
|
this::assertEqualInstances, assertToXContentEquivalence);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CancelTasksResponse createTestInstanceWithFailures() {
|
||||||
|
int numNodeFailures = randomIntBetween(0, 3);
|
||||||
|
List<FailedNodeException> nodeFailures = new ArrayList<>(numNodeFailures);
|
||||||
|
for (int i = 0; i < numNodeFailures; i++) {
|
||||||
|
nodeFailures.add(new FailedNodeException(randomAlphaOfLength(5), "error message", new ConnectException()));
|
||||||
|
}
|
||||||
|
int numTaskFailures = randomIntBetween(0, 3);
|
||||||
|
List<TaskOperationFailure> taskFailures = new ArrayList<>(numTaskFailures);
|
||||||
|
for (int i = 0; i < numTaskFailures; i++) {
|
||||||
|
taskFailures.add(new TaskOperationFailure(randomAlphaOfLength(5), randomLong(), new IllegalStateException()));
|
||||||
|
}
|
||||||
|
return new CancelTasksResponse(randomTasks(), taskFailures, nodeFailures);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -109,20 +109,30 @@ public class ListTasksResponseTests extends AbstractXContentTestCase<ListTasksRe
|
||||||
protected void assertEqualInstances(ListTasksResponse expectedInstance, ListTasksResponse newInstance) {
|
protected void assertEqualInstances(ListTasksResponse expectedInstance, ListTasksResponse newInstance) {
|
||||||
assertNotSame(expectedInstance, newInstance);
|
assertNotSame(expectedInstance, newInstance);
|
||||||
assertThat(newInstance.getTasks(), equalTo(expectedInstance.getTasks()));
|
assertThat(newInstance.getTasks(), equalTo(expectedInstance.getTasks()));
|
||||||
assertThat(newInstance.getNodeFailures().size(), equalTo(expectedInstance.getNodeFailures().size()));
|
assertOnNodeFailures(newInstance.getNodeFailures(), expectedInstance.getNodeFailures());
|
||||||
for (int i = 0; i < newInstance.getNodeFailures().size(); i++) {
|
assertOnTaskFailures(newInstance.getTaskFailures(), expectedInstance.getTaskFailures());
|
||||||
ElasticsearchException newException = newInstance.getNodeFailures().get(i);
|
}
|
||||||
ElasticsearchException expectedException = expectedInstance.getNodeFailures().get(i);
|
|
||||||
|
protected static void assertOnNodeFailures(List<ElasticsearchException> nodeFailures,
|
||||||
|
List<ElasticsearchException> expectedFailures) {
|
||||||
|
assertThat(nodeFailures.size(), equalTo(expectedFailures.size()));
|
||||||
|
for (int i = 0; i < nodeFailures.size(); i++) {
|
||||||
|
ElasticsearchException newException = nodeFailures.get(i);
|
||||||
|
ElasticsearchException expectedException = expectedFailures.get(i);
|
||||||
assertThat(newException.getMetadata("es.node_id").get(0), equalTo(((FailedNodeException)expectedException).nodeId()));
|
assertThat(newException.getMetadata("es.node_id").get(0), equalTo(((FailedNodeException)expectedException).nodeId()));
|
||||||
assertThat(newException.getMessage(), equalTo("Elasticsearch exception [type=failed_node_exception, reason=error message]"));
|
assertThat(newException.getMessage(), equalTo("Elasticsearch exception [type=failed_node_exception, reason=error message]"));
|
||||||
assertThat(newException.getCause(), instanceOf(ElasticsearchException.class));
|
assertThat(newException.getCause(), instanceOf(ElasticsearchException.class));
|
||||||
ElasticsearchException cause = (ElasticsearchException) newException.getCause();
|
ElasticsearchException cause = (ElasticsearchException) newException.getCause();
|
||||||
assertThat(cause.getMessage(), equalTo("Elasticsearch exception [type=connect_exception, reason=null]"));
|
assertThat(cause.getMessage(), equalTo("Elasticsearch exception [type=connect_exception, reason=null]"));
|
||||||
}
|
}
|
||||||
assertThat(newInstance.getTaskFailures().size(), equalTo(expectedInstance.getTaskFailures().size()));
|
}
|
||||||
for (int i = 0; i < newInstance.getTaskFailures().size(); i++) {
|
|
||||||
TaskOperationFailure newFailure = newInstance.getTaskFailures().get(i);
|
protected static void assertOnTaskFailures(List<TaskOperationFailure> taskFailures,
|
||||||
TaskOperationFailure expectedFailure = expectedInstance.getTaskFailures().get(i);
|
List<TaskOperationFailure> expectedFailures) {
|
||||||
|
assertThat(taskFailures.size(), equalTo(expectedFailures.size()));
|
||||||
|
for (int i = 0; i < taskFailures.size(); i++) {
|
||||||
|
TaskOperationFailure newFailure = taskFailures.get(i);
|
||||||
|
TaskOperationFailure expectedFailure = expectedFailures.get(i);
|
||||||
assertThat(newFailure.getNodeId(), equalTo(expectedFailure.getNodeId()));
|
assertThat(newFailure.getNodeId(), equalTo(expectedFailure.getNodeId()));
|
||||||
assertThat(newFailure.getTaskId(), equalTo(expectedFailure.getTaskId()));
|
assertThat(newFailure.getTaskId(), equalTo(expectedFailure.getTaskId()));
|
||||||
assertThat(newFailure.getStatus(), equalTo(expectedFailure.getStatus()));
|
assertThat(newFailure.getStatus(), equalTo(expectedFailure.getStatus()));
|
||||||
|
|
Loading…
Reference in New Issue