EQL: Introduce until functionality (#59292)

Sequences now support until conditional, which prevents a match from
occurring if the until matches a document while doing look-ups.
Thus a sequence must complete before the until condition matches - if
any document within the sequence occurs at, or after, the until hit, the
sequence is discarded.

(cherry picked from commit 1ba1b9f0661aee655aa48cf9475ac61aaee2bfda)
This commit is contained in:
Costin Leau 2020-07-09 15:55:14 +03:00 committed by Costin Leau
parent d07b11b86b
commit d9c1e531db
14 changed files with 508 additions and 237 deletions

View File

@ -25,6 +25,7 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -159,7 +160,10 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
protected void assertSearchHits(List<SearchHit> events) { protected void assertSearchHits(List<SearchHit> events) {
assertNotNull(events); assertNotNull(events);
assertArrayEquals("unexpected result for spec: [" + spec.toString() + "]", spec.expectedEventIds(), extractIds(events)); long[] expected = spec.expectedEventIds();
long[] actual = extractIds(events);
assertArrayEquals("unexpected result for spec: [" + spec.toString() + "]" + Arrays.toString(expected) + " vs " + Arrays.toString(
actual), expected, actual);
} }
private static long[] extractIds(List<SearchHit> events) { private static long[] extractIds(List<SearchHit> events) {

View File

@ -345,36 +345,9 @@ process where true
| sort serial_event_id | sort serial_event_id
''' '''
[[queries]] [[queries]]
name = "fourSequencesByPidWithUntil1" name = "fourSequencesByPidWithUntil1"
query = ''' query = '''
sequence
[process where opcode == 1] by unique_pid
[file where opcode == 0] by unique_pid
[file where opcode == 0] by unique_pid
[file where opcode == 0] by unique_pid
until
[file where opcode == 2] by unique_pid
'''
expected_event_ids = []
[[queries]]
name = "fourSequencesByPidWithUntil2"
query = '''
sequence
[process where opcode == 1] by unique_pid
[file where opcode == 0] by unique_pid
[file where opcode == 0] by unique_pid
[file where opcode == 0] by unique_pid
until
[file where opcode == 200] by unique_pid
'''
expected_event_ids = [54, 55, 61, 67]
[[queries]]
name = "doubleSameSequenceWithByAndFilter"
query = '''
sequence sequence
[file where opcode=0] by unique_pid [file where opcode=0] by unique_pid
[file where opcode=0] by unique_pid [file where opcode=0] by unique_pid
@ -385,17 +358,6 @@ expected_event_ids = [87, 92]
[[queries]] [[queries]]
name = "doubleSameSequenceWithByUntilAndHead2" name = "doubleSameSequenceWithByUntilAndHead2"
query = ''' query = '''
sequence
[file where opcode=0 and file_name="*.exe"] by unique_pid
[file where opcode=0 and file_name="*.exe"] by unique_pid
until [process where opcode=1] by unique_ppid
| head 1
'''
expected_event_ids = []
[[queries]]
name = "doubleJoinWithByUntilAndHead"
query = '''
join join
[file where opcode=0 and file_name="*.exe"] by unique_pid [file where opcode=0 and file_name="*.exe"] by unique_pid
[file where opcode=2 and file_name="*.exe"] by unique_pid [file where opcode=2 and file_name="*.exe"] by unique_pid
@ -698,29 +660,6 @@ expected_event_ids = [48, 50, 51, 54, 93]
[[queries]] [[queries]]
name = "twoSequencesWithTwoKeysAndUntil" name = "twoSequencesWithTwoKeysAndUntil"
query = ''' query = '''
sequence by user_name
[file where opcode=0] by pid,file_path
[file where opcode=2] by pid,file_path
until
[process where opcode == 2] by ppid,process_path
'''
expected_event_ids = []
[[queries]]
name = "twoSequencesWithUntil"
query = '''
sequence by user_name
[file where opcode=0] by pid,file_path
[file where opcode=2] by pid,file_path
until
[process where opcode == 5] by ppid,process_path
| head 2
'''
expected_event_ids = [55, 59, 61, 65]
[[queries]]
name = "twoSequencesWithHead"
query = '''
join by user_name join by user_name
[file where true] by pid,file_path [file where true] by pid,file_path
[process where true] by ppid,process_path [process where true] by ppid,process_path

View File

@ -75,6 +75,10 @@ public class BoxedQueryRequest implements QueryRequest {
return this; return this;
} }
public Ordinal after() {
return after;
}
public Ordinal from() { public Ordinal from() {
return from; return from;
} }

View File

@ -20,6 +20,14 @@ public class KeyAndOrdinal {
this.ordinal = ordinal; this.ordinal = ordinal;
} }
public SequenceKey key() {
return key;
}
public Ordinal ordinal() {
return ordinal;
}
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(key, ordinal); return Objects.hash(key, ordinal);

View File

@ -5,11 +5,12 @@
*/ */
package org.elasticsearch.xpack.eql.execution.assembler; package org.elasticsearch.xpack.eql.execution.assembler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.search.Limit; import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.execution.sequence.Sequence; import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine; import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine;
import org.elasticsearch.xpack.eql.session.Payload; import org.elasticsearch.xpack.eql.session.Payload;
@ -21,6 +22,8 @@ import java.util.List;
*/ */
class Matcher { class Matcher {
private final Logger log = LogManager.getLogger(Matcher.class);
// NB: just like in a list, this represents the total number of stages yet counting starts at 0 // NB: just like in a list, this represents the total number of stages yet counting starts at 0
private final SequenceStateMachine stateMachine; private final SequenceStateMachine stateMachine;
private final int numberOfStages; private final int numberOfStages;
@ -48,27 +51,33 @@ class Matcher {
// early skip in case of reaching the limit // early skip in case of reaching the limit
// check the last stage to avoid calling the state machine in other stages // check the last stage to avoid calling the state machine in other stages
if (stateMachine.reachedLimit()) { if (stateMachine.reachedLimit()) {
log.trace("Limit reached {}", stateMachine.stats());
return false; return false;
} }
} }
} }
log.trace("{}", stateMachine.stats());
return true; return true;
} }
boolean until(Iterable<Ordinal> markers) { void until(Iterable<KeyAndOrdinal> markers) {
// no-op so far stateMachine.until(markers);
return false;
} }
public boolean hasCandidates(int stage) { boolean hasCandidates(int stage) {
return stateMachine.hasCandidates(stage); return stateMachine.hasCandidates(stage);
} }
void dropUntil() {
stateMachine.dropUntil();
}
Payload payload(long startTime) { Payload payload(long startTime) {
List<Sequence> completed = stateMachine.completeSequences(); List<Sequence> completed = stateMachine.completeSequences();
TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime); TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
return new SequencePayload(completed, false, tookTime); Payload p = new SequencePayload(completed, false, tookTime);
stateMachine.clear();
return p;
} }
@Override @Override

