From bff3c7470e6929c4ebae295749b9bb9c88cf2312 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Tue, 25 Aug 2020 17:32:42 +0300 Subject: [PATCH] EQL: Replace SearchHit in response with Event (#61428) (#61522) The building block of the eql response is currently the SearchHit. This is a problem since it is tied to an actual search, and thus has scoring, highlighting, shard information and a lot of other things that are not relevant for EQL. This becomes a problem when doing sequence queries since the response is not generated from one search query and thus there are no SearchHits to speak of. Emulating one is not just conceptually incorrect but also problematic since most of the data is missed or made-up. As such this PR introduces a simple class, Event, that maps nicely to the terminology while hiding the ES internals (the use of SearchHit or GetResult/GetResponse depending on the API used). Fix #59764 Fix #59779 Co-authored-by: Igor Motov (cherry picked from commit 997376fbe6ef2894038968842f5e0635731ede65) --- .../client/eql/EqlSearchResponse.java | 216 +++++++++++++----- .../java/org/elasticsearch/client/EqlIT.java | 10 +- .../client/eql/EqlSearchResponseTests.java | 95 ++++++-- .../eql/detect-threats-with-eql.asciidoc | 22 -- docs/reference/eql/eql-search-api.asciidoc | 24 -- docs/reference/eql/eql.asciidoc | 24 -- .../test/eql/CommonEqlActionTestCase.java | 14 +- .../elasticsearch/xpack/eql/EqlRestIT.java | 8 +- .../xpack/eql/action/EqlSearchResponse.java | 142 ++++++++++-- .../eql/execution/payload/EventPayload.java | 40 ++++ .../eql/execution/payload/ReversePayload.java | 3 +- .../payload/SearchResponsePayload.java | 42 ---- .../xpack/eql/execution/payload/Sequence.java | 22 -- .../eql/execution/search/AsEventListener.java | 31 +++ .../eql/execution/search/BasicListener.java | 8 +- .../execution/search/BasicQueryClient.java | 27 +-- .../eql/execution/search/QueryClient.java | 8 +- .../eql/execution/search/RuntimeUtils.java | 6 + .../execution/sequence/SequencePayload.java | 19 +- .../execution/sequence/TumblingWindow.java | 20 +- .../xpack/eql/optimizer/Optimizer.java | 6 +- .../xpack/eql/plan/physical/EsQueryExec.java | 3 +- .../eql/plan/physical/LocalRelation.java | 5 +- .../eql/plugin/TransportEqlSearchAction.java | 2 +- .../xpack/eql/session/EmptyExecutable.java | 2 +- .../xpack/eql/session/EmptyPayload.java | 3 +- .../xpack/eql/session/Payload.java | 13 +- .../xpack/eql/session/Results.java | 13 +- .../eql/action/EqlSearchResponseTests.java | 87 +++++-- .../assembler/SequenceSpecTests.java | 47 ++-- 30 files changed, 598 insertions(+), 364 deletions(-) create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/EventPayload.java delete mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java delete mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/Sequence.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/AsEventListener.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java index 51aa4472ba8..d83d10ff1c6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java @@ -22,17 +22,22 @@ package org.elasticsearch.client.eql; import org.apache.lucene.search.TotalHits; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.InstantiatingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParserUtils; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; @@ -133,6 +138,95 @@ public class EqlSearchResponse { public int hashCode() { return Objects.hash(hits, tookInMillis, isTimeout); } + + // Event + public static class Event { + + private static final class Fields { + static final String INDEX = GetResult._INDEX; + static final String ID = GetResult._ID; + static final String SOURCE = SourceFieldMapper.NAME; + } + + private static final ParseField INDEX = new ParseField(Fields.INDEX); + private static final ParseField ID = new ParseField(Fields.ID); + private static final ParseField SOURCE = new ParseField(Fields.SOURCE); + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("eql/search_response_event", true, + args -> new Event((String) args[0], (String) args[1], (BytesReference) args[2])); + + static { + PARSER.declareString(constructorArg(), INDEX); + PARSER.declareString(constructorArg(), ID); + PARSER.declareObject(constructorArg(), (p, c) -> { + try (XContentBuilder builder = XContentBuilder.builder(p.contentType().xContent())) { + builder.copyCurrentStructure(p); + return BytesReference.bytes(builder); + } + }, SOURCE); + } + + private final String index; + private final String id; + private final BytesReference source; + private Map sourceAsMap; + + public Event(String index, String id, BytesReference source) { + this.index = index; + this.id = id; + this.source = source; + } + + public static Event fromXContent(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + public String index() { + return index; + } + + public String id() { + return id; + } + + public BytesReference source() { + return source; + } + + public Map sourceAsMap() { + if (source == null) { + return null; + } + if (sourceAsMap != null) { + return sourceAsMap; + } + + sourceAsMap = SourceLookup.sourceAsMap(source); + return sourceAsMap; + } + + @Override + public int hashCode() { + return Objects.hash(index, id, source); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + EqlSearchResponse.Event other = (EqlSearchResponse.Event) obj; + return Objects.equals(index, other.index) + && Objects.equals(id, other.id) + && Objects.equals(source, other.source); + } + } // Sequence public static class Sequence { @@ -149,20 +243,20 @@ public class EqlSearchResponse { args -> { int i = 0; @SuppressWarnings("unchecked") List joinKeys = (List) args[i++]; - @SuppressWarnings("unchecked") List events = (List) args[i]; + @SuppressWarnings("unchecked") List events = (List) args[i]; return new EqlSearchResponse.Sequence(joinKeys, events); }); static { PARSER.declareFieldArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> XContentParserUtils.parseFieldsValue(p), JOIN_KEYS, ObjectParser.ValueType.VALUE_ARRAY); - PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), EVENTS); + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), EVENTS); } private final List joinKeys; - private final List events; + private final List events; - public Sequence(List joinKeys, List events) { + public Sequence(List joinKeys, List events) { this.joinKeys = joinKeys == null ? Collections.emptyList() : joinKeys; this.events = events == null ? Collections.emptyList() : events; } @@ -171,6 +265,19 @@ public class EqlSearchResponse { return PARSER.apply(parser, null); } + public List joinKeys() { + return joinKeys; + } + + public List events() { + return events; + } + + @Override + public int hashCode() { + return Objects.hash(joinKeys, events); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -183,19 +290,6 @@ public class EqlSearchResponse { return Objects.equals(joinKeys, that.joinKeys) && Objects.equals(events, that.events); } - - @Override - public int hashCode() { - return Objects.hash(joinKeys, events); - } - - public List joinKeys() { - return joinKeys; - } - - public List events() { - return events; - } } // Count @@ -241,6 +335,23 @@ public class EqlSearchResponse { return PARSER.apply(parser, null); } + public int count() { + return count; + } + + public List keys() { + return keys; + } + + public float percent() { + return percent; + } + + @Override + public int hashCode() { + return Objects.hash(count, keys, percent); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -254,30 +365,13 @@ public class EqlSearchResponse { && Objects.equals(keys, that.keys) && Objects.equals(percent, that.percent); } - - @Override - public int hashCode() { - return Objects.hash(count, keys, percent); - } - - public int count() { - return count; - } - - public List keys() { - return keys; - } - - public float percent() { - return percent; - } } // Hits public static class Hits { public static final Hits EMPTY = new Hits(null, null, null, null); - private final List events; + private final List events; private final List sequences; private final List counts; private final TotalHits totalHits; @@ -289,7 +383,7 @@ public class EqlSearchResponse { static final String COUNTS = "counts"; } - public Hits(@Nullable List events, @Nullable List sequences, @Nullable List counts, + public Hits(@Nullable List events, @Nullable List sequences, @Nullable List counts, @Nullable TotalHits totalHits) { this.events = events; this.sequences = sequences; @@ -301,15 +395,15 @@ public class EqlSearchResponse { new ConstructingObjectParser<>("eql/search_response_count", true, args -> { int i = 0; - @SuppressWarnings("unchecked") List searchHits = (List) args[i++]; + @SuppressWarnings("unchecked") List events = (List) args[i++]; @SuppressWarnings("unchecked") List sequences = (List) args[i++]; @SuppressWarnings("unchecked") List counts = (List) args[i++]; TotalHits totalHits = (TotalHits) args[i]; - return new EqlSearchResponse.Hits(searchHits, sequences, counts, totalHits); + return new EqlSearchResponse.Hits(events, sequences, counts, totalHits); }); static { - PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), new ParseField(Fields.EVENTS)); PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Sequence.PARSER, new ParseField(Fields.SEQUENCES)); @@ -323,6 +417,27 @@ public class EqlSearchResponse { return PARSER.parse(parser, null); } + public List events() { + return this.events; + } + + public List sequences() { + return this.sequences; + } + + public List counts() { + return this.counts; + } + + public TotalHits totalHits() { + return this.totalHits; + } + + @Override + public int hashCode() { + return Objects.hash(events, sequences, counts, totalHits); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -337,26 +452,5 @@ public class EqlSearchResponse { && Objects.equals(counts, that.counts) && Objects.equals(totalHits, that.totalHits); } - - @Override - public int hashCode() { - return Objects.hash(events, sequences, counts, totalHits); - } - - public List events() { - return this.events; - } - - public List sequences() { - return this.sequences; - } - - public List counts() { - return this.counts; - } - - public TotalHits totalHits() { - return this.totalHits; - } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java index bacf3acfa90..146788fa26a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java @@ -29,13 +29,13 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.eql.EqlSearchRequest; import org.elasticsearch.client.eql.EqlSearchResponse; +import org.elasticsearch.client.eql.EqlSearchResponse.Event; import org.elasticsearch.client.eql.EqlStatsRequest; import org.elasticsearch.client.eql.EqlStatsResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.search.SearchHit; import org.junit.Before; import java.io.IOException; @@ -121,8 +121,8 @@ public class EqlIT extends ESRestHighLevelClientTestCase { assertResponse(response, RECORD_COUNT / DIVIDER); // test the content of the hits - for (SearchHit hit : response.hits().events()) { - final Map source = hit.getSourceAsMap(); + for (Event hit : response.hits().events()) { + final Map source = hit.sourceAsMap(); final Map event = (Map) source.get("event"); assertThat(event.get("category"), equalTo("process")); @@ -147,8 +147,8 @@ public class EqlIT extends ESRestHighLevelClientTestCase { assertResponse(response, 3); // test the content of the hits - for (SearchHit hit : response.hits().events()) { - final Map source = hit.getSourceAsMap(); + for (Event hit : response.hits().events()) { + final Map source = hit.sourceAsMap(); final Map event = (Map) source.get("event"); assertThat(event.get("category"), equalTo("process")); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java index 6cb1b8895af..41239e786ed 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java @@ -21,15 +21,19 @@ package org.elasticsearch.client.eql; import org.apache.lucene.search.TotalHits; import org.elasticsearch.client.AbstractResponseTestCase; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; @@ -38,13 +42,59 @@ import static org.hamcrest.Matchers.is; public class EqlSearchResponseTests extends AbstractResponseTestCase { - static List randomEvents() { + private static class RandomSource implements ToXContentObject { + + private final String key; + private final String value; + + RandomSource(Supplier randomStringSupplier) { + this.key = randomStringSupplier.get(); + this.value = randomStringSupplier.get(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(key, value); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RandomSource other = (RandomSource) obj; + return Objects.equals(key, other.key) && Objects.equals(value, other.value); + } + + public BytesReference toBytes(XContentType type) { + try (XContentBuilder builder = XContentBuilder.builder(type.xContent())) { + toXContent(builder, ToXContent.EMPTY_PARAMS); + return BytesReference.bytes(builder); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + + static List randomEvents(XContentType xType) { int size = randomIntBetween(1, 10); - List hits = null; + List hits = null; if (randomBoolean()) { hits = new ArrayList<>(); for (int i = 0; i < size; i++) { - hits.add(new SearchHit(i, randomAlphaOfLength(10), null, new HashMap<>(), new HashMap<>())); + BytesReference bytes = new RandomSource(() -> randomAlphaOfLength(10)).toBytes(xType); + hits.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event(String.valueOf(i), randomAlphaOfLength(10), bytes)); } } if (randomBoolean()) { @@ -53,10 +103,10 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase seq = null; if (randomBoolean()) { @@ -77,7 +128,7 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase serverEvents, + List clientEvents + ) { + assertThat(serverEvents.size(), equalTo(clientEvents.size())); + for (int j = 0; j < serverEvents.size(); j++) { + assertThat( + SourceLookup.sourceAsMap(serverEvents.get(j).source()), is(clientEvents.get(j).sourceAsMap())); +} + } } diff --git a/docs/reference/eql/detect-threats-with-eql.asciidoc b/docs/reference/eql/detect-threats-with-eql.asciidoc index d175a3bcd2d..86d68774321 100644 --- a/docs/reference/eql/detect-threats-with-eql.asciidoc +++ b/docs/reference/eql/detect-threats-with-eql.asciidoc @@ -166,9 +166,7 @@ The response also includes other valuable information about how the "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "gl5MJXMBMk1dGnErnBW8", - "_score": null, "_source": { "process": { "parent": { @@ -246,9 +244,7 @@ The query matches an event, confirming `scrobj.dll` was later loaded by "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "ol5MJXMBMk1dGnErnBW8", - "_score": null, "_source": { "process": { "name": "regsvr32.exe", @@ -333,12 +329,7 @@ The query matches a sequence, indicating the attack likely succeeded. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "gl5MJXMBMk1dGnErnBW8", - "_version": 1, - "_seq_no": 3, - "_primary_term": 1, - "_score": null, "_source": { "process": { "parent": { @@ -368,12 +359,7 @@ The query matches a sequence, indicating the attack likely succeeded. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "ol5MJXMBMk1dGnErnBW8", - "_version": 1, - "_seq_no": 5, - "_primary_term": 1, - "_score": null, "_source": { "process": { "name": "regsvr32.exe", @@ -393,12 +379,7 @@ The query matches a sequence, indicating the attack likely succeeded. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "EF5MJXMBMk1dGnErnBa9", - "_version": 1, - "_seq_no": 24, - "_primary_term": 1, - "_score": null, "_source": { "process": { "name": "regsvr32.exe", @@ -437,8 +418,5 @@ The query matches a sequence, indicating the attack likely succeeded. ---- // TESTRESPONSE[s/"took": 25/"took": $body.took/] // TESTRESPONSE[s/"_id": "gl5MJXMBMk1dGnErnBW8"/"_id": $body.hits.sequences.0.events.0._id/] -// TESTRESPONSE[s/"_seq_no": 3/"_seq_no": $body.hits.sequences.0.events.0._seq_no/] // TESTRESPONSE[s/"_id": "ol5MJXMBMk1dGnErnBW8"/"_id": $body.hits.sequences.0.events.1._id/] -// TESTRESPONSE[s/"_seq_no": 5/"_seq_no": $body.hits.sequences.0.events.1._seq_no/] // TESTRESPONSE[s/"_id": "EF5MJXMBMk1dGnErnBa9"/"_id": $body.hits.sequences.0.events.2._id/] -// TESTRESPONSE[s/"_seq_no": 24/"_seq_no": $body.hits.sequences.0.events.2._seq_no/] diff --git a/docs/reference/eql/eql-search-api.asciidoc b/docs/reference/eql/eql-search-api.asciidoc index e2e494df1c4..d87e0b96b56 100644 --- a/docs/reference/eql/eql-search-api.asciidoc +++ b/docs/reference/eql/eql-search-api.asciidoc @@ -442,11 +442,6 @@ doesn’t overwrite a newer version. See <>. (integer) Primary term assigned to the document. See <>. -`_score`:: -(float) -Positive 32-bit floating point number used to determine the relevance of the - event. See <>. - `_source`:: (object) Original JSON body passed for the event at index time. @@ -472,11 +467,6 @@ Name of the index containing the event. Unique identifier for the event. This ID is only unique within the index. -`_score`:: -(float) -Positive 32-bit floating point number used to determine the relevance of the - event. See <>. - `_source`:: (object) Original JSON body passed for the event at index time. @@ -531,9 +521,7 @@ the events in ascending, lexicographic order. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "babI3XMBI9IjHuIqU0S_", - "_score": null, "_source": { "@timestamp": "2099-12-06T11:04:05.000Z", "event": { @@ -550,9 +538,7 @@ the events in ascending, lexicographic order. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "b6bI3XMBI9IjHuIqU0S_", - "_score": null, "_source": { "@timestamp": "2099-12-07T11:06:07.000Z", "event": { @@ -634,12 +620,7 @@ shared `process.pid` value for each matching event. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "AtOJ4UjUBAAx3XR5kcCM", - "_version": 1, - "_seq_no": 1, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-06T11:04:07.000Z", "event": { @@ -663,12 +644,7 @@ shared `process.pid` value for each matching event. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "OQmfCaduce8zoHT93o4H", - "_version": 1, - "_seq_no": 3, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:09.000Z", "event": { diff --git a/docs/reference/eql/eql.asciidoc b/docs/reference/eql/eql.asciidoc index e8b8fb4ccae..30cf57f83ca 100644 --- a/docs/reference/eql/eql.asciidoc +++ b/docs/reference/eql/eql.asciidoc @@ -88,9 +88,7 @@ ascending order. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "OQmfCaduce8zoHT93o4H", - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:09.000Z", "event": { @@ -108,9 +106,7 @@ ascending order. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "xLkCaj4EujzdNSxfYLbO", - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:10.000Z", "event": { @@ -189,12 +185,7 @@ The API returns the following response. Matching sequences are included in the "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "OQmfCaduce8zoHT93o4H", - "_version": 1, - "_seq_no": 3, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:09.000Z", "event": { @@ -212,12 +203,7 @@ The API returns the following response. Matching sequences are included in the }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "yDwnGIJouOYGBzP0ZE9n", - "_version": 1, - "_seq_no": 4, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:10.000Z", "event": { @@ -325,12 +311,7 @@ contains the shared `process.pid` value for each matching event. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "OQmfCaduce8zoHT93o4H", - "_version": 1, - "_seq_no": 3, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:09.000Z", "event": { @@ -348,12 +329,7 @@ contains the shared `process.pid` value for each matching event. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "yDwnGIJouOYGBzP0ZE9n", - "_version": 1, - "_seq_no": 4, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:10.000Z", "event": { diff --git a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java index 84b82b4c8f6..87307a2b5d2 100644 --- a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java +++ b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java @@ -15,11 +15,11 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.eql.EqlSearchRequest; import org.elasticsearch.client.eql.EqlSearchResponse; +import org.elasticsearch.client.eql.EqlSearchResponse.Event; import org.elasticsearch.client.eql.EqlSearchResponse.Hits; import org.elasticsearch.client.eql.EqlSearchResponse.Sequence; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.LoggerMessageFormat; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.rest.ESRestTestCase; import org.junit.After; import org.junit.Before; @@ -133,7 +133,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { protected void assertResponse(EqlSearchResponse response) { Hits hits = response.hits(); if (hits.events() != null) { - assertSearchHits(hits.events()); + assertEvents(hits.events()); } else if (hits.sequences() != null) { assertSequences(hits.sequences()); @@ -157,7 +157,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { return highLevelClient().eql(); } - protected void assertSearchHits(List events) { + protected void assertEvents(List events) { assertNotNull(events); long[] expected = eventIds; long[] actual = extractIds(events); @@ -166,20 +166,20 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { expected, actual); } - private static long[] extractIds(List events) { + private static long[] extractIds(List events) { final int len = events.size(); final long ids[] = new long[len]; for (int i = 0; i < len; i++) { - ids[i] = ((Number) events.get(i).getSourceAsMap().get("serial_event_id")).longValue(); + ids[i] = ((Number) events.get(i).sourceAsMap().get("serial_event_id")).longValue(); } return ids; } protected void assertSequences(List sequences) { - List events = sequences.stream() + List events = sequences.stream() .flatMap(s -> s.events().stream()) .collect(toList()); - assertSearchHits(events); + assertEvents(events); } private RestHighLevelClient highLevelClient() { diff --git a/x-pack/plugin/eql/qa/rest/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java b/x-pack/plugin/eql/qa/rest/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java index aa1df16c585..5443715d735 100644 --- a/x-pack/plugin/eql/qa/rest/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java +++ b/x-pack/plugin/eql/qa/rest/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java @@ -7,18 +7,12 @@ package org.elasticsearch.xpack.eql; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; -import org.elasticsearch.Build; + import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; -import org.junit.BeforeClass; public class EqlRestIT extends ESClientYamlSuiteTestCase { - @BeforeClass - public static void checkForSnapshot() { - assumeTrue("Only works on snapshot builds for now", Build.CURRENT.isSnapshot()); - } - public EqlRestIT(final ClientYamlTestCandidate testCandidate) { super(testCandidate); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java index 6b494f6d128..43036658441 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -20,9 +21,11 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParserUtils; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.search.SearchHits; import java.io.IOException; @@ -179,6 +182,110 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec return Strings.toString(this); } + // Event + public static class Event implements Writeable, ToXContentObject { + + private static final class Fields { + static final String INDEX = GetResult._INDEX; + static final String ID = GetResult._ID; + static final String SOURCE = SourceFieldMapper.NAME; + } + + private static final ParseField INDEX = new ParseField(Fields.INDEX); + private static final ParseField ID = new ParseField(Fields.ID); + private static final ParseField SOURCE = new ParseField(Fields.SOURCE); + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("eql/search_response_event", true, + args -> new Event((String) args[0], (String) args[1], (BytesReference) args[2])); + + static { + PARSER.declareString(constructorArg(), INDEX); + PARSER.declareString(constructorArg(), ID); + PARSER.declareObject(constructorArg(), (p, c) -> { + try (XContentBuilder builder = XContentBuilder.builder(p.contentType().xContent())) { + builder.copyCurrentStructure(p); + return BytesReference.bytes(builder); + } + }, SOURCE); + } + + private final String index; + private final String id; + private final BytesReference source; + + public Event(String index, String id, BytesReference source) { + this.index = index; + this.id = id; + this.source = source; + } + + public Event(StreamInput in) throws IOException { + index = in.readString(); + id = in.readString(); + source = in.readBytesReference(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(id); + out.writeBytesReference(source); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Fields.INDEX, index); + builder.field(Fields.ID, id); + // We have to use the deprecated version since we don't know the content type of the original source + XContentHelper.writeRawField(Fields.SOURCE, source, builder, params); + builder.endObject(); + return builder; + } + + public static Event fromXContent(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + public String index() { + return index; + } + + public String id() { + return id; + } + + public BytesReference source() { + return source; + } + + @Override + public int hashCode() { + return Objects.hash(index, id, source); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + EqlSearchResponse.Event other = (EqlSearchResponse.Event) obj; + return Objects.equals(index, other.index) + && Objects.equals(id, other.id) + && Objects.equals(source, other.source); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } + } // Sequence public static class Sequence implements Writeable, ToXContentObject { @@ -195,20 +302,20 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec args -> { int i = 0; @SuppressWarnings("unchecked") List joinKeys = (List) args[i++]; - @SuppressWarnings("unchecked") List events = (List) args[i]; + @SuppressWarnings("unchecked") List events = (List) args[i]; return new EqlSearchResponse.Sequence(joinKeys, events); }); static { PARSER.declareFieldArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> XContentParserUtils.parseFieldsValue(p), JOIN_KEYS, ObjectParser.ValueType.VALUE_ARRAY); - PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), EVENTS); + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), EVENTS); } private final List joinKeys; - private final List events; + private final List events; - public Sequence(List joinKeys, List events) { + public Sequence(List joinKeys, List events) { this.joinKeys = joinKeys == null ? Collections.emptyList() : joinKeys; this.events = events == null ? Collections.emptyList() : events; } @@ -216,7 +323,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec @SuppressWarnings("unchecked") public Sequence(StreamInput in) throws IOException { this.joinKeys = (List) in.readGenericValue(); - this.events = in.readList(SearchHit::new); + this.events = in.readList(Event::new); } public static Sequence fromXContent(XContentParser parser) { @@ -236,8 +343,8 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec builder.field(Fields.JOIN_KEYS, joinKeys); } if (events.isEmpty() == false) { - builder.startArray(EVENTS.getPreferredName()); - for (SearchHit event : events) { + builder.startArray(Fields.EVENTS); + for (Event event : events) { event.toXContent(builder, params); } builder.endArray(); @@ -268,7 +375,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec return joinKeys; } - public List events() { + public List events() { return events; } } @@ -372,11 +479,10 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec } } - // Hits public static class Hits implements Writeable, ToXContentFragment { public static final Hits EMPTY = new Hits(null, null, null, null); - private final List events; + private final List events; private final List sequences; private final List counts; private final TotalHits totalHits; @@ -389,7 +495,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec static final String COUNTS = "counts"; } - public Hits(@Nullable List events, @Nullable List sequences, @Nullable List counts, + public Hits(@Nullable List events, @Nullable List sequences, @Nullable List counts, @Nullable TotalHits totalHits) { this.events = events; this.sequences = sequences; @@ -404,7 +510,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec } else { totalHits = null; } - events = in.readBoolean() ? in.readList(SearchHit::new) : null; + events = in.readBoolean() ? in.readList(Event::new) : null; sequences = in.readBoolean() ? in.readList(Sequence::new) : null; counts = in.readBoolean() ? in.readList(Count::new) : null; } @@ -440,15 +546,15 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec new ConstructingObjectParser<>("eql/search_response_count", true, args -> { int i = 0; - @SuppressWarnings("unchecked") List searchHits = (List) args[i++]; + @SuppressWarnings("unchecked") List events = (List) args[i++]; @SuppressWarnings("unchecked") List sequences = (List) args[i++]; @SuppressWarnings("unchecked") List counts = (List) args[i++]; TotalHits totalHits = (TotalHits) args[i]; - return new EqlSearchResponse.Hits(searchHits, sequences, counts, totalHits); + return new EqlSearchResponse.Hits(events, sequences, counts, totalHits); }); static { - PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), new ParseField(Fields.EVENTS)); PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Sequence.PARSER, new ParseField(Fields.SEQUENCES)); @@ -473,7 +579,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec } if (events != null) { builder.startArray(Fields.EVENTS); - for (SearchHit event : events) { + for (Event event : events) { event.toXContent(builder, params); } builder.endArray(); @@ -509,7 +615,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec return Objects.hash(events, sequences, counts, totalHits); } - public List events() { + public List events() { return this.events; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/EventPayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/EventPayload.java new file mode 100644 index 00000000000..6f10f45cd39 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/EventPayload.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.payload; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event; +import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils; + +import java.util.ArrayList; +import java.util.List; + +public class EventPayload extends AbstractPayload { + + private final List values; + + public EventPayload(SearchResponse response) { + super(response.isTimedOut(), response.getTook()); + + List hits = RuntimeUtils.searchHits(response); + values = new ArrayList<>(hits.size()); + for (SearchHit hit : hits) { + values.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceRef())); + } + } + + @Override + public Type resultType() { + return Type.EVENT; + } + + @Override + public List values() { + return values; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java index 8b40d178351..36c18e24432 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.eql.execution.payload; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.eql.session.Payload; -import org.elasticsearch.xpack.eql.session.Results.Type; import java.util.Collections; import java.util.List; @@ -38,7 +37,7 @@ public class ReversePayload implements Payload { } @Override - public List values() { + public List values() { return delegate.values(); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java deleted file mode 100644 index d97e882a20f..00000000000 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.eql.execution.payload; - -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchSortValues; -import org.elasticsearch.xpack.eql.session.Results.Type; - -import java.util.Arrays; -import java.util.List; - -public class SearchResponsePayload extends AbstractPayload { - - private final List hits; - - public SearchResponsePayload(SearchResponse response) { - super(response.isTimedOut(), response.getTook()); - hits = Arrays.asList(response.getHits().getHits()); - // clean hits - SearchSortValues sortValues = new SearchSortValues(new Object[0], new DocValueFormat[0]); - for (SearchHit hit : hits) { - hit.sortValues(sortValues); - } - } - - @Override - public Type resultType() { - return Type.SEARCH_HIT; - } - - @SuppressWarnings("unchecked") - @Override - public List values() { - return (List) hits; - } -} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/Sequence.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/Sequence.java deleted file mode 100644 index 09a398137ac..00000000000 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/Sequence.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.eql.execution.payload; - -import org.elasticsearch.search.SearchHit; - -public class Sequence { - - private final Iterable events; - - public Sequence(Iterable event) { - this.events = event; - } - - public Iterable event() { - return events; - } -} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/AsEventListener.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/AsEventListener.java new file mode 100644 index 00000000000..c7cc695df6f --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/AsEventListener.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.xpack.eql.execution.payload.EventPayload; +import org.elasticsearch.xpack.eql.session.Payload; + +public class AsEventListener implements ActionListener { + + private final ActionListener listener; + + public AsEventListener(ActionListener listener) { + this.listener = listener; + } + + @Override + public void onResponse(SearchResponse response) { + listener.onResponse(new EventPayload(response)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java index ab68cb56af0..e6529613500 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java @@ -12,8 +12,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; -import org.elasticsearch.xpack.eql.execution.payload.SearchResponsePayload; -import org.elasticsearch.xpack.eql.session.Payload; import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.logSearchResponse; @@ -21,9 +19,9 @@ public class BasicListener implements ActionListener { private static final Logger log = RuntimeUtils.QUERY_LOG; - private final ActionListener listener; + private final ActionListener listener; - public BasicListener(ActionListener listener) { + public BasicListener(ActionListener listener) { this.listener = listener; } @@ -37,7 +35,7 @@ public class BasicListener implements ActionListener { if (log.isTraceEnabled()) { logSearchResponse(response, log); } - listener.onResponse(new SearchResponsePayload(response)); + listener.onResponse(response); } } catch (Exception ex) { onFailure(ex); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java index 671966ef071..b59852f6ff5 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java @@ -13,17 +13,13 @@ import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetRequest.Item; import org.elasticsearch.action.get.MultiGetRequestBuilder; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.text.Text; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.xpack.eql.session.EqlConfiguration; import org.elasticsearch.xpack.eql.session.EqlSession; -import org.elasticsearch.xpack.eql.session.Payload; import org.elasticsearch.xpack.ql.util.StringUtils; import java.util.ArrayList; @@ -47,7 +43,7 @@ public class BasicQueryClient implements QueryClient { } @Override - public void query(QueryRequest request, ActionListener listener) { + public void query(QueryRequest request, ActionListener listener) { SearchSourceBuilder searchSource = request.searchSource(); // set query timeout searchSource.timeout(cfg.requestTimeout()); @@ -64,7 +60,7 @@ public class BasicQueryClient implements QueryClient { } @Override - public void get(Iterable> refs, ActionListener>> listener) { + public void get(Iterable> refs, ActionListener>> listener) { MultiGetRequestBuilder requestBuilder = client.prepareMultiGet(); // no need for real-time requestBuilder.setRealtime(false) @@ -84,29 +80,18 @@ public class BasicQueryClient implements QueryClient { final int listSize = sz; client.multiGet(requestBuilder.request(), wrap(r -> { - List> hits = new ArrayList<>(r.getResponses().length / listSize); + List> hits = new ArrayList<>(r.getResponses().length / listSize); - List sequence = new ArrayList<>(listSize); + List sequence = new ArrayList<>(listSize); int counter = 0; - Text type = new Text("_doc"); for (MultiGetItemResponse mgr : r.getResponses()) { if (mgr.isFailed()) { listener.onFailure(mgr.getFailure().getFailure()); return; } - GetResponse response = mgr.getResponse(); - SearchHit hit = new SearchHit(-1, response.getId(), type, null, null); - hit.sourceRef(response.getSourceInternal()); - // need to create these objects to set the index - hit.shard(new SearchShardTarget(null, new ShardId(response.getIndex(), "", -1), null, null)); - hit.setSeqNo(response.getSeqNo()); - hit.setPrimaryTerm(response.getPrimaryTerm()); - hit.version(response.getVersion()); - - - sequence.add(hit); + sequence.add(mgr.getResponse()); if (++counter == listSize) { counter = 0; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java index 19e14d7b4be..c9855e2a34d 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java @@ -7,8 +7,8 @@ package org.elasticsearch.xpack.eql.execution.search; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.xpack.eql.session.Payload; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; import java.util.List; @@ -17,7 +17,7 @@ import java.util.List; */ public interface QueryClient { - void query(QueryRequest request, ActionListener listener); + void query(QueryRequest request, ActionListener listener); - void get(Iterable> refs, ActionListener>> listener); + void get(Iterable> refs, ActionListener>> listener); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java index 2bfa6a3ba13..82957ebcd3c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; @@ -27,6 +28,7 @@ import org.elasticsearch.xpack.ql.expression.gen.pipeline.ReferenceInput; import org.elasticsearch.xpack.ql.index.IndexResolver; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; @@ -105,4 +107,8 @@ public final class RuntimeUtils { includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS) .request(); } + + public static List searchHits(SearchResponse response) { + return Arrays.asList(response.getHits().getHits()); + } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java index 783f6c6607b..f8fbb57686c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java @@ -6,10 +6,10 @@ package org.elasticsearch.xpack.eql.execution.sequence; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event; import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload; -import org.elasticsearch.xpack.eql.session.Results.Type; import java.util.ArrayList; import java.util.List; @@ -18,14 +18,18 @@ class SequencePayload extends AbstractPayload { private final List values; - SequencePayload(List sequences, List> searchHits, boolean timedOut, TimeValue timeTook) { + SequencePayload(List sequences, List> docs, boolean timedOut, TimeValue timeTook) { super(timedOut, timeTook); values = new ArrayList<>(sequences.size()); for (int i = 0; i < sequences.size(); i++) { Sequence s = sequences.get(i); - List hits = searchHits.get(i); - values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asList(), hits)); + List hits = docs.get(i); + List events = new ArrayList<>(hits.size()); + for (GetResponse hit : hits) { + events.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceAsBytesRef())); + } + values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asList(), events)); } } @@ -34,9 +38,8 @@ class SequencePayload extends AbstractPayload { return Type.SEQUENCE; } - @SuppressWarnings("unchecked") @Override - public List values() { - return (List) values; + public List values() { + return values; } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java index ef5af17f9d1..0a249ddd836 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.execution.sequence; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; @@ -20,13 +21,14 @@ import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.eql.execution.search.QueryClient; import org.elasticsearch.xpack.eql.session.EmptyPayload; import org.elasticsearch.xpack.eql.session.Payload; -import org.elasticsearch.xpack.eql.session.Results.Type; +import org.elasticsearch.xpack.eql.session.Payload.Type; import org.elasticsearch.xpack.eql.util.ReversedIterator; import java.util.Iterator; import java.util.List; import static org.elasticsearch.action.ActionListener.wrap; +import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.searchHits; /** * Time-based window encapsulating query creation and advancement. @@ -95,9 +97,9 @@ public class TumblingWindow implements Executable { client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure)); } - private void baseCriterion(int baseStage, Payload p, ActionListener listener) { + private void baseCriterion(int baseStage, SearchResponse r, ActionListener listener) { Criterion base = criteria.get(baseStage); - List hits = p.values(); + List hits = searchHits(r); log.trace("Found [{}] hits", hits.size()); @@ -167,8 +169,8 @@ public class TumblingWindow implements Executable { log.trace("Querying until stage {}", request); - client.query(request, wrap(p -> { - List hits = p.values(); + client.query(request, wrap(r -> { + List hits = searchHits(r); log.trace("Found [{}] hits", hits.size()); // no more results for until - let the other queries run @@ -208,8 +210,8 @@ public class TumblingWindow implements Executable { log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request); - client.query(request, wrap(p -> { - List hits = p.values(); + client.query(request, wrap(r -> { + List hits = searchHits(r); log.trace("Found [{}] hits", hits.size()); @@ -298,8 +300,8 @@ public class TumblingWindow implements Executable { return; } - client.get(hits(completed), wrap(searchHits -> { - listener.onResponse(new SequencePayload(completed, searchHits, false, timeTook())); + client.get(hits(completed), wrap(hits -> { + listener.onResponse(new SequencePayload(completed, hits, false, timeTook())); matcher.clear(); }, listener::onFailure)); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java index 5054d3c9e66..1291669f130 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java @@ -11,7 +11,7 @@ import org.elasticsearch.xpack.eql.plan.logical.Join; import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter; import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset; import org.elasticsearch.xpack.eql.plan.physical.LocalRelation; -import org.elasticsearch.xpack.eql.session.Results; +import org.elasticsearch.xpack.eql.session.Payload.Type; import org.elasticsearch.xpack.eql.util.MathUtils; import org.elasticsearch.xpack.eql.util.StringUtils; import org.elasticsearch.xpack.ql.expression.Expression; @@ -169,7 +169,7 @@ public class Optimizer extends RuleExecutor { @Override protected LogicalPlan rule(UnaryPlan plan) { if ((plan instanceof KeyedFilter) == false && plan.child() instanceof LocalRelation) { - return new LocalRelation(plan.source(), plan.output(), Results.Type.SEARCH_HIT); + return new LocalRelation(plan.source(), plan.output(), Type.EVENT); } return plan; } @@ -335,7 +335,7 @@ public class Optimizer extends RuleExecutor { // check for empty filters for (KeyedFilter filter : plan.queries()) { if (filter.anyMatch(LocalRelation.class::isInstance)) { - return new LocalRelation(plan.source(), plan.output(), Results.Type.SEQUENCE); + return new LocalRelation(plan.source(), plan.output(), Type.SEQUENCE); } } return plan; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java index fdfd40ed790..e9fb0545cf6 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java @@ -10,6 +10,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xpack.eql.execution.search.AsEventListener; import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient; import org.elasticsearch.xpack.eql.execution.search.QueryRequest; import org.elasticsearch.xpack.eql.execution.search.ReverseListener; @@ -61,7 +62,7 @@ public class EsQueryExec extends LeafExec { // endpoint - fetch all source QueryRequest request = () -> source(session).fetchSource(FetchSourceContext.FETCH_SOURCE); listener = shouldReverse(request) ? new ReverseListener(listener) : listener; - new BasicQueryClient(session).query(request, listener); + new BasicQueryClient(session).query(request, new AsEventListener(listener)); } private boolean shouldReverse(QueryRequest query) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java index 3ef33912c33..084d21c4349 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java @@ -10,7 +10,6 @@ import org.elasticsearch.xpack.eql.session.EmptyExecutable; import org.elasticsearch.xpack.eql.session.EqlSession; import org.elasticsearch.xpack.eql.session.Executable; import org.elasticsearch.xpack.eql.session.Payload; -import org.elasticsearch.xpack.eql.session.Results; import org.elasticsearch.xpack.ql.expression.Attribute; import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.ql.tree.NodeInfo; @@ -27,10 +26,10 @@ public class LocalRelation extends LogicalPlan implements Executable { private final Executable executable; public LocalRelation(Source source, List output) { - this(source, output, Results.Type.SEARCH_HIT); + this(source, output, Payload.Type.EVENT); } - public LocalRelation(Source source, List output, Results.Type resultType) { + public LocalRelation(Source source, List output, Payload.Type resultType) { this(source, new EmptyExecutable(output, resultType)); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index 3280652f47f..9bfeac2b654 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -125,7 +125,7 @@ public class TransportEqlSearchAction extends HandledTransportAction List values() { + public List values() { return emptyList(); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java index 920b1146525..49cf430d761 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java @@ -11,16 +11,21 @@ import org.elasticsearch.common.unit.TimeValue; import java.util.List; /** - * Container for internal results. Can be low-level such as SearchHits or Sequences. - * Generalized to allow reuse and internal pluggability. + * Container for final results. Used for completed data, such as Events or Sequences. */ public interface Payload { - Results.Type resultType(); + enum Type { + EVENT, + SEQUENCE, + COUNT; + } + + Type resultType(); boolean timedOut(); TimeValue timeTook(); - List values(); + List values(); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java index cd49b2749a3..9b420251268 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java @@ -9,20 +9,15 @@ package org.elasticsearch.xpack.eql.session; import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits.Relation; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Count; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event; import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence; +import org.elasticsearch.xpack.eql.session.Payload.Type; import java.util.List; public class Results { - public enum Type { - SEARCH_HIT, - SEQUENCE, - COUNT; - } - private final TotalHits totalHits; private final List results; private final boolean timedOut; @@ -47,8 +42,8 @@ public class Results { } @SuppressWarnings("unchecked") - public List searchHits() { - return type == Type.SEARCH_HIT ? (List) results : null; + public List events() { + return type == Type.EVENT ? (List) results : null; } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java index b1346c4d66d..cd1bc13bd7e 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java @@ -6,26 +6,78 @@ package org.elasticsearch.xpack.eql.action; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; public class EqlSearchResponseTests extends AbstractSerializingTestCase { - static List randomEvents() { + private static class RandomSource implements ToXContentObject { + + private final String key; + private final String value; + + RandomSource(Supplier randomStringSupplier) { + this.key = randomStringSupplier.get(); + this.value = randomStringSupplier.get(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(key, value); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RandomSource other = (RandomSource) obj; + return Objects.equals(key, other.key) && Objects.equals(value, other.value); + } + + public BytesReference toBytes(XContentType type) { + try (XContentBuilder builder = XContentBuilder.builder(type.xContent())) { + toXContent(builder, ToXContent.EMPTY_PARAMS); + return BytesReference.bytes(builder); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + + static List randomEvents(XContentType xType) { int size = randomIntBetween(1, 10); - List hits = null; + List hits = null; if (randomBoolean()) { hits = new ArrayList<>(); for (int i = 0; i < size; i++) { - hits.add(new SearchHit(i, randomAlphaOfLength(10), null, new HashMap<>(), new HashMap<>())); + BytesReference bytes = new RandomSource(() -> randomAlphaOfLength(10)).toBytes(xType); + hits.add(new Event(String.valueOf(i), randomAlphaOfLength(10), bytes)); } } if (randomBoolean()) { @@ -34,9 +86,14 @@ public class EqlSearchResponseTests extends AbstractSerializingTestCase seq = null; if (randomBoolean()) { @@ -76,7 +133,7 @@ public class EqlSearchResponseTests extends AbstractSerializingTestCase hits; private final Map> events; - TestPayload(Map> events) { + EventsAsHits(Map> events) { this.events = events; this.hits = new ArrayList<>(events.size()); @@ -147,25 +153,8 @@ public class SequenceSpecTests extends ESTestCase { } } - @Override - public Type resultType() { - return Type.SEARCH_HIT; - } - - @Override - public boolean timedOut() { - return false; - } - - @Override - public TimeValue timeTook() { - return TimeValue.ZERO; - } - - @SuppressWarnings("unchecked") - @Override - public List values() { - return (List) hits; + public List hits() { + return hits; } @Override @@ -177,17 +166,23 @@ public class SequenceSpecTests extends ESTestCase { class TestQueryClient implements QueryClient { @Override - public void query(QueryRequest r, ActionListener l) { + public void query(QueryRequest r, ActionListener l) { int ordinal = r.searchSource().size(); if (ordinal != Integer.MAX_VALUE) { r.searchSource().size(Integer.MAX_VALUE); } Map> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap(); - l.onResponse(new TestPayload(evs)); + + EventsAsHits eah = new EventsAsHits(evs); + SearchHits searchHits = new SearchHits(eah.hits.toArray(new SearchHit[0]), new TotalHits(eah.hits.size(), Relation.EQUAL_TO), + 0.0f); + SearchResponseSections internal = new SearchResponseSections(searchHits, null, null, false, false, null, 0); + SearchResponse s = new SearchResponse(internal, null, 0, 1, 0, 0, null, Clusters.EMPTY); + l.onResponse(s); } @Override - public void get(Iterable> refs, ActionListener>> listener) { + public void get(Iterable> refs, ActionListener>> listener) { //no-op } } @@ -240,7 +235,7 @@ public class SequenceSpecTests extends ESTestCase { List match = matches.get(i); List returned = new ArrayList<>(); for (int j = 0; j < match.size(); j++) { - int key = ((Number) TimestampExtractor.INSTANCE.extract(s.events().get(j))).intValue(); + int key = Integer.parseInt(s.events().get(j).id()); returned.add(allEvents.get(key)); }