EQL: Improve retrieval of results (#59552)

Instead of retrieving an entire SearchHit, get just a reference and
postpone the document retrieval when assembling the final results.
Remove sort information from results to make them consistent.
Move TumblingWindow under the sequence package.

Co-authored-by: James Rodewig <james.rodewig@elastic.co>
(cherry picked from commit bccfbcd81f2f1d3552e95e4a9ee2618fb3059bd9)
This commit is contained in:
Costin Leau 2020-07-14 23:26:25 +03:00 committed by Costin Leau
parent 6d6d565eeb
commit 679619c798
18 changed files with 316 additions and 149 deletions

View File

@ -565,10 +565,7 @@ the events in ascending, lexicographic order.
"name": "cmd.exe",
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"sort": [
1607252647000
]
}
},
{
"_index": "my_index",
@ -596,10 +593,7 @@ the events in ascending, lexicographic order.
"name": "cmd.exe",
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"sort": [
1607339228000
]
}
}
]
}
@ -696,10 +690,7 @@ the events in ascending, lexicographic order.
"name": "cmd.exe",
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"sort": [
1607339228000
]
}
},
{
"_index": "my_index",
@ -720,10 +711,7 @@ the events in ascending, lexicographic order.
"name": "regsvr32.exe",
"path": "C:\\Windows\\System32\\regsvr32.exe"
}
},
"sort": [
1607339229000
]
}
}
]
}
@ -732,3 +720,4 @@ the events in ascending, lexicographic order.
}
----
// TESTRESPONSE[s/"took": 6/"took": $body.took/]
// TESTRESPONSE[skip: response format updated]

View File

@ -84,7 +84,7 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
"relation": "eq"
},
"events": [
{
{
"_index": "sec_logs",
"_type": "_doc",
"_id": "1",
@ -103,10 +103,7 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
"name": "cmd.exe",
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"sort": [
1607252645000
]
}
},
{
"_index": "sec_logs",
@ -127,10 +124,7 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
"name": "cmd.exe",
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"sort": [
1607339167000
]
}
}
]
}
@ -223,10 +217,7 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
"name": "cmd.exe",
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"sort": [
1607339228000
]
}
},
{
"_index": "sec_logs",
@ -247,10 +238,7 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
"name": "regsvr32.exe",
"path": "C:\\Windows\\System32\\regsvr32.exe"
}
},
"sort": [
1607339229000
]
}
}
]
}
@ -259,6 +247,7 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
}
----
// TESTRESPONSE[s/"took": 60/"took": $body.took/]
// TESTRESPONSE[skip: response format updated]
You can use the <<eql-with-maxspan-keywords,`with maxspan` keywords>> to
constrain a sequence to a specified timespan.
@ -362,10 +351,7 @@ contains the shared `agent.id` value for each matching event.
"name": "cmd.exe",
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"sort": [
1607339228000
]
}
},
{
"_index": "sec_logs",
@ -386,10 +372,7 @@ contains the shared `agent.id` value for each matching event.
"name": "regsvr32.exe",
"path": "C:\\Windows\\System32\\regsvr32.exe"
}
},
"sort": [
1607339229000
]
}
}
]
}
@ -398,6 +381,7 @@ contains the shared `agent.id` value for each matching event.
}
----
// TESTRESPONSE[s/"took": 60/"took": $body.took/]
// TESTRESPONSE[skip: response format updated]
You can use the <<eql-until-keyword,`until` keyword>> to specify an expiration
event for sequences. Matching sequences must end before this event.
@ -501,15 +485,7 @@ GET /sec_logs/_eql/search
----
// TEST[s/search/search\?filter_path\=\-\*\.events\.\*fields/]
The API returns the following response. Note the `sort` property of each
matching event contains an array of two items:
* The first item is the event's <<eql-search-api-timestamp-field,timestamp>>,
converted to milliseconds since the https://en.wikipedia.org/wiki/Unix_time[Unix
epoch].
* The second item is the event's `event.id` value. This value is used as a sort
tiebreaker for events with the same timestamp.
The API returns the following response.
[source,console-result]
----
@ -524,7 +500,7 @@ tiebreaker for events with the same timestamp.
"relation": "eq"
},
"events": [
{
{
"_index": "sec_logs",
"_type": "_doc",
"_id": "1",
@ -543,13 +519,9 @@ tiebreaker for events with the same timestamp.
"name": "cmd.exe",
"path": "C:\\Windows\\System32\\cmd.exe"
}
}
},
"sort": [
1607252645000, <1>
"edwCRnyD" <2>
]
},
{
{
"_index": "sec_logs",
"_type": "_doc",
"_id": "3",
@ -568,21 +540,13 @@ tiebreaker for events with the same timestamp.
"name": "cmd.exe",
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"sort": [
1607339167000, <1>
"cMyt5SZ2" <2>
]
}
}
}
]
}
}
----
// TESTRESPONSE[s/"took": 34/"took": $body.took/]
<1> The event's <<eql-search-api-timestamp-field,timestamp>>, converted to
milliseconds since the https://en.wikipedia.org/wiki/Unix_time[Unix
epoch]
<2> The event's `event.id` value.
====

