From ad8326134828b3490227a11c98671636992bddf5 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 8 Sep 2020 16:13:46 +0200 Subject: [PATCH] Print out search request as part of async search task description (#62057) Currently, the async search task is the task that will be running through the whole execution of an async search. While the submit async search task prints out the search as part of its description, async search task doesn't while it should. With this commit we address that while also making sure that the description highlights that the task is originated from an async search. Also, we streamline the way the description is printed out by SearchTask so that it does not get forgotten in the future. --- .../action/search/SearchRequest.java | 10 ++-------- .../action/search/SearchScrollRequest.java | 2 +- .../elasticsearch/action/search/SearchTask.java | 14 ++++++++++++-- .../action/search/MockSearchPhaseContext.java | 2 +- .../SearchQueryThenFetchAsyncActionTests.java | 2 +- .../xpack/search/AsyncSearchTask.java | 3 ++- .../TransportSubmitAsyncSearchAction.java | 2 +- .../xpack/search/AsyncSearchTaskTests.java | 17 +++++++++++++++-- 8 files changed, 35 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 7b15e9dd110..d7ee7d3558f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -645,16 +645,10 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla @Override public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - // generating description in a lazy way since source can be quite big - return new SearchTask(id, type, action, null, parentTaskId, headers) { - @Override - public String getDescription() { - return buildDescription(); - } - }; + return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers); } - public String buildDescription() { + public final String buildDescription() { StringBuilder sb = new StringBuilder(); sb.append("indices["); Strings.arrayToDelimitedString(indices, ",", sb); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java index 5815df7cb96..7a320d81f2d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java @@ -114,7 +114,7 @@ public class SearchScrollRequest extends ActionRequest implements ToXContentObje @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchTask(id, type, action, this::getDescription, parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java index c5a918c06f1..45f1fe95860 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java @@ -23,15 +23,25 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; import java.util.Map; +import java.util.function.Supplier; /** * Task storing information about a currently running {@link SearchRequest}. */ public class SearchTask extends CancellableTask { + // generating description in a lazy way since source can be quite big + private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; - public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { - super(id, type, action, description, parentTaskId, headers); + public SearchTask(long id, String type, String action, Supplier descriptionSupplier, + TaskId parentTaskId, Map headers) { + super(id, type, action, null, parentTaskId, headers); + this.descriptionSupplier = descriptionSupplier; + } + + @Override + public final String getDescription() { + return descriptionSupplier.get(); } /** diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index cd060c971a4..29099af8c2d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -74,7 +74,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext { @Override public SearchTask getTask() { - return new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap()); + return new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index cc48a60a725..c8f38c926f3 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -146,7 +146,7 @@ public class SearchQueryThenFetchAsyncActionTests extends ESTestCase { searchRequest.allowPartialSearchResults(false); SearchPhaseController controller = new SearchPhaseController( writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); - SearchTask task = new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 78bca8de805..8c1ecdd846a 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -84,6 +84,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { String type, String action, TaskId parentTaskId, + Supplier descriptionSupplier, TimeValue keepAlive, Map originHeaders, Map taskHeaders, @@ -91,7 +92,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { Client client, ThreadPool threadPool, Supplier aggReduceContextSupplier) { - super(id, type, action, "async_search", parentTaskId, taskHeaders); + super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders); this.expirationTimeMillis = getStartTime() + keepAlive.getMillis(); this.originHeaders = originHeaders; this.searchId = searchId; diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index b31af656e2a..d12cd268e79 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -143,7 +143,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction aggReduceContextSupplier = () -> requestToAggReduceContextBuilder.apply(request.getSearchRequest()); - return new AsyncSearchTask(id, type, action, parentTaskId, keepAlive, + return new AsyncSearchTask(id, type, action, parentTaskId, this::buildDescription, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier); } }; diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java index b0bad46e214..3cd2c682dd1 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchShard; import org.elasticsearch.action.search.ShardSearchFailure; @@ -18,6 +19,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHits; @@ -26,6 +28,7 @@ import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -71,13 +74,23 @@ public class AsyncSearchTaskTests extends ESTestCase { } private AsyncSearchTask createAsyncSearchTask() { - return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1), + return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), new NoOpClient(threadPool), threadPool, null); } + public void testTaskDescription() { + SearchRequest searchRequest = new SearchRequest("index1", "index2").source( + new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value"))); + AsyncSearchTask asyncSearchTask = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), searchRequest::buildDescription, + TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), + new NoOpClient(threadPool), threadPool, null); + assertEquals("async_search{indices[index1,index2], types[], search_type[QUERY_THEN_FETCH], " + + "source[{\"query\":{\"term\":{\"field\":{\"value\":\"value\",\"boost\":1.0}}}}]}", asyncSearchTask.getDescription()); + } + public void testWaitForInit() throws InterruptedException { - AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1), + AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), new NoOpClient(threadPool), threadPool, null); int numShards = randomIntBetween(0, 10);