From 679619c798f5d03c478e03714a663a0a193bcb4a Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Tue, 14 Jul 2020 23:26:25 +0300 Subject: [PATCH] EQL: Improve retrieval of results (#59552) Instead of retrieving an entire SearchHit, get just a reference and postpone the document retrieval when assembling the final results. Remove sort information from results to make them consistent. Move TumblingWindow under the sequence package. Co-authored-by: James Rodewig (cherry picked from commit bccfbcd81f2f1d3552e95e4a9ee2618fb3059bd9) --- docs/reference/eql/eql-search-api.asciidoc | 21 ++--- docs/reference/eql/search.asciidoc | 66 ++++---------- .../eql/execution/assembler/Criterion.java | 2 +- .../execution/assembler/ExecutionManager.java | 10 +-- .../payload/SearchResponsePayload.java | 7 ++ .../execution/search/BasicQueryClient.java | 87 +++++++++++++++++- .../eql/execution/search/HitReference.java | 55 ++++++++++++ .../eql/execution/search/QueryClient.java | 5 ++ .../eql/execution/search/SourceGenerator.java | 10 ++- .../xpack/eql/execution/sequence/Match.java | 10 +-- .../eql/execution/sequence/Sequence.java | 10 +-- .../execution/sequence/SequenceMatcher.java | 22 ++--- .../execution/sequence/SequencePayload.java | 19 ++-- .../TumblingWindow.java | 89 +++++++++++++------ .../xpack/eql/plan/physical/EsQueryExec.java | 10 +-- .../xpack/eql/session/EmptyPayload.java | 8 +- .../assembler/SequenceSpecTests.java | 31 +++++-- .../xpack/eql/planner/QueryFolderOkTests.java | 3 +- 18 files changed, 316 insertions(+), 149 deletions(-) create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/HitReference.java rename x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/{assembler => sequence}/TumblingWindow.java (80%) diff --git a/docs/reference/eql/eql-search-api.asciidoc b/docs/reference/eql/eql-search-api.asciidoc index e7071f7db3e..882d3d6e94b 100644 --- a/docs/reference/eql/eql-search-api.asciidoc +++ b/docs/reference/eql/eql-search-api.asciidoc @@ -565,10 +565,7 @@ the events in ascending, lexicographic order. "name": "cmd.exe", "path": "C:\\Windows\\System32\\cmd.exe" } - }, - "sort": [ - 1607252647000 - ] + } }, { "_index": "my_index", @@ -596,10 +593,7 @@ the events in ascending, lexicographic order. "name": "cmd.exe", "path": "C:\\Windows\\System32\\cmd.exe" } - }, - "sort": [ - 1607339228000 - ] + } } ] } @@ -696,10 +690,7 @@ the events in ascending, lexicographic order. "name": "cmd.exe", "path": "C:\\Windows\\System32\\cmd.exe" } - }, - "sort": [ - 1607339228000 - ] + } }, { "_index": "my_index", @@ -720,10 +711,7 @@ the events in ascending, lexicographic order. "name": "regsvr32.exe", "path": "C:\\Windows\\System32\\regsvr32.exe" } - }, - "sort": [ - 1607339229000 - ] + } } ] } @@ -732,3 +720,4 @@ the events in ascending, lexicographic order. } ---- // TESTRESPONSE[s/"took": 6/"took": $body.took/] +// TESTRESPONSE[skip: response format updated] diff --git a/docs/reference/eql/search.asciidoc b/docs/reference/eql/search.asciidoc index 2908ded7ee9..75117d1bbfe 100644 --- a/docs/reference/eql/search.asciidoc +++ b/docs/reference/eql/search.asciidoc @@ -84,7 +84,7 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order. "relation": "eq" }, "events": [ - { + { "_index": "sec_logs", "_type": "_doc", "_id": "1", @@ -103,10 +103,7 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order. "name": "cmd.exe", "path": "C:\\Windows\\System32\\cmd.exe" } - }, - "sort": [ - 1607252645000 - ] + } }, { "_index": "sec_logs", @@ -127,10 +124,7 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order. "name": "cmd.exe", "path": "C:\\Windows\\System32\\cmd.exe" } - }, - "sort": [ - 1607339167000 - ] + } } ] } @@ -223,10 +217,7 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order. "name": "cmd.exe", "path": "C:\\Windows\\System32\\cmd.exe" } - }, - "sort": [ - 1607339228000 - ] + } }, { "_index": "sec_logs", @@ -247,10 +238,7 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order. "name": "regsvr32.exe", "path": "C:\\Windows\\System32\\regsvr32.exe" } - }, - "sort": [ - 1607339229000 - ] + } } ] } @@ -259,6 +247,7 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order. } ---- // TESTRESPONSE[s/"took": 60/"took": $body.took/] +// TESTRESPONSE[skip: response format updated] You can use the <> to constrain a sequence to a specified timespan. @@ -362,10 +351,7 @@ contains the shared `agent.id` value for each matching event. "name": "cmd.exe", "path": "C:\\Windows\\System32\\cmd.exe" } - }, - "sort": [ - 1607339228000 - ] + } }, { "_index": "sec_logs", @@ -386,10 +372,7 @@ contains the shared `agent.id` value for each matching event. "name": "regsvr32.exe", "path": "C:\\Windows\\System32\\regsvr32.exe" } - }, - "sort": [ - 1607339229000 - ] + } } ] } @@ -398,6 +381,7 @@ contains the shared `agent.id` value for each matching event. } ---- // TESTRESPONSE[s/"took": 60/"took": $body.took/] +// TESTRESPONSE[skip: response format updated] You can use the <> to specify an expiration event for sequences. Matching sequences must end before this event. @@ -501,15 +485,7 @@ GET /sec_logs/_eql/search ---- // TEST[s/search/search\?filter_path\=\-\*\.events\.\*fields/] -The API returns the following response. Note the `sort` property of each -matching event contains an array of two items: - -* The first item is the event's <>, -converted to milliseconds since the https://en.wikipedia.org/wiki/Unix_time[Unix -epoch]. - -* The second item is the event's `event.id` value. This value is used as a sort -tiebreaker for events with the same timestamp. +The API returns the following response. [source,console-result] ---- @@ -524,7 +500,7 @@ tiebreaker for events with the same timestamp. "relation": "eq" }, "events": [ - { + { "_index": "sec_logs", "_type": "_doc", "_id": "1", @@ -543,13 +519,9 @@ tiebreaker for events with the same timestamp. "name": "cmd.exe", "path": "C:\\Windows\\System32\\cmd.exe" } + } }, - "sort": [ - 1607252645000, <1> - "edwCRnyD" <2> - ] - }, - { + { "_index": "sec_logs", "_type": "_doc", "_id": "3", @@ -568,21 +540,13 @@ tiebreaker for events with the same timestamp. "name": "cmd.exe", "path": "C:\\Windows\\System32\\cmd.exe" } - }, - "sort": [ - 1607339167000, <1> - "cMyt5SZ2" <2> - ] - } + } + } ] } } ---- // TESTRESPONSE[s/"took": 34/"took": $body.took/] -<1> The event's <>, converted to -milliseconds since the https://en.wikipedia.org/wiki/Unix_time[Unix -epoch] -<2> The event's `event.id` value. ==== diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java index f3954332b82..c8aa3b42e6c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java @@ -44,7 +44,7 @@ public class Criterion { return stage; } - boolean reverse() { + public boolean reverse() { return reverse; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java index 343dc5f67ae..b4c343f4eb0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.execution.assembler; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient; import org.elasticsearch.xpack.eql.execution.search.Limit; @@ -15,6 +16,7 @@ import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils; import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor; import org.elasticsearch.xpack.eql.execution.search.extractor.TimestampFieldHitExtractor; import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher; +import org.elasticsearch.xpack.eql.execution.sequence.TumblingWindow; import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.eql.querydsl.container.FieldExtractorRegistry; @@ -57,7 +59,7 @@ public class ExecutionManager { String timestampName = Expressions.name(timestamp); String tiebreakerName = Expressions.isPresent(tiebreaker) ? Expressions.name(tiebreaker) : null; - // secondary criteria + // secondary criteriam List> criteria = new ArrayList<>(plans.size() - 1); // build a criterion for each query @@ -68,10 +70,8 @@ public class ExecutionManager { PhysicalPlan query = plans.get(i); // search query if (query instanceof EsQueryExec) { - QueryRequest original = ((EsQueryExec) query).queryRequest(session); - - // increase the request size based on the fetch size (since size is applied already through limit) - + SearchSourceBuilder source = ((EsQueryExec) query).source(session); + QueryRequest original = () -> source; BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName); Criterion criterion = new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i > 0 && descending); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java index 6d70c087e35..d97e882a20f 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java @@ -7,7 +7,9 @@ package org.elasticsearch.xpack.eql.execution.payload; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchSortValues; import org.elasticsearch.xpack.eql.session.Results.Type; import java.util.Arrays; @@ -20,6 +22,11 @@ public class SearchResponsePayload extends AbstractPayload { public SearchResponsePayload(SearchResponse response) { super(response.isTimedOut(), response.getTook()); hits = Arrays.asList(response.getHits().getHits()); + // clean hits + SearchSortValues sortValues = new SearchSortValues(new Object[0], new DocValueFormat[0]); + for (SearchHit hit : hits) { + hit.sortValues(sortValues); + } } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java index 7a4ee85dcfd..1e7930b0576 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java @@ -8,15 +8,34 @@ package org.elasticsearch.xpack.eql.execution.search; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetRequest.Item; +import org.elasticsearch.action.get.MultiGetRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.xpack.eql.session.EqlConfiguration; import org.elasticsearch.xpack.eql.session.EqlSession; import org.elasticsearch.xpack.eql.session.Payload; import org.elasticsearch.xpack.ql.util.StringUtils; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.action.ActionListener.wrap; import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest; public class BasicQueryClient implements QueryClient { @@ -49,4 +68,70 @@ public class BasicQueryClient implements QueryClient { SearchRequest search = prepareRequest(client, searchSource, false, indices); client.search(search, new BasicListener(listener)); } -} + + @Override + public void get(Iterable> refs, ActionListener>> listener) { + MultiGetRequestBuilder requestBuilder = client.prepareMultiGet(); + // no need for real-time + requestBuilder.setRealtime(false) + .setRefresh(false); + + int sz = 0; + + for (List list : refs) { + sz = list.size(); + for (HitReference ref : list) { + Item item = new Item(ref.index(), ref.id()); + // make sure to get the whole source + item.fetchSourceContext(FetchSourceContext.FETCH_SOURCE); + requestBuilder.add(item); + } + } + + final int listSize = sz; + client.multiGet(requestBuilder.request(), wrap(r -> { + List> hits = new ArrayList<>(r.getResponses().length / listSize); + + List sequence = new ArrayList<>(listSize); + + // copy streams - reused across the whole loop + PipedInputStream in = new PipedInputStream(); + PipedOutputStream out = new PipedOutputStream(in); + StreamOutput so = new OutputStreamStreamOutput(out); + StreamInput si = new InputStreamStreamInput(in); + + int counter = 0; + Text type = new Text("_doc"); + for (MultiGetItemResponse mgr : r.getResponses()) { + if (mgr.isFailed()) { + listener.onFailure(mgr.getFailure().getFailure()); + return; + } + // HACK: the only way to get GetResult is to serialize it and then load it back :( + mgr.getResponse().writeTo(so); + GetResult result = new GetResult(si); + + SearchHit hit = new SearchHit(-1, result.getId(), type, result.getDocumentFields(), result.getMetadataFields()); + hit.sourceRef(result.internalSourceRef()); + // need to create these objects to set the index + hit.shard(new SearchShardTarget(null, new ShardId(result.getIndex(), "", -1), null, null)); + + hit.setSeqNo(result.getSeqNo()); + hit.setPrimaryTerm(result.getPrimaryTerm()); + hit.version(result.getVersion()); + + + sequence.add(hit); + + if (++counter == listSize) { + counter = 0; + hits.add(sequence); + sequence = new ArrayList<>(listSize); + } + } + // send the results + listener.onResponse(hits); + + }, listener::onFailure)); + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/HitReference.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/HitReference.java new file mode 100644 index 00000000000..669b9209235 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/HitReference.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.search; + +import org.elasticsearch.search.SearchHit; + +import java.util.Objects; + +public class HitReference { + + private final String index; + private final String id; + + public HitReference(SearchHit hit) { + this.index = hit.getIndex(); + this.id = hit.getId(); + } + + public String index() { + return index; + } + + public String id() { + return id; + } + + @Override + public int hashCode() { + return Objects.hash(index, id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + HitReference other = (HitReference) obj; + return Objects.equals(index, other.index) + && Objects.equals(id, other.id); + } + + @Override + public String toString() { + return "doc[" + index + "][" + id + "]"; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java index f05250dcab8..19e14d7b4be 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java @@ -7,12 +7,17 @@ package org.elasticsearch.xpack.eql.execution.search; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.session.Payload; +import java.util.List; + /** * Infrastructure interface used to decouple listener consumers from the stateful classes holding client-references and co. */ public interface QueryClient { void query(QueryRequest request, ActionListener listener); + + void get(Iterable> refs, ActionListener>> listener); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java index c0a9bb07383..0a9ab105c2b 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.eql.execution.search; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; @@ -57,7 +58,14 @@ public abstract class SourceGenerator { sourceBuilder.build(source); sorting(container, source); - source.fetchSource(FetchSourceContext.FETCH_SOURCE); + + // disable the source if there are no includes + if (source.fetchSource() == null || CollectionUtils.isEmpty(source.fetchSource().includes())) { + source.fetchSource(FetchSourceContext.DO_NOT_FETCH_SOURCE); + } else { + // use true to fetch only the needed bits from the source + source.fetchSource(true); + } if (container.limit() != null) { // add size and from diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java index 533eebbc093..588ee2fadd0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java @@ -6,7 +6,7 @@ package org.elasticsearch.xpack.eql.execution.sequence; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.execution.search.HitReference; import org.elasticsearch.xpack.eql.execution.search.Ordinal; import java.util.Objects; @@ -17,9 +17,9 @@ import java.util.Objects; class Match { private final Ordinal ordinal; - private final SearchHit hit; + private final HitReference hit; - Match(Ordinal ordinal, SearchHit hit) { + Match(Ordinal ordinal, HitReference hit) { this.ordinal = ordinal; this.hit = hit; } @@ -28,7 +28,7 @@ class Match { return ordinal; } - SearchHit hit() { + HitReference hit() { return hit; } @@ -54,6 +54,6 @@ class Match { @Override public String toString() { - return ordinal.toString() + "->" + hit.getId(); + return ordinal + "->" + hit; } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java index 0298adbad4a..1dbb900d92b 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java @@ -6,8 +6,8 @@ package org.elasticsearch.xpack.eql.execution.sequence; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; +import org.elasticsearch.xpack.eql.execution.search.HitReference; import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.ql.util.Check; @@ -32,7 +32,7 @@ public class Sequence { private int currentStage = 0; - public Sequence(SequenceKey key, int stages, Ordinal ordinal, SearchHit firstHit) { + public Sequence(SequenceKey key, int stages, Ordinal ordinal, HitReference firstHit) { Check.isTrue(stages >= 2, "A sequence requires at least 2 criteria, given [{}]", stages); this.key = key; this.stages = stages; @@ -40,7 +40,7 @@ public class Sequence { this.matches[0] = new Match(ordinal, firstHit); } - public int putMatch(int stage, SearchHit hit, Ordinal ordinal) { + public int putMatch(int stage, Ordinal ordinal, HitReference hit) { if (stage == currentStage + 1) { int previousStage = currentStage; currentStage = stage; @@ -62,8 +62,8 @@ public class Sequence { return matches[0].ordinal(); } - public List hits() { - List hits = new ArrayList<>(matches.length); + public List hits() { + List hits = new ArrayList<>(matches.length); for (Match m : matches) { hits.add(m.hit()); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java index 45fa51fc9d3..6d56d986b27 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java @@ -11,10 +11,9 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.execution.search.HitReference; import org.elasticsearch.xpack.eql.execution.search.Limit; import org.elasticsearch.xpack.eql.execution.search.Ordinal; -import org.elasticsearch.xpack.eql.session.Payload; import java.util.LinkedList; import java.util.List; @@ -98,10 +97,10 @@ public class SequenceMatcher { * Match hits for the given stage. * Returns false if the process needs to be stopped. */ - public boolean match(int stage, Iterable> hits) { - for (Tuple tuple : hits) { + public boolean match(int stage, Iterable> hits) { + for (Tuple tuple : hits) { KeyAndOrdinal ko = tuple.v1(); - SearchHit hit = tuple.v2(); + HitReference hit = tuple.v2(); if (stage == 0) { Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit); @@ -125,7 +124,7 @@ public class SequenceMatcher { * Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous * given stage. If that's the case, update the sequence and the rest of the references. */ - private void match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) { + private void match(int stage, SequenceKey key, Ordinal ordinal, HitReference hit) { stats.seen++; int previousStage = stage - 1; @@ -172,7 +171,7 @@ public class SequenceMatcher { } } - sequence.putMatch(stage, hit, ordinal); + sequence.putMatch(stage, ordinal, hit); // bump the stages if (stage == completionStage) { @@ -207,12 +206,9 @@ public class SequenceMatcher { return false; } - public Payload payload(long startTime) { - TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime); - List view = limit != null ? limit.view(completed) : completed; - Payload p = new SequencePayload(view, false, tookTime); - clear(); - return p; + + public List completed() { + return limit != null ? limit.view(completed) : completed; } public void dropUntil() { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java index aa02c91c4d2..699a33925e8 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java @@ -7,26 +7,25 @@ package org.elasticsearch.xpack.eql.execution.sequence; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload; import org.elasticsearch.xpack.eql.session.Results.Type; -import org.elasticsearch.xpack.eql.util.ReversedIterator; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; class SequencePayload extends AbstractPayload { - private final List sequences; + private final List values; - SequencePayload(List seq, boolean timedOut, TimeValue timeTook) { + SequencePayload(List sequences, List> searchHits, boolean timedOut, TimeValue timeTook) { super(timedOut, timeTook); - sequences = new ArrayList<>(seq.size()); - boolean needsReversal = seq.size() > 1 && (seq.get(0).ordinal().compareTo(seq.get(1).ordinal()) > 0); + values = new ArrayList<>(sequences.size()); - for (Iterator it = needsReversal ? new ReversedIterator<>(seq) : seq.iterator(); it.hasNext();) { - Sequence s = it.next(); - sequences.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits())); + for (int i = 0; i < sequences.size(); i++) { + Sequence s = sequences.get(i); + List hits = searchHits.get(i); + values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), hits)); } } @@ -38,6 +37,6 @@ class SequencePayload extends AbstractPayload { @SuppressWarnings("unchecked") @Override public List values() { - return (List) sequences; + return (List) values; } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java similarity index 80% rename from x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java rename to x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java index 9576f0dd0d3..848c3fe3526 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java @@ -4,19 +4,23 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.eql.execution.assembler; +package org.elasticsearch.xpack.eql.execution.sequence; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.execution.assembler.BoxedQueryRequest; +import org.elasticsearch.xpack.eql.execution.assembler.Criterion; +import org.elasticsearch.xpack.eql.execution.assembler.Executable; +import org.elasticsearch.xpack.eql.execution.search.HitReference; import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.eql.execution.search.QueryClient; -import org.elasticsearch.xpack.eql.execution.sequence.KeyAndOrdinal; -import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; -import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher; +import org.elasticsearch.xpack.eql.session.EmptyPayload; import org.elasticsearch.xpack.eql.session.Payload; +import org.elasticsearch.xpack.eql.session.Results.Type; import org.elasticsearch.xpack.eql.util.ReversedIterator; import java.util.Iterator; @@ -98,7 +102,7 @@ public class TumblingWindow implements Executable { if (hits.isEmpty() == false) { if (matcher.match(baseStage, wrapValues(base, hits)) == false) { - listener.onResponse(payload()); + payload(listener); return; } } @@ -120,7 +124,7 @@ public class TumblingWindow implements Executable { } // there aren't going to be any matches so cancel search else { - listener.onResponse(payload()); + payload(listener); } return; } @@ -231,7 +235,7 @@ public class TumblingWindow implements Executable { // if the limit has been reached, return what's available if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) { - listener.onResponse(payload()); + payload(listener); return; } } @@ -281,48 +285,83 @@ public class TumblingWindow implements Executable { return criterion.reverse() != base.reverse(); } - Iterable> wrapValues(Criterion criterion, List hits) { - return () -> { - final Iterator iter = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator(); + private void payload(ActionListener listener) { + List completed = matcher.completed(); - return new Iterator>() { + if (completed.isEmpty()) { + listener.onResponse(new EmptyPayload(Type.SEQUENCE, timeTook())); + matcher.clear(); + return; + } + + client.get(hits(completed), wrap(searchHits -> { + listener.onResponse(new SequencePayload(completed, searchHits, false, timeTook())); + matcher.clear(); + }, listener::onFailure)); + } + + private TimeValue timeTook() { + return new TimeValue(System.currentTimeMillis() - startTime); + } + Iterable> hits(List sequences) { + return () -> { + final Iterator delegate = criteria.get(0).reverse() != criteria.get(1).reverse() ? + new ReversedIterator<>(sequences) : + sequences.iterator(); + + return new Iterator>() { @Override public boolean hasNext() { - return iter.hasNext(); + return delegate.hasNext(); } @Override - public Tuple next() { - SearchHit hit = iter.next(); - SequenceKey k = criterion.key(hit); - Ordinal o = criterion.ordinal(hit); - return new Tuple<>(new KeyAndOrdinal(k, o), hit); + public List next() { + return delegate.next().hits(); } }; }; } - Iterable wrapUntilValues(Iterable> iterable) { + Iterable> wrapValues(Criterion criterion, List hits) { return () -> { - final Iterator> iter = iterable.iterator(); + final Iterator delegate = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator(); + + return new Iterator>() { + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public Tuple next() { + SearchHit hit = delegate.next(); + SequenceKey k = criterion.key(hit); + Ordinal o = criterion.ordinal(hit); + return new Tuple<>(new KeyAndOrdinal(k, o), new HitReference(hit)); + } + }; + }; + } + + Iterable wrapUntilValues(Iterable> iterable) { + return () -> { + final Iterator> delegate = iterable.iterator(); return new Iterator() { @Override public boolean hasNext() { - return iter.hasNext(); + return delegate.hasNext(); } @Override public KeyAndOrdinal next() { - return iter.next().v1(); + return delegate.next().v1(); } }; }; } - - Payload payload() { - return matcher.payload(startTime); - } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java index 9686a1e5e1a..fdfd40ed790 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.plan.physical; import org.elasticsearch.action.ActionListener; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient; @@ -49,17 +50,16 @@ public class EsQueryExec extends LeafExec { return output; } - public QueryRequest queryRequest(EqlSession session) { + public SearchSourceBuilder source(EqlSession session) { EqlConfiguration cfg = session.configuration(); // by default use the configuration size - // join/sequence queries will want to override this - SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter()); - return () -> sourceBuilder; + return SourceGenerator.sourceBuilder(queryContainer, cfg.filter()); } @Override public void execute(EqlSession session, ActionListener listener) { - QueryRequest request = queryRequest(session); + // endpoint - fetch all source + QueryRequest request = () -> source(session).fetchSource(FetchSourceContext.FETCH_SOURCE); listener = shouldReverse(request) ? new ReverseListener(listener) : listener; new BasicQueryClient(session).query(request, listener); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java index 22e8405e5f9..70b1f9291c9 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java @@ -16,9 +16,15 @@ import static java.util.Collections.emptyList; public class EmptyPayload implements Payload { private final Type type; + private final TimeValue timeTook; public EmptyPayload(Type type) { + this(type, TimeValue.ZERO); + } + + public EmptyPayload(Type type, TimeValue timeTook) { this.type = type; + this.timeTook = timeTook; } @Override @@ -33,7 +39,7 @@ public class EmptyPayload implements Payload { @Override public TimeValue timeTook() { - return TimeValue.ZERO; + return timeTook; } @Override diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java index a4bde988d1b..c0f1c75446c 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.execution.assembler; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; @@ -17,8 +18,11 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence; import org.elasticsearch.xpack.eql.execution.assembler.SeriesUtils.SeriesSpec; +import org.elasticsearch.xpack.eql.execution.search.HitReference; import org.elasticsearch.xpack.eql.execution.search.QueryClient; +import org.elasticsearch.xpack.eql.execution.search.QueryRequest; import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher; +import org.elasticsearch.xpack.eql.execution.sequence.TumblingWindow; import org.elasticsearch.xpack.eql.session.Payload; import org.elasticsearch.xpack.eql.session.Results; import org.elasticsearch.xpack.eql.session.Results.Type; @@ -170,6 +174,23 @@ public class SequenceSpecTests extends ESTestCase { } } + class TestQueryClient implements QueryClient { + + @Override + public void query(QueryRequest r, ActionListener l) { + int ordinal = r.searchSource().size(); + if (ordinal != Integer.MAX_VALUE) { + r.searchSource().size(Integer.MAX_VALUE); + } + Map> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap(); + l.onResponse(new TestPayload(evs)); + } + + @Override + public void get(Iterable> refs, ActionListener>> listener) { + //no-op + } + } public SequenceSpecTests(String testName, int lineNumber, SeriesSpec spec) { this.lineNumber = lineNumber; @@ -199,15 +220,7 @@ public class SequenceSpecTests extends ESTestCase { // convert the results through a test specific payload SequenceMatcher matcher = new SequenceMatcher(stages, TimeValue.MINUS_ONE, null); - QueryClient testClient = (r, l) -> { - int ordinal = r.searchSource().size(); - if (ordinal != Integer.MAX_VALUE) { - r.searchSource().size(Integer.MAX_VALUE); - } - Map> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap(); - l.onResponse(new TestPayload(evs)); - }; - + QueryClient testClient = new TestQueryClient(); TumblingWindow window = new TumblingWindow(testClient, criteria, null, matcher); // finally make the assertion at the end of the listener diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/planner/QueryFolderOkTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/planner/QueryFolderOkTests.java index d175e3b08fa..eb275337fda 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/planner/QueryFolderOkTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/planner/QueryFolderOkTests.java @@ -22,6 +22,7 @@ import java.util.Locale; import java.util.Map; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; public class QueryFolderOkTests extends AbstractQueryFolderTestCase { @@ -139,6 +140,6 @@ public class QueryFolderOkTests extends AbstractQueryFolderTestCase { assertThat(query, containsString("\"term\":{\"event.category\":{\"value\":\"process\"")); // test field source extraction - assertThat(query, containsString("\"_source\":{\"includes\":[],\"excludes\":[]")); + assertThat(query, not(containsString("\"_source\":{\"includes\":[],\"excludes\":[]"))); } }