Add the ability to define search response listeners in plugins (#22682)

This change is a simple adaptation of https://github.com/elastic/elasticsearch/pull/19587 for the current state of master.
It allows to define search response listener in the form of `BiConsumer<SearchRequest, SearchResponse>`s in a search plugin.
This commit is contained in:
Jim Ferenczi 2017-01-19 12:48:45 +01:00 committed by GitHub
parent 652cb7dbf7
commit 21dae1924f
7 changed files with 156 additions and 2 deletions

View File

@ -65,6 +65,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -82,11 +83,25 @@ public class SearchPhaseController extends AbstractComponent {
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final List<BiConsumer<SearchRequest, SearchResponse> > searchResponseListener;
public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService) {
this(settings, bigArrays, scriptService, Collections.emptyList());
}
public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService,
List<BiConsumer<SearchRequest, SearchResponse> > searchResponseListener) {
super(settings);
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.searchResponseListener = searchResponseListener;
}
/**
* Returns the search response listeners registry
*/
public List<BiConsumer<SearchRequest, SearchResponse> > getSearchResponseListener() {
return searchResponseListener;
}
public AggregatedDfs aggregateDfs(AtomicArray<DfsSearchResult> results) {

View File

@ -51,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Function;
import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH;
@ -212,8 +213,23 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
return connection;
};
final ActionListener<SearchResponse> wrapper;
if (searchPhaseController.getSearchResponseListener().size() > 0) {
wrapper = ActionListener.wrap(searchResponse -> {
List<BiConsumer<SearchRequest, SearchResponse>> responseListeners =
searchPhaseController.getSearchResponseListener();
for (BiConsumer<SearchRequest, SearchResponse> respListener : responseListeners) {
respListener.accept(searchRequest, searchResponse);
}
listener.onResponse(searchResponse);
}, listener::onFailure);
} else {
wrapper = listener;
}
searchAsyncAction(task, searchRequest, shardIterators, startTimeInMillis, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start();
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, wrapper).start();
}
private static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator,

View File

@ -451,7 +451,7 @@ public class Node implements Closeable {
b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings,
settingsModule.getClusterSettings(), transportService));
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
scriptModule.getScriptService()));
scriptModule.getScriptService(), searchModule.getSearchResponseListeners()));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);

View File

@ -20,6 +20,8 @@
package org.elasticsearch.plugins;
import org.apache.lucene.search.Query;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -51,6 +53,7 @@ import org.elasticsearch.search.suggest.SuggestionBuilder;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@ -121,6 +124,15 @@ public interface SearchPlugin {
default List<PipelineAggregationSpec> getPipelineAggregations() {
return emptyList();
}
/**
* The new search response listeners in the form of {@link BiConsumer}s added by this plugin.
* The listeners are invoked on the coordinating node, at the very end of the search request.
* This provides a convenient location if you wish to inspect/modify the final response (took time, etc).
* The BiConsumers are passed the original {@link SearchRequest} and the final {@link SearchResponse}
*/
default List<BiConsumer<SearchRequest, SearchResponse>> getSearchResponseListeners() {
return emptyList();
}
/**
* Specification of custom {@link ScoreFunction}.

View File

@ -20,6 +20,8 @@
package org.elasticsearch.search;
import org.apache.lucene.search.BooleanQuery;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.geo.builders.ShapeBuilders;
@ -251,6 +253,8 @@ import org.elasticsearch.search.suggest.term.TermSuggestionBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@ -272,6 +276,7 @@ public class SearchModule {
"moving_avg_model");
private final List<FetchSubPhase> fetchSubPhases = new ArrayList<>();
private final List<BiConsumer<SearchRequest, SearchResponse> > searchResponseListeners = new ArrayList<> ();
private final Settings settings;
private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
@ -293,6 +298,7 @@ public class SearchModule {
registerPipelineAggregations(plugins);
registerFetchSubPhases(plugins);
registerSearchExts(plugins);
registerSearchResponseListeners(plugins);
registerShapes();
}
@ -325,6 +331,13 @@ public class SearchModule {
return movingAverageModelParserRegistry;
}
/**
* Returns the search response listeners registry
*/
public List<BiConsumer<SearchRequest, SearchResponse> > getSearchResponseListeners() {
return searchResponseListeners;
}
private void registerAggregations(List<SearchPlugin> plugins) {
registerAggregation(new AggregationSpec(AvgAggregationBuilder.NAME, AvgAggregationBuilder::new, AvgAggregationBuilder::parse)
.addResultReader(InternalAvg::new));
@ -675,6 +688,10 @@ public class SearchModule {
registerFromPlugin(plugins, p -> p.getFetchSubPhases(context), this::registerFetchSubPhase);
}
private void registerSearchResponseListeners(List<SearchPlugin> plugins) {
registerFromPlugin(plugins, p -> p.getSearchResponseListeners(), this::registerSearchResponseListener);
}
private void registerSearchExts(List<SearchPlugin> plugins) {
registerFromPlugin(plugins, SearchPlugin::getSearchExts, this::registerSearchExt);
}
@ -761,6 +778,10 @@ public class SearchModule {
(p, c) -> spec.getParser().fromXContent((QueryParseContext) c)));
}
private void registerSearchResponseListener(BiConsumer<SearchRequest, SearchResponse> listener) {
searchResponseListeners.add(requireNonNull(listener, "SearchResponseListener must not be null"));
}
public FetchPhase getFetchPhase() {
return new FetchPhase(fetchSubPhases);
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import static java.util.Collections.singletonList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThan;
public class ResponseListenerPluginIT extends ESIntegTestCase {
static AtomicBoolean hookWasFired = new AtomicBoolean(false);
public static class FooResponseListenerPlugin extends Plugin implements SearchPlugin {
@Override
public List<BiConsumer<SearchRequest, SearchResponse>> getSearchResponseListeners() {
return singletonList((searchRequest, response) -> {
assertThat(response.getTookInMillis(), greaterThan(0L));
boolean alreadyFired = hookWasFired.getAndSet(true);
assertFalse(alreadyFired);
});
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(FooResponseListenerPlugin.class);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
createIndex("test");
ensureGreen();
client().prepareIndex("index", "type", "1").setSource("field", "value").get();
refresh();
}
public void testSearchResponseHook() {
assertHitCount(client().prepareSearch("index").setQuery(QueryBuilders.matchAllQuery()).get(), 1L);
boolean alreadyFired = hookWasFired.get();
assertTrue(alreadyFired);
}
}

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.inject.ModuleTestCase;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -69,6 +71,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@ -256,6 +259,18 @@ public class SearchModuleTests extends ModuleTestCase {
hasSize(1));
}
public void testRegisterSearchResponseListener() {
BiConsumer<SearchRequest, SearchResponse> listener = (s, r) -> {};
SearchModule module = new SearchModule(Settings.EMPTY, false, singletonList(new SearchPlugin() {
public List<BiConsumer<SearchRequest, SearchResponse>> getSearchResponseListeners() {
return singletonList(listener);
}
}));
List<BiConsumer<SearchRequest, SearchResponse>> listeners = module.getSearchResponseListeners();
assertEquals(listeners.size(), 1);
assertEquals(listeners.get(0), listener);
}
private static final String[] NON_DEPRECATED_QUERIES = new String[] {
"bool",
"boosting",