View File

@ -47,11 +47,10 @@ public class TumblingWindow implements Executable {
private static class WindowInfo { private static class WindowInfo {
private final int baseStage; private final int baseStage;
private final Ordinal begin, end; private final Ordinal end;
WindowInfo(int baseStage, Ordinal begin, Ordinal end) { WindowInfo(int baseStage, Ordinal end) {
this.baseStage = baseStage; this.baseStage = baseStage;
this.begin = begin;
this.end = end; this.end = end;
} }
} }
@ -93,18 +92,29 @@ public class TumblingWindow implements Executable {
Criterion<BoxedQueryRequest> base = criteria.get(baseStage); Criterion<BoxedQueryRequest> base = criteria.get(baseStage);
List<SearchHit> hits = p.values(); List<SearchHit> hits = p.values();
log.trace("Found [{}] hits", hits.size());
if (hits.isEmpty() == false) { if (hits.isEmpty() == false) {
if (matcher.match(baseStage, wrapValues(base, hits)) == false) { if (matcher.match(baseStage, wrapValues(base, hits)) == false) {
listener.onResponse(payload()); listener.onResponse(payload());
return; return;
} }
} }
// empty or only one result means there aren't going to be any matches
// only one result means there aren't going to be any matches
// so move the window boxing to the next stage // so move the window boxing to the next stage
if (hits.size() < 2) { if (hits.size() < 2) {
// if there are still candidates, advance the window base // if there are still candidates, advance the window base
if (matcher.hasCandidates(baseStage) && baseStage + 1 < maxStages) { if (matcher.hasCandidates(baseStage) && baseStage + 1 < maxStages) {
advance(baseStage + 1, listener); Runnable next = () -> advance(baseStage + 1, listener);
if (until != null && hits.size() == 1) {
// find "until" ordinals - early on to discard data in-flight to avoid matching
// hits that can occur in other documents
untilCriterion(new WindowInfo(baseStage, base.ordinal(hits.get(0))), listener, next);
} else {
next.run();
}
} }
// there aren't going to be any matches so cancel search // there aren't going to be any matches so cancel search
else { else {
@ -122,37 +132,72 @@ public class TumblingWindow implements Executable {
log.trace("Found base [{}] window {}->{}", base.stage(), begin, end); log.trace("Found base [{}] window {}->{}", base.stage(), begin, end);
// find until ordinals WindowInfo info = new WindowInfo(baseStage, end);
//NB: not currently implemented
// no more queries to run // no more queries to run
if (baseStage + 1 < maxStages) { if (baseStage + 1 < maxStages) {
secondaryCriterion(new WindowInfo(baseStage, begin, end), baseStage + 1, listener); Runnable next = () -> secondaryCriterion(info, baseStage + 1, listener);
if (until != null) {
// find "until" ordinals - early on to discard data in-flight to avoid matching
// hits that can occur in other documents
untilCriterion(info, listener, next);
} else {
next.run();
}
} else { } else {
advance(baseStage, listener); advance(baseStage, listener);
} }
} }
private void untilCriterion(WindowInfo window, ActionListener<Payload> listener, Runnable next) {
final BoxedQueryRequest request = until.queryRequest();
// before doing a new query, clean all previous until hits
// including dropping any in-flight sequences that were not dropped (because they did not match)
matcher.dropUntil();
final boolean reversed = boxQuery(window, until);
log.trace("Querying until stage {}", request);
client.query(request, wrap(p -> {
List<SearchHit> hits = p.values();
log.trace("Found [{}] hits", hits.size());
// no more results for until - let the other queries run
if (hits.isEmpty()) {
// put the markers in place before the next call
if (reversed) {
request.to(window.end);
} else {
request.from(window.end);
}
} else {
// prepare the query for the next search
request.nextAfter(until.ordinal(hits.get(hits.size() - 1)));
// if the limit has been reached, return what's available
matcher.until(wrapUntilValues(wrapValues(until, hits)));
}
// keep running the query runs out of the results (essentially returns less than what we want)
if (hits.size() == windowSize) {
untilCriterion(window, listener, next);
}
// looks like this stage is done, move on
else {
// to the next query
next.run();
}
}, listener::onFailure));
}
private void secondaryCriterion(WindowInfo window, int currentStage, ActionListener<Payload> listener) { private void secondaryCriterion(WindowInfo window, int currentStage, ActionListener<Payload> listener) {
final Criterion<BoxedQueryRequest> criterion = criteria.get(currentStage); final Criterion<BoxedQueryRequest> criterion = criteria.get(currentStage);
final BoxedQueryRequest request = criterion.queryRequest(); final BoxedQueryRequest request = criterion.queryRequest();
Criterion<BoxedQueryRequest> base = criteria.get(window.baseStage);
// first box the query final boolean reversed = boxQuery(window, criterion);
// only the first base can be descending
// all subsequence queries are ascending
if (criterion.reverse() != base.reverse()) {
if (window.end.equals(request.from()) == false) {
// if that's the case, set the starting point
request.from(window.end);
// reposition the pointer
request.nextAfter(window.end);
}
} else {
// otherwise just the upper limit
request.to(window.end);
}
log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request); log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request);
@ -160,10 +205,11 @@ public class TumblingWindow implements Executable {
List<SearchHit> hits = p.values(); List<SearchHit> hits = p.values();
log.trace("Found [{}] hits", hits.size()); log.trace("Found [{}] hits", hits.size());
// no more results for this query // no more results for this query
if (hits.isEmpty()) { if (hits.isEmpty()) {
// put the markers in place before the next call // put the markers in place before the next call
if (criterion.reverse() != base.reverse()) { if (reversed) {
request.to(window.end); request.to(window.end);
} else { } else {
request.from(window.end); request.from(window.end);
@ -207,6 +253,32 @@ public class TumblingWindow implements Executable {
}, listener::onFailure)); }, listener::onFailure));
} }
/**
* Box the query for the given criterion based on the window information.
* Returns a boolean indicating whether reversal has been applied or not.
*/
private boolean boxQuery(WindowInfo window, Criterion<BoxedQueryRequest> criterion) {
final BoxedQueryRequest request = criterion.queryRequest();
Criterion<BoxedQueryRequest> base = criteria.get(window.baseStage);
// first box the query
// only the first base can be descending
// all subsequence queries are ascending
if (criterion.reverse() != base.reverse()) {
if (window.end.equals(request.from()) == false) {
// if that's the case, set the starting point
request.from(window.end);
// reposition the pointer
request.nextAfter(window.end);
}
} else {
// otherwise just the upper limit
request.to(window.end);
}
return criterion.reverse() != base.reverse();
}
Iterable<Tuple<KeyAndOrdinal, SearchHit>> wrapValues(Criterion<?> criterion, List<SearchHit> hits) { Iterable<Tuple<KeyAndOrdinal, SearchHit>> wrapValues(Criterion<?> criterion, List<SearchHit> hits) {
return () -> { return () -> {
final Iterator<SearchHit> iter = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator(); final Iterator<SearchHit> iter = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator();
@ -229,6 +301,25 @@ public class TumblingWindow implements Executable {
}; };
} }
Iterable<KeyAndOrdinal> wrapUntilValues(Iterable<Tuple<KeyAndOrdinal, SearchHit>> iterable) {
return () -> {
final Iterator<Tuple<KeyAndOrdinal, SearchHit>> iter = iterable.iterator();
return new Iterator<KeyAndOrdinal>() {
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public KeyAndOrdinal next() {
return iter.next().v1();
}
};
};
}
Payload payload() { Payload payload() {
return matcher.payload(startTime); return matcher.payload(startTime);
} }

View File

@ -77,6 +77,10 @@ public class Ordinal implements Comparable<Ordinal> {
return 1; return 1;
} }
public boolean between(Ordinal left, Ordinal right) {
return (compareTo(left) <= 0 && compareTo(right) >= 0) || (compareTo(right) <= 0 && compareTo(left) >= 0);
}
public Object[] toArray() { public Object[] toArray() {
return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp }; return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp };
} }

View File

@ -6,8 +6,13 @@
package org.elasticsearch.xpack.eql.execution.sequence; package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.xpack.eql.execution.assembler.KeyAndOrdinal;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
/** Dedicated collection for mapping a key to a list of sequences */ /** Dedicated collection for mapping a key to a list of sequences */
/** The list represents the sequence for each stage (based on its index) and is fixed in size */ /** The list represents the sequence for each stage (based on its index) and is fixed in size */
@ -17,19 +22,16 @@ class KeyToSequences {
private final int listSize; private final int listSize;
/** for each key, associate the frame per state (determined by index) */ /** for each key, associate the frame per state (determined by index) */
private final Map<SequenceKey, SequenceGroup[]> keyToSequences; private final Map<SequenceKey, SequenceGroup[]> keyToSequences;
private final Map<SequenceKey, UntilGroup> keyToUntil;
KeyToSequences(int listSize) { KeyToSequences(int listSize) {
this.listSize = listSize; this.listSize = listSize;
this.keyToSequences = new LinkedHashMap<>(); this.keyToSequences = new LinkedHashMap<>();
this.keyToUntil = new LinkedHashMap<>();
} }
private SequenceGroup[] group(SequenceKey key) { private SequenceGroup[] group(SequenceKey key) {
SequenceGroup[] groups = keyToSequences.get(key); return keyToSequences.computeIfAbsent(key, k -> new SequenceGroup[listSize]);
if (groups == null) {
groups = new SequenceGroup[listSize];
keyToSequences.put(key, groups);
}
return groups;
} }
SequenceGroup groupIfPresent(int stage, SequenceKey key) { SequenceGroup groupIfPresent(int stage, SequenceKey key) {
@ -37,6 +39,10 @@ class KeyToSequences {
return groups == null ? null : groups[stage]; return groups == null ? null : groups[stage];
} }
UntilGroup untilIfPresent(SequenceKey key) {
return keyToUntil.get(key);
}
void add(int stage, Sequence sequence) { void add(int stage, Sequence sequence) {
SequenceKey key = sequence.key(); SequenceKey key = sequence.key();
SequenceGroup[] groups = group(key); SequenceGroup[] groups = group(key);
@ -47,7 +53,64 @@ class KeyToSequences {
groups[stage].add(sequence); groups[stage].add(sequence);
} }
void until(Iterable<KeyAndOrdinal> until) {
for (KeyAndOrdinal keyAndOrdinal : until) {
// ignore unknown keys
SequenceKey key = keyAndOrdinal.key();
if (keyToSequences.containsKey(key)) {
UntilGroup group = keyToUntil.computeIfAbsent(key, UntilGroup::new);
group.add(keyAndOrdinal);
}
}
}
void remove(int stage, SequenceGroup group) {
SequenceKey key = group.key();
SequenceGroup[] groups = keyToSequences.get(key);
groups[stage] = null;
// clean-up the key if all groups are empty
boolean shouldRemoveKey = true;
for (SequenceGroup gp : groups) {
if (gp != null && gp.isEmpty() == false) {
shouldRemoveKey = false;
break;
}
}
if (shouldRemoveKey) {
keyToSequences.remove(key);
}
}
void dropUntil() {
// clean-up all candidates that occur before until
for (Entry<SequenceKey, UntilGroup> entry : keyToUntil.entrySet()) {
SequenceGroup[] groups = keyToSequences.get(entry.getKey());
if (groups != null) {
for (Ordinal o : entry.getValue()) {
for (SequenceGroup group : groups) {
if (group != null) {
group.trimBefore(o);
}
}
}
}
}
keyToUntil.clear();
}
public void clear() {
keyToSequences.clear();
keyToUntil.clear();
}
int numberOfKeys() { int numberOfKeys() {
return keyToSequences.size(); return keyToSequences.size();
} }
@Override
public String toString() {
return LoggerMessageFormat.format(null, "Keys=[{}], Until=[{}]", keyToSequences.size(), keyToUntil.size());
}
} }

View File

@ -0,0 +1,155 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
/** List of in-flight ordinals for a given key. For fast lookup, typically associated with a stage. */
abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
private final SequenceKey key;
private final Function<E, Ordinal> extractor;
// NB: since the size varies significantly, use a LinkedList
// Considering the order it might make sense to use a B-Tree+ for faster lookups which should work well with
// timestamp compression (whose range is known for the current frame).
private final List<E> elements = new LinkedList<>();
private int hashCode = 0;
private Ordinal start, stop;
protected OrdinalGroup(SequenceKey key, Function<E, Ordinal> extractor) {
this.key = key;
hashCode = key.hashCode();
this.extractor = extractor;
}
public SequenceKey key() {
return key;
}
public void add(E element) {
elements.add(element);
hashCode = 31 * hashCode + Objects.hashCode(element);
Ordinal ordinal = extractor.apply(element);
if (start == null) {
start = ordinal;
} else if (stop == null) {
stop = ordinal;
} else {
if (start.compareTo(ordinal) > 0) {
start = ordinal;
}
if (stop.compareTo(ordinal) < 0) {
stop = ordinal;
}
}
}
/**
* Returns the latest element from the group that has its timestamp
* less than the given argument alongside its position in the list.
* The element and everything before it is removed.
*/
E trimBefore(Ordinal ordinal) {
Tuple<E, Integer> match = findBefore(ordinal);
// trim
if (match != null) {
elements.subList(0, match.v2() + 1).clear();
// update min time
if (elements.isEmpty() == false) {
start = extractor.apply(elements.get(0));
} else {
start = null;
stop = null;
}
}
return match != null ? match.v1() : null;
}
E before(Ordinal ordinal) {
Tuple<E, Integer> match = findBefore(ordinal);
return match != null ? match.v1() : null;
}
private Tuple<E, Integer> findBefore(Ordinal ordinal) {
E match = null;
int matchPos = -1;
int position = -1;
for (E element : elements) {
position++;
Ordinal o = extractor.apply(element);
if (o.compareTo(ordinal) < 0) {
match = element;
matchPos = position;
} else {
break;
}
}
return match != null ? new Tuple<>(match, matchPos) : null;
}
public boolean isEmpty() {
return elements.isEmpty();
}
@Override
public Iterator<Ordinal> iterator() {
return new Iterator<Ordinal>() {
final Iterator<E> iter = elements.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public Ordinal next() {
return extractor.apply(iter.next());
}
};
}
@Override
public int hashCode() {
return key.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
OrdinalGroup<?> other = (OrdinalGroup<?>) obj;
return Objects.equals(key, other.key)
&& Objects.equals(hashCode, other.hashCode);
}
@Override
public String toString() {
return format(null, "[{}][{}-{}]({} seqs)", key, start, stop, elements.size());
}
}

View File

@ -47,7 +47,7 @@ public class Sequence {
matches[currentStage] = new Match(ordinal, hit); matches[currentStage] = new Match(ordinal, hit);
return previousStage; return previousStage;
} }
throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage=]", stage, key, currentStage); throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage={}]", stage, key, currentStage);
} }
public SequenceKey key() { public SequenceKey key() {
@ -58,8 +58,8 @@ public class Sequence {
return matches[currentStage].ordinal(); return matches[currentStage].ordinal();
} }
public long startTimestamp() { public Ordinal startOrdinal() {
return matches[0].ordinal().timestamp(); return matches[0].ordinal();
} }
public List<SearchHit> hits() { public List<SearchHit> hits() {

View File

@ -6,122 +6,9 @@
package org.elasticsearch.xpack.eql.execution.sequence; package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.common.collect.Tuple; public class SequenceGroup extends OrdinalGroup<Sequence> {
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
/** List of in-flight sequences for a given key. For fast lookup, typically associated with a stage. */
public class SequenceGroup {
private final SequenceKey key;
// NB: since the size varies significantly, use a LinkedList
// Considering the order it might make sense to use a B-Tree+ for faster lookups which should work well with
// timestamp compression (whose range is known for the current frame).
private final List<Sequence> sequences = new LinkedList<>();
private Ordinal start, stop;
SequenceGroup(SequenceKey key) { SequenceGroup(SequenceKey key) {
this.key = key; super(key, Sequence::ordinal);
}
public void add(Sequence sequence) {
sequences.add(sequence);
Ordinal ordinal = sequence.ordinal();
if (start == null) {
start = ordinal;
} else if (stop == null) {
stop = ordinal;
} else {
if (start.compareTo(ordinal) > 0) {
start = ordinal;
}
if (stop.compareTo(ordinal) < 0) {
stop = ordinal;
}
}
}
/**
* Returns the latest Sequence from the group that has its timestamp
* less than the given argument alongside its position in the list.
*/
public Tuple<Sequence, Integer> before(Ordinal ordinal) {
return find(o -> o.compareTo(ordinal) < 0);
}
/**
* Returns the first Sequence from the group that has its timestamp
* greater than the given argument alongside its position in the list.
*/
public Tuple<Sequence, Integer> after(Ordinal ordinal) {
return find(o -> o.compareTo(ordinal) > 0);
}
private Tuple<Sequence, Integer> find(Predicate<Ordinal> predicate) {
Sequence matchSeq = null;
int matchPos = -1;
int position = -1;
for (Sequence sequence : sequences) {
position++;
if (predicate.test(sequence.ordinal())) {
matchSeq = sequence;
matchPos = position;
} else {
break;
}
}
return matchSeq != null ? new Tuple<>(matchSeq, matchPos) : null;
}
public boolean isEmpty() {
return sequences.isEmpty();
}
public void trim(int position) {
sequences.subList(0, position).clear();
// update min time
if (sequences.isEmpty() == false) {
start = sequences.get(0).ordinal();
} else {
start = null;
stop = null;
}
}
public List<Sequence> sequences() {
return sequences;
}
@Override
public int hashCode() {
return key.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
SequenceGroup other = (SequenceGroup) obj;
return Objects.equals(key, other.key);
}
@Override
public String toString() {
return format(null, "[{}][{}-{}]({} seqs)", key, start, stop, sequences.size());
} }
} }

View File

@ -6,10 +6,10 @@
package org.elasticsearch.xpack.eql.execution.sequence; package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.assembler.KeyAndOrdinal;
import org.elasticsearch.xpack.eql.execution.search.Limit; import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.eql.execution.search.Ordinal;
@ -21,6 +21,32 @@ import java.util.List;
*/ */
public class SequenceStateMachine { public class SequenceStateMachine {
static class Stats {
long seen = 0;
long ignored = 0;
long until = 0;
long rejectionMaxspan = 0;
long rejectionUntil = 0;
@Override
public String toString() {
return LoggerMessageFormat.format(null, "Stats: Seen [{}]/Ignored [{}]/Until [{}]/Rejected {Maxspan [{}]/Until [{}]}",
seen,
ignored,
until,
rejectionMaxspan,
rejectionUntil);
}
public void clear() {
seen = 0;
ignored = 0;
until = 0;
rejectionMaxspan = 0;
rejectionUntil = 0;
}
}
/** Current sequences for each key */ /** Current sequences for each key */
/** Note will be multiple sequences for the same key and the same stage with different timestamps */ /** Note will be multiple sequences for the same key and the same stage with different timestamps */
private final KeyToSequences keyToSequences; private final KeyToSequences keyToSequences;
@ -37,6 +63,8 @@ public class SequenceStateMachine {
private int limit = -1; private int limit = -1;
private boolean limitReached = false; private boolean limitReached = false;
private final Stats stats = new Stats();
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public SequenceStateMachine(int stages, TimeValue maxSpan, Limit limit) { public SequenceStateMachine(int stages, TimeValue maxSpan, Limit limit) {
this.completionStage = stages - 1; this.completionStage = stages - 1;
@ -61,8 +89,10 @@ public class SequenceStateMachine {
public void trackSequence(Sequence sequence) { public void trackSequence(Sequence sequence) {
SequenceKey key = sequence.key(); SequenceKey key = sequence.key();
stageToKeys.keys(0).add(key); stageToKeys.add(0, key);
keyToSequences.add(0, sequence); keyToSequences.add(0, sequence);
stats.seen++;
} }
/** /**
@ -70,30 +100,52 @@ public class SequenceStateMachine {
* given stage. If that's the case, update the sequence and the rest of the references. * given stage. If that's the case, update the sequence and the rest of the references.
*/ */
public void match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) { public void match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) {
stats.seen++;
int previousStage = stage - 1; int previousStage = stage - 1;
// check key presence to avoid creating a collection // check key presence to avoid creating a collection
SequenceGroup group = keyToSequences.groupIfPresent(previousStage, key); SequenceGroup group = keyToSequences.groupIfPresent(previousStage, key);
if (group == null || group.isEmpty()) { if (group == null || group.isEmpty()) {
stats.ignored++;
return; return;
} }
Tuple<Sequence, Integer> before = group.before(ordinal);
if (before == null) {
return;
}
Sequence sequence = before.v1();
// eliminate the match and all previous values from the frame
group.trim(before.v2() + 1);
// remove the frame and keys early (as the key space is large) // eliminate the match and all previous values from the group
Sequence sequence = group.trimBefore(ordinal);
if (sequence == null) {
stats.ignored++;
return;
}
// remove the group early (as the key space is large)
if (group.isEmpty()) { if (group.isEmpty()) {
stageToKeys.keys(previousStage).remove(key); keyToSequences.remove(previousStage, group);
stageToKeys.remove(previousStage, key);
} }
// check maxspan before continuing the sequence //
if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startTimestamp() > maxSpanInMillis)) { // Conditional checks
//
// maxspan
if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startOrdinal().timestamp() > maxSpanInMillis)) {
stats.rejectionMaxspan++;
return; return;
} }
// until
UntilGroup until = keyToSequences.untilIfPresent(key);
if (until != null) {
KeyAndOrdinal nearestUntil = until.before(ordinal);
if (nearestUntil != null) {
// check if until matches
if (nearestUntil.ordinal().between(sequence.ordinal(), ordinal)) {
stats.rejectionUntil++;
return;
}
}
}
sequence.putMatch(stage, hit, ordinal); sequence.putMatch(stage, hit, ordinal);
// bump the stages // bump the stages
@ -109,7 +161,7 @@ public class SequenceStateMachine {
} }
} }
} else { } else {
stageToKeys.keys(stage).add(key); stageToKeys.add(stage, key);
keyToSequences.add(stage, sequence); keyToSequences.add(stage, sequence);
} }
} }
@ -127,17 +179,36 @@ public class SequenceStateMachine {
*/ */
public boolean hasCandidates(int stage) { public boolean hasCandidates(int stage) {
for (int i = stage; i < completionStage; i++) { for (int i = stage; i < completionStage; i++) {
if (stageToKeys.keys(i).isEmpty() == false) { if (stageToKeys.isEmpty(i) == false) {
return true; return true;
} }
} }
return false; return false;
} }
public void dropUntil() {
keyToSequences.dropUntil();
}
public void until(Iterable<KeyAndOrdinal> markers) {
keyToSequences.until(markers);
}
public Stats stats() {
return stats;
}
public void clear() {
stats.clear();
keyToSequences.clear();
stageToKeys.clear();
completed.clear();
}
@Override @Override
public String toString() { public String toString() {
return LoggerMessageFormat.format(null, "Tracking [{}] keys with [{}] completed and in-flight {}", return LoggerMessageFormat.format(null, "Tracking [{}] keys with [{}] completed and in-flight {}",
keyToSequences.numberOfKeys(), keyToSequences,
completed.size(), completed.size(),
stageToKeys); stageToKeys);
} }

View File

@ -23,14 +23,34 @@ class StageToKeys {
this.stageToKey = Arrays.asList(new Set[stages]); this.stageToKey = Arrays.asList(new Set[stages]);
} }
Set<SequenceKey> keys(int stage) { void add(int stage, SequenceKey key) {
Set<SequenceKey> set = stageToKey.get(stage); Set<SequenceKey> set = stageToKey.get(stage);
if (set == null) { if (set == null) {
// TODO: could we use an allocation strategy? // TODO: could we use an allocation strategy?
set = new LinkedHashSet<>(); set = new LinkedHashSet<>();
stageToKey.set(stage, set); stageToKey.set(stage, set);
} }
return set; set.add(key);
}
void remove(int stage, SequenceKey key) {
Set<SequenceKey> set = stageToKey.get(stage);
if (set != null) {
set.remove(key);
}
}
boolean isEmpty(int stage) {
Set<SequenceKey> set = stageToKey.get(stage);
return set == null || set.isEmpty();
}
void clear() {
for (Set<SequenceKey> set : stageToKey) {
if (set != null) {
set.clear();
}
}
} }
@Override @Override

View File

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.xpack.eql.execution.assembler.KeyAndOrdinal;
public class UntilGroup extends OrdinalGroup<KeyAndOrdinal> {
UntilGroup(SequenceKey key) {
super(key, KeyAndOrdinal::ordinal);
}
}