Print out search request as part of async search task description (#62057)

Currently, the async search task is the task that will be running through the whole execution of an async search. While the submit async search task prints out the search as part of its description, async search task doesn't while it should.

With this commit we address that while also making sure that the description highlights that the task is originated from an async search.

Also, we streamline the way the description is printed out by SearchTask so that it does not get forgotten in the future.
This commit is contained in:
Luca Cavanna 2020-09-08 16:13:46 +02:00
parent b7fd7cf154
commit ad83261348
8 changed files with 35 additions and 17 deletions

View File

@ -645,16 +645,10 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
public String getDescription() {
return buildDescription();
}
};
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers);
}
public String buildDescription() {
public final String buildDescription() {
StringBuilder sb = new StringBuilder();
sb.append("indices[");
Strings.arrayToDelimitedString(indices, ",", sb);

View File

@ -114,7 +114,7 @@ public class SearchScrollRequest extends ActionRequest implements ToXContentObje
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
return new SearchTask(id, type, action, this::getDescription, parentTaskId, headers);
}
@Override

View File

@ -23,15 +23,25 @@ import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import java.util.Map;
import java.util.function.Supplier;
/**
* Task storing information about a currently running {@link SearchRequest}.
*/
public class SearchTask extends CancellableTask {
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, description, parentTaskId, headers);
public SearchTask(long id, String type, String action, Supplier<String> descriptionSupplier,
TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, null, parentTaskId, headers);
this.descriptionSupplier = descriptionSupplier;
}
@Override
public final String getDescription() {
return descriptionSupplier.get();
}
/**

View File

@ -74,7 +74,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
@Override
public SearchTask getTask() {
return new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap());
return new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
}
@Override

View File

@ -146,7 +146,7 @@ public class SearchQueryThenFetchAsyncActionTests extends ESTestCase {
searchRequest.allowPartialSearchResults(false);
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchTask task = new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap());
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger,
searchTransportService, (clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),

View File

@ -84,6 +84,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
String type,
String action,
TaskId parentTaskId,
Supplier<String> descriptionSupplier,
TimeValue keepAlive,
Map<String, String> originHeaders,
Map<String, String> taskHeaders,
@ -91,7 +92,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
Client client,
ThreadPool threadPool,
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
super(id, type, action, "async_search", parentTaskId, taskHeaders);
super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders);
this.expirationTimeMillis = getStartTime() + keepAlive.getMillis();
this.originHeaders = originHeaders;
this.searchId = searchId;

View File

@ -143,7 +143,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
return new AsyncSearchTask(id, type, action, parentTaskId, keepAlive,
return new AsyncSearchTask(id, type, action, parentTaskId, this::buildDescription, keepAlive,
originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier);
}
};

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchShard;
import org.elasticsearch.action.search.ShardSearchFailure;
@ -18,6 +19,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHits;
@ -26,6 +28,7 @@ import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
@ -71,13 +74,23 @@ public class AsyncSearchTaskTests extends ESTestCase {
}
private AsyncSearchTask createAsyncSearchTask() {
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
}
public void testTaskDescription() {
SearchRequest searchRequest = new SearchRequest("index1", "index2").source(
new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value")));
AsyncSearchTask asyncSearchTask = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), searchRequest::buildDescription,
TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
assertEquals("async_search{indices[index1,index2], types[], search_type[QUERY_THEN_FETCH], " +
"source[{\"query\":{\"term\":{\"field\":{\"value\":\"value\",\"boost\":1.0}}}}]}", asyncSearchTask.getDescription());
}
public void testWaitForInit() throws InterruptedException {
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
int numShards = randomIntBetween(0, 10);