View File

@ -44,7 +44,7 @@ public class Criterion<Q extends QueryRequest> {
return stage;
}
boolean reverse() {
public boolean reverse() {
return reverse;
}

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.eql.execution.assembler;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
import org.elasticsearch.xpack.eql.execution.search.Limit;
@ -15,6 +16,7 @@ import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor;
import org.elasticsearch.xpack.eql.execution.search.extractor.TimestampFieldHitExtractor;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
import org.elasticsearch.xpack.eql.execution.sequence.TumblingWindow;
import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.eql.querydsl.container.FieldExtractorRegistry;
@ -57,7 +59,7 @@ public class ExecutionManager {
String timestampName = Expressions.name(timestamp);
String tiebreakerName = Expressions.isPresent(tiebreaker) ? Expressions.name(tiebreaker) : null;
// secondary criteria
// secondary criteriam
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(plans.size() - 1);
// build a criterion for each query
@ -68,10 +70,8 @@ public class ExecutionManager {
PhysicalPlan query = plans.get(i);
// search query
if (query instanceof EsQueryExec) {
QueryRequest original = ((EsQueryExec) query).queryRequest(session);
// increase the request size based on the fetch size (since size is applied already through limit)
SearchSourceBuilder source = ((EsQueryExec) query).source(session);
QueryRequest original = () -> source;
BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName);
Criterion<BoxedQueryRequest> criterion =
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i > 0 && descending);

View File

@ -7,7 +7,9 @@
package org.elasticsearch.xpack.eql.execution.payload;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchSortValues;
import org.elasticsearch.xpack.eql.session.Results.Type;
import java.util.Arrays;
@ -20,6 +22,11 @@ public class SearchResponsePayload extends AbstractPayload {
public SearchResponsePayload(SearchResponse response) {
super(response.isTimedOut(), response.getTook());
hits = Arrays.asList(response.getHits().getHits());
// clean hits
SearchSortValues sortValues = new SearchSortValues(new Object[0], new DocValueFormat[0]);
for (SearchHit hit : hits) {
hit.sortValues(sortValues);
}
}
@Override

View File

@ -8,15 +8,34 @@ package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest.Item;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.ql.util.StringUtils;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest;
public class BasicQueryClient implements QueryClient {
@ -49,4 +68,70 @@ public class BasicQueryClient implements QueryClient {
SearchRequest search = prepareRequest(client, searchSource, false, indices);
client.search(search, new BasicListener(listener));
}
}
@Override
public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
MultiGetRequestBuilder requestBuilder = client.prepareMultiGet();
// no need for real-time
requestBuilder.setRealtime(false)
.setRefresh(false);
int sz = 0;
for (List<HitReference> list : refs) {
sz = list.size();
for (HitReference ref : list) {
Item item = new Item(ref.index(), ref.id());
// make sure to get the whole source
item.fetchSourceContext(FetchSourceContext.FETCH_SOURCE);
requestBuilder.add(item);
}
}
final int listSize = sz;
client.multiGet(requestBuilder.request(), wrap(r -> {
List<List<SearchHit>> hits = new ArrayList<>(r.getResponses().length / listSize);
List<SearchHit> sequence = new ArrayList<>(listSize);
// copy streams - reused across the whole loop
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
StreamOutput so = new OutputStreamStreamOutput(out);
StreamInput si = new InputStreamStreamInput(in);
int counter = 0;
Text type = new Text("_doc");
for (MultiGetItemResponse mgr : r.getResponses()) {
if (mgr.isFailed()) {
listener.onFailure(mgr.getFailure().getFailure());
return;
}
// HACK: the only way to get GetResult is to serialize it and then load it back :(
mgr.getResponse().writeTo(so);
GetResult result = new GetResult(si);
SearchHit hit = new SearchHit(-1, result.getId(), type, result.getDocumentFields(), result.getMetadataFields());
hit.sourceRef(result.internalSourceRef());
// need to create these objects to set the index
hit.shard(new SearchShardTarget(null, new ShardId(result.getIndex(), "", -1), null, null));
hit.setSeqNo(result.getSeqNo());
hit.setPrimaryTerm(result.getPrimaryTerm());
hit.version(result.getVersion());
sequence.add(hit);
if (++counter == listSize) {
counter = 0;
hits.add(sequence);
sequence = new ArrayList<>(listSize);
}
}
// send the results
listener.onResponse(hits);
}, listener::onFailure));
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.search.SearchHit;
import java.util.Objects;
public class HitReference {
private final String index;
private final String id;
public HitReference(SearchHit hit) {
this.index = hit.getIndex();
this.id = hit.getId();
}
public String index() {
return index;
}
public String id() {
return id;
}
@Override
public int hashCode() {
return Objects.hash(index, id);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
HitReference other = (HitReference) obj;
return Objects.equals(index, other.index)
&& Objects.equals(id, other.id);
}
@Override
public String toString() {
return "doc[" + index + "][" + id + "]";
}
}

View File

@ -7,12 +7,17 @@
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.session.Payload;
import java.util.List;
/**
* Infrastructure interface used to decouple listener consumers from the stateful classes holding client-references and co.
*/
public interface QueryClient {
void query(QueryRequest request, ActionListener<Payload> listener);
void get(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener);
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
@ -57,7 +58,14 @@ public abstract class SourceGenerator {
sourceBuilder.build(source);
sorting(container, source);
source.fetchSource(FetchSourceContext.FETCH_SOURCE);
// disable the source if there are no includes
if (source.fetchSource() == null || CollectionUtils.isEmpty(source.fetchSource().includes())) {
source.fetchSource(FetchSourceContext.DO_NOT_FETCH_SOURCE);
} else {
// use true to fetch only the needed bits from the source
source.fetchSource(true);
}
if (container.limit() != null) {
// add size and from

View File

@ -6,7 +6,7 @@
package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import java.util.Objects;
@ -17,9 +17,9 @@ import java.util.Objects;
class Match {
private final Ordinal ordinal;
private final SearchHit hit;
private final HitReference hit;
Match(Ordinal ordinal, SearchHit hit) {
Match(Ordinal ordinal, HitReference hit) {
this.ordinal = ordinal;
this.hit = hit;
}
@ -28,7 +28,7 @@ class Match {
return ordinal;
}
SearchHit hit() {
HitReference hit() {
return hit;
}
@ -54,6 +54,6 @@ class Match {
@Override
public String toString() {
return ordinal.toString() + "->" + hit.getId();
return ordinal + "->" + hit;
}
}

View File

@ -6,8 +6,8 @@
package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.ql.util.Check;
@ -32,7 +32,7 @@ public class Sequence {
private int currentStage = 0;
public Sequence(SequenceKey key, int stages, Ordinal ordinal, SearchHit firstHit) {
public Sequence(SequenceKey key, int stages, Ordinal ordinal, HitReference firstHit) {
Check.isTrue(stages >= 2, "A sequence requires at least 2 criteria, given [{}]", stages);
this.key = key;
this.stages = stages;
@ -40,7 +40,7 @@ public class Sequence {
this.matches[0] = new Match(ordinal, firstHit);
}
public int putMatch(int stage, SearchHit hit, Ordinal ordinal) {
public int putMatch(int stage, Ordinal ordinal, HitReference hit) {
if (stage == currentStage + 1) {
int previousStage = currentStage;
currentStage = stage;
@ -62,8 +62,8 @@ public class Sequence {
return matches[0].ordinal();
}
public List<SearchHit> hits() {
List<SearchHit> hits = new ArrayList<>(matches.length);
public List<HitReference> hits() {
List<HitReference> hits = new ArrayList<>(matches.length);
for (Match m : matches) {
hits.add(m.hit());
}

View File

@ -11,10 +11,9 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.session.Payload;
import java.util.LinkedList;
import java.util.List;
@ -98,10 +97,10 @@ public class SequenceMatcher {
* Match hits for the given stage.
* Returns false if the process needs to be stopped.
*/
public boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, SearchHit>> hits) {
for (Tuple<KeyAndOrdinal, SearchHit> tuple : hits) {
public boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, HitReference>> hits) {
for (Tuple<KeyAndOrdinal, HitReference> tuple : hits) {
KeyAndOrdinal ko = tuple.v1();
SearchHit hit = tuple.v2();
HitReference hit = tuple.v2();
if (stage == 0) {
Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit);
@ -125,7 +124,7 @@ public class SequenceMatcher {
* Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous
* given stage. If that's the case, update the sequence and the rest of the references.
*/
private void match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) {
private void match(int stage, SequenceKey key, Ordinal ordinal, HitReference hit) {
stats.seen++;
int previousStage = stage - 1;
@ -172,7 +171,7 @@ public class SequenceMatcher {
}
}
sequence.putMatch(stage, hit, ordinal);
sequence.putMatch(stage, ordinal, hit);
// bump the stages
if (stage == completionStage) {
@ -207,12 +206,9 @@ public class SequenceMatcher {
return false;
}
public Payload payload(long startTime) {
TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
List<Sequence> view = limit != null ? limit.view(completed) : completed;
Payload p = new SequencePayload(view, false, tookTime);
clear();
return p;
public List<Sequence> completed() {
return limit != null ? limit.view(completed) : completed;
}
public void dropUntil() {

View File

@ -7,26 +7,25 @@
package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
class SequencePayload extends AbstractPayload {
private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> sequences;
private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> values;
SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook) {
SequencePayload(List<Sequence> sequences, List<List<SearchHit>> searchHits, boolean timedOut, TimeValue timeTook) {
super(timedOut, timeTook);
sequences = new ArrayList<>(seq.size());
boolean needsReversal = seq.size() > 1 && (seq.get(0).ordinal().compareTo(seq.get(1).ordinal()) > 0);
values = new ArrayList<>(sequences.size());
for (Iterator<Sequence> it = needsReversal ? new ReversedIterator<>(seq) : seq.iterator(); it.hasNext();) {
Sequence s = it.next();
sequences.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits()));
for (int i = 0; i < sequences.size(); i++) {
Sequence s = sequences.get(i);
List<SearchHit> hits = searchHits.get(i);
values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), hits));
}
}
@ -38,6 +37,6 @@ class SequencePayload extends AbstractPayload {
@SuppressWarnings("unchecked")
@Override
public <V> List<V> values() {
return (List<V>) sequences;
return (List<V>) values;
}
}

View File

@ -4,19 +4,23 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.assembler;
package org.elasticsearch.xpack.eql.execution.sequence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.assembler.BoxedQueryRequest;
import org.elasticsearch.xpack.eql.execution.assembler.Criterion;
import org.elasticsearch.xpack.eql.execution.assembler.Executable;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.execution.search.QueryClient;
import org.elasticsearch.xpack.eql.execution.sequence.KeyAndOrdinal;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
import org.elasticsearch.xpack.eql.session.EmptyPayload;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import java.util.Iterator;
@ -98,7 +102,7 @@ public class TumblingWindow implements Executable {
if (hits.isEmpty() == false) {
if (matcher.match(baseStage, wrapValues(base, hits)) == false) {
listener.onResponse(payload());
payload(listener);
return;
}
}
@ -120,7 +124,7 @@ public class TumblingWindow implements Executable {
}
// there aren't going to be any matches so cancel search
else {
listener.onResponse(payload());
payload(listener);
}
return;
}
@ -231,7 +235,7 @@ public class TumblingWindow implements Executable {
// if the limit has been reached, return what's available
if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) {
listener.onResponse(payload());
payload(listener);
return;
}
}
@ -281,48 +285,83 @@ public class TumblingWindow implements Executable {
return criterion.reverse() != base.reverse();
}
Iterable<Tuple<KeyAndOrdinal, SearchHit>> wrapValues(Criterion<?> criterion, List<SearchHit> hits) {
return () -> {
final Iterator<SearchHit> iter = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator();
private void payload(ActionListener<Payload> listener) {
List<Sequence> completed = matcher.completed();
return new Iterator<Tuple<KeyAndOrdinal, SearchHit>>() {
if (completed.isEmpty()) {
listener.onResponse(new EmptyPayload(Type.SEQUENCE, timeTook()));
matcher.clear();
return;
}
client.get(hits(completed), wrap(searchHits -> {
listener.onResponse(new SequencePayload(completed, searchHits, false, timeTook()));
matcher.clear();
}, listener::onFailure));
}
private TimeValue timeTook() {
return new TimeValue(System.currentTimeMillis() - startTime);
}
Iterable<List<HitReference>> hits(List<Sequence> sequences) {
return () -> {
final Iterator<Sequence> delegate = criteria.get(0).reverse() != criteria.get(1).reverse() ?
new ReversedIterator<>(sequences) :
sequences.iterator();
return new Iterator<List<HitReference>>() {
@Override
public boolean hasNext() {
return iter.hasNext();
return delegate.hasNext();
}
@Override
public Tuple<KeyAndOrdinal, SearchHit> next() {
SearchHit hit = iter.next();
SequenceKey k = criterion.key(hit);
Ordinal o = criterion.ordinal(hit);
return new Tuple<>(new KeyAndOrdinal(k, o), hit);
public List<HitReference> next() {
return delegate.next().hits();
}
};
};
}
Iterable<KeyAndOrdinal> wrapUntilValues(Iterable<Tuple<KeyAndOrdinal, SearchHit>> iterable) {
Iterable<Tuple<KeyAndOrdinal, HitReference>> wrapValues(Criterion<?> criterion, List<SearchHit> hits) {
return () -> {
final Iterator<Tuple<KeyAndOrdinal, SearchHit>> iter = iterable.iterator();
final Iterator<SearchHit> delegate = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator();
return new Iterator<Tuple<KeyAndOrdinal, HitReference>>() {
@Override
public boolean hasNext() {
return delegate.hasNext();
}
@Override
public Tuple<KeyAndOrdinal, HitReference> next() {
SearchHit hit = delegate.next();
SequenceKey k = criterion.key(hit);
Ordinal o = criterion.ordinal(hit);
return new Tuple<>(new KeyAndOrdinal(k, o), new HitReference(hit));
}
};
};
}
<E> Iterable<KeyAndOrdinal> wrapUntilValues(Iterable<Tuple<KeyAndOrdinal, E>> iterable) {
return () -> {
final Iterator<Tuple<KeyAndOrdinal, E>> delegate = iterable.iterator();
return new Iterator<KeyAndOrdinal>() {
@Override
public boolean hasNext() {
return iter.hasNext();
return delegate.hasNext();
}
@Override
public KeyAndOrdinal next() {
return iter.next().v1();
return delegate.next().v1();
}
};
};
}
Payload payload() {
return matcher.payload(startTime);
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.plan.physical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
@ -49,17 +50,16 @@ public class EsQueryExec extends LeafExec {
return output;
}
public QueryRequest queryRequest(EqlSession session) {
public SearchSourceBuilder source(EqlSession session) {
EqlConfiguration cfg = session.configuration();
// by default use the configuration size
// join/sequence queries will want to override this
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter());
return () -> sourceBuilder;
return SourceGenerator.sourceBuilder(queryContainer, cfg.filter());
}
@Override
public void execute(EqlSession session, ActionListener<Payload> listener) {
QueryRequest request = queryRequest(session);
// endpoint - fetch all source
QueryRequest request = () -> source(session).fetchSource(FetchSourceContext.FETCH_SOURCE);
listener = shouldReverse(request) ? new ReverseListener(listener) : listener;
new BasicQueryClient(session).query(request, listener);
}

View File

@ -16,9 +16,15 @@ import static java.util.Collections.emptyList;
public class EmptyPayload implements Payload {
private final Type type;
private final TimeValue timeTook;
public EmptyPayload(Type type) {
this(type, TimeValue.ZERO);
}
public EmptyPayload(Type type, TimeValue timeTook) {
this.type = type;
this.timeTook = timeTook;
}
@Override
@ -33,7 +39,7 @@ public class EmptyPayload implements Payload {
@Override
public TimeValue timeTook() {
return TimeValue.ZERO;
return timeTook;
}
@Override

View File

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.execution.assembler;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
@ -17,8 +18,11 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence;
import org.elasticsearch.xpack.eql.execution.assembler.SeriesUtils.SeriesSpec;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.QueryClient;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
import org.elasticsearch.xpack.eql.execution.sequence.TumblingWindow;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Results.Type;
@ -170,6 +174,23 @@ public class SequenceSpecTests extends ESTestCase {
}
}
class TestQueryClient implements QueryClient {
@Override
public void query(QueryRequest r, ActionListener<Payload> l) {
int ordinal = r.searchSource().size();
if (ordinal != Integer.MAX_VALUE) {
r.searchSource().size(Integer.MAX_VALUE);
}
Map<Integer, Tuple<String, String>> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap();
l.onResponse(new TestPayload(evs));
}
@Override
public void get(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
//no-op
}
}
public SequenceSpecTests(String testName, int lineNumber, SeriesSpec spec) {
this.lineNumber = lineNumber;
@ -199,15 +220,7 @@ public class SequenceSpecTests extends ESTestCase {
// convert the results through a test specific payload
SequenceMatcher matcher = new SequenceMatcher(stages, TimeValue.MINUS_ONE, null);
QueryClient testClient = (r, l) -> {
int ordinal = r.searchSource().size();
if (ordinal != Integer.MAX_VALUE) {
r.searchSource().size(Integer.MAX_VALUE);
}
Map<Integer, Tuple<String, String>> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap();
l.onResponse(new TestPayload(evs));
};
QueryClient testClient = new TestQueryClient();
TumblingWindow window = new TumblingWindow(testClient, criteria, null, matcher);
// finally make the assertion at the end of the listener

View File

@ -22,6 +22,7 @@ import java.util.Locale;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
public class QueryFolderOkTests extends AbstractQueryFolderTestCase {
@ -139,6 +140,6 @@ public class QueryFolderOkTests extends AbstractQueryFolderTestCase {
assertThat(query, containsString("\"term\":{\"event.category\":{\"value\":\"process\""));
// test field source extraction
assertThat(query, containsString("\"_source\":{\"includes\":[],\"excludes\":[]"));
assertThat(query, not(containsString("\"_source\":{\"includes\":[],\"excludes\":[]")));
}
}