diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index 4d42ad334a9..560379a6ce2 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -21,12 +21,14 @@ package org.elasticsearch.action.search; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -112,11 +114,14 @@ public class MultiSearchResponse extends ActionResponse implements Iterable { private final int availableProcessors; private final ClusterService clusterService; private final TransportAction searchAction; + private final LongSupplier relativeTimeProvider; @Inject public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, @@ -53,19 +55,23 @@ public class TransportMultiSearchAction extends HandledTransportAction searchAction, - IndexNameExpressionResolver resolver, int availableProcessors) { + IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) { super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new); this.clusterService = clusterService; this.searchAction = searchAction; this.availableProcessors = availableProcessors; + this.relativeTimeProvider = relativeTimeProvider; } @Override protected void doExecute(MultiSearchRequest request, ActionListener listener) { + final long relativeStartTime = relativeTimeProvider.getAsLong(); + ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); @@ -85,7 +91,7 @@ public class TransportMultiSearchAction extends HandledTransportAction requests, final AtomicArray responses, final AtomicInteger responseCounter, - final ActionListener listener) { + final ActionListener listener, + final long relativeStartTime) { SearchRequestSlot request = requests.poll(); if (request == null) { /* @@ -155,16 +162,25 @@ public class TransportMultiSearchAction extends HandledTransportAction executeSearch(requests, responses, responseCounter, listener)); + threadPool.generic() + .execute(() -> executeSearch(requests, responses, responseCounter, listener, relativeStartTime)); } else { // we are on a different thread (we went asynchronous), it's safe to recurse - executeSearch(requests, responses, responseCounter, listener); + executeSearch(requests, responses, responseCounter, listener, relativeStartTime); } } } private void finish() { - listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]))); + listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]), + buildTookInMillis())); + } + + /** + * Builds how long it took to execute the msearch. + */ + private long buildTookInMillis() { + return TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - relativeStartTime); } }); } @@ -178,7 +194,5 @@ public class TransportMultiSearchAction extends HandledTransportAction() { + @Override + public void onResponse(MultiSearchResponse multiSearchResponse) { + if (controlledClock) { + assertThat(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS), + equalTo(multiSearchResponse.getTook().getMillis())); + } else { + assertThat(multiSearchResponse.getTook().getMillis(), + greaterThanOrEqualTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS))); + } + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }); + } + + private TransportMultiSearchAction createTransportMultiSearchAction(boolean controlledClock, AtomicLong expected) { + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + TaskManager taskManager = mock(TaskManager.class); + TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + ActionFilters actionFilters = new ActionFilters(new HashSet<>()); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); + IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); + + final int availableProcessors = Runtime.getRuntime().availableProcessors(); + AtomicInteger counter = new AtomicInteger(); + final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME); + Randomness.shuffle(threadPoolNames); + final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); + final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); + + TransportAction searchAction = new TransportAction(Settings.EMPTY, + "action", threadPool, actionFilters, resolver, taskManager) { + @Override + protected void doExecute(SearchRequest request, ActionListener listener) { + requests.add(request); + commonExecutor.execute(() -> { + counter.decrementAndGet(); + listener.onResponse(new SearchResponse()); + }); + } + }; + + if (controlledClock) { + return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, + availableProcessors, expected::get) { + @Override + void executeSearch(final Queue requests, final AtomicArray responses, + final AtomicInteger responseCounter, final ActionListener listener, long startTimeInNanos) { + expected.set(1000000); + super.executeSearch(requests, responses, responseCounter, listener, startTimeInNanos); + } + }; + } else { + return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, + availableProcessors, System::nanoTime) { + + @Override + void executeSearch(final Queue requests, final AtomicArray responses, + final AtomicInteger responseCounter, final ActionListener listener, long startTimeInNanos) { + long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(0, 10)); + expected.set(elapsed); + super.executeSearch(requests, responses, responseCounter, listener, startTimeInNanos); + } + }; + } + } + + static class Resolver extends IndexNameExpressionResolver { + + Resolver(Settings settings) { + super(settings); + } + + @Override + public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { + return request.indices(); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index 3a162f302bc..e6de1d859d8 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -146,13 +146,16 @@ public class MultiSearchRequestTests extends ESTestCase { } public void testResponseErrorToXContent() throws IOException { + long tookInMillis = randomIntBetween(1, 1000); MultiSearchResponse response = new MultiSearchResponse( - new MultiSearchResponse.Item[]{ - new MultiSearchResponse.Item(null, new IllegalStateException("foobar")), - new MultiSearchResponse.Item(null, new IllegalStateException("baaaaaazzzz")) - }); + new MultiSearchResponse.Item[] { + new MultiSearchResponse.Item(null, new IllegalStateException("foobar")), + new MultiSearchResponse.Item(null, new IllegalStateException("baaaaaazzzz")) + }, tookInMillis); - assertEquals("{\"responses\":[" + assertEquals("{\"took\":" + + tookInMillis + + ",\"responses\":[" + "{" + "\"error\":{\"root_cause\":[{\"type\":\"illegal_state_exception\",\"reason\":\"foobar\"}]," + "\"type\":\"illegal_state_exception\",\"reason\":\"foobar\"},\"status\":500" diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index e811da82c47..4410507eef9 100644 --- a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -102,8 +102,10 @@ public class TransportMultiSearchActionTests extends ESTestCase { }); } }; - TransportMultiSearchAction action = - new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10); + + TransportMultiSearchAction action = + new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10, + System::nanoTime); // Execute the multi search api and fail if we find an error after executing: try {