diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java index 51aa4472ba8..d83d10ff1c6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java @@ -22,17 +22,22 @@ package org.elasticsearch.client.eql; import org.apache.lucene.search.TotalHits; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.InstantiatingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParserUtils; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; @@ -133,6 +138,95 @@ public class EqlSearchResponse { public int hashCode() { return Objects.hash(hits, tookInMillis, isTimeout); } + + // Event + public static class Event { + + private static final class Fields { + static final String INDEX = GetResult._INDEX; + static final String ID = GetResult._ID; + static final String SOURCE = SourceFieldMapper.NAME; + } + + private static final ParseField INDEX = new ParseField(Fields.INDEX); + private static final ParseField ID = new ParseField(Fields.ID); + private static final ParseField SOURCE = new ParseField(Fields.SOURCE); + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("eql/search_response_event", true, + args -> new Event((String) args[0], (String) args[1], (BytesReference) args[2])); + + static { + PARSER.declareString(constructorArg(), INDEX); + PARSER.declareString(constructorArg(), ID); + PARSER.declareObject(constructorArg(), (p, c) -> { + try (XContentBuilder builder = XContentBuilder.builder(p.contentType().xContent())) { + builder.copyCurrentStructure(p); + return BytesReference.bytes(builder); + } + }, SOURCE); + } + + private final String index; + private final String id; + private final BytesReference source; + private Map sourceAsMap; + + public Event(String index, String id, BytesReference source) { + this.index = index; + this.id = id; + this.source = source; + } + + public static Event fromXContent(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + public String index() { + return index; + } + + public String id() { + return id; + } + + public BytesReference source() { + return source; + } + + public Map sourceAsMap() { + if (source == null) { + return null; + } + if (sourceAsMap != null) { + return sourceAsMap; + } + + sourceAsMap = SourceLookup.sourceAsMap(source); + return sourceAsMap; + } + + @Override + public int hashCode() { + return Objects.hash(index, id, source); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + EqlSearchResponse.Event other = (EqlSearchResponse.Event) obj; + return Objects.equals(index, other.index) + && Objects.equals(id, other.id) + && Objects.equals(source, other.source); + } + } // Sequence public static class Sequence { @@ -149,20 +243,20 @@ public class EqlSearchResponse { args -> { int i = 0; @SuppressWarnings("unchecked") List joinKeys = (List) args[i++]; - @SuppressWarnings("unchecked") List events = (List) args[i]; + @SuppressWarnings("unchecked") List events = (List) args[i]; return new EqlSearchResponse.Sequence(joinKeys, events); }); static { PARSER.declareFieldArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> XContentParserUtils.parseFieldsValue(p), JOIN_KEYS, ObjectParser.ValueType.VALUE_ARRAY); - PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), EVENTS); + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), EVENTS); } private final List joinKeys; - private final List events; + private final List events; - public Sequence(List joinKeys, List events) { + public Sequence(List joinKeys, List events) { this.joinKeys = joinKeys == null ? Collections.emptyList() : joinKeys; this.events = events == null ? Collections.emptyList() : events; } @@ -171,6 +265,19 @@ public class EqlSearchResponse { return PARSER.apply(parser, null); } + public List joinKeys() { + return joinKeys; + } + + public List events() { + return events; + } + + @Override + public int hashCode() { + return Objects.hash(joinKeys, events); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -183,19 +290,6 @@ public class EqlSearchResponse { return Objects.equals(joinKeys, that.joinKeys) && Objects.equals(events, that.events); } - - @Override - public int hashCode() { - return Objects.hash(joinKeys, events); - } - - public List joinKeys() { - return joinKeys; - } - - public List events() { - return events; - } } // Count @@ -241,6 +335,23 @@ public class EqlSearchResponse { return PARSER.apply(parser, null); } + public int count() { + return count; + } + + public List keys() { + return keys; + } + + public float percent() { + return percent; + } + + @Override + public int hashCode() { + return Objects.hash(count, keys, percent); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -254,30 +365,13 @@ public class EqlSearchResponse { && Objects.equals(keys, that.keys) && Objects.equals(percent, that.percent); } - - @Override - public int hashCode() { - return Objects.hash(count, keys, percent); - } - - public int count() { - return count; - } - - public List keys() { - return keys; - } - - public float percent() { - return percent; - } } // Hits public static class Hits { public static final Hits EMPTY = new Hits(null, null, null, null); - private final List events; + private final List events; private final List sequences; private final List counts; private final TotalHits totalHits; @@ -289,7 +383,7 @@ public class EqlSearchResponse { static final String COUNTS = "counts"; } - public Hits(@Nullable List events, @Nullable List sequences, @Nullable List counts, + public Hits(@Nullable List events, @Nullable List sequences, @Nullable List counts, @Nullable TotalHits totalHits) { this.events = events; this.sequences = sequences; @@ -301,15 +395,15 @@ public class EqlSearchResponse { new ConstructingObjectParser<>("eql/search_response_count", true, args -> { int i = 0; - @SuppressWarnings("unchecked") List searchHits = (List) args[i++]; + @SuppressWarnings("unchecked") List events = (List) args[i++]; @SuppressWarnings("unchecked") List sequences = (List) args[i++]; @SuppressWarnings("unchecked") List counts = (List) args[i++]; TotalHits totalHits = (TotalHits) args[i]; - return new EqlSearchResponse.Hits(searchHits, sequences, counts, totalHits); + return new EqlSearchResponse.Hits(events, sequences, counts, totalHits); }); static { - PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), new ParseField(Fields.EVENTS)); PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Sequence.PARSER, new ParseField(Fields.SEQUENCES)); @@ -323,6 +417,27 @@ public class EqlSearchResponse { return PARSER.parse(parser, null); } + public List events() { + return this.events; + } + + public List sequences() { + return this.sequences; + } + + public List counts() { + return this.counts; + } + + public TotalHits totalHits() { + return this.totalHits; + } + + @Override + public int hashCode() { + return Objects.hash(events, sequences, counts, totalHits); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -337,26 +452,5 @@ public class EqlSearchResponse { && Objects.equals(counts, that.counts) && Objects.equals(totalHits, that.totalHits); } - - @Override - public int hashCode() { - return Objects.hash(events, sequences, counts, totalHits); - } - - public List events() { - return this.events; - } - - public List sequences() { - return this.sequences; - } - - public List counts() { - return this.counts; - } - - public TotalHits totalHits() { - return this.totalHits; - } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java index bacf3acfa90..146788fa26a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java @@ -29,13 +29,13 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.eql.EqlSearchRequest; import org.elasticsearch.client.eql.EqlSearchResponse; +import org.elasticsearch.client.eql.EqlSearchResponse.Event; import org.elasticsearch.client.eql.EqlStatsRequest; import org.elasticsearch.client.eql.EqlStatsResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.search.SearchHit; import org.junit.Before; import java.io.IOException; @@ -121,8 +121,8 @@ public class EqlIT extends ESRestHighLevelClientTestCase { assertResponse(response, RECORD_COUNT / DIVIDER); // test the content of the hits - for (SearchHit hit : response.hits().events()) { - final Map source = hit.getSourceAsMap(); + for (Event hit : response.hits().events()) { + final Map source = hit.sourceAsMap(); final Map event = (Map) source.get("event"); assertThat(event.get("category"), equalTo("process")); @@ -147,8 +147,8 @@ public class EqlIT extends ESRestHighLevelClientTestCase { assertResponse(response, 3); // test the content of the hits - for (SearchHit hit : response.hits().events()) { - final Map source = hit.getSourceAsMap(); + for (Event hit : response.hits().events()) { + final Map source = hit.sourceAsMap(); final Map event = (Map) source.get("event"); assertThat(event.get("category"), equalTo("process")); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java index 6cb1b8895af..41239e786ed 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java @@ -21,15 +21,19 @@ package org.elasticsearch.client.eql; import org.apache.lucene.search.TotalHits; import org.elasticsearch.client.AbstractResponseTestCase; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; @@ -38,13 +42,59 @@ import static org.hamcrest.Matchers.is; public class EqlSearchResponseTests extends AbstractResponseTestCase { - static List randomEvents() { + private static class RandomSource implements ToXContentObject { + + private final String key; + private final String value; + + RandomSource(Supplier randomStringSupplier) { + this.key = randomStringSupplier.get(); + this.value = randomStringSupplier.get(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(key, value); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RandomSource other = (RandomSource) obj; + return Objects.equals(key, other.key) && Objects.equals(value, other.value); + } + + public BytesReference toBytes(XContentType type) { + try (XContentBuilder builder = XContentBuilder.builder(type.xContent())) { + toXContent(builder, ToXContent.EMPTY_PARAMS); + return BytesReference.bytes(builder); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + + static List randomEvents(XContentType xType) { int size = randomIntBetween(1, 10); - List hits = null; + List hits = null; if (randomBoolean()) { hits = new ArrayList<>(); for (int i = 0; i < size; i++) { - hits.add(new SearchHit(i, randomAlphaOfLength(10), null, new HashMap<>(), new HashMap<>())); + BytesReference bytes = new RandomSource(() -> randomAlphaOfLength(10)).toBytes(xType); + hits.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event(String.valueOf(i), randomAlphaOfLength(10), bytes)); } } if (randomBoolean()) { @@ -53,10 +103,10 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase seq = null; if (randomBoolean()) { @@ -77,7 +128,7 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase serverEvents, + List clientEvents + ) { + assertThat(serverEvents.size(), equalTo(clientEvents.size())); + for (int j = 0; j < serverEvents.size(); j++) { + assertThat( + SourceLookup.sourceAsMap(serverEvents.get(j).source()), is(clientEvents.get(j).sourceAsMap())); +} + } } diff --git a/docs/reference/eql/detect-threats-with-eql.asciidoc b/docs/reference/eql/detect-threats-with-eql.asciidoc index d175a3bcd2d..86d68774321 100644 --- a/docs/reference/eql/detect-threats-with-eql.asciidoc +++ b/docs/reference/eql/detect-threats-with-eql.asciidoc @@ -166,9 +166,7 @@ The response also includes other valuable information about how the "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "gl5MJXMBMk1dGnErnBW8", - "_score": null, "_source": { "process": { "parent": { @@ -246,9 +244,7 @@ The query matches an event, confirming `scrobj.dll` was later loaded by "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "ol5MJXMBMk1dGnErnBW8", - "_score": null, "_source": { "process": { "name": "regsvr32.exe", @@ -333,12 +329,7 @@ The query matches a sequence, indicating the attack likely succeeded. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "gl5MJXMBMk1dGnErnBW8", - "_version": 1, - "_seq_no": 3, - "_primary_term": 1, - "_score": null, "_source": { "process": { "parent": { @@ -368,12 +359,7 @@ The query matches a sequence, indicating the attack likely succeeded. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "ol5MJXMBMk1dGnErnBW8", - "_version": 1, - "_seq_no": 5, - "_primary_term": 1, - "_score": null, "_source": { "process": { "name": "regsvr32.exe", @@ -393,12 +379,7 @@ The query matches a sequence, indicating the attack likely succeeded. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "EF5MJXMBMk1dGnErnBa9", - "_version": 1, - "_seq_no": 24, - "_primary_term": 1, - "_score": null, "_source": { "process": { "name": "regsvr32.exe", @@ -437,8 +418,5 @@ The query matches a sequence, indicating the attack likely succeeded. ---- // TESTRESPONSE[s/"took": 25/"took": $body.took/] // TESTRESPONSE[s/"_id": "gl5MJXMBMk1dGnErnBW8"/"_id": $body.hits.sequences.0.events.0._id/] -// TESTRESPONSE[s/"_seq_no": 3/"_seq_no": $body.hits.sequences.0.events.0._seq_no/] // TESTRESPONSE[s/"_id": "ol5MJXMBMk1dGnErnBW8"/"_id": $body.hits.sequences.0.events.1._id/] -// TESTRESPONSE[s/"_seq_no": 5/"_seq_no": $body.hits.sequences.0.events.1._seq_no/] // TESTRESPONSE[s/"_id": "EF5MJXMBMk1dGnErnBa9"/"_id": $body.hits.sequences.0.events.2._id/] -// TESTRESPONSE[s/"_seq_no": 24/"_seq_no": $body.hits.sequences.0.events.2._seq_no/] diff --git a/docs/reference/eql/eql-search-api.asciidoc b/docs/reference/eql/eql-search-api.asciidoc index e2e494df1c4..d87e0b96b56 100644 --- a/docs/reference/eql/eql-search-api.asciidoc +++ b/docs/reference/eql/eql-search-api.asciidoc @@ -442,11 +442,6 @@ doesn’t overwrite a newer version. See <>. (integer) Primary term assigned to the document. See <>. -`_score`:: -(float) -Positive 32-bit floating point number used to determine the relevance of the - event. See <>. - `_source`:: (object) Original JSON body passed for the event at index time. @@ -472,11 +467,6 @@ Name of the index containing the event. Unique identifier for the event. This ID is only unique within the index. -`_score`:: -(float) -Positive 32-bit floating point number used to determine the relevance of the - event. See <>. - `_source`:: (object) Original JSON body passed for the event at index time. @@ -531,9 +521,7 @@ the events in ascending, lexicographic order. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "babI3XMBI9IjHuIqU0S_", - "_score": null, "_source": { "@timestamp": "2099-12-06T11:04:05.000Z", "event": { @@ -550,9 +538,7 @@ the events in ascending, lexicographic order. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "b6bI3XMBI9IjHuIqU0S_", - "_score": null, "_source": { "@timestamp": "2099-12-07T11:06:07.000Z", "event": { @@ -634,12 +620,7 @@ shared `process.pid` value for each matching event. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "AtOJ4UjUBAAx3XR5kcCM", - "_version": 1, - "_seq_no": 1, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-06T11:04:07.000Z", "event": { @@ -663,12 +644,7 @@ shared `process.pid` value for each matching event. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "OQmfCaduce8zoHT93o4H", - "_version": 1, - "_seq_no": 3, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:09.000Z", "event": { diff --git a/docs/reference/eql/eql.asciidoc b/docs/reference/eql/eql.asciidoc index e8b8fb4ccae..30cf57f83ca 100644 --- a/docs/reference/eql/eql.asciidoc +++ b/docs/reference/eql/eql.asciidoc @@ -88,9 +88,7 @@ ascending order. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "OQmfCaduce8zoHT93o4H", - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:09.000Z", "event": { @@ -108,9 +106,7 @@ ascending order. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "xLkCaj4EujzdNSxfYLbO", - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:10.000Z", "event": { @@ -189,12 +185,7 @@ The API returns the following response. Matching sequences are included in the "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "OQmfCaduce8zoHT93o4H", - "_version": 1, - "_seq_no": 3, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:09.000Z", "event": { @@ -212,12 +203,7 @@ The API returns the following response. Matching sequences are included in the }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "yDwnGIJouOYGBzP0ZE9n", - "_version": 1, - "_seq_no": 4, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:10.000Z", "event": { @@ -325,12 +311,7 @@ contains the shared `process.pid` value for each matching event. "events": [ { "_index": "my-index-000001", - "_type": "_doc", "_id": "OQmfCaduce8zoHT93o4H", - "_version": 1, - "_seq_no": 3, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:09.000Z", "event": { @@ -348,12 +329,7 @@ contains the shared `process.pid` value for each matching event. }, { "_index": "my-index-000001", - "_type": "_doc", "_id": "yDwnGIJouOYGBzP0ZE9n", - "_version": 1, - "_seq_no": 4, - "_primary_term": 1, - "_score": null, "_source": { "@timestamp": "2099-12-07T11:07:10.000Z", "event": { diff --git a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java index 84b82b4c8f6..87307a2b5d2 100644 --- a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java +++ b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java @@ -15,11 +15,11 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.eql.EqlSearchRequest; import org.elasticsearch.client.eql.EqlSearchResponse; +import org.elasticsearch.client.eql.EqlSearchResponse.Event; import org.elasticsearch.client.eql.EqlSearchResponse.Hits; import org.elasticsearch.client.eql.EqlSearchResponse.Sequence; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.LoggerMessageFormat; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.rest.ESRestTestCase; import org.junit.After; import org.junit.Before; @@ -133,7 +133,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { protected void assertResponse(EqlSearchResponse response) { Hits hits = response.hits(); if (hits.events() != null) { - assertSearchHits(hits.events()); + assertEvents(hits.events()); } else if (hits.sequences() != null) { assertSequences(hits.sequences()); @@ -157,7 +157,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { return highLevelClient().eql(); } - protected void assertSearchHits(List events) { + protected void assertEvents(List events) { assertNotNull(events); long[] expected = eventIds; long[] actual = extractIds(events); @@ -166,20 +166,20 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { expected, actual); } - private static long[] extractIds(List events) { + private static long[] extractIds(List events) { final int len = events.size(); final long ids[] = new long[len]; for (int i = 0; i < len; i++) { - ids[i] = ((Number) events.get(i).getSourceAsMap().get("serial_event_id")).longValue(); + ids[i] = ((Number) events.get(i).sourceAsMap().get("serial_event_id")).longValue(); } return ids; } protected void assertSequences(List sequences) { - List events = sequences.stream() + List events = sequences.stream() .flatMap(s -> s.events().stream()) .collect(toList()); - assertSearchHits(events); + assertEvents(events); } private RestHighLevelClient highLevelClient() { diff --git a/x-pack/plugin/eql/qa/rest/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java b/x-pack/plugin/eql/qa/rest/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java index aa1df16c585..5443715d735 100644 --- a/x-pack/plugin/eql/qa/rest/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java +++ b/x-pack/plugin/eql/qa/rest/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java @@ -7,18 +7,12 @@ package org.elasticsearch.xpack.eql; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; -import org.elasticsearch.Build; + import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; -import org.junit.BeforeClass; public class EqlRestIT extends ESClientYamlSuiteTestCase { - @BeforeClass - public static void checkForSnapshot() { - assumeTrue("Only works on snapshot builds for now", Build.CURRENT.isSnapshot()); - } - public EqlRestIT(final ClientYamlTestCandidate testCandidate) { super(testCandidate); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java index 6b494f6d128..43036658441 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -20,9 +21,11 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParserUtils; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.search.SearchHits; import java.io.IOException; @@ -179,6 +182,110 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec return Strings.toString(this); } + // Event + public static class Event implements Writeable, ToXContentObject { + + private static final class Fields { + static final String INDEX = GetResult._INDEX; + static final String ID = GetResult._ID; + static final String SOURCE = SourceFieldMapper.NAME; + } + + private static final ParseField INDEX = new ParseField(Fields.INDEX); + private static final ParseField ID = new ParseField(Fields.ID); + private static final ParseField SOURCE = new ParseField(Fields.SOURCE); + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("eql/search_response_event", true, + args -> new Event((String) args[0], (String) args[1], (BytesReference) args[2])); + + static { + PARSER.declareString(constructorArg(), INDEX); + PARSER.declareString(constructorArg(), ID); + PARSER.declareObject(constructorArg(), (p, c) -> { + try (XContentBuilder builder = XContentBuilder.builder(p.contentType().xContent())) { + builder.copyCurrentStructure(p); + return BytesReference.bytes(builder); + } + }, SOURCE); + } + + private final String index; + private final String id; + private final BytesReference source; + + public Event(String index, String id, BytesReference source) { + this.index = index; + this.id = id; + this.source = source; + } + + public Event(StreamInput in) throws IOException { + index = in.readString(); + id = in.readString(); + source = in.readBytesReference(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(id); + out.writeBytesReference(source); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Fields.INDEX, index); + builder.field(Fields.ID, id); + // We have to use the deprecated version since we don't know the content type of the original source + XContentHelper.writeRawField(Fields.SOURCE, source, builder, params); + builder.endObject(); + return builder; + } + + public static Event fromXContent(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + public String index() { + return index; + } + + public String id() { + return id; + } + + public BytesReference source() { + return source; + } + + @Override + public int hashCode() { + return Objects.hash(index, id, source); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + EqlSearchResponse.Event other = (EqlSearchResponse.Event) obj; + return Objects.equals(index, other.index) + && Objects.equals(id, other.id) + && Objects.equals(source, other.source); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } + } // Sequence public static class Sequence implements Writeable, ToXContentObject { @@ -195,20 +302,20 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec args -> { int i = 0; @SuppressWarnings("unchecked") List joinKeys = (List) args[i++]; - @SuppressWarnings("unchecked") List events = (List) args[i]; + @SuppressWarnings("unchecked") List events = (List) args[i]; return new EqlSearchResponse.Sequence(joinKeys, events); }); static { PARSER.declareFieldArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> XContentParserUtils.parseFieldsValue(p), JOIN_KEYS, ObjectParser.ValueType.VALUE_ARRAY); - PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), EVENTS); + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), EVENTS); } private final List joinKeys; - private final List events; + private final List events; - public Sequence(List joinKeys, List events) { + public Sequence(List joinKeys, List events) { this.joinKeys = joinKeys == null ? Collections.emptyList() : joinKeys; this.events = events == null ? Collections.emptyList() : events; } @@ -216,7 +323,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec @SuppressWarnings("unchecked") public Sequence(StreamInput in) throws IOException { this.joinKeys = (List) in.readGenericValue(); - this.events = in.readList(SearchHit::new); + this.events = in.readList(Event::new); } public static Sequence fromXContent(XContentParser parser) { @@ -236,8 +343,8 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec builder.field(Fields.JOIN_KEYS, joinKeys); } if (events.isEmpty() == false) { - builder.startArray(EVENTS.getPreferredName()); - for (SearchHit event : events) { + builder.startArray(Fields.EVENTS); + for (Event event : events) { event.toXContent(builder, params); } builder.endArray(); @@ -268,7 +375,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec return joinKeys; } - public List events() { + public List events() { return events; } } @@ -372,11 +479,10 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec } } - // Hits public static class Hits implements Writeable, ToXContentFragment { public static final Hits EMPTY = new Hits(null, null, null, null); - private final List events; + private final List events; private final List sequences; private final List counts; private final TotalHits totalHits; @@ -389,7 +495,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec static final String COUNTS = "counts"; } - public Hits(@Nullable List events, @Nullable List sequences, @Nullable List counts, + public Hits(@Nullable List events, @Nullable List sequences, @Nullable List counts, @Nullable TotalHits totalHits) { this.events = events; this.sequences = sequences; @@ -404,7 +510,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec } else { totalHits = null; } - events = in.readBoolean() ? in.readList(SearchHit::new) : null; + events = in.readBoolean() ? in.readList(Event::new) : null; sequences = in.readBoolean() ? in.readList(Sequence::new) : null; counts = in.readBoolean() ? in.readList(Count::new) : null; } @@ -440,15 +546,15 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec new ConstructingObjectParser<>("eql/search_response_count", true, args -> { int i = 0; - @SuppressWarnings("unchecked") List searchHits = (List) args[i++]; + @SuppressWarnings("unchecked") List events = (List) args[i++]; @SuppressWarnings("unchecked") List sequences = (List) args[i++]; @SuppressWarnings("unchecked") List counts = (List) args[i++]; TotalHits totalHits = (TotalHits) args[i]; - return new EqlSearchResponse.Hits(searchHits, sequences, counts, totalHits); + return new EqlSearchResponse.Hits(events, sequences, counts, totalHits); }); static { - PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), new ParseField(Fields.EVENTS)); PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Sequence.PARSER, new ParseField(Fields.SEQUENCES)); @@ -473,7 +579,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec } if (events != null) { builder.startArray(Fields.EVENTS); - for (SearchHit event : events) { + for (Event event : events) { event.toXContent(builder, params); } builder.endArray(); @@ -509,7 +615,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec return Objects.hash(events, sequences, counts, totalHits); } - public List events() { + public List events() { return this.events; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/EventPayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/EventPayload.java new file mode 100644 index 00000000000..6f10f45cd39 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/EventPayload.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.payload; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event; +import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils; + +import java.util.ArrayList; +import java.util.List; + +public class EventPayload extends AbstractPayload { + + private final List values; + + public EventPayload(SearchResponse response) { + super(response.isTimedOut(), response.getTook()); + + List hits = RuntimeUtils.searchHits(response); + values = new ArrayList<>(hits.size()); + for (SearchHit hit : hits) { + values.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceRef())); + } + } + + @Override + public Type resultType() { + return Type.EVENT; + } + + @Override + public List values() { + return values; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java index 8b40d178351..36c18e24432 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.eql.execution.payload; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.eql.session.Payload; -import org.elasticsearch.xpack.eql.session.Results.Type; import java.util.Collections; import java.util.List; @@ -38,7 +37,7 @@ public class ReversePayload implements Payload { } @Override - public List values() { + public List values() { return delegate.values(); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java deleted file mode 100644 index d97e882a20f..00000000000 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.eql.execution.payload; - -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchSortValues; -import org.elasticsearch.xpack.eql.session.Results.Type; - -import java.util.Arrays; -import java.util.List; - -public class SearchResponsePayload extends AbstractPayload { - - private final List hits; - - public SearchResponsePayload(SearchResponse response) { - super(response.isTimedOut(), response.getTook()); - hits = Arrays.asList(response.getHits().getHits()); - // clean hits - SearchSortValues sortValues = new SearchSortValues(new Object[0], new DocValueFormat[0]); - for (SearchHit hit : hits) { - hit.sortValues(sortValues); - } - } - - @Override - public Type resultType() { - return Type.SEARCH_HIT; - } - - @SuppressWarnings("unchecked") - @Override - public List values() { - return (List) hits; - } -} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/Sequence.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/Sequence.java deleted file mode 100644 index 09a398137ac..00000000000 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/Sequence.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.eql.execution.payload; - -import org.elasticsearch.search.SearchHit; - -public class Sequence { - - private final Iterable events; - - public Sequence(Iterable event) { - this.events = event; - } - - public Iterable event() { - return events; - } -} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/AsEventListener.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/AsEventListener.java new file mode 100644 index 00000000000..c7cc695df6f --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/AsEventListener.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.xpack.eql.execution.payload.EventPayload; +import org.elasticsearch.xpack.eql.session.Payload; + +public class AsEventListener implements ActionListener { + + private final ActionListener listener; + + public AsEventListener(ActionListener listener) { + this.listener = listener; + } + + @Override + public void onResponse(SearchResponse response) { + listener.onResponse(new EventPayload(response)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java index ab68cb56af0..e6529613500 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java @@ -12,8 +12,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; -import org.elasticsearch.xpack.eql.execution.payload.SearchResponsePayload; -import org.elasticsearch.xpack.eql.session.Payload; import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.logSearchResponse; @@ -21,9 +19,9 @@ public class BasicListener implements ActionListener { private static final Logger log = RuntimeUtils.QUERY_LOG; - private final ActionListener listener; + private final ActionListener listener; - public BasicListener(ActionListener listener) { + public BasicListener(ActionListener listener) { this.listener = listener; } @@ -37,7 +35,7 @@ public class BasicListener implements ActionListener { if (log.isTraceEnabled()) { logSearchResponse(response, log); } - listener.onResponse(new SearchResponsePayload(response)); + listener.onResponse(response); } } catch (Exception ex) { onFailure(ex); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java index 671966ef071..b59852f6ff5 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java @@ -13,17 +13,13 @@ import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetRequest.Item; import org.elasticsearch.action.get.MultiGetRequestBuilder; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.text.Text; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.xpack.eql.session.EqlConfiguration; import org.elasticsearch.xpack.eql.session.EqlSession; -import org.elasticsearch.xpack.eql.session.Payload; import org.elasticsearch.xpack.ql.util.StringUtils; import java.util.ArrayList; @@ -47,7 +43,7 @@ public class BasicQueryClient implements QueryClient { } @Override - public void query(QueryRequest request, ActionListener listener) { + public void query(QueryRequest request, ActionListener listener) { SearchSourceBuilder searchSource = request.searchSource(); // set query timeout searchSource.timeout(cfg.requestTimeout()); @@ -64,7 +60,7 @@ public class BasicQueryClient implements QueryClient { } @Override - public void get(Iterable> refs, ActionListener>> listener) { + public void get(Iterable> refs, ActionListener>> listener) { MultiGetRequestBuilder requestBuilder = client.prepareMultiGet(); // no need for real-time requestBuilder.setRealtime(false) @@ -84,29 +80,18 @@ public class BasicQueryClient implements QueryClient { final int listSize = sz; client.multiGet(requestBuilder.request(), wrap(r -> { - List> hits = new ArrayList<>(r.getResponses().length / listSize); + List> hits = new ArrayList<>(r.getResponses().length / listSize); - List sequence = new ArrayList<>(listSize); + List sequence = new ArrayList<>(listSize); int counter = 0; - Text type = new Text("_doc"); for (MultiGetItemResponse mgr : r.getResponses()) { if (mgr.isFailed()) { listener.onFailure(mgr.getFailure().getFailure()); return; } - GetResponse response = mgr.getResponse(); - SearchHit hit = new SearchHit(-1, response.getId(), type, null, null); - hit.sourceRef(response.getSourceInternal()); - // need to create these objects to set the index - hit.shard(new SearchShardTarget(null, new ShardId(response.getIndex(), "", -1), null, null)); - hit.setSeqNo(response.getSeqNo()); - hit.setPrimaryTerm(response.getPrimaryTerm()); - hit.version(response.getVersion()); - - - sequence.add(hit); + sequence.add(mgr.getResponse()); if (++counter == listSize) { counter = 0; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java index 19e14d7b4be..c9855e2a34d 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryClient.java @@ -7,8 +7,8 @@ package org.elasticsearch.xpack.eql.execution.search; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.xpack.eql.session.Payload; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; import java.util.List; @@ -17,7 +17,7 @@ import java.util.List; */ public interface QueryClient { - void query(QueryRequest request, ActionListener listener); + void query(QueryRequest request, ActionListener listener); - void get(Iterable> refs, ActionListener>> listener); + void get(Iterable> refs, ActionListener>> listener); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java index 2bfa6a3ba13..82957ebcd3c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; @@ -27,6 +28,7 @@ import org.elasticsearch.xpack.ql.expression.gen.pipeline.ReferenceInput; import org.elasticsearch.xpack.ql.index.IndexResolver; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; @@ -105,4 +107,8 @@ public final class RuntimeUtils { includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS) .request(); } + + public static List searchHits(SearchResponse response) { + return Arrays.asList(response.getHits().getHits()); + } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java index 783f6c6607b..f8fbb57686c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java @@ -6,10 +6,10 @@ package org.elasticsearch.xpack.eql.execution.sequence; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event; import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload; -import org.elasticsearch.xpack.eql.session.Results.Type; import java.util.ArrayList; import java.util.List; @@ -18,14 +18,18 @@ class SequencePayload extends AbstractPayload { private final List values; - SequencePayload(List sequences, List> searchHits, boolean timedOut, TimeValue timeTook) { + SequencePayload(List sequences, List> docs, boolean timedOut, TimeValue timeTook) { super(timedOut, timeTook); values = new ArrayList<>(sequences.size()); for (int i = 0; i < sequences.size(); i++) { Sequence s = sequences.get(i); - List hits = searchHits.get(i); - values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asList(), hits)); + List hits = docs.get(i); + List events = new ArrayList<>(hits.size()); + for (GetResponse hit : hits) { + events.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceAsBytesRef())); + } + values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asList(), events)); } } @@ -34,9 +38,8 @@ class SequencePayload extends AbstractPayload { return Type.SEQUENCE; } - @SuppressWarnings("unchecked") @Override - public List values() { - return (List) values; + public List values() { + return values; } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java index ef5af17f9d1..0a249ddd836 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.execution.sequence; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; @@ -20,13 +21,14 @@ import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.eql.execution.search.QueryClient; import org.elasticsearch.xpack.eql.session.EmptyPayload; import org.elasticsearch.xpack.eql.session.Payload; -import org.elasticsearch.xpack.eql.session.Results.Type; +import org.elasticsearch.xpack.eql.session.Payload.Type; import org.elasticsearch.xpack.eql.util.ReversedIterator; import java.util.Iterator; import java.util.List; import static org.elasticsearch.action.ActionListener.wrap; +import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.searchHits; /** * Time-based window encapsulating query creation and advancement. @@ -95,9 +97,9 @@ public class TumblingWindow implements Executable { client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure)); } - private void baseCriterion(int baseStage, Payload p, ActionListener listener) { + private void baseCriterion(int baseStage, SearchResponse r, ActionListener listener) { Criterion base = criteria.get(baseStage); - List hits = p.values(); + List hits = searchHits(r); log.trace("Found [{}] hits", hits.size()); @@ -167,8 +169,8 @@ public class TumblingWindow implements Executable { log.trace("Querying until stage {}", request); - client.query(request, wrap(p -> { - List hits = p.values(); + client.query(request, wrap(r -> { + List hits = searchHits(r); log.trace("Found [{}] hits", hits.size()); // no more results for until - let the other queries run @@ -208,8 +210,8 @@ public class TumblingWindow implements Executable { log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request); - client.query(request, wrap(p -> { - List hits = p.values(); + client.query(request, wrap(r -> { + List hits = searchHits(r); log.trace("Found [{}] hits", hits.size()); @@ -298,8 +300,8 @@ public class TumblingWindow implements Executable { return; } - client.get(hits(completed), wrap(searchHits -> { - listener.onResponse(new SequencePayload(completed, searchHits, false, timeTook())); + client.get(hits(completed), wrap(hits -> { + listener.onResponse(new SequencePayload(completed, hits, false, timeTook())); matcher.clear(); }, listener::onFailure)); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java index 5054d3c9e66..1291669f130 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java @@ -11,7 +11,7 @@ import org.elasticsearch.xpack.eql.plan.logical.Join; import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter; import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset; import org.elasticsearch.xpack.eql.plan.physical.LocalRelation; -import org.elasticsearch.xpack.eql.session.Results; +import org.elasticsearch.xpack.eql.session.Payload.Type; import org.elasticsearch.xpack.eql.util.MathUtils; import org.elasticsearch.xpack.eql.util.StringUtils; import org.elasticsearch.xpack.ql.expression.Expression; @@ -169,7 +169,7 @@ public class Optimizer extends RuleExecutor { @Override protected LogicalPlan rule(UnaryPlan plan) { if ((plan instanceof KeyedFilter) == false && plan.child() instanceof LocalRelation) { - return new LocalRelation(plan.source(), plan.output(), Results.Type.SEARCH_HIT); + return new LocalRelation(plan.source(), plan.output(), Type.EVENT); } return plan; } @@ -335,7 +335,7 @@ public class Optimizer extends RuleExecutor { // check for empty filters for (KeyedFilter filter : plan.queries()) { if (filter.anyMatch(LocalRelation.class::isInstance)) { - return new LocalRelation(plan.source(), plan.output(), Results.Type.SEQUENCE); + return new LocalRelation(plan.source(), plan.output(), Type.SEQUENCE); } } return plan; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java index fdfd40ed790..e9fb0545cf6 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java @@ -10,6 +10,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xpack.eql.execution.search.AsEventListener; import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient; import org.elasticsearch.xpack.eql.execution.search.QueryRequest; import org.elasticsearch.xpack.eql.execution.search.ReverseListener; @@ -61,7 +62,7 @@ public class EsQueryExec extends LeafExec { // endpoint - fetch all source QueryRequest request = () -> source(session).fetchSource(FetchSourceContext.FETCH_SOURCE); listener = shouldReverse(request) ? new ReverseListener(listener) : listener; - new BasicQueryClient(session).query(request, listener); + new BasicQueryClient(session).query(request, new AsEventListener(listener)); } private boolean shouldReverse(QueryRequest query) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java index 3ef33912c33..084d21c4349 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java @@ -10,7 +10,6 @@ import org.elasticsearch.xpack.eql.session.EmptyExecutable; import org.elasticsearch.xpack.eql.session.EqlSession; import org.elasticsearch.xpack.eql.session.Executable; import org.elasticsearch.xpack.eql.session.Payload; -import org.elasticsearch.xpack.eql.session.Results; import org.elasticsearch.xpack.ql.expression.Attribute; import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.ql.tree.NodeInfo; @@ -27,10 +26,10 @@ public class LocalRelation extends LogicalPlan implements Executable { private final Executable executable; public LocalRelation(Source source, List output) { - this(source, output, Results.Type.SEARCH_HIT); + this(source, output, Payload.Type.EVENT); } - public LocalRelation(Source source, List output, Results.Type resultType) { + public LocalRelation(Source source, List output, Payload.Type resultType) { this(source, new EmptyExecutable(output, resultType)); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index 3280652f47f..9bfeac2b654 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -125,7 +125,7 @@ public class TransportEqlSearchAction extends HandledTransportAction List values() { + public List values() { return emptyList(); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java index 920b1146525..49cf430d761 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java @@ -11,16 +11,21 @@ import org.elasticsearch.common.unit.TimeValue; import java.util.List; /** - * Container for internal results. Can be low-level such as SearchHits or Sequences. - * Generalized to allow reuse and internal pluggability. + * Container for final results. Used for completed data, such as Events or Sequences. */ public interface Payload { - Results.Type resultType(); + enum Type { + EVENT, + SEQUENCE, + COUNT; + } + + Type resultType(); boolean timedOut(); TimeValue timeTook(); - List values(); + List values(); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java index cd49b2749a3..9b420251268 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java @@ -9,20 +9,15 @@ package org.elasticsearch.xpack.eql.session; import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits.Relation; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Count; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event; import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence; +import org.elasticsearch.xpack.eql.session.Payload.Type; import java.util.List; public class Results { - public enum Type { - SEARCH_HIT, - SEQUENCE, - COUNT; - } - private final TotalHits totalHits; private final List results; private final boolean timedOut; @@ -47,8 +42,8 @@ public class Results { } @SuppressWarnings("unchecked") - public List searchHits() { - return type == Type.SEARCH_HIT ? (List) results : null; + public List events() { + return type == Type.EVENT ? (List) results : null; } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java index b1346c4d66d..cd1bc13bd7e 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java @@ -6,26 +6,78 @@ package org.elasticsearch.xpack.eql.action; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; public class EqlSearchResponseTests extends AbstractSerializingTestCase { - static List randomEvents() { + private static class RandomSource implements ToXContentObject { + + private final String key; + private final String value; + + RandomSource(Supplier randomStringSupplier) { + this.key = randomStringSupplier.get(); + this.value = randomStringSupplier.get(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(key, value); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RandomSource other = (RandomSource) obj; + return Objects.equals(key, other.key) && Objects.equals(value, other.value); + } + + public BytesReference toBytes(XContentType type) { + try (XContentBuilder builder = XContentBuilder.builder(type.xContent())) { + toXContent(builder, ToXContent.EMPTY_PARAMS); + return BytesReference.bytes(builder); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + + static List randomEvents(XContentType xType) { int size = randomIntBetween(1, 10); - List hits = null; + List hits = null; if (randomBoolean()) { hits = new ArrayList<>(); for (int i = 0; i < size; i++) { - hits.add(new SearchHit(i, randomAlphaOfLength(10), null, new HashMap<>(), new HashMap<>())); + BytesReference bytes = new RandomSource(() -> randomAlphaOfLength(10)).toBytes(xType); + hits.add(new Event(String.valueOf(i), randomAlphaOfLength(10), bytes)); } } if (randomBoolean()) { @@ -34,9 +86,14 @@ public class EqlSearchResponseTests extends AbstractSerializingTestCase seq = null; if (randomBoolean()) { @@ -76,7 +133,7 @@ public class EqlSearchResponseTests extends AbstractSerializingTestCase hits; private final Map> events; - TestPayload(Map> events) { + EventsAsHits(Map> events) { this.events = events; this.hits = new ArrayList<>(events.size()); @@ -147,25 +153,8 @@ public class SequenceSpecTests extends ESTestCase { } } - @Override - public Type resultType() { - return Type.SEARCH_HIT; - } - - @Override - public boolean timedOut() { - return false; - } - - @Override - public TimeValue timeTook() { - return TimeValue.ZERO; - } - - @SuppressWarnings("unchecked") - @Override - public List values() { - return (List) hits; + public List hits() { + return hits; } @Override @@ -177,17 +166,23 @@ public class SequenceSpecTests extends ESTestCase { class TestQueryClient implements QueryClient { @Override - public void query(QueryRequest r, ActionListener l) { + public void query(QueryRequest r, ActionListener l) { int ordinal = r.searchSource().size(); if (ordinal != Integer.MAX_VALUE) { r.searchSource().size(Integer.MAX_VALUE); } Map> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap(); - l.onResponse(new TestPayload(evs)); + + EventsAsHits eah = new EventsAsHits(evs); + SearchHits searchHits = new SearchHits(eah.hits.toArray(new SearchHit[0]), new TotalHits(eah.hits.size(), Relation.EQUAL_TO), + 0.0f); + SearchResponseSections internal = new SearchResponseSections(searchHits, null, null, false, false, null, 0); + SearchResponse s = new SearchResponse(internal, null, 0, 1, 0, 0, null, Clusters.EMPTY); + l.onResponse(s); } @Override - public void get(Iterable> refs, ActionListener>> listener) { + public void get(Iterable> refs, ActionListener>> listener) { //no-op } } @@ -240,7 +235,7 @@ public class SequenceSpecTests extends ESTestCase { List match = matches.get(i); List returned = new ArrayList<>(); for (int j = 0; j < match.size(); j++) { - int key = ((Number) TimestampExtractor.INSTANCE.extract(s.events().get(j))).intValue(); + int key = Integer.parseInt(s.events().get(j).id()); returned.add(allEvents.get(key)); }