Add description to submit and get async search, as well as cancel tasks (#57745)

This makes it easier to debug where such tasks come from in case they are returned from the get tasks API.

Also renamed the last occurrence of waitForCompletion to waitForCompletionTimeout in get async search request.
This commit is contained in:
Luca Cavanna 2020-06-08 11:09:53 +02:00
parent 06ef3042c1
commit 7a06a13d99
10 changed files with 124 additions and 41 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import java.io.IOException;
import java.util.Arrays;
/**
* A request to cancel tasks
@ -89,4 +90,14 @@ public class CancelTasksRequest extends BaseTasksRequest<CancelTasksRequest> {
public boolean waitForCompletion() {
return waitForCompletion;
}
@Override
public String getDescription() {
return "reason[" + reason +
"], waitForCompletion[" + waitForCompletion +
"], taskId[" + getTaskId() +
"], parentTaskId[" + getParentTaskId() +
"], nodes" + Arrays.toString(getNodes()) +
", actions" + Arrays.toString(getActions());
}
}

View File

@ -644,27 +644,31 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
public String getDescription() {
StringBuilder sb = new StringBuilder();
sb.append("indices[");
Strings.arrayToDelimitedString(indices, ",", sb);
sb.append("], ");
sb.append("types[");
Strings.arrayToDelimitedString(types, ",", sb);
sb.append("], ");
sb.append("search_type[").append(searchType).append("], ");
if (scroll != null) {
sb.append("scroll[").append(scroll.keepAlive()).append("], ");
}
if (source != null) {
sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]");
} else {
sb.append("source[]");
}
return sb.toString();
return buildDescription();
}
};
}
public String buildDescription() {
StringBuilder sb = new StringBuilder();
sb.append("indices[");
Strings.arrayToDelimitedString(indices, ",", sb);
sb.append("], ");
sb.append("types[");
Strings.arrayToDelimitedString(types, ",", sb);
sb.append("], ");
sb.append("search_type[").append(searchType).append("], ");
if (scroll != null) {
sb.append("scroll[").append(scroll.keepAlive()).append("], ");
}
if (source != null) {
sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]");
} else {
sb.append("source[]");
}
return sb.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
public class CancelTasksRequestTests extends ESTestCase {
public void testGetDescription() {
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setActions("action1", "action2");
cancelTasksRequest.setNodes("node1", "node2");
cancelTasksRequest.setTaskId(new TaskId("node1", 1));
cancelTasksRequest.setParentTaskId(new TaskId("node1", 0));
assertEquals("reason[by user request], waitForCompletion[false], taskId[node1:1], " +
"parentTaskId[node1:0], nodes[node1, node2], actions[action1, action2]", cancelTasksRequest.getDescription());
Task task = cancelTasksRequest.createTask(1, "type", "action", null, Collections.emptyMap());
assertEquals(cancelTasksRequest.getDescription(), task.getDescription());
}
}

View File

@ -35,7 +35,7 @@ public class RestGetAsyncSearchAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
GetAsyncSearchAction.Request get = new GetAsyncSearchAction.Request(request.param("id"));
if (request.hasParam("wait_for_completion_timeout")) {
get.setWaitForCompletion(request.paramAsTime("wait_for_completion_timeout", get.getWaitForCompletion()));
get.setWaitForCompletionTimeout(request.paramAsTime("wait_for_completion_timeout", get.getWaitForCompletionTimeout()));
}
if (request.hasParam("keep_alive")) {
get.setKeepAlive(request.paramAsTime("keep_alive", get.getKeepAlive()));

View File

@ -117,7 +117,7 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsy
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
}, request.getWaitForCompletion());
}, request.getWaitForCompletionTimeout());
} catch (Exception exc) {
listener.onFailure(exc);
}

View File

@ -250,7 +250,7 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
queryLatch.countDownAndReset();
AsyncSearchResponse newResponse = client().execute(GetAsyncSearchAction.INSTANCE,
new GetAsyncSearchAction.Request(response.getId())
.setWaitForCompletion(TimeValue.timeValueMillis(10))).get();
.setWaitForCompletionTimeout(TimeValue.timeValueMillis(10))).get();
if (newResponse.isRunning()) {
assertThat(newResponse.status(), equalTo(RestStatus.OK));

View File

@ -8,11 +8,14 @@ package org.elasticsearch.xpack.search;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import java.util.Collections;
public class GetAsyncSearchRequestTests extends AbstractWireSerializingTestCase<GetAsyncSearchAction.Request> {
@Override
protected Writeable.Reader<GetAsyncSearchAction.Request> instanceReader() {
@ -23,7 +26,7 @@ public class GetAsyncSearchRequestTests extends AbstractWireSerializingTestCase<
protected GetAsyncSearchAction.Request createTestInstance() {
GetAsyncSearchAction.Request req = new GetAsyncSearchAction.Request(randomSearchId());
if (randomBoolean()) {
req.setWaitForCompletion(TimeValue.timeValueMillis(randomIntBetween(1, 10000)));
req.setWaitForCompletionTimeout(TimeValue.timeValueMillis(randomIntBetween(1, 10000)));
}
if (randomBoolean()) {
req.setKeepAlive(TimeValue.timeValueMillis(randomIntBetween(1, 10000)));
@ -35,4 +38,10 @@ public class GetAsyncSearchRequestTests extends AbstractWireSerializingTestCase<
return AsyncExecutionId.encode(UUIDs.randomBase64UUID(),
new TaskId(randomAlphaOfLengthBetween(10, 20), randomLongBetween(0, Long.MAX_VALUE)));
}
public void testTaskDescription() {
GetAsyncSearchAction.Request request = new GetAsyncSearchAction.Request("abcdef");
Task task = request.createTask(1, "type", "action", null, Collections.emptyMap());
assertEquals("id[abcdef], waitForCompletionTimeout[-1], keepAlive[-1]", task.getDescription());
}
}

View File

@ -10,13 +10,17 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
import org.elasticsearch.xpack.core.transform.action.AbstractWireSerializingTransformTestCase;
import java.util.Collections;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -118,4 +122,12 @@ public class SubmitAsyncSearchRequestTests extends AbstractWireSerializingTransf
assertThat(exc.validationErrors().size(), equalTo(1));
assertThat(exc.validationErrors().get(0), containsString("[pre_filter_shard_size]"));
}
public void testTaskDescription() {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(
new SearchSourceBuilder().query(new MatchAllQueryBuilder()), "index");
Task task = request.createTask(1, "type", "action", null, Collections.emptyMap());
assertEquals("waitForCompletionTimeout[1s], keepOnCompletion[false] keepAlive[5d], request=indices[index], " +
"types[], search_type[QUERY_THEN_FETCH], source[{\"query\":{\"match_all\":{\"boost\":1.0}}}]", task.getDescription());
}
}

View File

@ -31,7 +31,7 @@ public class GetAsyncSearchAction extends ActionType<AsyncSearchResponse> {
public static class Request extends ActionRequest {
private final String id;
private TimeValue waitForCompletion = TimeValue.MINUS_ONE;
private TimeValue waitForCompletionTimeout = TimeValue.MINUS_ONE;
private TimeValue keepAlive = TimeValue.MINUS_ONE;
/**
@ -46,7 +46,7 @@ public class GetAsyncSearchAction extends ActionType<AsyncSearchResponse> {
public Request(StreamInput in) throws IOException {
super(in);
this.id = in.readString();
this.waitForCompletion = TimeValue.timeValueMillis(in.readLong());
this.waitForCompletionTimeout = TimeValue.timeValueMillis(in.readLong());
this.keepAlive = in.readTimeValue();
}
@ -54,7 +54,7 @@ public class GetAsyncSearchAction extends ActionType<AsyncSearchResponse> {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeLong(waitForCompletion.millis());
out.writeLong(waitForCompletionTimeout.millis());
out.writeTimeValue(keepAlive);
}
@ -73,13 +73,13 @@ public class GetAsyncSearchAction extends ActionType<AsyncSearchResponse> {
/**
* Sets the minimum time that the request should wait before returning a partial result (defaults to no wait).
*/
public Request setWaitForCompletion(TimeValue timeValue) {
this.waitForCompletion = timeValue;
public Request setWaitForCompletionTimeout(TimeValue timeValue) {
this.waitForCompletionTimeout = timeValue;
return this;
}
public TimeValue getWaitForCompletion() {
return waitForCompletion;
public TimeValue getWaitForCompletionTimeout() {
return waitForCompletionTimeout;
}
/**
@ -100,13 +100,20 @@ public class GetAsyncSearchAction extends ActionType<AsyncSearchResponse> {
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(id, request.id) &&
waitForCompletion.equals(request.waitForCompletion) &&
waitForCompletionTimeout.equals(request.waitForCompletionTimeout) &&
keepAlive.equals(request.keepAlive);
}
@Override
public int hashCode() {
return Objects.hash(id, waitForCompletion, keepAlive);
return Objects.hash(id, waitForCompletionTimeout, keepAlive);
}
@Override
public String getDescription() {
return "id[" + id +
"], waitForCompletionTimeout[" + waitForCompletionTimeout +
"], keepAlive[" + keepAlive + "]";
}
}
}

View File

@ -151,12 +151,21 @@ public class SubmitAsyncSearchRequest extends ActionRequest {
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, toString(), parentTaskId, headers) {
return new CancellableTask(id, type, action, null, parentTaskId, headers) {
@Override
public boolean shouldCancelChildrenOnCancellation() {
// we cancel the underlying search action explicitly in the submit action
return false;
}
@Override
public String getDescription() {
// generating description in a lazy way since source can be quite big
return "waitForCompletionTimeout[" + waitForCompletionTimeout +
"], keepOnCompletion[" + keepOnCompletion +
"] keepAlive[" + keepAlive +
"], request=" + request.buildDescription();
}
};
}
@ -179,14 +188,4 @@ public class SubmitAsyncSearchRequest extends ActionRequest {
public int hashCode() {
return Objects.hash(waitForCompletionTimeout, keepOnCompletion, keepAlive, request);
}
@Override
public String toString() {
return "SubmitAsyncSearchRequest{" +
"waitForCompletionTimeout=" + waitForCompletionTimeout +
", keepOnCompletion=" + keepOnCompletion +
", keepAlive=" + keepAlive +
", request=" + request +
'}';
}
}