EQL: Introduce support for sequence maxspan (#58635)

EQL sequences can specify now a maximum time allowed for their span
(computed between the first and the last matching event).

(cherry picked from commit 747c3592244192a2e25a092f62aec91a899afc83)
This commit is contained in:
Costin Leau 2020-06-29 21:30:15 +03:00 committed by Costin Leau
parent 773f3574a9
commit 3a546f1f51
22 changed files with 289 additions and 306 deletions

View File

@ -31,6 +31,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
public class DataLoader {
private static final String TEST_DATA = "/test_data.json";
@ -64,7 +67,10 @@ public class DataLoader {
try (XContentParser parser = p.apply(JsonXContent.jsonXContent, DataLoader.class.getResourceAsStream(TEST_DATA))) {
List<Object> list = parser.list();
for (Object item : list) {
bulk.add(new IndexRequest(testIndexName).source((Map<String, Object>) item, XContentType.JSON));
assertThat(item, instanceOf(Map.class));
Map<String, Object> entry = (Map<String, Object>) item;
transformDataset(entry);
bulk.add(new IndexRequest(testIndexName).source(entry, XContentType.JSON));
}
}
@ -78,6 +84,23 @@ public class DataLoader {
}
}
private static void transformDataset(Map<String, Object> entry) {
Object object = entry.get("timestamp");
assertThat(object, instanceOf(Long.class));
Long ts = (Long) object;
// currently this is windows filetime
entry.put("@timestamp", winFileTimeToUnix(ts));
}
private static final long FILETIME_EPOCH_DIFF = 11644473600000L;
private static final long FILETIME_ONE_MILLISECOND = 10 * 1000;
public static long winFileTimeToUnix(final long filetime) {
long ts = (filetime / FILETIME_ONE_MILLISECOND);
return ts - FILETIME_EPOCH_DIFF;
}
private static XContentParser createParser(XContent xContent, InputStream data) throws IOException {
NamedXContentRegistry contentRegistry = new NamedXContentRegistry(ClusterModule.getNamedXWriteables());
return xContent.createParser(contentRegistry, LoggingDeprecationHandler.INSTANCE, data);

View File

@ -46,8 +46,7 @@
"type" : "date"
},
"@timestamp" : {
"type" : "alias",
"path" : "timestamp"
"type" : "date"
},
"user" : {
"type" : "keyword"

View File

@ -336,51 +336,6 @@ until
'''
expected_event_ids = [54, 55, 61, 67]
[[queries]]
query = '''
sequence
[process where opcode == 1] by unique_pid, process_path
[file where opcode == 0] by unique_pid, process_path
[file where opcode == 0] by unique_pid, process_path
[file where opcode == 0] by unique_pid, process_path
until
[file where opcode == 200] by unique_pid, process_path
'''
[[queries]]
note = "Sequence: non-field based join."
query = '''
sequence
[process where serial_event_id<3] by unique_pid * 2
[process where true] by unique_ppid * 2
'''
expected_event_ids = [1, 2,
2, 3]
[[queries]]
note = "Sequence: multiple non-field based joins."
query = '''
sequence
[process where serial_event_id<3] by unique_pid * 2, length(unique_pid), string(unique_pid)
[process where true] by unique_ppid * 2, length(unique_ppid), string(unique_ppid)
'''
expected_event_ids = [1, 2,
2, 3]
[[queries]]
query = '''
sequence with maxspan=500ms
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2
'''
expected_event_ids = []
[[queries]]
query = '''
sequence
@ -1026,6 +981,28 @@ query = '''
registry where arrayContains(bytes_written_string_list, "missing", "en-US")
'''
[[queries]]
note = "Sequence: non-field based join."
query = '''
sequence
[process where serial_event_id<3] by unique_pid * 2
[process where true] by unique_ppid * 2
'''
expected_event_ids = [1, 2,
2, 3]
[[queries]]
note = "Sequence: multiple non-field based joins."
query = '''
sequence
[process where serial_event_id<3] by unique_pid * 2, length(unique_pid), string(unique_pid)
[process where true] by unique_ppid * 2, length(unique_ppid), string(unique_ppid)
'''
expected_event_ids = [1, 2,
2, 3]
# TODO: update toggles for this function
[[queries]]
case_sensitive = true

View File

@ -10,6 +10,8 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.sequence.Ordinal;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import java.util.List;
@ -22,12 +24,17 @@ public class Criterion implements QueryRequest {
private final HitExtractor tiebreakerExtractor;
// search after markers
private Object[] startMarker;
private Object[] stopMarker;
private Ordinal startMarker;
private Ordinal stopMarker;
private boolean reverse;
//TODO: should accept QueryRequest instead of another SearchSourceBuilder
public Criterion(SearchSourceBuilder searchSource, List<HitExtractor> searchAfterExractors, HitExtractor timestampExtractor,
HitExtractor tiebreakerExtractor) {
public Criterion(SearchSourceBuilder searchSource,
List<HitExtractor> searchAfterExractors,
HitExtractor timestampExtractor,
HitExtractor tiebreakerExtractor,
boolean reverse) {
this.searchSource = searchSource;
this.keyExtractors = searchAfterExractors;
this.timestampExtractor = timestampExtractor;
@ -35,6 +42,7 @@ public class Criterion implements QueryRequest {
this.startMarker = null;
this.stopMarker = null;
this.reverse = reverse;
}
@Override
@ -54,54 +62,45 @@ public class Criterion implements QueryRequest {
return tiebreakerExtractor;
}
public long timestamp(SearchHit hit) {
Object ts = timestampExtractor.extract(hit);
if (ts instanceof Number) {
return ((Number) ts).longValue();
}
throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts);
}
@SuppressWarnings({ "unchecked" })
public Comparable<Object> tiebreaker(SearchHit hit) {
if (tiebreakerExtractor == null) {
return null;
}
Object tb = tiebreakerExtractor.extract(hit);
if (tb instanceof Comparable) {
return (Comparable<Object>) tb;
}
throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb);
}
public Ordinal ordinal(SearchHit hit) {
public Object[] startMarker() {
return startMarker;
}
public Object[] stopMarker() {
return stopMarker;
}
private Object[] marker(SearchHit hit) {
long timestamp = timestamp(hit);
Object tiebreaker = null;
if (tiebreakerExtractor() != null) {
tiebreaker = tiebreaker(hit);
Object ts = timestampExtractor.extract(hit);
if (ts instanceof Number == false) {
throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts);
}
return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp };
long timestamp = ((Number) ts).longValue();
Comparable<Object> tiebreaker = null;
if (tiebreakerExtractor != null) {
Object tb = tiebreakerExtractor.extract(hit);
if (tb instanceof Comparable == false) {
throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb);
}
tiebreaker = (Comparable<Object>) tb;
}
return new Ordinal(timestamp, tiebreaker);
}
public void startMarker(SearchHit hit) {
startMarker = marker(hit);
public void startMarker(Ordinal ordinal) {
startMarker = ordinal;
}
public void stopMarker(SearchHit hit) {
stopMarker = marker(hit);
public void stopMarker(Ordinal ordinal) {
stopMarker = ordinal;
}
public Criterion useMarker(Object[] marker) {
searchSource.searchAfter(marker);
public Ordinal nextMarker() {
return startMarker.compareTo(stopMarker) < 1 ? startMarker : stopMarker;
}
public Criterion useMarker(Ordinal marker) {
searchSource.searchAfter(marker.toArray());
return this;
}
public Iterable<SearchHit> iterable(List<SearchHit> hits) {
return () -> reverse ? new ReversedIterator<>(hits) : hits.iterator();
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.eql.execution.assembler;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
import org.elasticsearch.xpack.eql.execution.search.Limit;
@ -43,11 +44,14 @@ public class ExecutionManager {
Attribute timestamp,
Attribute tiebreaker,
OrderDirection direction,
TimeValue maxSpan,
Limit limit) {
FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry();
List<Criterion> criteria = new ArrayList<>(plans.size() - 1);
boolean descending = direction == OrderDirection.DESC;
// build a criterion for each query
for (int i = 0; i < plans.size() - 1; i++) {
List<Attribute> keys = listOfKeys.get(i);
@ -61,9 +65,10 @@ public class ExecutionManager {
// TODO: this could be generalized into an exec only query
Check.isTrue(query instanceof EsQueryExec, "Expected a query but got [{}]", query.getClass());
QueryRequest request = ((EsQueryExec) query).queryRequest(session);
criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor));
// base query remains descending, the rest need to flip
criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor, i > 0 && descending));
}
return new SequenceRuntime(criteria, new BasicQueryClient(session), direction == OrderDirection.DESC, limit);
return new SequenceRuntime(criteria, new BasicQueryClient(session), maxSpan, limit);
}
private HitExtractor timestampExtractor(HitExtractor hitExtractor) {

View File

@ -6,24 +6,23 @@
package org.elasticsearch.xpack.eql.execution.assembler;
import org.elasticsearch.xpack.eql.execution.sequence.Ordinal;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
import java.util.Objects;
class KeyAndOrdinal {
final SequenceKey key;
final long timestamp;
final Comparable<Object> tiebreaker;
final Ordinal ordinal;
KeyAndOrdinal(SequenceKey key, long timestamp, Comparable<Object> tiebreaker) {
KeyAndOrdinal(SequenceKey key, Ordinal ordinal) {
this.key = key;
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
this.ordinal = ordinal;
}
@Override
public int hashCode() {
return Objects.hash(key, timestamp, tiebreaker);
return Objects.hash(key, ordinal);
}
@Override
@ -38,12 +37,11 @@ class KeyAndOrdinal {
KeyAndOrdinal other = (KeyAndOrdinal) obj;
return Objects.equals(key, other.key)
&& Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker);
&& Objects.equals(ordinal, other.ordinal);
}
@Override
public String toString() {
return key + "[" + timestamp + "][" + (tiebreaker != null ? Objects.toString(tiebreaker) : "") + "]";
return key.toString() + ordinal.toString();
}
}

View File

@ -10,18 +10,23 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
class SequencePayload extends AbstractPayload {
private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> sequences;
SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook, Object[] nextKeys) {
super(timedOut, timeTook, nextKeys);
SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook) {
super(timedOut, timeTook);
sequences = new ArrayList<>(seq.size());
for (Sequence s : seq) {
boolean needsReversal = seq.size() > 1 && (seq.get(0).ordinal().compareTo(seq.get(1).ordinal()) > 0);
for (Iterator<Sequence> it = needsReversal ? new ReversedIterator<>(seq) : seq.iterator(); it.hasNext();) {
Sequence s = it.next();
sequences.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits()));
}
}

View File

@ -11,17 +11,15 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.payload.ReversePayload;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.QueryClient;
import org.elasticsearch.xpack.eql.execution.sequence.Ordinal;
import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import java.util.Iterator;
import java.util.List;
import static org.elasticsearch.action.ActionListener.wrap;
@ -38,18 +36,14 @@ class SequenceRuntime implements Executable {
private final int numberOfStages;
private final SequenceStateMachine stateMachine;
private final QueryClient queryClient;
private final boolean descending;
private long startTime;
SequenceRuntime(List<Criterion> criteria, QueryClient queryClient, boolean descending, Limit limit) {
SequenceRuntime(List<Criterion> criteria, QueryClient queryClient, TimeValue maxSpan, Limit limit) {
this.criteria = criteria;
this.numberOfStages = criteria.size();
this.queryClient = queryClient;
boolean hasTiebreaker = criteria.get(0).tiebreakerExtractor() != null;
this.stateMachine = new SequenceStateMachine(numberOfStages, hasTiebreaker, limit);
this.descending = descending;
this.stateMachine = new SequenceStateMachine(numberOfStages, maxSpan, limit);
}
@Override
@ -73,9 +67,8 @@ class SequenceRuntime implements Executable {
// narrow by the previous stage timestamp marker
Criterion previous = criteria.get(stage - 1);
// if DESC, flip the markers (the stop becomes the start due to the reverse order), otherwise keep it accordingly
Object[] marker = descending && stage == 1 ? previous.stopMarker() : previous.startMarker();
currentCriterion.useMarker(marker);
// pass the next marker along
currentCriterion.useMarker(previous.nextMarker());
}
log.info("Querying stage {}", stage);
@ -95,34 +88,39 @@ class SequenceRuntime implements Executable {
}
// hits are guaranteed to be non-empty
private void findMatches(int currentStage, List<SearchHit> hits) {
private void findMatches(int stage, List<SearchHit> hits) {
// update criterion
Criterion criterion = criteria.get(currentStage);
criterion.startMarker(hits.get(0));
criterion.stopMarker(hits.get(hits.size() - 1));
Criterion criterion = criteria.get(stage);
// break the results per key
// when dealing with descending order, queries outside the base are ASC (search_before)
// so look at the data in reverse (that is DESC)
for (Iterator<SearchHit> it = descending ? new ReversedIterator<>(hits) : hits.iterator(); it.hasNext();) {
SearchHit hit = it.next();
Ordinal firstOrdinal = null, ordinal = null;
for (SearchHit hit : criterion.iterable(hits)) {
KeyAndOrdinal ko = key(hit, criterion);
if (currentStage == 0) {
Sequence seq = new Sequence(ko.key, numberOfStages, ko.timestamp, ko.tiebreaker, hit);
long tStart = (long) criterion.startMarker()[0];
long tStop = (long) criterion.stopMarker()[0];
stateMachine.trackSequence(seq, tStart, tStop);
ordinal = ko.ordinal;
if (firstOrdinal == null) {
firstOrdinal = ordinal;
}
if (stage == 0) {
Sequence seq = new Sequence(ko.key, numberOfStages, ordinal, hit);
stateMachine.trackSequence(seq);
} else {
stateMachine.match(currentStage, ko.key, ko.timestamp, ko.tiebreaker, hit);
stateMachine.match(stage, ko.key, ordinal, hit);
// early skip in case of reaching the limit
// check the last stage to avoid calling the state machine in other stages
if (stateMachine.reachedLimit()) {
return;
break;
}
}
}
criterion.startMarker(firstOrdinal);
criterion.stopMarker(ordinal);
}
private KeyAndOrdinal key(SearchHit hit, Criterion criterion) {
@ -139,14 +137,13 @@ class SequenceRuntime implements Executable {
key = new SequenceKey(docKeys);
}
return new KeyAndOrdinal(key, criterion.timestamp(hit), criterion.tiebreaker(hit));
return new KeyAndOrdinal(key, criterion.ordinal(hit));
}
private Payload sequencePayload() {
List<Sequence> completed = stateMachine.completeSequences();
TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
SequencePayload payload = new SequencePayload(completed, false, tookTime, null);
return descending ? new ReversePayload(payload) : payload;
return new SequencePayload(completed, false, tookTime);
}
private boolean hasFinished(int stage) {

View File

@ -13,12 +13,10 @@ public abstract class AbstractPayload implements Payload {
private final boolean timedOut;
private final TimeValue timeTook;
private final Object[] nextKeys;
protected AbstractPayload(boolean timedOut, TimeValue timeTook, Object[] nextKeys) {
protected AbstractPayload(boolean timedOut, TimeValue timeTook) {
this.timedOut = timedOut;
this.timeTook = timeTook;
this.nextKeys = nextKeys;
}
@Override
@ -30,9 +28,4 @@ public abstract class AbstractPayload implements Payload {
public TimeValue timeTook() {
return timeTook;
}
@Override
public Object[] nextKeys() {
return nextKeys;
}
}

View File

@ -37,11 +37,6 @@ public class ReversePayload implements Payload {
return delegate.timeTook();
}
@Override
public Object[] nextKeys() {
return delegate.nextKeys();
}
@Override
public <V> List<V> values() {
return delegate.values();

View File

@ -18,7 +18,7 @@ public class SearchResponsePayload extends AbstractPayload {
private final List<SearchHit> hits;
public SearchResponsePayload(SearchResponse response) {
super(response.isTimedOut(), response.getTook(), null);
super(response.isTimedOut(), response.getTook());
hits = Arrays.asList(response.getHits().getHits());
}

View File

@ -15,22 +15,16 @@ import java.util.Objects;
*/
class Match {
private final long timestamp;
private final Comparable<Object> tiebreaker;
private final Ordinal ordinal;
private final SearchHit hit;
Match(long timestamp, Comparable<Object> tiebreaker, SearchHit hit) {
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
Match(Ordinal ordinal, SearchHit hit) {
this.ordinal = ordinal;
this.hit = hit;
}
long timestamp() {
return timestamp;
}
Comparable<Object> tiebreaker() {
return tiebreaker;
Ordinal ordinal() {
return ordinal;
}
SearchHit hit() {
@ -39,7 +33,7 @@ class Match {
@Override
public int hashCode() {
return Objects.hash(timestamp, tiebreaker, hit);
return Objects.hash(ordinal, hit);
}
@Override
@ -53,13 +47,12 @@ class Match {
}
Match other = (Match) obj;
return Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker)
return Objects.equals(ordinal, other.ordinal)
&& Objects.equals(hit, other.hit);
}
@Override
public String toString() {
return timestamp + "[" + (tiebreaker != null ? tiebreaker : "") + "]->" + hit.getId();
return ordinal.toString() + "->" + hit.getId();
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.sequence;
import java.util.Objects;
public class Ordinal implements Comparable<Ordinal> {
final long timestamp;
final Comparable<Object> tiebreaker;
public Ordinal(long timestamp, Comparable<Object> tiebreaker) {
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
}
@Override
public int hashCode() {
return Objects.hash(timestamp, tiebreaker);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Ordinal other = (Ordinal) obj;
return Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker);
}
@Override
public String toString() {
return "[" + timestamp + "][" + (tiebreaker != null ? tiebreaker.toString() : "") + "]";
}
@Override
public int compareTo(Ordinal o) {
if (timestamp < o.timestamp) {
return -1;
}
if (timestamp == o.timestamp) {
if (tiebreaker != null) {
if (o.tiebreaker != null) {
return tiebreaker.compareTo(o.tiebreaker);
}
// nulls are first - lower than any other value
// other tiebreaker is null this one isn't, fall through 1
}
// null tiebreaker
else {
if (o.tiebreaker != null) {
return -1;
} else {
return 0;
}
}
}
// if none of the branches above matched, this ordinal is greater than o
return 1;
}
public Object[] toArray() {
return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp };
}
}

View File

@ -31,19 +31,19 @@ public class Sequence {
private int currentStage = 0;
public Sequence(SequenceKey key, int stages, long timestamp, Comparable<Object> tiebreaker, SearchHit firstHit) {
public Sequence(SequenceKey key, int stages, Ordinal ordinal, SearchHit firstHit) {
Check.isTrue(stages >= 2, "A sequence requires at least 2 criteria, given [{}]", stages);
this.key = key;
this.stages = stages;
this.matches = new Match[stages];
this.matches[0] = new Match(timestamp, tiebreaker, firstHit);
this.matches[0] = new Match(ordinal, firstHit);
}
public int putMatch(int stage, SearchHit hit, long timestamp, Comparable<Object> tiebreaker) {
public int putMatch(int stage, SearchHit hit, Ordinal ordinal) {
if (stage == currentStage + 1) {
int previousStage = currentStage;
currentStage = stage;
matches[currentStage] = new Match(timestamp, tiebreaker, hit);
matches[currentStage] = new Match(ordinal, hit);
return previousStage;
}
throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage=]", stage, key, currentStage);
@ -53,24 +53,12 @@ public class Sequence {
return key;
}
public int currentStage() {
return currentStage;
public Ordinal ordinal() {
return matches[currentStage].ordinal();
}
public long currentTimestamp() {
return matches[currentStage].timestamp();
}
public Comparable<Object> currentTiebreaker() {
return matches[currentStage].tiebreaker();
}
public long timestamp(int stage) {
// stages not initialized yet return an out-of-band value to have no impact on the interval range
if (stage > currentStage) {
return Long.MAX_VALUE;
}
return matches[stage].timestamp();
public long startTimestamp() {
return matches[0].ordinal().timestamp;
}
public List<SearchHit> hits() {
@ -110,7 +98,7 @@ public class Sequence {
StringBuilder sb = new StringBuilder();
sb.append(format(null, "[Seq<{}>[{}/{}]]",
key,
nf.format(currentStage()),
nf.format(currentStage),
nf.format(stages - 1)));
for (int i = 0; i < matches.length; i++) {

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.collect.Tuple;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Predicate;
import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
@ -21,27 +22,21 @@ public class SequenceFrame {
// timestamp compression (whose range is known for the current frame).
private final List<Sequence> sequences = new LinkedList<>();
// time frame being/end
private long tBegin = Long.MAX_VALUE, tEnd = Long.MIN_VALUE;
private long min = tBegin, max = tEnd;
private Ordinal start, stop;
public void add(Sequence sequence) {
sequences.add(sequence);
long ts = sequence.currentTimestamp();
if (min > ts) {
min = ts;
}
if (max < ts) {
max = ts;
}
}
public void setTimeFrame(long begin, long end) {
if (tBegin > begin) {
tBegin = begin;
}
if (tEnd < end) {
tEnd = end;
Ordinal ordinal = sequence.ordinal();
if (start == null) {
start = ordinal;
stop = ordinal;
} else {
if (start.compareTo(ordinal) > 0) {
start = ordinal;
}
if (stop.compareTo(ordinal) < 0) {
stop = ordinal;
}
}
}
@ -49,53 +44,27 @@ public class SequenceFrame {
* Returns the latest Sequence from the group that has its timestamp
* less than the given argument alongside its position in the list.
*/
public Tuple<Sequence, Integer> before(long timestamp, Comparable<Object> tiebreaker) {
Sequence matchSeq = null;
int matchPos = -1;
int position = -1;
for (Sequence sequence : sequences) {
position++;
// ts only comparison
if (sequence.currentTimestamp() < timestamp) {
matchSeq = sequence;
matchPos = position;
}
// apply tiebreaker (null first, that is null is less than any value)
else if (tiebreaker != null && sequence.currentTimestamp() == timestamp) {
Comparable<Object> tb = sequence.currentTiebreaker();
if (tb == null || tb.compareTo(tiebreaker) < 0) {
matchSeq = sequence;
matchPos = position;
}
} else {
break;
}
}
return matchSeq != null ? new Tuple<>(matchSeq, matchPos) : null;
public Tuple<Sequence, Integer> before(Ordinal ordinal) {
return find(o -> o.compareTo(ordinal) < 0);
}
/**
* Returns the first Sequence from the group that has its timestamp
* greater than the given argument alongside its position in the list.
*/
public Tuple<Sequence, Integer> after(long timestamp, Comparable<Object> tiebreaker) {
public Tuple<Sequence, Integer> after(Ordinal ordinal) {
return find(o -> o.compareTo(ordinal) > 0);
}
private Tuple<Sequence, Integer> find(Predicate<Ordinal> predicate) {
Sequence matchSeq = null;
int matchPos = -1;
int position = -1;
for (Sequence sequence : sequences) {
position++;
// ts only comparison
if (sequence.currentTimestamp() > timestamp) {
if (predicate.test(sequence.ordinal())) {
matchSeq = sequence;
matchPos = position;
}
// apply tiebreaker (null first, that is null is less than any value)
else if (tiebreaker != null && sequence.currentTimestamp() == timestamp) {
Comparable<Object> tb = sequence.currentTiebreaker();
if (tb == null || tb.compareTo(tiebreaker) > 0) {
matchSeq = sequence;
matchPos = position;
}
} else {
break;
}
@ -112,9 +81,9 @@ public class SequenceFrame {
// update min time
if (sequences.isEmpty() == false) {
min = sequences.get(0).currentTimestamp();
start = sequences.get(0).ordinal();
} else {
min = Long.MAX_VALUE;
stop = null;
}
}
@ -124,6 +93,6 @@ public class SequenceFrame {
@Override
public String toString() {
return format(null, "[{}-{}]({} seqs)", tBegin, tEnd, sequences.size());
return format(null, "[{}-{}]({} seqs)", start, stop, sequences.size());
}
}

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.search.Limit;
@ -24,33 +25,25 @@ public class SequenceStateMachine {
/** Current keys on each stage */
private final StageToKeys stageToKeys;
/** minimum timestamp per stage */
/** this ignores the key */
private final long[] timestampMarkers;
private final Comparable<?>[] tiebreakerMarkers;
private final boolean hasTieBreaker;
private final int completionStage;
/** list of completed sequences - separate to avoid polluting the other stages */
private final List<Sequence> completed;
private final long maxSpanInMillis;
private int offset = 0;
private int limit = -1;
private boolean limitReached = false;
@SuppressWarnings("rawtypes")
public SequenceStateMachine(int stages, boolean hasTiebreaker, Limit limit) {
public SequenceStateMachine(int stages, TimeValue maxSpan, Limit limit) {
this.completionStage = stages - 1;
this.stageToKeys = new StageToKeys(completionStage);
this.keyToSequences = new KeyToSequences(completionStage);
this.timestampMarkers = new long[completionStage];
this.tiebreakerMarkers = new Comparable[completionStage];
this.completed = new LinkedList<>();
this.hasTieBreaker = hasTiebreaker;
this.maxSpanInMillis = maxSpan.millis();
// limit && offset
if (limit != null) {
@ -63,34 +56,11 @@ public class SequenceStateMachine {
return completed;
}
public long getTimestampMarker(int stage) {
return timestampMarkers[stage];
}
public Comparable<?> getTiebreakerMarker(int stage) {
return tiebreakerMarkers[stage];
}
public void setTimestampMarker(int stage, long timestamp) {
timestampMarkers[stage] = timestamp;
}
public void setTiebreakerMarker(int stage, Comparable<Object> tiebreaker) {
tiebreakerMarkers[stage] = tiebreaker;
}
public Object[] getMarkers(int stage) {
long ts = timestampMarkers[stage];
Comparable<?> tb = tiebreakerMarkers[stage];
return hasTieBreaker ? new Object[] { ts, tb } : new Object[] { ts };
}
public void trackSequence(Sequence sequence, long tStart, long tStop) {
public void trackSequence(Sequence sequence) {
SequenceKey key = sequence.key();
stageToKeys.keys(0).add(key);
SequenceFrame frame = keyToSequences.frame(0, key);
frame.setTimeFrame(tStart, tStop);
frame.add(sequence);
}
@ -98,23 +68,27 @@ public class SequenceStateMachine {
* Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous
* given stage. If that's the case, update the sequence and the rest of the references.
*/
public boolean match(int stage, SequenceKey key, long timestamp, Comparable<Object> tiebreaker, SearchHit hit) {
public boolean match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) {
int previousStage = stage - 1;
// check key presence to avoid creating a collection
SequenceFrame frame = keyToSequences.frameIfPresent(previousStage, key);
if (frame == null || frame.isEmpty()) {
return false;
}
// pick the sequence with the highest (for ASC) / lowest (for DESC) timestamp lower than current match timestamp
Tuple<Sequence, Integer> neighbour = frame.before(timestamp, tiebreaker);
if (neighbour == null) {
Tuple<Sequence, Integer> before = frame.before(ordinal);
if (before == null) {
return false;
}
Sequence sequence = neighbour.v1();
Sequence sequence = before.v1();
// eliminate the match and all previous values from the frame
frame.trim(neighbour.v2() + 1);
// update sequence
sequence.putMatch(stage, hit, timestamp, tiebreaker);
frame.trim(before.v2() + 1);
// check maxspan before continuing the sequence
if (maxSpanInMillis > 0 && (ordinal.timestamp - sequence.startTimestamp() >= maxSpanInMillis)) {
return false;
}
sequence.putMatch(stage, hit, ordinal);
// remove the frame and keys early (as the key space is large)
if (frame.isEmpty()) {

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.eql.plan.physical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.assembler.ExecutionManager;
import org.elasticsearch.xpack.eql.execution.search.Limit;
@ -33,6 +34,7 @@ public class SequenceExec extends PhysicalPlan {
private final Attribute tiebreaker;
private final Limit limit;
private final OrderDirection direction;
private final TimeValue maxSpan;
public SequenceExec(Source source,
List<List<Attribute>> keys,
@ -41,8 +43,9 @@ public class SequenceExec extends PhysicalPlan {
PhysicalPlan until,
Attribute timestamp,
Attribute tiebreaker,
OrderDirection direction) {
this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tiebreaker, null, direction);
OrderDirection direction,
TimeValue maxSpan) {
this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tiebreaker, null, direction, maxSpan);
}
private SequenceExec(Source source,
@ -51,18 +54,20 @@ public class SequenceExec extends PhysicalPlan {
Attribute ts,
Attribute tb,
Limit limit,
OrderDirection direction) {
OrderDirection direction,
TimeValue maxSpan) {
super(source, children);
this.keys = keys;
this.timestamp = ts;
this.tiebreaker = tb;
this.limit = limit;
this.direction = direction;
this.maxSpan = maxSpan;
}
@Override
protected NodeInfo<SequenceExec> info() {
return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tiebreaker, limit, direction);
return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tiebreaker, limit, direction, maxSpan);
}
@Override
@ -72,7 +77,7 @@ public class SequenceExec extends PhysicalPlan {
children().size(),
newChildren.size());
}
return new SequenceExec(source(), newChildren, keys, timestamp, tiebreaker, limit, direction);
return new SequenceExec(source(), newChildren, keys, timestamp, tiebreaker, limit, direction, maxSpan);
}
@Override
@ -109,12 +114,13 @@ public class SequenceExec extends PhysicalPlan {
}
public SequenceExec with(Limit limit) {
return new SequenceExec(source(), children(), keys(), timestamp(), tiebreaker(), limit, direction);
return new SequenceExec(source(), children(), keys(), timestamp(), tiebreaker(), limit, direction, maxSpan);
}
@Override
public void execute(EqlSession session, ActionListener<Payload> listener) {
new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker(), direction, limit()).execute(listener);
new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker(), direction, maxSpan, limit()).execute(
listener);
}
@Override

View File

@ -78,7 +78,7 @@ class Mapper extends RuleExecutor<PhysicalPlan> {
map(s.until().child()),
s.timestamp(),
s.tiebreaker(),
s.direction());
s.direction(), s.maxSpan());
}
if (p instanceof LocalRelation) {

View File

@ -36,11 +36,6 @@ public class EmptyPayload implements Payload {
return TimeValue.ZERO;
}
@Override
public Object[] nextKeys() {
return null;
}
@Override
public <V> List<V> values() {
return emptyList();

View File

@ -22,7 +22,5 @@ public interface Payload {
TimeValue timeTook();
Object[] nextKeys();
<V> List<V> values();
}

View File

@ -80,7 +80,7 @@ public class SequenceRuntimeTests extends ESTestCase {
private final int ordinal;
TestCriterion(int ordinal) {
super(SearchSourceBuilder.searchSource().size(ordinal), keyExtractors, tsExtractor, tbExtractor);
super(SearchSourceBuilder.searchSource().size(ordinal), keyExtractors, tsExtractor, tbExtractor, false);
this.ordinal = ordinal;
}
@ -144,11 +144,6 @@ public class SequenceRuntimeTests extends ESTestCase {
return TimeValue.ZERO;
}
@Override
public Object[] nextKeys() {
return new Object[0];
}
@SuppressWarnings("unchecked")
@Override
public <V> List<V> values() {
@ -191,7 +186,7 @@ public class SequenceRuntimeTests extends ESTestCase {
SequenceRuntime runtime = new SequenceRuntime(criteria, (r, l) -> {
Map<Integer, Tuple<String, String>> evs = events.get(r.searchSource().size());
l.onResponse(new TestPayload(evs));
}, false, null);
}, TimeValue.MINUS_ONE, null);
// finally make the assertion at the end of the listener
runtime.execute(wrap(this::checkResults, ex -> {

View File

@ -145,6 +145,5 @@ public class LogicalPlanTests extends ESTestCase {
TimeValue maxSpan = seq.maxSpan();
assertEquals(new TimeValue(2, TimeUnit.SECONDS), maxSpan);
}
}
}