diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/additional_test_queries.toml b/x-pack/plugin/eql/qa/common/src/main/resources/additional_test_queries.toml index 25e478adabe..2ffc81eed9c 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/additional_test_queries.toml +++ b/x-pack/plugin/eql/qa/common/src/main/resources/additional_test_queries.toml @@ -262,3 +262,19 @@ sequence [any where true] by unique_ppid ''' expected_event_ids = [1, 2, 2, 3] + +[[queries]] +name = "sequenceWithMoreThan10Results" +query = ''' +sequence by unique_pid + [any where true] + [any where true] + [any where serial_event_id < 72] +''' +expected_event_ids = [54, 55, 59, + 55, 59, 61, + 59, 61, 65, + 16, 60, 66, + 61, 65, 67, + 65, 67, 70, + 60, 66, 71] diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java index ee4d9581be6..f715e02c2c1 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java @@ -8,6 +8,9 @@ package org.elasticsearch.xpack.eql.execution.search; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchRequestBuilder; +import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; @@ -16,18 +19,16 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; import org.elasticsearch.xpack.eql.session.EqlConfiguration; import org.elasticsearch.xpack.eql.session.EqlSession; -import org.elasticsearch.xpack.ql.util.ActionListeners; import org.elasticsearch.xpack.ql.util.StringUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import static org.elasticsearch.index.query.QueryBuilders.idsQuery; import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest; @@ -67,53 +68,84 @@ public class BasicQueryClient implements QueryClient { client.search(search, listener); } + protected void search(MultiSearchRequest search, ActionListener listener) { + client.multiSearch(search, listener); + } + @Override public void fetchHits(Iterable> refs, ActionListener>> listener) { - IdsQueryBuilder idsQuery = idsQuery(); - int innerListSize = 0; - Set indices = new HashSet<>(); - // associate each reference with its own + Map queries = new HashMap<>(); + + // associate each reference with its positions inside the matrix final Map> referenceToPosition = new HashMap<>(); int counter = 0; for (List list : refs) { innerListSize = list.size(); for (HitReference ref : list) { - idsQuery.addIds(ref.id()); - indices.add(ref.index()); - // remember the reference position + // keep ids per index + IdsQueryBuilder query = queries.computeIfAbsent(ref.index(), v -> idsQuery()); + query.addIds(ref.id()); + // save the ref position inside the matrix List positions = referenceToPosition.computeIfAbsent(ref, v -> new ArrayList<>(1)); positions.add(counter++); } } - SearchSourceBuilder builder = SearchSourceBuilder.searchSource() - // make sure to fetch the whole source - .fetchSource(FetchSourceContext.FETCH_SOURCE) - .trackTotalHits(false) - .trackScores(false) - .query(idsQuery); - final int listSize = innerListSize; final int topListSize = counter / listSize; + // pre-allocate the response matrix @SuppressWarnings({"rawtypes", "unchecked"}) List[] hits = new List[topListSize]; for (int i = 0; i < hits.length; i++) { hits[i] = Arrays.asList(new SearchHit[listSize]); - } + } final List> seq = Arrays.asList(hits); - SearchRequest search = prepareRequest(client, builder, false, indices.toArray(new String[0])); + // create a multi-search + MultiSearchRequestBuilder multiSearchBuilder = client.prepareMultiSearch(); + for (Map.Entry entry : queries.entrySet()) { + IdsQueryBuilder idQuery = entry.getValue(); + SearchSourceBuilder builder = SearchSourceBuilder.searchSource() + // make sure to fetch the whole source + .fetchSource(FetchSourceContext.FETCH_SOURCE) + .trackTotalHits(false) + .trackScores(false) + .query(idQuery) + // the default size is 10 so be sure to change it + // NB:this is different from mget + .size(idQuery.ids().size()); - search(search, ActionListeners.map(listener, r -> { - for (SearchHit hit : RuntimeUtils.searchHits(r)) { - List positions = referenceToPosition.get(new HitReference(hit)); - positions.forEach(pos -> seq.get(pos / listSize).set(pos % listSize, hit)); + SearchRequest search = prepareRequest(client, builder, false, entry.getKey()); + multiSearchBuilder.add(search); + } + + search(multiSearchBuilder.request(), ActionListener.wrap(r -> { + for (MultiSearchResponse.Item item : r.getResponses()) { + // check for failures + if (item.isFailure()) { + listener.onFailure(item.getFailure()); + return; + } + // otherwise proceed + List docs = RuntimeUtils.searchHits(item.getResponse()); + // for each doc, find its reference and its position inside the matrix + for (SearchHit doc : docs) { + HitReference docRef = new HitReference(doc); + List positions = referenceToPosition.get(docRef); + positions.forEach(pos -> { + SearchHit previous = seq.get(pos / listSize).set(pos % listSize, doc); + if (previous != null) { + throw new EqlIllegalArgumentException("Overriding sequence match [{}] with [{}]", + new HitReference(previous), docRef); + } + }); } - return seq; - })); } + listener.onResponse(seq); + }, listener::onFailure)); } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java index 6dfa5cb32b0..aef783afec2 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.eql.execution.search; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.unit.TimeValue; @@ -16,10 +18,11 @@ import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeResponse; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; -import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse; import org.elasticsearch.xpack.eql.session.EqlSession; import org.elasticsearch.xpack.ql.index.IndexResolver; +import java.util.function.Function; + import static org.elasticsearch.action.ActionListener.wrap; import static org.elasticsearch.xpack.ql.util.ActionListeners.map; @@ -28,6 +31,9 @@ import static org.elasticsearch.xpack.ql.util.ActionListeners.map; * Opens a point-in-time, uses it for all queries and closes it when disposed, * freeing consumer from doing any special management for it. */ + +// NB: cannot simplify the template further since client has different request/response types and different methods between +// search and multi-search hence the code repetition public class PITAwareQueryClient extends BasicQueryClient { private String pitId; @@ -42,12 +48,8 @@ public class PITAwareQueryClient extends BasicQueryClient { protected void search(SearchRequest search, ActionListener listener) { // no pitId, ask for one if (pitId == null) { - openPIT(wrap(r -> { - pitId = r; - searchWithPIT(search, listener); - }, listener::onFailure)); - } - else { + openPIT(listener, () -> searchWithPIT(search, listener)); + } else { searchWithPIT(search, listener); } } @@ -55,21 +57,58 @@ public class PITAwareQueryClient extends BasicQueryClient { private void searchWithPIT(SearchRequest search, ActionListener listener) { // don't increase the keep alive search.source().pointInTimeBuilder(new PointInTimeBuilder(pitId)); + // get the pid on each response + super.search(search, pitListener(SearchResponse::pointInTimeId, listener)); + } + + @Override + protected void search(MultiSearchRequest search, ActionListener listener) { + // no pitId, ask for one + if (pitId == null) { + openPIT(listener, () -> searchWithPIT(search, listener)); + } else { + searchWithPIT(search, listener); + } + } + + private void searchWithPIT(MultiSearchRequest search, ActionListener listener) { + // don't increase the keep alive + for (SearchRequest request : search.requests()) { + request.source().pointInTimeBuilder(new PointInTimeBuilder(pitId)); + } + // get the pid on each request - super.search(search, wrap(r -> { - pitId = r.pointInTimeId(); + super.search(search, pitListener(r -> { + // get pid + for (MultiSearchResponse.Item item : r.getResponses()) { + // pick the first non-failing response + if (item.isFailure() == false) { + return item.getResponse().pointInTimeId(); + } + } + // no results or successful responses, preserve the current pid + return pitId; + }, listener)); + } + + // listener handing the extraction of new PIT and closing in case of exceptions + private ActionListener pitListener(Function pitIdExtractor, ActionListener listener) { + return wrap(r -> { + // get pid + pitId = pitIdExtractor.apply(r); listener.onResponse(r); }, // always close PIT in case of exceptions e -> { if (pitId != null) { - close(wrap(b -> {}, listener::onFailure)); + close(wrap(b -> { + }, listener::onFailure)); } listener.onFailure(e); - })); + }); } - private void openPIT(ActionListener listener) { + private void openPIT(ActionListener listener, Runnable runnable) { OpenPointInTimeRequest request = new OpenPointInTimeRequest( indices, IndexResolver.FIELD_CAPS_INDICES_OPTIONS, @@ -77,11 +116,15 @@ public class PITAwareQueryClient extends BasicQueryClient { null, null ); - client.execute(OpenPointInTimeAction.INSTANCE, request, map(listener, OpenPointInTimeResponse::getSearchContextId)); + client.execute(OpenPointInTimeAction.INSTANCE, request, wrap(r -> { + pitId = r.getSearchContextId(); + runnable.run(); + }, + listener::onFailure)); } @Override - public void close(ActionListener listener) { + public void close(ActionListener listener) { client.execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId), map(listener, ClosePointInTimeResponse::isSucceeded)); pitId = null;