Rewrite multi search template api to delegate to multi search api instead of to search template api.
The max concurrent searches logic is complex and we shouldn't duplicate that in multi search template api, so we should template each individual template search request and then delegate to multi search api.
This commit is contained in:
parent
bd559d96d4
commit
760e5fce77
|
@ -34,7 +34,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
|
|||
|
||||
public class MultiSearchTemplateRequest extends ActionRequest implements CompositeIndicesRequest {
|
||||
|
||||
private int maxConcurrentSearchRequests = 0;
|
||||
private int maxConcurrentSearchRequests = 1;
|
||||
private List<SearchTemplateRequest> requests = new ArrayList<>();
|
||||
|
||||
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
|
||||
|
|
|
@ -20,59 +20,81 @@
|
|||
package org.elasticsearch.script.mustache;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.search.MultiSearchRequest;
|
||||
import org.elasticsearch.action.search.MultiSearchResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.TransportMultiSearchAction;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.script.mustache.TransportSearchTemplateAction.convert;
|
||||
|
||||
public class TransportMultiSearchTemplateAction extends HandledTransportAction<MultiSearchTemplateRequest, MultiSearchTemplateResponse> {
|
||||
|
||||
private final TransportSearchTemplateAction searchTemplateAction;
|
||||
private final ScriptService scriptService;
|
||||
private final NamedXContentRegistry xContentRegistry;
|
||||
private final TransportMultiSearchAction multiSearchAction;
|
||||
|
||||
@Inject
|
||||
public TransportMultiSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver resolver,
|
||||
TransportSearchTemplateAction searchTemplateAction) {
|
||||
ScriptService scriptService, NamedXContentRegistry xContentRegistry,
|
||||
TransportMultiSearchAction multiSearchAction) {
|
||||
super(settings, MultiSearchTemplateAction.NAME, threadPool, transportService, actionFilters, resolver,
|
||||
MultiSearchTemplateRequest::new);
|
||||
this.searchTemplateAction = searchTemplateAction;
|
||||
this.scriptService = scriptService;
|
||||
this.xContentRegistry = xContentRegistry;
|
||||
this.multiSearchAction = multiSearchAction;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(MultiSearchTemplateRequest request, ActionListener<MultiSearchTemplateResponse> listener) {
|
||||
final AtomicArray<MultiSearchTemplateResponse.Item> responses = new AtomicArray<>(request.requests().size());
|
||||
final AtomicInteger counter = new AtomicInteger(responses.length());
|
||||
List<Integer> originalSlots = new ArrayList<>();
|
||||
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
|
||||
multiSearchRequest.indicesOptions(request.indicesOptions());
|
||||
if (request.maxConcurrentSearchRequests() != 0) {
|
||||
multiSearchRequest.maxConcurrentSearchRequests(request.maxConcurrentSearchRequests());
|
||||
}
|
||||
|
||||
for (int i = 0; i < responses.length(); i++) {
|
||||
final int index = i;
|
||||
searchTemplateAction.execute(request.requests().get(i), new ActionListener<SearchTemplateResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchTemplateResponse searchTemplateResponse) {
|
||||
responses.set(index, new MultiSearchTemplateResponse.Item(searchTemplateResponse, null));
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
finishHim();
|
||||
MultiSearchTemplateResponse.Item[] items = new MultiSearchTemplateResponse.Item[request.requests().size()];
|
||||
for (int i = 0; i < items.length; i++) {
|
||||
SearchTemplateRequest searchTemplateRequest = request.requests().get(i);
|
||||
SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse();
|
||||
SearchRequest searchRequest;
|
||||
try {
|
||||
searchRequest = convert(searchTemplateRequest, searchTemplateResponse, scriptService, xContentRegistry);
|
||||
} catch (Exception e) {
|
||||
items[i] = new MultiSearchTemplateResponse.Item(null, e);
|
||||
continue;
|
||||
}
|
||||
items[i] = new MultiSearchTemplateResponse.Item(searchTemplateResponse, null);
|
||||
if (searchRequest != null) {
|
||||
multiSearchRequest.add(searchRequest);
|
||||
originalSlots.add(i);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
responses.set(index, new MultiSearchTemplateResponse.Item(null, e));
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
finishHim();
|
||||
multiSearchAction.execute(multiSearchRequest, ActionListener.wrap(r -> {
|
||||
for (int i = 0; i < r.getResponses().length; i++) {
|
||||
MultiSearchResponse.Item item = r.getResponses()[i];
|
||||
int originalSlot = originalSlots.get(i);
|
||||
if (item.isFailure()) {
|
||||
items[originalSlot] = new MultiSearchTemplateResponse.Item(null, item.getFailure());
|
||||
} else {
|
||||
items[originalSlot].getResponse().setResponse(item.getResponse());
|
||||
}
|
||||
}
|
||||
|
||||
private void finishHim() {
|
||||
MultiSearchTemplateResponse.Item[] items = responses.toArray(new MultiSearchTemplateResponse.Item[responses.length()]);
|
||||
listener.onResponse(new MultiSearchTemplateResponse(items));
|
||||
}
|
||||
});
|
||||
}
|
||||
}, listener::onFailure));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,9 +38,11 @@ import org.elasticsearch.script.Script;
|
|||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.template.CompiledTemplate;
|
||||
import org.elasticsearch.template.CompiledTemplate;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.script.ScriptContext.Standard.SEARCH;
|
||||
|
@ -69,27 +71,8 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
|
|||
protected void doExecute(SearchTemplateRequest request, ActionListener<SearchTemplateResponse> listener) {
|
||||
final SearchTemplateResponse response = new SearchTemplateResponse();
|
||||
try {
|
||||
Script script = new Script(request.getScriptType(), TEMPLATE_LANG, request.getScript(),
|
||||
request.getScriptParams() == null ? Collections.emptyMap() : request.getScriptParams());
|
||||
CompiledTemplate compiledScript = scriptService.compileTemplate(script, SEARCH);
|
||||
BytesReference source = compiledScript.run(script.getParams());
|
||||
response.setSource(source);
|
||||
|
||||
if (request.isSimulate()) {
|
||||
listener.onResponse(response);
|
||||
return;
|
||||
}
|
||||
|
||||
// Executes the search
|
||||
SearchRequest searchRequest = request.getRequest();
|
||||
//we can assume the template is always json as we convert it before compiling it
|
||||
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(xContentRegistry, source)) {
|
||||
SearchSourceBuilder builder = SearchSourceBuilder.searchSource();
|
||||
builder.parseXContent(new QueryParseContext(parser));
|
||||
builder.explain(request.isExplain());
|
||||
builder.profile(request.isProfile());
|
||||
searchRequest.source(builder);
|
||||
|
||||
SearchRequest searchRequest = convert(request, response, scriptService, xContentRegistry);
|
||||
if (searchRequest != null) {
|
||||
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchResponse searchResponse) {
|
||||
|
@ -106,9 +89,35 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
|
|||
listener.onFailure(t);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
} catch (Exception t) {
|
||||
listener.onFailure(t);
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
static SearchRequest convert(SearchTemplateRequest searchTemplateRequest, SearchTemplateResponse response, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry) throws IOException {
|
||||
Script script = new Script(searchTemplateRequest.getScriptType(), TEMPLATE_LANG, searchTemplateRequest.getScript(),
|
||||
searchTemplateRequest.getScriptParams() == null ? Collections.emptyMap() : searchTemplateRequest.getScriptParams());
|
||||
CompiledTemplate compiledScript = scriptService.compileTemplate(script, SEARCH);
|
||||
BytesReference source = compiledScript.run(script.getParams());
|
||||
response.setSource(source);
|
||||
|
||||
SearchRequest searchRequest = searchTemplateRequest.getRequest();
|
||||
response.setSource(source);
|
||||
if (searchTemplateRequest.isSimulate()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(xContentRegistry, source)) {
|
||||
SearchSourceBuilder builder = SearchSourceBuilder.searchSource();
|
||||
builder.parseXContent(new QueryParseContext(parser));
|
||||
builder.explain(searchTemplateRequest.isExplain());
|
||||
builder.profile(searchTemplateRequest.isProfile());
|
||||
searchRequest.source(builder);
|
||||
}
|
||||
return searchRequest;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue