diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index e9c9968ecb8..0243a872d61 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -65,6 +65,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -82,11 +83,25 @@ public class SearchPhaseController extends AbstractComponent { private final BigArrays bigArrays; private final ScriptService scriptService; + private final List > searchResponseListener; public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService) { + this(settings, bigArrays, scriptService, Collections.emptyList()); + } + + public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService, + List > searchResponseListener) { super(settings); this.bigArrays = bigArrays; this.scriptService = scriptService; + this.searchResponseListener = searchResponseListener; + } + + /** + * Returns the search response listeners registry + */ + public List > getSearchResponseListener() { + return searchResponseListener; } public AggregatedDfs aggregateDfs(AtomicArray results) { diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index f67fc543425..cfbd30f0217 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.function.BiConsumer; import java.util.function.Function; import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH; @@ -212,8 +213,23 @@ public class TransportSearchAction extends HandledTransportAction wrapper; + if (searchPhaseController.getSearchResponseListener().size() > 0) { + wrapper = ActionListener.wrap(searchResponse -> { + List> responseListeners = + searchPhaseController.getSearchResponseListener(); + for (BiConsumer respListener : responseListeners) { + respListener.accept(searchRequest, searchResponse); + } + listener.onResponse(searchResponse); + + }, listener::onFailure); + } else { + wrapper = listener; + } searchAsyncAction(task, searchRequest, shardIterators, startTimeInMillis, connectionLookup, clusterState.version(), - Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start(); + Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, wrapper).start(); } private static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator, diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index e3fe68838d4..3671f7f331a 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -451,7 +451,7 @@ public class Node implements Closeable { b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings, settingsModule.getClusterSettings(), transportService)); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays, - scriptModule.getScriptService())); + scriptModule.getScriptService(), searchModule.getSearchResponseListeners())); b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NetworkService.class).toInstance(networkService); diff --git a/core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java b/core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java index b68176d8f0d..b6adbf893b5 100644 --- a/core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java @@ -20,6 +20,8 @@ package org.elasticsearch.plugins; import org.apache.lucene.search.Query; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; @@ -51,6 +53,7 @@ import org.elasticsearch.search.suggest.SuggestionBuilder; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.function.BiConsumer; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -121,6 +124,15 @@ public interface SearchPlugin { default List getPipelineAggregations() { return emptyList(); } + /** + * The new search response listeners in the form of {@link BiConsumer}s added by this plugin. + * The listeners are invoked on the coordinating node, at the very end of the search request. + * This provides a convenient location if you wish to inspect/modify the final response (took time, etc). + * The BiConsumers are passed the original {@link SearchRequest} and the final {@link SearchResponse} + */ + default List> getSearchResponseListeners() { + return emptyList(); + } /** * Specification of custom {@link ScoreFunction}. diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 048630dad20..92c0d1700e7 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -20,6 +20,8 @@ package org.elasticsearch.search; import org.apache.lucene.search.BooleanQuery; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.NamedRegistry; import org.elasticsearch.common.geo.ShapesAvailability; import org.elasticsearch.common.geo.builders.ShapeBuilders; @@ -251,6 +253,8 @@ import org.elasticsearch.search.suggest.term.TermSuggestionBuilder; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -272,6 +276,7 @@ public class SearchModule { "moving_avg_model"); private final List fetchSubPhases = new ArrayList<>(); + private final List > searchResponseListeners = new ArrayList<> (); private final Settings settings; private final List namedWriteables = new ArrayList<>(); @@ -293,6 +298,7 @@ public class SearchModule { registerPipelineAggregations(plugins); registerFetchSubPhases(plugins); registerSearchExts(plugins); + registerSearchResponseListeners(plugins); registerShapes(); } @@ -325,6 +331,13 @@ public class SearchModule { return movingAverageModelParserRegistry; } + /** + * Returns the search response listeners registry + */ + public List > getSearchResponseListeners() { + return searchResponseListeners; + } + private void registerAggregations(List plugins) { registerAggregation(new AggregationSpec(AvgAggregationBuilder.NAME, AvgAggregationBuilder::new, AvgAggregationBuilder::parse) .addResultReader(InternalAvg::new)); @@ -675,6 +688,10 @@ public class SearchModule { registerFromPlugin(plugins, p -> p.getFetchSubPhases(context), this::registerFetchSubPhase); } + private void registerSearchResponseListeners(List plugins) { + registerFromPlugin(plugins, p -> p.getSearchResponseListeners(), this::registerSearchResponseListener); + } + private void registerSearchExts(List plugins) { registerFromPlugin(plugins, SearchPlugin::getSearchExts, this::registerSearchExt); } @@ -761,6 +778,10 @@ public class SearchModule { (p, c) -> spec.getParser().fromXContent((QueryParseContext) c))); } + private void registerSearchResponseListener(BiConsumer listener) { + searchResponseListeners.add(requireNonNull(listener, "SearchResponseListener must not be null")); + } + public FetchPhase getFetchPhase() { return new FetchPhase(fetchSubPhases); } diff --git a/core/src/test/java/org/elasticsearch/search/ResponseListenerPluginIT.java b/core/src/test/java/org/elasticsearch/search/ResponseListenerPluginIT.java new file mode 100644 index 00000000000..0fc36a42b18 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/ResponseListenerPluginIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Before; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.greaterThan; + +public class ResponseListenerPluginIT extends ESIntegTestCase { + + static AtomicBoolean hookWasFired = new AtomicBoolean(false); + + public static class FooResponseListenerPlugin extends Plugin implements SearchPlugin { + @Override + public List> getSearchResponseListeners() { + return singletonList((searchRequest, response) -> { + assertThat(response.getTookInMillis(), greaterThan(0L)); + boolean alreadyFired = hookWasFired.getAndSet(true); + assertFalse(alreadyFired); + }); + } + } + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(FooResponseListenerPlugin.class); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + createIndex("test"); + ensureGreen(); + client().prepareIndex("index", "type", "1").setSource("field", "value").get(); + refresh(); + } + + public void testSearchResponseHook() { + assertHitCount(client().prepareSearch("index").setQuery(QueryBuilders.matchAllQuery()).get(), 1L); + boolean alreadyFired = hookWasFired.get(); + assertTrue(alreadyFired); + } +} diff --git a/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java b/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java index 2512dbdfeac..e5652845ab4 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.search; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.inject.ModuleTestCase; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -69,6 +71,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -256,6 +259,18 @@ public class SearchModuleTests extends ModuleTestCase { hasSize(1)); } + public void testRegisterSearchResponseListener() { + BiConsumer listener = (s, r) -> {}; + SearchModule module = new SearchModule(Settings.EMPTY, false, singletonList(new SearchPlugin() { + public List> getSearchResponseListeners() { + return singletonList(listener); + } + })); + List> listeners = module.getSearchResponseListeners(); + assertEquals(listeners.size(), 1); + assertEquals(listeners.get(0), listener); + } + private static final String[] NON_DEPRECATED_QUERIES = new String[] { "bool", "boosting",