Associate sub-requests to their parent task in multi search API (#44492)

Multi search accepts multiple search requests and runs them as
independent requests, each one as part of their own search task. Today
they don't get associated though with their parent multi search task,
which would be useful to monitor which msearch a certain search was part
of, if any, and also to cancel all of the sub-requests in case the
parent msearch gets cancelled (though this will also require making 
the multi search task cancellable as a follow-up).
This commit is contained in:
Luca Cavanna 2019-07-18 11:04:05 +02:00
parent e9f257b4d9
commit a8a16e6b08
3 changed files with 70 additions and 1 deletions

View File

@ -86,6 +86,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
Queue<SearchRequestSlot> searchRequestSlots = new ConcurrentLinkedQueue<>();
for (int i = 0; i < request.requests().size(); i++) {
SearchRequest searchRequest = request.requests().get(i);
searchRequest.setParentTask(client.getLocalNodeId(), task.getId());
searchRequestSlots.add(new SearchRequestSlot(searchRequest, i));
}

View File

@ -159,6 +159,11 @@ public class MultiSearchActionTookTests extends ESTestCase {
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
});
}
@Override
public String getLocalNodeId() {
return "local_node_id";
}
};
if (controlledClock) {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -35,6 +36,7 @@ import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
@ -53,6 +55,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
@ -76,7 +79,62 @@ public class TransportMultiSearchActionTests extends ESTestCase {
super.tearDown();
}
public void testBatchExecute() throws Exception {
public void testParentTaskId() throws Exception {
// Initialize dependencies of TransportMultiSearchAction
Settings settings = Settings.builder()
.put("node.name", TransportMultiSearchActionTests.class.getSimpleName())
.build();
ActionFilters actionFilters = mock(ActionFilters.class);
when(actionFilters.filters()).thenReturn(new ActionFilter[0]);
ThreadPool threadPool = new ThreadPool(settings);
try {
TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null,
Collections.emptySet()) {
@Override
public TaskManager getTaskManager() {
return taskManager;
}
};
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build());
String localNodeId = randomAlphaOfLengthBetween(3, 10);
int numSearchRequests = randomIntBetween(1, 100);
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
for (int i = 0; i < numSearchRequests; i++) {
multiSearchRequest.add(new SearchRequest());
}
AtomicInteger counter = new AtomicInteger(0);
Task task = multiSearchRequest.createTask(randomLong(), "type", "action", null, Collections.emptyMap());
NodeClient client = new NodeClient(settings, threadPool) {
@Override
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
assertEquals(task.getId(), request.getParentTask().getId());
assertEquals(localNodeId, request.getParentTask().getNodeId());
counter.incrementAndGet();
listener.onResponse(SearchResponse.empty(() -> 1L, SearchResponse.Clusters.EMPTY));
}
@Override
public String getLocalNodeId() {
return localNodeId;
}
};
TransportMultiSearchAction action =
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, 10, System::nanoTime, client);
PlainActionFuture<MultiSearchResponse> future = newFuture();
action.execute(task, multiSearchRequest, future);
future.get();
assertEquals(numSearchRequests, counter.get());
} finally {
assertTrue(ESTestCase.terminate(threadPool));
}
}
public void testBatchExecute() {
// Initialize dependencies of TransportMultiSearchAction
Settings settings = Settings.builder()
.put("node.name", TransportMultiSearchActionTests.class.getSimpleName())
@ -123,6 +181,11 @@ public class TransportMultiSearchActionTests extends ESTestCase {
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
});
}
@Override
public String getLocalNodeId() {
return "local_node_id";
}
};
TransportMultiSearchAction action =