EQL: Replace SearchHit in response with Event (#61428) (#61522)

The building block of the eql response is currently the SearchHit. This
is a problem since it is tied to an actual search, and thus has scoring,
highlighting, shard information and a lot of other things that are not
relevant for EQL.
This becomes a problem when doing sequence queries since the response is
not generated from one search query and thus there are no SearchHits to
speak of.
Emulating one is not just conceptually incorrect but also problematic
since most of the data is missed or made-up.

As such this PR introduces a simple class, Event, that maps nicely to
the terminology while hiding the ES internals (the use of SearchHit or
GetResult/GetResponse depending on the API used).

Fix #59764
Fix #59779

Co-authored-by: Igor Motov <igor@motovs.org>
(cherry picked from commit 997376fbe6ef2894038968842f5e0635731ede65)
This commit is contained in:
Costin Leau 2020-08-25 17:32:42 +03:00 committed by GitHub
parent f22ddf822e
commit bff3c7470e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 598 additions and 364 deletions

View File

@ -22,17 +22,22 @@ package org.elasticsearch.client.eql;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.InstantiatingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@ -133,6 +138,95 @@ public class EqlSearchResponse {
public int hashCode() {
return Objects.hash(hits, tookInMillis, isTimeout);
}
// Event
public static class Event {
private static final class Fields {
static final String INDEX = GetResult._INDEX;
static final String ID = GetResult._ID;
static final String SOURCE = SourceFieldMapper.NAME;
}
private static final ParseField INDEX = new ParseField(Fields.INDEX);
private static final ParseField ID = new ParseField(Fields.ID);
private static final ParseField SOURCE = new ParseField(Fields.SOURCE);
private static final ConstructingObjectParser<Event, Void> PARSER =
new ConstructingObjectParser<>("eql/search_response_event", true,
args -> new Event((String) args[0], (String) args[1], (BytesReference) args[2]));
static {
PARSER.declareString(constructorArg(), INDEX);
PARSER.declareString(constructorArg(), ID);
PARSER.declareObject(constructorArg(), (p, c) -> {
try (XContentBuilder builder = XContentBuilder.builder(p.contentType().xContent())) {
builder.copyCurrentStructure(p);
return BytesReference.bytes(builder);
}
}, SOURCE);
}
private final String index;
private final String id;
private final BytesReference source;
private Map<String, Object> sourceAsMap;
public Event(String index, String id, BytesReference source) {
this.index = index;
this.id = id;
this.source = source;
}
public static Event fromXContent(XContentParser parser) throws IOException {
return PARSER.apply(parser, null);
}
public String index() {
return index;
}
public String id() {
return id;
}
public BytesReference source() {
return source;
}
public Map<String, Object> sourceAsMap() {
if (source == null) {
return null;
}
if (sourceAsMap != null) {
return sourceAsMap;
}
sourceAsMap = SourceLookup.sourceAsMap(source);
return sourceAsMap;
}
@Override
public int hashCode() {
return Objects.hash(index, id, source);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
EqlSearchResponse.Event other = (EqlSearchResponse.Event) obj;
return Objects.equals(index, other.index)
&& Objects.equals(id, other.id)
&& Objects.equals(source, other.source);
}
}
// Sequence
public static class Sequence {
@ -149,20 +243,20 @@ public class EqlSearchResponse {
args -> {
int i = 0;
@SuppressWarnings("unchecked") List<Object> joinKeys = (List<Object>) args[i++];
@SuppressWarnings("unchecked") List<SearchHit> events = (List<SearchHit>) args[i];
@SuppressWarnings("unchecked") List<Event> events = (List<Event>) args[i];
return new EqlSearchResponse.Sequence(joinKeys, events);
});
static {
PARSER.declareFieldArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> XContentParserUtils.parseFieldsValue(p),
JOIN_KEYS, ObjectParser.ValueType.VALUE_ARRAY);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), EVENTS);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), EVENTS);
}
private final List<Object> joinKeys;
private final List<SearchHit> events;
private final List<Event> events;
public Sequence(List<Object> joinKeys, List<SearchHit> events) {
public Sequence(List<Object> joinKeys, List<Event> events) {
this.joinKeys = joinKeys == null ? Collections.emptyList() : joinKeys;
this.events = events == null ? Collections.emptyList() : events;
}
@ -171,6 +265,19 @@ public class EqlSearchResponse {
return PARSER.apply(parser, null);
}
public List<Object> joinKeys() {
return joinKeys;
}
public List<Event> events() {
return events;
}
@Override
public int hashCode() {
return Objects.hash(joinKeys, events);
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -183,19 +290,6 @@ public class EqlSearchResponse {
return Objects.equals(joinKeys, that.joinKeys)
&& Objects.equals(events, that.events);
}
@Override
public int hashCode() {
return Objects.hash(joinKeys, events);
}
public List<Object> joinKeys() {
return joinKeys;
}
public List<SearchHit> events() {
return events;
}
}
// Count
@ -241,6 +335,23 @@ public class EqlSearchResponse {
return PARSER.apply(parser, null);
}
public int count() {
return count;
}
public List<Object> keys() {
return keys;
}
public float percent() {
return percent;
}
@Override
public int hashCode() {
return Objects.hash(count, keys, percent);
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -254,30 +365,13 @@ public class EqlSearchResponse {
&& Objects.equals(keys, that.keys)
&& Objects.equals(percent, that.percent);
}
@Override
public int hashCode() {
return Objects.hash(count, keys, percent);
}
public int count() {
return count;
}
public List<Object> keys() {
return keys;
}
public float percent() {
return percent;
}
}
// Hits
public static class Hits {
public static final Hits EMPTY = new Hits(null, null, null, null);
private final List<SearchHit> events;
private final List<Event> events;
private final List<Sequence> sequences;
private final List<Count> counts;
private final TotalHits totalHits;
@ -289,7 +383,7 @@ public class EqlSearchResponse {
static final String COUNTS = "counts";
}
public Hits(@Nullable List<SearchHit> events, @Nullable List<Sequence> sequences, @Nullable List<Count> counts,
public Hits(@Nullable List<Event> events, @Nullable List<Sequence> sequences, @Nullable List<Count> counts,
@Nullable TotalHits totalHits) {
this.events = events;
this.sequences = sequences;
@ -301,15 +395,15 @@ public class EqlSearchResponse {
new ConstructingObjectParser<>("eql/search_response_count", true,
args -> {
int i = 0;
@SuppressWarnings("unchecked") List<SearchHit> searchHits = (List<SearchHit>) args[i++];
@SuppressWarnings("unchecked") List<Event> events = (List<Event>) args[i++];
@SuppressWarnings("unchecked") List<Sequence> sequences = (List<Sequence>) args[i++];
@SuppressWarnings("unchecked") List<Count> counts = (List<Count>) args[i++];
TotalHits totalHits = (TotalHits) args[i];
return new EqlSearchResponse.Hits(searchHits, sequences, counts, totalHits);
return new EqlSearchResponse.Hits(events, sequences, counts, totalHits);
});
static {
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p),
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p),
new ParseField(Fields.EVENTS));
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Sequence.PARSER,
new ParseField(Fields.SEQUENCES));
@ -323,6 +417,27 @@ public class EqlSearchResponse {
return PARSER.parse(parser, null);
}
public List<Event> events() {
return this.events;
}
public List<Sequence> sequences() {
return this.sequences;
}
public List<Count> counts() {
return this.counts;
}
public TotalHits totalHits() {
return this.totalHits;
}
@Override
public int hashCode() {
return Objects.hash(events, sequences, counts, totalHits);
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -337,26 +452,5 @@ public class EqlSearchResponse {
&& Objects.equals(counts, that.counts)
&& Objects.equals(totalHits, that.totalHits);
}
@Override
public int hashCode() {
return Objects.hash(events, sequences, counts, totalHits);
}
public List<SearchHit> events() {
return this.events;
}
public List<Sequence> sequences() {
return this.sequences;
}
public List<Count> counts() {
return this.counts;
}
public TotalHits totalHits() {
return this.totalHits;
}
}
}

View File

@ -29,13 +29,13 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.eql.EqlSearchRequest;
import org.elasticsearch.client.eql.EqlSearchResponse;
import org.elasticsearch.client.eql.EqlSearchResponse.Event;
import org.elasticsearch.client.eql.EqlStatsRequest;
import org.elasticsearch.client.eql.EqlStatsResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.junit.Before;
import java.io.IOException;
@ -121,8 +121,8 @@ public class EqlIT extends ESRestHighLevelClientTestCase {
assertResponse(response, RECORD_COUNT / DIVIDER);
// test the content of the hits
for (SearchHit hit : response.hits().events()) {
final Map<String, Object> source = hit.getSourceAsMap();
for (Event hit : response.hits().events()) {
final Map<String, Object> source = hit.sourceAsMap();
final Map<String, Object> event = (Map<String, Object>) source.get("event");
assertThat(event.get("category"), equalTo("process"));
@ -147,8 +147,8 @@ public class EqlIT extends ESRestHighLevelClientTestCase {
assertResponse(response, 3);
// test the content of the hits
for (SearchHit hit : response.hits().events()) {
final Map<String, Object> source = hit.getSourceAsMap();
for (Event hit : response.hits().events()) {
final Map<String, Object> source = hit.sourceAsMap();
final Map<String, Object> event = (Map<String, Object>) source.get("event");
assertThat(event.get("category"), equalTo("process"));

View File

@ -21,15 +21,19 @@ package org.elasticsearch.client.eql;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
@ -38,13 +42,59 @@ import static org.hamcrest.Matchers.is;
public class EqlSearchResponseTests extends AbstractResponseTestCase<org.elasticsearch.xpack.eql.action.EqlSearchResponse,
EqlSearchResponse> {
static List<SearchHit> randomEvents() {
private static class RandomSource implements ToXContentObject {
private final String key;
private final String value;
RandomSource(Supplier<String> randomStringSupplier) {
this.key = randomStringSupplier.get();
this.value = randomStringSupplier.get();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(key, value);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(key, value);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
RandomSource other = (RandomSource) obj;
return Objects.equals(key, other.key) && Objects.equals(value, other.value);
}
public BytesReference toBytes(XContentType type) {
try (XContentBuilder builder = XContentBuilder.builder(type.xContent())) {
toXContent(builder, ToXContent.EMPTY_PARAMS);
return BytesReference.bytes(builder);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
static List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event> randomEvents(XContentType xType) {
int size = randomIntBetween(1, 10);
List<SearchHit> hits = null;
List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event> hits = null;
if (randomBoolean()) {
hits = new ArrayList<>();
for (int i = 0; i < size; i++) {
hits.add(new SearchHit(i, randomAlphaOfLength(10), null, new HashMap<>(), new HashMap<>()));
BytesReference bytes = new RandomSource(() -> randomAlphaOfLength(10)).toBytes(xType);
hits.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event(String.valueOf(i), randomAlphaOfLength(10), bytes));
}
}
if (randomBoolean()) {
@ -53,10 +103,10 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase<org.elastic
return hits;
}
public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomEventsResponse(TotalHits totalHits) {
public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomEventsResponse(TotalHits totalHits, XContentType xType) {
org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits hits = null;
if (randomBoolean()) {
hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(randomEvents(), null, null, totalHits);
hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(randomEvents(xType), null, null, totalHits);
}
if (randomBoolean()) {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
@ -66,7 +116,8 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase<org.elastic
}
}
public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomSequencesResponse(TotalHits totalHits) {
public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomSequencesResponse(TotalHits totalHits,
XContentType xType) {
int size = randomIntBetween(1, 10);
List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> seq = null;
if (randomBoolean()) {
@ -77,7 +128,7 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase<org.elastic
if (randomBoolean()) {
joins = Arrays.asList(randomFrom(randoms).get());
}
seq.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(joins, randomEvents()));
seq.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(joins, randomEvents(xType)));
}
}
org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits hits = null;
@ -128,13 +179,13 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase<org.elastic
}
}
public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomInstance(TotalHits totalHits) {
public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomInstance(TotalHits totalHits, XContentType xType) {
int type = between(0, 2);
switch (type) {
case 0:
return createRandomEventsResponse(totalHits);
return createRandomEventsResponse(totalHits, xType);
case 1:
return createRandomSequencesResponse(totalHits);
return createRandomSequencesResponse(totalHits, xType);
case 2:
return createRandomCountResponse(totalHits);
default:
@ -148,7 +199,7 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase<org.elastic
if (randomBoolean()) {
totalHits = new TotalHits(randomIntBetween(100, 1000), TotalHits.Relation.EQUAL_TO);
}
return createRandomInstance(totalHits);
return createRandomInstance(totalHits, xContentType);
}
@Override
@ -175,11 +226,8 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase<org.elastic
if (serverTestInstance.hits().events() == null) {
assertNull(clientInstance.hits().events());
} else {
assertThat(serverTestInstance.hits().events().size(), equalTo(clientInstance.hits().events().size()));
for (int i = 0; i < serverTestInstance.hits().events().size(); i++) {
assertThat(serverTestInstance.hits().events().get(i), is(clientInstance.hits().events().get(i)));
assertEvents(serverTestInstance.hits().events(), clientInstance.hits().events());
}
}
if (serverTestInstance.hits().sequences() == null) {
assertNull(clientInstance.hits().sequences());
} else {
@ -187,8 +235,19 @@ public class EqlSearchResponseTests extends AbstractResponseTestCase<org.elastic
for (int i = 0; i < serverTestInstance.hits().sequences().size(); i++) {
assertThat(serverTestInstance.hits().sequences().get(i).joinKeys(),
is(clientInstance.hits().sequences().get(i).joinKeys()));
assertThat(serverTestInstance.hits().sequences().get(i).events(), is(clientInstance.hits().sequences().get(i).events()));
assertEvents(serverTestInstance.hits().sequences().get(i).events(), clientInstance.hits().sequences().get(i).events());
}
}
}
private void assertEvents(
List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event> serverEvents,
List<EqlSearchResponse.Event> clientEvents
) {
assertThat(serverEvents.size(), equalTo(clientEvents.size()));
for (int j = 0; j < serverEvents.size(); j++) {
assertThat(
SourceLookup.sourceAsMap(serverEvents.get(j).source()), is(clientEvents.get(j).sourceAsMap()));
}
}
}

View File

@ -166,9 +166,7 @@ The response also includes other valuable information about how the
"events": [
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "gl5MJXMBMk1dGnErnBW8",
"_score": null,
"_source": {
"process": {
"parent": {
@ -246,9 +244,7 @@ The query matches an event, confirming `scrobj.dll` was later loaded by
"events": [
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "ol5MJXMBMk1dGnErnBW8",
"_score": null,
"_source": {
"process": {
"name": "regsvr32.exe",
@ -333,12 +329,7 @@ The query matches a sequence, indicating the attack likely succeeded.
"events": [
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "gl5MJXMBMk1dGnErnBW8",
"_version": 1,
"_seq_no": 3,
"_primary_term": 1,
"_score": null,
"_source": {
"process": {
"parent": {
@ -368,12 +359,7 @@ The query matches a sequence, indicating the attack likely succeeded.
},
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "ol5MJXMBMk1dGnErnBW8",
"_version": 1,
"_seq_no": 5,
"_primary_term": 1,
"_score": null,
"_source": {
"process": {
"name": "regsvr32.exe",
@ -393,12 +379,7 @@ The query matches a sequence, indicating the attack likely succeeded.
},
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "EF5MJXMBMk1dGnErnBa9",
"_version": 1,
"_seq_no": 24,
"_primary_term": 1,
"_score": null,
"_source": {
"process": {
"name": "regsvr32.exe",
@ -437,8 +418,5 @@ The query matches a sequence, indicating the attack likely succeeded.
----
// TESTRESPONSE[s/"took": 25/"took": $body.took/]
// TESTRESPONSE[s/"_id": "gl5MJXMBMk1dGnErnBW8"/"_id": $body.hits.sequences.0.events.0._id/]
// TESTRESPONSE[s/"_seq_no": 3/"_seq_no": $body.hits.sequences.0.events.0._seq_no/]
// TESTRESPONSE[s/"_id": "ol5MJXMBMk1dGnErnBW8"/"_id": $body.hits.sequences.0.events.1._id/]
// TESTRESPONSE[s/"_seq_no": 5/"_seq_no": $body.hits.sequences.0.events.1._seq_no/]
// TESTRESPONSE[s/"_id": "EF5MJXMBMk1dGnErnBa9"/"_id": $body.hits.sequences.0.events.2._id/]
// TESTRESPONSE[s/"_seq_no": 24/"_seq_no": $body.hits.sequences.0.events.2._seq_no/]

View File

@ -442,11 +442,6 @@ doesnt overwrite a newer version. See <<optimistic-concurrency-control>>.
(integer)
Primary term assigned to the document. See <<optimistic-concurrency-control>>.
`_score`::
(float)
Positive 32-bit floating point number used to determine the relevance of the
event. See <<relevance-scores>>.
`_source`::
(object)
Original JSON body passed for the event at index time.
@ -472,11 +467,6 @@ Name of the index containing the event.
Unique identifier for the event.
This ID is only unique within the index.
`_score`::
(float)
Positive 32-bit floating point number used to determine the relevance of the
event. See <<relevance-scores>>.
`_source`::
(object)
Original JSON body passed for the event at index time.
@ -531,9 +521,7 @@ the events in ascending, lexicographic order.
"events": [
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "babI3XMBI9IjHuIqU0S_",
"_score": null,
"_source": {
"@timestamp": "2099-12-06T11:04:05.000Z",
"event": {
@ -550,9 +538,7 @@ the events in ascending, lexicographic order.
},
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "b6bI3XMBI9IjHuIqU0S_",
"_score": null,
"_source": {
"@timestamp": "2099-12-07T11:06:07.000Z",
"event": {
@ -634,12 +620,7 @@ shared `process.pid` value for each matching event.
"events": [
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "AtOJ4UjUBAAx3XR5kcCM",
"_version": 1,
"_seq_no": 1,
"_primary_term": 1,
"_score": null,
"_source": {
"@timestamp": "2099-12-06T11:04:07.000Z",
"event": {
@ -663,12 +644,7 @@ shared `process.pid` value for each matching event.
},
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "OQmfCaduce8zoHT93o4H",
"_version": 1,
"_seq_no": 3,
"_primary_term": 1,
"_score": null,
"_source": {
"@timestamp": "2099-12-07T11:07:09.000Z",
"event": {

View File

@ -88,9 +88,7 @@ ascending order.
"events": [
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "OQmfCaduce8zoHT93o4H",
"_score": null,
"_source": {
"@timestamp": "2099-12-07T11:07:09.000Z",
"event": {
@ -108,9 +106,7 @@ ascending order.
},
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "xLkCaj4EujzdNSxfYLbO",
"_score": null,
"_source": {
"@timestamp": "2099-12-07T11:07:10.000Z",
"event": {
@ -189,12 +185,7 @@ The API returns the following response. Matching sequences are included in the
"events": [
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "OQmfCaduce8zoHT93o4H",
"_version": 1,
"_seq_no": 3,
"_primary_term": 1,
"_score": null,
"_source": {
"@timestamp": "2099-12-07T11:07:09.000Z",
"event": {
@ -212,12 +203,7 @@ The API returns the following response. Matching sequences are included in the
},
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "yDwnGIJouOYGBzP0ZE9n",
"_version": 1,
"_seq_no": 4,
"_primary_term": 1,
"_score": null,
"_source": {
"@timestamp": "2099-12-07T11:07:10.000Z",
"event": {
@ -325,12 +311,7 @@ contains the shared `process.pid` value for each matching event.
"events": [
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "OQmfCaduce8zoHT93o4H",
"_version": 1,
"_seq_no": 3,
"_primary_term": 1,
"_score": null,
"_source": {
"@timestamp": "2099-12-07T11:07:09.000Z",
"event": {
@ -348,12 +329,7 @@ contains the shared `process.pid` value for each matching event.
},
{
"_index": "my-index-000001",
"_type": "_doc",
"_id": "yDwnGIJouOYGBzP0ZE9n",
"_version": 1,
"_seq_no": 4,
"_primary_term": 1,
"_score": null,
"_source": {
"@timestamp": "2099-12-07T11:07:10.000Z",
"event": {

View File

@ -15,11 +15,11 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.eql.EqlSearchRequest;
import org.elasticsearch.client.eql.EqlSearchResponse;
import org.elasticsearch.client.eql.EqlSearchResponse.Event;
import org.elasticsearch.client.eql.EqlSearchResponse.Hits;
import org.elasticsearch.client.eql.EqlSearchResponse.Sequence;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.After;
import org.junit.Before;
@ -133,7 +133,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
protected void assertResponse(EqlSearchResponse response) {
Hits hits = response.hits();
if (hits.events() != null) {
assertSearchHits(hits.events());
assertEvents(hits.events());
}
else if (hits.sequences() != null) {
assertSequences(hits.sequences());
@ -157,7 +157,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
return highLevelClient().eql();
}
protected void assertSearchHits(List<SearchHit> events) {
protected void assertEvents(List<Event> events) {
assertNotNull(events);
long[] expected = eventIds;
long[] actual = extractIds(events);
@ -166,20 +166,20 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
expected, actual);
}
private static long[] extractIds(List<SearchHit> events) {
private static long[] extractIds(List<Event> events) {
final int len = events.size();
final long ids[] = new long[len];
for (int i = 0; i < len; i++) {
ids[i] = ((Number) events.get(i).getSourceAsMap().get("serial_event_id")).longValue();
ids[i] = ((Number) events.get(i).sourceAsMap().get("serial_event_id")).longValue();
}
return ids;
}
protected void assertSequences(List<Sequence> sequences) {
List<SearchHit> events = sequences.stream()
List<Event> events = sequences.stream()
.flatMap(s -> s.events().stream())
.collect(toList());
assertSearchHits(events);
assertEvents(events);
}
private RestHighLevelClient highLevelClient() {

View File

@ -7,18 +7,12 @@
package org.elasticsearch.xpack.eql;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.Build;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.junit.BeforeClass;
public class EqlRestIT extends ESClientYamlSuiteTestCase {
@BeforeClass
public static void checkForSnapshot() {
assumeTrue("Only works on snapshot builds for now", Build.CURRENT.isSnapshot());
}
public EqlRestIT(final ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -20,9 +21,11 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.search.SearchHits;
import java.io.IOException;
@ -179,6 +182,110 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
return Strings.toString(this);
}
// Event
public static class Event implements Writeable, ToXContentObject {
private static final class Fields {
static final String INDEX = GetResult._INDEX;
static final String ID = GetResult._ID;
static final String SOURCE = SourceFieldMapper.NAME;
}
private static final ParseField INDEX = new ParseField(Fields.INDEX);
private static final ParseField ID = new ParseField(Fields.ID);
private static final ParseField SOURCE = new ParseField(Fields.SOURCE);
private static final ConstructingObjectParser<Event, Void> PARSER =
new ConstructingObjectParser<>("eql/search_response_event", true,
args -> new Event((String) args[0], (String) args[1], (BytesReference) args[2]));
static {
PARSER.declareString(constructorArg(), INDEX);
PARSER.declareString(constructorArg(), ID);
PARSER.declareObject(constructorArg(), (p, c) -> {
try (XContentBuilder builder = XContentBuilder.builder(p.contentType().xContent())) {
builder.copyCurrentStructure(p);
return BytesReference.bytes(builder);
}
}, SOURCE);
}
private final String index;
private final String id;
private final BytesReference source;
public Event(String index, String id, BytesReference source) {
this.index = index;
this.id = id;
this.source = source;
}
public Event(StreamInput in) throws IOException {
index = in.readString();
id = in.readString();
source = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeString(id);
out.writeBytesReference(source);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Fields.INDEX, index);
builder.field(Fields.ID, id);
// We have to use the deprecated version since we don't know the content type of the original source
XContentHelper.writeRawField(Fields.SOURCE, source, builder, params);
builder.endObject();
return builder;
}
public static Event fromXContent(XContentParser parser) throws IOException {
return PARSER.apply(parser, null);
}
public String index() {
return index;
}
public String id() {
return id;
}
public BytesReference source() {
return source;
}
@Override
public int hashCode() {
return Objects.hash(index, id, source);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
EqlSearchResponse.Event other = (EqlSearchResponse.Event) obj;
return Objects.equals(index, other.index)
&& Objects.equals(id, other.id)
&& Objects.equals(source, other.source);
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
// Sequence
public static class Sequence implements Writeable, ToXContentObject {
@ -195,20 +302,20 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
args -> {
int i = 0;
@SuppressWarnings("unchecked") List<Object> joinKeys = (List<Object>) args[i++];
@SuppressWarnings("unchecked") List<SearchHit> events = (List<SearchHit>) args[i];
@SuppressWarnings("unchecked") List<Event> events = (List<Event>) args[i];
return new EqlSearchResponse.Sequence(joinKeys, events);
});
static {
PARSER.declareFieldArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> XContentParserUtils.parseFieldsValue(p),
JOIN_KEYS, ObjectParser.ValueType.VALUE_ARRAY);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p), EVENTS);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p), EVENTS);
}
private final List<Object> joinKeys;
private final List<SearchHit> events;
private final List<Event> events;
public Sequence(List<Object> joinKeys, List<SearchHit> events) {
public Sequence(List<Object> joinKeys, List<Event> events) {
this.joinKeys = joinKeys == null ? Collections.emptyList() : joinKeys;
this.events = events == null ? Collections.emptyList() : events;
}
@ -216,7 +323,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
@SuppressWarnings("unchecked")
public Sequence(StreamInput in) throws IOException {
this.joinKeys = (List<Object>) in.readGenericValue();
this.events = in.readList(SearchHit::new);
this.events = in.readList(Event::new);
}
public static Sequence fromXContent(XContentParser parser) {
@ -236,8 +343,8 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
builder.field(Fields.JOIN_KEYS, joinKeys);
}
if (events.isEmpty() == false) {
builder.startArray(EVENTS.getPreferredName());
for (SearchHit event : events) {
builder.startArray(Fields.EVENTS);
for (Event event : events) {
event.toXContent(builder, params);
}
builder.endArray();
@ -268,7 +375,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
return joinKeys;
}
public List<SearchHit> events() {
public List<Event> events() {
return events;
}
}
@ -372,11 +479,10 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
}
}
// Hits
public static class Hits implements Writeable, ToXContentFragment {
public static final Hits EMPTY = new Hits(null, null, null, null);
private final List<SearchHit> events;
private final List<Event> events;
private final List<Sequence> sequences;
private final List<Count> counts;
private final TotalHits totalHits;
@ -389,7 +495,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
static final String COUNTS = "counts";
}
public Hits(@Nullable List<SearchHit> events, @Nullable List<Sequence> sequences, @Nullable List<Count> counts,
public Hits(@Nullable List<Event> events, @Nullable List<Sequence> sequences, @Nullable List<Count> counts,
@Nullable TotalHits totalHits) {
this.events = events;
this.sequences = sequences;
@ -404,7 +510,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
} else {
totalHits = null;
}
events = in.readBoolean() ? in.readList(SearchHit::new) : null;
events = in.readBoolean() ? in.readList(Event::new) : null;
sequences = in.readBoolean() ? in.readList(Sequence::new) : null;
counts = in.readBoolean() ? in.readList(Count::new) : null;
}
@ -440,15 +546,15 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
new ConstructingObjectParser<>("eql/search_response_count", true,
args -> {
int i = 0;
@SuppressWarnings("unchecked") List<SearchHit> searchHits = (List<SearchHit>) args[i++];
@SuppressWarnings("unchecked") List<Event> events = (List<Event>) args[i++];
@SuppressWarnings("unchecked") List<Sequence> sequences = (List<Sequence>) args[i++];
@SuppressWarnings("unchecked") List<Count> counts = (List<Count>) args[i++];
TotalHits totalHits = (TotalHits) args[i];
return new EqlSearchResponse.Hits(searchHits, sequences, counts, totalHits);
return new EqlSearchResponse.Hits(events, sequences, counts, totalHits);
});
static {
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> SearchHit.fromXContent(p),
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Event.fromXContent(p),
new ParseField(Fields.EVENTS));
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Sequence.PARSER,
new ParseField(Fields.SEQUENCES));
@ -473,7 +579,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
}
if (events != null) {
builder.startArray(Fields.EVENTS);
for (SearchHit event : events) {
for (Event event : events) {
event.toXContent(builder, params);
}
builder.endArray();
@ -509,7 +615,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
return Objects.hash(events, sequences, counts, totalHits);
}
public List<SearchHit> events() {
public List<Event> events() {
return this.events;
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.payload;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event;
import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
import java.util.ArrayList;
import java.util.List;
public class EventPayload extends AbstractPayload {
private final List<Event> values;
public EventPayload(SearchResponse response) {
super(response.isTimedOut(), response.getTook());
List<SearchHit> hits = RuntimeUtils.searchHits(response);
values = new ArrayList<>(hits.size());
for (SearchHit hit : hits) {
values.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceRef()));
}
}
@Override
public Type resultType() {
return Type.EVENT;
}
@Override
public List<Event> values() {
return values;
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.eql.execution.payload;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Results.Type;
import java.util.Collections;
import java.util.List;
@ -38,7 +37,7 @@ public class ReversePayload implements Payload {
}
@Override
public <V> List<V> values() {
public List<?> values() {
return delegate.values();
}
}

View File

@ -1,42 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.payload;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchSortValues;
import org.elasticsearch.xpack.eql.session.Results.Type;
import java.util.Arrays;
import java.util.List;
public class SearchResponsePayload extends AbstractPayload {
private final List<SearchHit> hits;
public SearchResponsePayload(SearchResponse response) {
super(response.isTimedOut(), response.getTook());
hits = Arrays.asList(response.getHits().getHits());
// clean hits
SearchSortValues sortValues = new SearchSortValues(new Object[0], new DocValueFormat[0]);
for (SearchHit hit : hits) {
hit.sortValues(sortValues);
}
}
@Override
public Type resultType() {
return Type.SEARCH_HIT;
}
@SuppressWarnings("unchecked")
@Override
public <V> List<V> values() {
return (List<V>) hits;
}
}

View File

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.payload;
import org.elasticsearch.search.SearchHit;
public class Sequence {
private final Iterable<SearchHit> events;
public Sequence(Iterable<SearchHit> event) {
this.events = event;
}
public Iterable<SearchHit> event() {
return events;
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.xpack.eql.execution.payload.EventPayload;
import org.elasticsearch.xpack.eql.session.Payload;
public class AsEventListener implements ActionListener<SearchResponse> {
private final ActionListener<Payload> listener;
public AsEventListener(ActionListener<Payload> listener) {
this.listener = listener;
}
@Override
public void onResponse(SearchResponse response) {
listener.onResponse(new EventPayload(response));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}

View File

@ -12,8 +12,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.payload.SearchResponsePayload;
import org.elasticsearch.xpack.eql.session.Payload;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.logSearchResponse;
@ -21,9 +19,9 @@ public class BasicListener implements ActionListener<SearchResponse> {
private static final Logger log = RuntimeUtils.QUERY_LOG;
private final ActionListener<Payload> listener;
private final ActionListener<SearchResponse> listener;
public BasicListener(ActionListener<Payload> listener) {
public BasicListener(ActionListener<SearchResponse> listener) {
this.listener = listener;
}
@ -37,7 +35,7 @@ public class BasicListener implements ActionListener<SearchResponse> {
if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}
listener.onResponse(new SearchResponsePayload(response));
listener.onResponse(response);
}
} catch (Exception ex) {
onFailure(ex);

View File

@ -13,17 +13,13 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest.Item;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.ql.util.StringUtils;
import java.util.ArrayList;
@ -47,7 +43,7 @@ public class BasicQueryClient implements QueryClient {
}
@Override
public void query(QueryRequest request, ActionListener<Payload> listener) {
public void query(QueryRequest request, ActionListener<SearchResponse> listener) {
SearchSourceBuilder searchSource = request.searchSource();
// set query timeout
searchSource.timeout(cfg.requestTimeout());
@ -64,7 +60,7 @@ public class BasicQueryClient implements QueryClient {
}
@Override
public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<GetResponse>>> listener) {
MultiGetRequestBuilder requestBuilder = client.prepareMultiGet();
// no need for real-time
requestBuilder.setRealtime(false)
@ -84,29 +80,18 @@ public class BasicQueryClient implements QueryClient {
final int listSize = sz;
client.multiGet(requestBuilder.request(), wrap(r -> {
List<List<SearchHit>> hits = new ArrayList<>(r.getResponses().length / listSize);
List<List<GetResponse>> hits = new ArrayList<>(r.getResponses().length / listSize);
List<SearchHit> sequence = new ArrayList<>(listSize);
List<GetResponse> sequence = new ArrayList<>(listSize);
int counter = 0;
Text type = new Text("_doc");
for (MultiGetItemResponse mgr : r.getResponses()) {
if (mgr.isFailed()) {
listener.onFailure(mgr.getFailure().getFailure());
return;
}
GetResponse response = mgr.getResponse();
SearchHit hit = new SearchHit(-1, response.getId(), type, null, null);
hit.sourceRef(response.getSourceInternal());
// need to create these objects to set the index
hit.shard(new SearchShardTarget(null, new ShardId(response.getIndex(), "", -1), null, null));
hit.setSeqNo(response.getSeqNo());
hit.setPrimaryTerm(response.getPrimaryTerm());
hit.version(response.getVersion());
sequence.add(hit);
sequence.add(mgr.getResponse());
if (++counter == listSize) {
counter = 0;

View File

@ -7,8 +7,8 @@
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import java.util.List;
@ -17,7 +17,7 @@ import java.util.List;
*/
public interface QueryClient {
void query(QueryRequest request, ActionListener<Payload> listener);
void query(QueryRequest request, ActionListener<SearchResponse> listener);
void get(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener);
void get(Iterable<List<HitReference>> refs, ActionListener<List<List<GetResponse>>> listener);
}

View File

@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
@ -27,6 +28,7 @@ import org.elasticsearch.xpack.ql.expression.gen.pipeline.ReferenceInput;
import org.elasticsearch.xpack.ql.index.IndexResolver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
@ -105,4 +107,8 @@ public final class RuntimeUtils {
includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS)
.request();
}
public static List<SearchHit> searchHits(SearchResponse response) {
return Arrays.asList(response.getHits().getHits());
}
}

View File

@ -6,10 +6,10 @@
package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event;
import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
import org.elasticsearch.xpack.eql.session.Results.Type;
import java.util.ArrayList;
import java.util.List;
@ -18,14 +18,18 @@ class SequencePayload extends AbstractPayload {
private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> values;
SequencePayload(List<Sequence> sequences, List<List<SearchHit>> searchHits, boolean timedOut, TimeValue timeTook) {
SequencePayload(List<Sequence> sequences, List<List<GetResponse>> docs, boolean timedOut, TimeValue timeTook) {
super(timedOut, timeTook);
values = new ArrayList<>(sequences.size());
for (int i = 0; i < sequences.size(); i++) {
Sequence s = sequences.get(i);
List<SearchHit> hits = searchHits.get(i);
values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asList(), hits));
List<GetResponse> hits = docs.get(i);
List<Event> events = new ArrayList<>(hits.size());
for (GetResponse hit : hits) {
events.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceAsBytesRef()));
}
values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asList(), events));
}
}
@ -34,9 +38,8 @@ class SequencePayload extends AbstractPayload {
return Type.SEQUENCE;
}
@SuppressWarnings("unchecked")
@Override
public <V> List<V> values() {
return (List<V>) values;
public List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> values() {
return values;
}
}

View File

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.execution.sequence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
@ -20,13 +21,14 @@ import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.execution.search.QueryClient;
import org.elasticsearch.xpack.eql.session.EmptyPayload;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.eql.session.Payload.Type;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import java.util.Iterator;
import java.util.List;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.searchHits;
/**
* Time-based window encapsulating query creation and advancement.
@ -95,9 +97,9 @@ public class TumblingWindow implements Executable {
client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure));
}
private void baseCriterion(int baseStage, Payload p, ActionListener<Payload> listener) {
private void baseCriterion(int baseStage, SearchResponse r, ActionListener<Payload> listener) {
Criterion<BoxedQueryRequest> base = criteria.get(baseStage);
List<SearchHit> hits = p.values();
List<SearchHit> hits = searchHits(r);
log.trace("Found [{}] hits", hits.size());
@ -167,8 +169,8 @@ public class TumblingWindow implements Executable {
log.trace("Querying until stage {}", request);
client.query(request, wrap(p -> {
List<SearchHit> hits = p.values();
client.query(request, wrap(r -> {
List<SearchHit> hits = searchHits(r);
log.trace("Found [{}] hits", hits.size());
// no more results for until - let the other queries run
@ -208,8 +210,8 @@ public class TumblingWindow implements Executable {
log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request);
client.query(request, wrap(p -> {
List<SearchHit> hits = p.values();
client.query(request, wrap(r -> {
List<SearchHit> hits = searchHits(r);
log.trace("Found [{}] hits", hits.size());
@ -298,8 +300,8 @@ public class TumblingWindow implements Executable {
return;
}
client.get(hits(completed), wrap(searchHits -> {
listener.onResponse(new SequencePayload(completed, searchHits, false, timeTook()));
client.get(hits(completed), wrap(hits -> {
listener.onResponse(new SequencePayload(completed, hits, false, timeTook()));
matcher.clear();
}, listener::onFailure));
}

View File

@ -11,7 +11,7 @@ import org.elasticsearch.xpack.eql.plan.logical.Join;
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Payload.Type;
import org.elasticsearch.xpack.eql.util.MathUtils;
import org.elasticsearch.xpack.eql.util.StringUtils;
import org.elasticsearch.xpack.ql.expression.Expression;
@ -169,7 +169,7 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
@Override
protected LogicalPlan rule(UnaryPlan plan) {
if ((plan instanceof KeyedFilter) == false && plan.child() instanceof LocalRelation) {
return new LocalRelation(plan.source(), plan.output(), Results.Type.SEARCH_HIT);
return new LocalRelation(plan.source(), plan.output(), Type.EVENT);
}
return plan;
}
@ -335,7 +335,7 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
// check for empty filters
for (KeyedFilter filter : plan.queries()) {
if (filter.anyMatch(LocalRelation.class::isInstance)) {
return new LocalRelation(plan.source(), plan.output(), Results.Type.SEQUENCE);
return new LocalRelation(plan.source(), plan.output(), Type.SEQUENCE);
}
}
return plan;

View File

@ -10,6 +10,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.eql.execution.search.AsEventListener;
import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.search.ReverseListener;
@ -61,7 +62,7 @@ public class EsQueryExec extends LeafExec {
// endpoint - fetch all source
QueryRequest request = () -> source(session).fetchSource(FetchSourceContext.FETCH_SOURCE);
listener = shouldReverse(request) ? new ReverseListener(listener) : listener;
new BasicQueryClient(session).query(request, listener);
new BasicQueryClient(session).query(request, new AsEventListener(listener));
}
private boolean shouldReverse(QueryRequest query) {

View File

@ -10,7 +10,6 @@ import org.elasticsearch.xpack.eql.session.EmptyExecutable;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Executable;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
@ -27,10 +26,10 @@ public class LocalRelation extends LogicalPlan implements Executable {
private final Executable executable;
public LocalRelation(Source source, List<Attribute> output) {
this(source, output, Results.Type.SEARCH_HIT);
this(source, output, Payload.Type.EVENT);
}
public LocalRelation(Source source, List<Attribute> output, Results.Type resultType) {
public LocalRelation(Source source, List<Attribute> output, Payload.Type resultType) {
this(source, new EmptyExecutable(output, resultType));
}

View File

@ -125,7 +125,7 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRe
}
static EqlSearchResponse createResponse(Results results, AsyncExecutionId id) {
EqlSearchResponse.Hits hits = new EqlSearchResponse.Hits(results.searchHits(), results.sequences(), results.counts(), results
EqlSearchResponse.Hits hits = new EqlSearchResponse.Hits(results.events(), results.sequences(), results.counts(), results
.totalHits());
if (id != null) {
return new EqlSearchResponse(hits, results.tookTime().getMillis(), results.timedOut(), id.getEncoded(), false, false);

View File

@ -6,7 +6,7 @@
package org.elasticsearch.xpack.eql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.eql.session.Payload.Type;
import org.elasticsearch.xpack.ql.expression.Attribute;
import java.util.List;

View File

@ -7,7 +7,6 @@
package org.elasticsearch.xpack.eql.session;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.session.Results.Type;
import java.util.List;
@ -43,7 +42,7 @@ public class EmptyPayload implements Payload {
}
@Override
public <V> List<V> values() {
public List<?> values() {
return emptyList();
}
}

View File

@ -11,16 +11,21 @@ import org.elasticsearch.common.unit.TimeValue;
import java.util.List;
/**
* Container for internal results. Can be low-level such as SearchHits or Sequences.
* Generalized to allow reuse and internal pluggability.
* Container for final results. Used for completed data, such as Events or Sequences.
*/
public interface Payload {
Results.Type resultType();
enum Type {
EVENT,
SEQUENCE,
COUNT;
}
Type resultType();
boolean timedOut();
TimeValue timeTook();
<V> List<V> values();
List<?> values();
}

View File

@ -9,20 +9,15 @@ package org.elasticsearch.xpack.eql.session;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Count;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence;
import org.elasticsearch.xpack.eql.session.Payload.Type;
import java.util.List;
public class Results {
public enum Type {
SEARCH_HIT,
SEQUENCE,
COUNT;
}
private final TotalHits totalHits;
private final List<?> results;
private final boolean timedOut;
@ -47,8 +42,8 @@ public class Results {
}
@SuppressWarnings("unchecked")
public List<SearchHit> searchHits() {
return type == Type.SEARCH_HIT ? (List<SearchHit>) results : null;
public List<Event> events() {
return type == Type.EVENT ? (List<Event>) results : null;
}
@SuppressWarnings("unchecked")

View File

@ -6,26 +6,78 @@
package org.elasticsearch.xpack.eql.action;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Event;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
public class EqlSearchResponseTests extends AbstractSerializingTestCase<EqlSearchResponse> {
static List<SearchHit> randomEvents() {
private static class RandomSource implements ToXContentObject {
private final String key;
private final String value;
RandomSource(Supplier<String> randomStringSupplier) {
this.key = randomStringSupplier.get();
this.value = randomStringSupplier.get();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(key, value);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(key, value);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
RandomSource other = (RandomSource) obj;
return Objects.equals(key, other.key) && Objects.equals(value, other.value);
}
public BytesReference toBytes(XContentType type) {
try (XContentBuilder builder = XContentBuilder.builder(type.xContent())) {
toXContent(builder, ToXContent.EMPTY_PARAMS);
return BytesReference.bytes(builder);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
static List<Event> randomEvents(XContentType xType) {
int size = randomIntBetween(1, 10);
List<SearchHit> hits = null;
List<Event> hits = null;
if (randomBoolean()) {
hits = new ArrayList<>();
for (int i = 0; i < size; i++) {
hits.add(new SearchHit(i, randomAlphaOfLength(10), null, new HashMap<>(), new HashMap<>()));
BytesReference bytes = new RandomSource(() -> randomAlphaOfLength(10)).toBytes(xType);
hits.add(new Event(String.valueOf(i), randomAlphaOfLength(10), bytes));
}
}
if (randomBoolean()) {
@ -34,9 +86,14 @@ public class EqlSearchResponseTests extends AbstractSerializingTestCase<EqlSearc
return null;
}
@Override
protected EqlSearchResponse createXContextTestInstance(XContentType xContentType) {
return randomEqlSearchResponse(xContentType);
}
@Override
protected EqlSearchResponse createTestInstance() {
return randomEqlSearchResponse();
return randomEqlSearchResponse(XContentType.JSON);
}
@Override
@ -44,18 +101,18 @@ public class EqlSearchResponseTests extends AbstractSerializingTestCase<EqlSearc
return EqlSearchResponse::new;
}
public static EqlSearchResponse randomEqlSearchResponse() {
public static EqlSearchResponse randomEqlSearchResponse(XContentType xContentType) {
TotalHits totalHits = null;
if (randomBoolean()) {
totalHits = new TotalHits(randomIntBetween(100, 1000), TotalHits.Relation.EQUAL_TO);
}
return createRandomInstance(totalHits);
return createRandomInstance(totalHits, xContentType);
}
public static EqlSearchResponse createRandomEventsResponse(TotalHits totalHits) {
public static EqlSearchResponse createRandomEventsResponse(TotalHits totalHits, XContentType xType) {
EqlSearchResponse.Hits hits = null;
if (randomBoolean()) {
hits = new EqlSearchResponse.Hits(randomEvents(), null, null, totalHits);
hits = new EqlSearchResponse.Hits(randomEvents(xType), null, null, totalHits);
}
if (randomBoolean()) {
return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
@ -65,7 +122,7 @@ public class EqlSearchResponseTests extends AbstractSerializingTestCase<EqlSearc
}
}
public static EqlSearchResponse createRandomSequencesResponse(TotalHits totalHits) {
public static EqlSearchResponse createRandomSequencesResponse(TotalHits totalHits, XContentType xType) {
int size = randomIntBetween(1, 10);
List<EqlSearchResponse.Sequence> seq = null;
if (randomBoolean()) {
@ -76,7 +133,7 @@ public class EqlSearchResponseTests extends AbstractSerializingTestCase<EqlSearc
if (randomBoolean()) {
joins = Arrays.asList(randomFrom(randoms).get());
}
seq.add(new EqlSearchResponse.Sequence(joins, randomEvents()));
seq.add(new EqlSearchResponse.Sequence(joins, randomEvents(xType)));
}
}
EqlSearchResponse.Hits hits = null;
@ -127,13 +184,13 @@ public class EqlSearchResponseTests extends AbstractSerializingTestCase<EqlSearc
}
}
public static EqlSearchResponse createRandomInstance(TotalHits totalHits) {
public static EqlSearchResponse createRandomInstance(TotalHits totalHits, XContentType xType) {
int type = between(0, 2);
switch(type) {
case 0:
return createRandomEventsResponse(totalHits);
return createRandomEventsResponse(totalHits, xType);
case 1:
return createRandomSequencesResponse(totalHits);
return createRandomSequencesResponse(totalHits, xType);
case 2:
return createRandomCountResponse(totalHits);
default:

View File

@ -8,12 +8,19 @@ package org.elasticsearch.xpack.eql.execution.assembler;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence;
@ -25,7 +32,6 @@ import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
import org.elasticsearch.xpack.eql.execution.sequence.TumblingWindow;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import java.io.IOException;
@ -131,11 +137,11 @@ public class SequenceSpecTests extends ESTestCase {
}
}
static class TestPayload implements Payload {
static class EventsAsHits {
private final List<SearchHit> hits;
private final Map<Integer, Tuple<String, String>> events;
TestPayload(Map<Integer, Tuple<String, String>> events) {
EventsAsHits(Map<Integer, Tuple<String, String>> events) {
this.events = events;
this.hits = new ArrayList<>(events.size());
@ -147,25 +153,8 @@ public class SequenceSpecTests extends ESTestCase {
}
}
@Override
public Type resultType() {
return Type.SEARCH_HIT;
}
@Override
public boolean timedOut() {
return false;
}
@Override
public TimeValue timeTook() {
return TimeValue.ZERO;
}
@SuppressWarnings("unchecked")
@Override
public <V> List<V> values() {
return (List<V>) hits;
public List<SearchHit> hits() {
return hits;
}
@Override
@ -177,17 +166,23 @@ public class SequenceSpecTests extends ESTestCase {
class TestQueryClient implements QueryClient {
@Override
public void query(QueryRequest r, ActionListener<Payload> l) {
public void query(QueryRequest r, ActionListener<SearchResponse> l) {
int ordinal = r.searchSource().size();
if (ordinal != Integer.MAX_VALUE) {
r.searchSource().size(Integer.MAX_VALUE);
}
Map<Integer, Tuple<String, String>> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap();
l.onResponse(new TestPayload(evs));
EventsAsHits eah = new EventsAsHits(evs);
SearchHits searchHits = new SearchHits(eah.hits.toArray(new SearchHit[0]), new TotalHits(eah.hits.size(), Relation.EQUAL_TO),
0.0f);
SearchResponseSections internal = new SearchResponseSections(searchHits, null, null, false, false, null, 0);
SearchResponse s = new SearchResponse(internal, null, 0, 1, 0, 0, null, Clusters.EMPTY);
l.onResponse(s);
}
@Override
public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<GetResponse>>> listener) {
//no-op
}
}
@ -240,7 +235,7 @@ public class SequenceSpecTests extends ESTestCase {
List<String> match = matches.get(i);
List<String> returned = new ArrayList<>();
for (int j = 0; j < match.size(); j++) {
int key = ((Number) TimestampExtractor.INSTANCE.extract(s.events().get(j))).intValue();
int key = Integer.parseInt(s.events().get(j).id());
returned.add(allEvents.get(key));
}