EQL: Fix NPE from incorrect use of ids search (#63032)

This fixes a bug introduced when moving from mget to ids query. While
mget returns all the ids given, id query is a search query and thus by
default returns only 10 documents.
The fix correctly sets the expected size so all the information is
returned inside the response.

Fix #63030

(cherry picked from commit 09ba85548a0142a1fe8376efea9cc4e7764a207c)
This commit is contained in:
Costin Leau 2020-09-30 15:54:35 +03:00 committed by Costin Leau
parent e001b4c021
commit c2992ea287
3 changed files with 130 additions and 39 deletions

View File

@ -262,3 +262,19 @@ sequence
[any where true] by unique_ppid
'''
expected_event_ids = [1, 2, 2, 3]
[[queries]]
name = "sequenceWithMoreThan10Results"
query = '''
sequence by unique_pid
[any where true]
[any where true]
[any where serial_event_id < 72]
'''
expected_event_ids = [54, 55, 59,
55, 59, 61,
59, 61, 65,
16, 60, 66,
61, 65, 67,
65, 67, 70,
60, 66, 71]

View File

@ -8,6 +8,9 @@ package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
@ -16,18 +19,16 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.ql.util.ActionListeners;
import org.elasticsearch.xpack.ql.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.index.query.QueryBuilders.idsQuery;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest;
@ -67,37 +68,35 @@ public class BasicQueryClient implements QueryClient {
client.search(search, listener);
}
protected void search(MultiSearchRequest search, ActionListener<MultiSearchResponse> listener) {
client.multiSearch(search, listener);
}
@Override
public void fetchHits(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
IdsQueryBuilder idsQuery = idsQuery();
int innerListSize = 0;
Set<String> indices = new HashSet<>();
// associate each reference with its own
Map<String, IdsQueryBuilder> queries = new HashMap<>();
// associate each reference with its positions inside the matrix
final Map<HitReference, List<Integer>> referenceToPosition = new HashMap<>();
int counter = 0;
for (List<HitReference> list : refs) {
innerListSize = list.size();
for (HitReference ref : list) {
idsQuery.addIds(ref.id());
indices.add(ref.index());
// remember the reference position
// keep ids per index
IdsQueryBuilder query = queries.computeIfAbsent(ref.index(), v -> idsQuery());
query.addIds(ref.id());
// save the ref position inside the matrix
List<Integer> positions = referenceToPosition.computeIfAbsent(ref, v -> new ArrayList<>(1));
positions.add(counter++);
}
}
SearchSourceBuilder builder = SearchSourceBuilder.searchSource()
// make sure to fetch the whole source
.fetchSource(FetchSourceContext.FETCH_SOURCE)
.trackTotalHits(false)
.trackScores(false)
.query(idsQuery);
final int listSize = innerListSize;
final int topListSize = counter / listSize;
// pre-allocate the response matrix
@SuppressWarnings({"rawtypes", "unchecked"})
List<SearchHit>[] hits = new List[topListSize];
@ -106,14 +105,47 @@ public class BasicQueryClient implements QueryClient {
}
final List<List<SearchHit>> seq = Arrays.asList(hits);
SearchRequest search = prepareRequest(client, builder, false, indices.toArray(new String[0]));
// create a multi-search
MultiSearchRequestBuilder multiSearchBuilder = client.prepareMultiSearch();
for (Map.Entry<String, IdsQueryBuilder> entry : queries.entrySet()) {
IdsQueryBuilder idQuery = entry.getValue();
SearchSourceBuilder builder = SearchSourceBuilder.searchSource()
// make sure to fetch the whole source
.fetchSource(FetchSourceContext.FETCH_SOURCE)
.trackTotalHits(false)
.trackScores(false)
.query(idQuery)
// the default size is 10 so be sure to change it
// NB:this is different from mget
.size(idQuery.ids().size());
search(search, ActionListeners.map(listener, r -> {
for (SearchHit hit : RuntimeUtils.searchHits(r)) {
List<Integer> positions = referenceToPosition.get(new HitReference(hit));
positions.forEach(pos -> seq.get(pos / listSize).set(pos % listSize, hit));
SearchRequest search = prepareRequest(client, builder, false, entry.getKey());
multiSearchBuilder.add(search);
}
return seq;
}));
search(multiSearchBuilder.request(), ActionListener.wrap(r -> {
for (MultiSearchResponse.Item item : r.getResponses()) {
// check for failures
if (item.isFailure()) {
listener.onFailure(item.getFailure());
return;
}
// otherwise proceed
List<SearchHit> docs = RuntimeUtils.searchHits(item.getResponse());
// for each doc, find its reference and its position inside the matrix
for (SearchHit doc : docs) {
HitReference docRef = new HitReference(doc);
List<Integer> positions = referenceToPosition.get(docRef);
positions.forEach(pos -> {
SearchHit previous = seq.get(pos / listSize).set(pos % listSize, doc);
if (previous != null) {
throw new EqlIllegalArgumentException("Overriding sequence match [{}] with [{}]",
new HitReference(previous), docRef);
}
});
}
}
listener.onResponse(seq);
}, listener::onFailure));
}
}

View File

@ -7,6 +7,8 @@
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
@ -16,10 +18,11 @@ import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeResponse;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.ql.index.IndexResolver;
import java.util.function.Function;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.ql.util.ActionListeners.map;
@ -28,6 +31,9 @@ import static org.elasticsearch.xpack.ql.util.ActionListeners.map;
* Opens a point-in-time, uses it for all queries and closes it when disposed,
* freeing consumer from doing any special management for it.
*/
// NB: cannot simplify the template further since client has different request/response types and different methods between
// search and multi-search hence the code repetition
public class PITAwareQueryClient extends BasicQueryClient {
private String pitId;
@ -42,12 +48,8 @@ public class PITAwareQueryClient extends BasicQueryClient {
protected void search(SearchRequest search, ActionListener<SearchResponse> listener) {
// no pitId, ask for one
if (pitId == null) {
openPIT(wrap(r -> {
pitId = r;
searchWithPIT(search, listener);
}, listener::onFailure));
}
else {
openPIT(listener, () -> searchWithPIT(search, listener));
} else {
searchWithPIT(search, listener);
}
}
@ -55,21 +57,58 @@ public class PITAwareQueryClient extends BasicQueryClient {
private void searchWithPIT(SearchRequest search, ActionListener<SearchResponse> listener) {
// don't increase the keep alive
search.source().pointInTimeBuilder(new PointInTimeBuilder(pitId));
// get the pid on each response
super.search(search, pitListener(SearchResponse::pointInTimeId, listener));
}
@Override
protected void search(MultiSearchRequest search, ActionListener<MultiSearchResponse> listener) {
// no pitId, ask for one
if (pitId == null) {
openPIT(listener, () -> searchWithPIT(search, listener));
} else {
searchWithPIT(search, listener);
}
}
private void searchWithPIT(MultiSearchRequest search, ActionListener<MultiSearchResponse> listener) {
// don't increase the keep alive
for (SearchRequest request : search.requests()) {
request.source().pointInTimeBuilder(new PointInTimeBuilder(pitId));
}
// get the pid on each request
super.search(search, wrap(r -> {
pitId = r.pointInTimeId();
super.search(search, pitListener(r -> {
// get pid
for (MultiSearchResponse.Item item : r.getResponses()) {
// pick the first non-failing response
if (item.isFailure() == false) {
return item.getResponse().pointInTimeId();
}
}
// no results or successful responses, preserve the current pid
return pitId;
}, listener));
}
// listener handing the extraction of new PIT and closing in case of exceptions
private <Response> ActionListener<Response> pitListener(Function<Response, String> pitIdExtractor, ActionListener<Response> listener) {
return wrap(r -> {
// get pid
pitId = pitIdExtractor.apply(r);
listener.onResponse(r);
},
// always close PIT in case of exceptions
e -> {
if (pitId != null) {
close(wrap(b -> {}, listener::onFailure));
close(wrap(b -> {
}, listener::onFailure));
}
listener.onFailure(e);
}));
});
}
private void openPIT(ActionListener<String> listener) {
private <Response> void openPIT(ActionListener<Response> listener, Runnable runnable) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(
indices,
IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
@ -77,7 +116,11 @@ public class PITAwareQueryClient extends BasicQueryClient {
null,
null
);
client.execute(OpenPointInTimeAction.INSTANCE, request, map(listener, OpenPointInTimeResponse::getSearchContextId));
client.execute(OpenPointInTimeAction.INSTANCE, request, wrap(r -> {
pitId = r.getSearchContextId();
runnable.run();
},
listener::onFailure));
}
@Override