From a8a16e6b08e7be9dd2a2c801b396067b5623c500 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 18 Jul 2019 11:04:05 +0200 Subject: [PATCH] Associate sub-requests to their parent task in multi search API (#44492) Multi search accepts multiple search requests and runs them as independent requests, each one as part of their own search task. Today they don't get associated though with their parent multi search task, which would be useful to monitor which msearch a certain search was part of, if any, and also to cancel all of the sub-requests in case the parent msearch gets cancelled (though this will also require making the multi search task cancellable as a follow-up). --- .../search/TransportMultiSearchAction.java | 1 + .../search/MultiSearchActionTookTests.java | 5 ++ .../TransportMultiSearchActionTests.java | 65 ++++++++++++++++++- 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index f03e1fd4dd1..66aa15c5692 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -86,6 +86,7 @@ public class TransportMultiSearchAction extends HandledTransportAction searchRequestSlots = new ConcurrentLinkedQueue<>(); for (int i = 0; i < request.requests().size(); i++) { SearchRequest searchRequest = request.requests().get(i); + searchRequest.setParentTask(client.getLocalNodeId(), task.getId()); searchRequestSlots.add(new SearchRequestSlot(searchRequest, i)); } diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 01f1109ef3b..19b53e2f8d3 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -159,6 +159,11 @@ public class MultiSearchActionTookTests extends ESTestCase { ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY)); }); } + + @Override + public String getLocalNodeId() { + return "local_node_id"; + } }; if (controlledClock) { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index b405b0790de..1b8fea2e78f 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -35,6 +36,7 @@ import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -53,6 +55,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.action.support.PlainActionFuture.newFuture; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; @@ -76,7 +79,62 @@ public class TransportMultiSearchActionTests extends ESTestCase { super.tearDown(); } - public void testBatchExecute() throws Exception { + public void testParentTaskId() throws Exception { + // Initialize dependencies of TransportMultiSearchAction + Settings settings = Settings.builder() + .put("node.name", TransportMultiSearchActionTests.class.getSimpleName()) + .build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); + + String localNodeId = randomAlphaOfLengthBetween(3, 10); + int numSearchRequests = randomIntBetween(1, 100); + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + for (int i = 0; i < numSearchRequests; i++) { + multiSearchRequest.add(new SearchRequest()); + } + AtomicInteger counter = new AtomicInteger(0); + Task task = multiSearchRequest.createTask(randomLong(), "type", "action", null, Collections.emptyMap()); + NodeClient client = new NodeClient(settings, threadPool) { + @Override + public void search(final SearchRequest request, final ActionListener listener) { + assertEquals(task.getId(), request.getParentTask().getId()); + assertEquals(localNodeId, request.getParentTask().getNodeId()); + counter.incrementAndGet(); + listener.onResponse(SearchResponse.empty(() -> 1L, SearchResponse.Clusters.EMPTY)); + } + + @Override + public String getLocalNodeId() { + return localNodeId; + } + }; + TransportMultiSearchAction action = + new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, 10, System::nanoTime, client); + + PlainActionFuture future = newFuture(); + action.execute(task, multiSearchRequest, future); + future.get(); + assertEquals(numSearchRequests, counter.get()); + } finally { + assertTrue(ESTestCase.terminate(threadPool)); + } + } + + public void testBatchExecute() { // Initialize dependencies of TransportMultiSearchAction Settings settings = Settings.builder() .put("node.name", TransportMultiSearchActionTests.class.getSimpleName()) @@ -123,6 +181,11 @@ public class TransportMultiSearchActionTests extends ESTestCase { ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY)); }); } + + @Override + public String getLocalNodeId() { + return "local_node_id"; + } }; TransportMultiSearchAction action =