diff --git a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java index 023365ffa64..e3954c540d9 100644 --- a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java +++ b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java @@ -25,6 +25,7 @@ import org.junit.Before; import org.junit.BeforeClass; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -159,7 +160,10 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { protected void assertSearchHits(List events) { assertNotNull(events); - assertArrayEquals("unexpected result for spec: [" + spec.toString() + "]", spec.expectedEventIds(), extractIds(events)); + long[] expected = spec.expectedEventIds(); + long[] actual = extractIds(events); + assertArrayEquals("unexpected result for spec: [" + spec.toString() + "]" + Arrays.toString(expected) + " vs " + Arrays.toString( + actual), expected, actual); } private static long[] extractIds(List events) { diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml index a2e59bed940..f307d8dc4c2 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml +++ b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml @@ -345,36 +345,9 @@ process where true | sort serial_event_id ''' - [[queries]] name = "fourSequencesByPidWithUntil1" query = ''' -sequence - [process where opcode == 1] by unique_pid - [file where opcode == 0] by unique_pid - [file where opcode == 0] by unique_pid - [file where opcode == 0] by unique_pid -until - [file where opcode == 2] by unique_pid -''' -expected_event_ids = [] - -[[queries]] -name = "fourSequencesByPidWithUntil2" -query = ''' -sequence - [process where opcode == 1] by unique_pid - [file where opcode == 0] by unique_pid - [file where opcode == 0] by unique_pid - [file where opcode == 0] by unique_pid -until - [file where opcode == 200] by unique_pid -''' -expected_event_ids = [54, 55, 61, 67] - -[[queries]] -name = "doubleSameSequenceWithByAndFilter" -query = ''' sequence [file where opcode=0] by unique_pid [file where opcode=0] by unique_pid @@ -385,17 +358,6 @@ expected_event_ids = [87, 92] [[queries]] name = "doubleSameSequenceWithByUntilAndHead2" query = ''' -sequence - [file where opcode=0 and file_name="*.exe"] by unique_pid - [file where opcode=0 and file_name="*.exe"] by unique_pid -until [process where opcode=1] by unique_ppid -| head 1 -''' -expected_event_ids = [] - -[[queries]] -name = "doubleJoinWithByUntilAndHead" -query = ''' join [file where opcode=0 and file_name="*.exe"] by unique_pid [file where opcode=2 and file_name="*.exe"] by unique_pid @@ -698,29 +660,6 @@ expected_event_ids = [48, 50, 51, 54, 93] [[queries]] name = "twoSequencesWithTwoKeysAndUntil" query = ''' -sequence by user_name - [file where opcode=0] by pid,file_path - [file where opcode=2] by pid,file_path -until - [process where opcode == 2] by ppid,process_path -''' -expected_event_ids = [] - -[[queries]] -name = "twoSequencesWithUntil" -query = ''' -sequence by user_name - [file where opcode=0] by pid,file_path - [file where opcode=2] by pid,file_path -until - [process where opcode == 5] by ppid,process_path -| head 2 -''' -expected_event_ids = [55, 59, 61, 65] - -[[queries]] -name = "twoSequencesWithHead" -query = ''' join by user_name [file where true] by pid,file_path [process where true] by ppid,process_path diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java index 99d9671f16e..5908e1ec91f 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java @@ -75,6 +75,10 @@ public class BoxedQueryRequest implements QueryRequest { return this; } + public Ordinal after() { + return after; + } + public Ordinal from() { return from; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java index 69d00486794..4a47705dcda 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java @@ -20,6 +20,14 @@ public class KeyAndOrdinal { this.ordinal = ordinal; } + public SequenceKey key() { + return key; + } + + public Ordinal ordinal() { + return ordinal; + } + @Override public int hashCode() { return Objects.hash(key, ordinal); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java index 84c929a9ec2..0ab1fec7e18 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java @@ -5,11 +5,12 @@ */ package org.elasticsearch.xpack.eql.execution.assembler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.execution.search.Limit; -import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.eql.execution.sequence.Sequence; import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine; import org.elasticsearch.xpack.eql.session.Payload; @@ -21,6 +22,8 @@ import java.util.List; */ class Matcher { + private final Logger log = LogManager.getLogger(Matcher.class); + // NB: just like in a list, this represents the total number of stages yet counting starts at 0 private final SequenceStateMachine stateMachine; private final int numberOfStages; @@ -48,27 +51,33 @@ class Matcher { // early skip in case of reaching the limit // check the last stage to avoid calling the state machine in other stages if (stateMachine.reachedLimit()) { + log.trace("Limit reached {}", stateMachine.stats()); return false; } } } + log.trace("{}", stateMachine.stats()); return true; } - boolean until(Iterable markers) { - // no-op so far - - return false; + void until(Iterable markers) { + stateMachine.until(markers); } - public boolean hasCandidates(int stage) { + boolean hasCandidates(int stage) { return stateMachine.hasCandidates(stage); } + void dropUntil() { + stateMachine.dropUntil(); + } + Payload payload(long startTime) { List completed = stateMachine.completeSequences(); TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime); - return new SequencePayload(completed, false, tookTime); + Payload p = new SequencePayload(completed, false, tookTime); + stateMachine.clear(); + return p; } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java index 4e16bfa8286..1a2197a651f 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java @@ -47,11 +47,10 @@ public class TumblingWindow implements Executable { private static class WindowInfo { private final int baseStage; - private final Ordinal begin, end; + private final Ordinal end; - WindowInfo(int baseStage, Ordinal begin, Ordinal end) { + WindowInfo(int baseStage, Ordinal end) { this.baseStage = baseStage; - this.begin = begin; this.end = end; } } @@ -93,18 +92,29 @@ public class TumblingWindow implements Executable { Criterion base = criteria.get(baseStage); List hits = p.values(); + log.trace("Found [{}] hits", hits.size()); + if (hits.isEmpty() == false) { if (matcher.match(baseStage, wrapValues(base, hits)) == false) { listener.onResponse(payload()); return; } } - // empty or only one result means there aren't going to be any matches + + // only one result means there aren't going to be any matches // so move the window boxing to the next stage if (hits.size() < 2) { // if there are still candidates, advance the window base if (matcher.hasCandidates(baseStage) && baseStage + 1 < maxStages) { - advance(baseStage + 1, listener); + Runnable next = () -> advance(baseStage + 1, listener); + + if (until != null && hits.size() == 1) { + // find "until" ordinals - early on to discard data in-flight to avoid matching + // hits that can occur in other documents + untilCriterion(new WindowInfo(baseStage, base.ordinal(hits.get(0))), listener, next); + } else { + next.run(); + } } // there aren't going to be any matches so cancel search else { @@ -122,37 +132,72 @@ public class TumblingWindow implements Executable { log.trace("Found base [{}] window {}->{}", base.stage(), begin, end); - // find until ordinals - //NB: not currently implemented + WindowInfo info = new WindowInfo(baseStage, end); // no more queries to run if (baseStage + 1 < maxStages) { - secondaryCriterion(new WindowInfo(baseStage, begin, end), baseStage + 1, listener); + Runnable next = () -> secondaryCriterion(info, baseStage + 1, listener); + if (until != null) { + // find "until" ordinals - early on to discard data in-flight to avoid matching + // hits that can occur in other documents + untilCriterion(info, listener, next); + } else { + next.run(); + } } else { advance(baseStage, listener); } } + private void untilCriterion(WindowInfo window, ActionListener listener, Runnable next) { + final BoxedQueryRequest request = until.queryRequest(); + + // before doing a new query, clean all previous until hits + // including dropping any in-flight sequences that were not dropped (because they did not match) + matcher.dropUntil(); + + final boolean reversed = boxQuery(window, until); + + log.trace("Querying until stage {}", request); + + client.query(request, wrap(p -> { + List hits = p.values(); + + log.trace("Found [{}] hits", hits.size()); + // no more results for until - let the other queries run + if (hits.isEmpty()) { + // put the markers in place before the next call + if (reversed) { + request.to(window.end); + } else { + request.from(window.end); + } + } else { + // prepare the query for the next search + request.nextAfter(until.ordinal(hits.get(hits.size() - 1))); + + // if the limit has been reached, return what's available + matcher.until(wrapUntilValues(wrapValues(until, hits))); + } + + // keep running the query runs out of the results (essentially returns less than what we want) + if (hits.size() == windowSize) { + untilCriterion(window, listener, next); + } + // looks like this stage is done, move on + else { + // to the next query + next.run(); + } + + }, listener::onFailure)); + } + private void secondaryCriterion(WindowInfo window, int currentStage, ActionListener listener) { final Criterion criterion = criteria.get(currentStage); - final BoxedQueryRequest request = criterion.queryRequest(); - Criterion base = criteria.get(window.baseStage); - // first box the query - // only the first base can be descending - // all subsequence queries are ascending - if (criterion.reverse() != base.reverse()) { - if (window.end.equals(request.from()) == false) { - // if that's the case, set the starting point - request.from(window.end); - // reposition the pointer - request.nextAfter(window.end); - } - } else { - // otherwise just the upper limit - request.to(window.end); - } + final boolean reversed = boxQuery(window, criterion); log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request); @@ -160,10 +205,11 @@ public class TumblingWindow implements Executable { List hits = p.values(); log.trace("Found [{}] hits", hits.size()); + // no more results for this query if (hits.isEmpty()) { // put the markers in place before the next call - if (criterion.reverse() != base.reverse()) { + if (reversed) { request.to(window.end); } else { request.from(window.end); @@ -207,6 +253,32 @@ public class TumblingWindow implements Executable { }, listener::onFailure)); } + /** + * Box the query for the given criterion based on the window information. + * Returns a boolean indicating whether reversal has been applied or not. + */ + private boolean boxQuery(WindowInfo window, Criterion criterion) { + final BoxedQueryRequest request = criterion.queryRequest(); + Criterion base = criteria.get(window.baseStage); + + // first box the query + // only the first base can be descending + // all subsequence queries are ascending + if (criterion.reverse() != base.reverse()) { + if (window.end.equals(request.from()) == false) { + // if that's the case, set the starting point + request.from(window.end); + // reposition the pointer + request.nextAfter(window.end); + } + } else { + // otherwise just the upper limit + request.to(window.end); + } + + return criterion.reverse() != base.reverse(); + } + Iterable> wrapValues(Criterion criterion, List hits) { return () -> { final Iterator iter = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator(); @@ -229,6 +301,25 @@ public class TumblingWindow implements Executable { }; } + Iterable wrapUntilValues(Iterable> iterable) { + return () -> { + final Iterator> iter = iterable.iterator(); + + return new Iterator() { + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyAndOrdinal next() { + return iter.next().v1(); + } + }; + }; + } + Payload payload() { return matcher.payload(startTime); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java index b6111163813..dc0fc5827b1 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java @@ -77,6 +77,10 @@ public class Ordinal implements Comparable { return 1; } + public boolean between(Ordinal left, Ordinal right) { + return (compareTo(left) <= 0 && compareTo(right) >= 0) || (compareTo(right) <= 0 && compareTo(left) >= 0); + } + public Object[] toArray() { return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp }; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java index fd0cd7fb2ec..8aa5452d6ba 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java @@ -6,8 +6,13 @@ package org.elasticsearch.xpack.eql.execution.sequence; +import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.xpack.eql.execution.assembler.KeyAndOrdinal; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; + import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; /** Dedicated collection for mapping a key to a list of sequences */ /** The list represents the sequence for each stage (based on its index) and is fixed in size */ @@ -17,19 +22,16 @@ class KeyToSequences { private final int listSize; /** for each key, associate the frame per state (determined by index) */ private final Map keyToSequences; + private final Map keyToUntil; KeyToSequences(int listSize) { this.listSize = listSize; this.keyToSequences = new LinkedHashMap<>(); + this.keyToUntil = new LinkedHashMap<>(); } private SequenceGroup[] group(SequenceKey key) { - SequenceGroup[] groups = keyToSequences.get(key); - if (groups == null) { - groups = new SequenceGroup[listSize]; - keyToSequences.put(key, groups); - } - return groups; + return keyToSequences.computeIfAbsent(key, k -> new SequenceGroup[listSize]); } SequenceGroup groupIfPresent(int stage, SequenceKey key) { @@ -37,6 +39,10 @@ class KeyToSequences { return groups == null ? null : groups[stage]; } + UntilGroup untilIfPresent(SequenceKey key) { + return keyToUntil.get(key); + } + void add(int stage, Sequence sequence) { SequenceKey key = sequence.key(); SequenceGroup[] groups = group(key); @@ -47,7 +53,64 @@ class KeyToSequences { groups[stage].add(sequence); } + void until(Iterable until) { + for (KeyAndOrdinal keyAndOrdinal : until) { + // ignore unknown keys + SequenceKey key = keyAndOrdinal.key(); + if (keyToSequences.containsKey(key)) { + UntilGroup group = keyToUntil.computeIfAbsent(key, UntilGroup::new); + group.add(keyAndOrdinal); + } + } + } + + void remove(int stage, SequenceGroup group) { + SequenceKey key = group.key(); + SequenceGroup[] groups = keyToSequences.get(key); + groups[stage] = null; + // clean-up the key if all groups are empty + boolean shouldRemoveKey = true; + for (SequenceGroup gp : groups) { + if (gp != null && gp.isEmpty() == false) { + shouldRemoveKey = false; + break; + } + } + if (shouldRemoveKey) { + keyToSequences.remove(key); + } + } + + void dropUntil() { + // clean-up all candidates that occur before until + for (Entry entry : keyToUntil.entrySet()) { + SequenceGroup[] groups = keyToSequences.get(entry.getKey()); + if (groups != null) { + for (Ordinal o : entry.getValue()) { + for (SequenceGroup group : groups) { + if (group != null) { + group.trimBefore(o); + } + } + } + } + } + + keyToUntil.clear(); + } + + public void clear() { + keyToSequences.clear(); + keyToUntil.clear(); + } + int numberOfKeys() { return keyToSequences.size(); } -} + + @Override + public String toString() { + return LoggerMessageFormat.format(null, "Keys=[{}], Until=[{}]", keyToSequences.size(), keyToUntil.size()); + } + +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java new file mode 100644 index 00000000000..a2220772223 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.sequence; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.xpack.eql.execution.search.Ordinal; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +import static org.elasticsearch.common.logging.LoggerMessageFormat.format; + +/** List of in-flight ordinals for a given key. For fast lookup, typically associated with a stage. */ +abstract class OrdinalGroup implements Iterable { + + private final SequenceKey key; + private final Function extractor; + + // NB: since the size varies significantly, use a LinkedList + // Considering the order it might make sense to use a B-Tree+ for faster lookups which should work well with + // timestamp compression (whose range is known for the current frame). + private final List elements = new LinkedList<>(); + + private int hashCode = 0; + + private Ordinal start, stop; + + protected OrdinalGroup(SequenceKey key, Function extractor) { + this.key = key; + hashCode = key.hashCode(); + + this.extractor = extractor; + } + + public SequenceKey key() { + return key; + } + + public void add(E element) { + elements.add(element); + hashCode = 31 * hashCode + Objects.hashCode(element); + + Ordinal ordinal = extractor.apply(element); + if (start == null) { + start = ordinal; + } else if (stop == null) { + stop = ordinal; + } else { + if (start.compareTo(ordinal) > 0) { + start = ordinal; + } + if (stop.compareTo(ordinal) < 0) { + stop = ordinal; + } + } + } + + /** + * Returns the latest element from the group that has its timestamp + * less than the given argument alongside its position in the list. + * The element and everything before it is removed. + */ + E trimBefore(Ordinal ordinal) { + Tuple match = findBefore(ordinal); + + // trim + if (match != null) { + elements.subList(0, match.v2() + 1).clear(); + + // update min time + if (elements.isEmpty() == false) { + start = extractor.apply(elements.get(0)); + } else { + start = null; + stop = null; + } + } + return match != null ? match.v1() : null; + } + + E before(Ordinal ordinal) { + Tuple match = findBefore(ordinal); + return match != null ? match.v1() : null; + } + + private Tuple findBefore(Ordinal ordinal) { + E match = null; + int matchPos = -1; + int position = -1; + for (E element : elements) { + position++; + Ordinal o = extractor.apply(element); + if (o.compareTo(ordinal) < 0) { + match = element; + matchPos = position; + } else { + break; + } + } + return match != null ? new Tuple<>(match, matchPos) : null; + } + + public boolean isEmpty() { + return elements.isEmpty(); + } + + @Override + public Iterator iterator() { + return new Iterator() { + final Iterator iter = elements.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Ordinal next() { + return extractor.apply(iter.next()); + } + }; + } + + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + OrdinalGroup other = (OrdinalGroup) obj; + return Objects.equals(key, other.key) + && Objects.equals(hashCode, other.hashCode); + } + + @Override + public String toString() { + return format(null, "[{}][{}-{}]({} seqs)", key, start, stop, elements.size()); + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java index b0f41c0aead..0298adbad4a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java @@ -47,7 +47,7 @@ public class Sequence { matches[currentStage] = new Match(ordinal, hit); return previousStage; } - throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage=]", stage, key, currentStage); + throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage={}]", stage, key, currentStage); } public SequenceKey key() { @@ -58,8 +58,8 @@ public class Sequence { return matches[currentStage].ordinal(); } - public long startTimestamp() { - return matches[0].ordinal().timestamp(); + public Ordinal startOrdinal() { + return matches[0].ordinal(); } public List hits() { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java index 1d48e8f6eb6..c7bd5a6d1f6 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java @@ -6,122 +6,9 @@ package org.elasticsearch.xpack.eql.execution.sequence; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.xpack.eql.execution.search.Ordinal; - -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; -import java.util.function.Predicate; - -import static org.elasticsearch.common.logging.LoggerMessageFormat.format; - -/** List of in-flight sequences for a given key. For fast lookup, typically associated with a stage. */ -public class SequenceGroup { - - private final SequenceKey key; - - // NB: since the size varies significantly, use a LinkedList - // Considering the order it might make sense to use a B-Tree+ for faster lookups which should work well with - // timestamp compression (whose range is known for the current frame). - private final List sequences = new LinkedList<>(); - - private Ordinal start, stop; +public class SequenceGroup extends OrdinalGroup { SequenceGroup(SequenceKey key) { - this.key = key; - } - - public void add(Sequence sequence) { - sequences.add(sequence); - Ordinal ordinal = sequence.ordinal(); - if (start == null) { - start = ordinal; - } else if (stop == null) { - stop = ordinal; - } else { - if (start.compareTo(ordinal) > 0) { - start = ordinal; - } - if (stop.compareTo(ordinal) < 0) { - stop = ordinal; - } - } - } - - /** - * Returns the latest Sequence from the group that has its timestamp - * less than the given argument alongside its position in the list. - */ - public Tuple before(Ordinal ordinal) { - return find(o -> o.compareTo(ordinal) < 0); - } - - /** - * Returns the first Sequence from the group that has its timestamp - * greater than the given argument alongside its position in the list. - */ - public Tuple after(Ordinal ordinal) { - return find(o -> o.compareTo(ordinal) > 0); - } - - private Tuple find(Predicate predicate) { - Sequence matchSeq = null; - int matchPos = -1; - int position = -1; - for (Sequence sequence : sequences) { - position++; - if (predicate.test(sequence.ordinal())) { - matchSeq = sequence; - matchPos = position; - } else { - break; - } - } - return matchSeq != null ? new Tuple<>(matchSeq, matchPos) : null; - } - - public boolean isEmpty() { - return sequences.isEmpty(); - } - - public void trim(int position) { - sequences.subList(0, position).clear(); - - // update min time - if (sequences.isEmpty() == false) { - start = sequences.get(0).ordinal(); - } else { - start = null; - stop = null; - } - } - - public List sequences() { - return sequences; - } - - @Override - public int hashCode() { - return key.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - SequenceGroup other = (SequenceGroup) obj; - return Objects.equals(key, other.key); - } - - @Override - public String toString() { - return format(null, "[{}][{}-{}]({} seqs)", key, start, stop, sequences.size()); + super(key, Sequence::ordinal); } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java index 85a6b9119b6..f721efd6d69 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java @@ -6,10 +6,10 @@ package org.elasticsearch.xpack.eql.execution.sequence; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.eql.execution.assembler.KeyAndOrdinal; import org.elasticsearch.xpack.eql.execution.search.Limit; import org.elasticsearch.xpack.eql.execution.search.Ordinal; @@ -21,6 +21,32 @@ import java.util.List; */ public class SequenceStateMachine { + static class Stats { + long seen = 0; + long ignored = 0; + long until = 0; + long rejectionMaxspan = 0; + long rejectionUntil = 0; + + @Override + public String toString() { + return LoggerMessageFormat.format(null, "Stats: Seen [{}]/Ignored [{}]/Until [{}]/Rejected {Maxspan [{}]/Until [{}]}", + seen, + ignored, + until, + rejectionMaxspan, + rejectionUntil); + } + + public void clear() { + seen = 0; + ignored = 0; + until = 0; + rejectionMaxspan = 0; + rejectionUntil = 0; + } + } + /** Current sequences for each key */ /** Note will be multiple sequences for the same key and the same stage with different timestamps */ private final KeyToSequences keyToSequences; @@ -37,6 +63,8 @@ public class SequenceStateMachine { private int limit = -1; private boolean limitReached = false; + private final Stats stats = new Stats(); + @SuppressWarnings("rawtypes") public SequenceStateMachine(int stages, TimeValue maxSpan, Limit limit) { this.completionStage = stages - 1; @@ -61,8 +89,10 @@ public class SequenceStateMachine { public void trackSequence(Sequence sequence) { SequenceKey key = sequence.key(); - stageToKeys.keys(0).add(key); + stageToKeys.add(0, key); keyToSequences.add(0, sequence); + + stats.seen++; } /** @@ -70,30 +100,52 @@ public class SequenceStateMachine { * given stage. If that's the case, update the sequence and the rest of the references. */ public void match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) { + stats.seen++; + int previousStage = stage - 1; // check key presence to avoid creating a collection SequenceGroup group = keyToSequences.groupIfPresent(previousStage, key); if (group == null || group.isEmpty()) { + stats.ignored++; return; } - Tuple before = group.before(ordinal); - if (before == null) { - return; - } - Sequence sequence = before.v1(); - // eliminate the match and all previous values from the frame - group.trim(before.v2() + 1); - // remove the frame and keys early (as the key space is large) + // eliminate the match and all previous values from the group + Sequence sequence = group.trimBefore(ordinal); + if (sequence == null) { + stats.ignored++; + return; + } + + // remove the group early (as the key space is large) if (group.isEmpty()) { - stageToKeys.keys(previousStage).remove(key); + keyToSequences.remove(previousStage, group); + stageToKeys.remove(previousStage, key); } - // check maxspan before continuing the sequence - if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startTimestamp() > maxSpanInMillis)) { + // + // Conditional checks + // + + // maxspan + if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startOrdinal().timestamp() > maxSpanInMillis)) { + stats.rejectionMaxspan++; return; } + // until + UntilGroup until = keyToSequences.untilIfPresent(key); + if (until != null) { + KeyAndOrdinal nearestUntil = until.before(ordinal); + if (nearestUntil != null) { + // check if until matches + if (nearestUntil.ordinal().between(sequence.ordinal(), ordinal)) { + stats.rejectionUntil++; + return; + } + } + } + sequence.putMatch(stage, hit, ordinal); // bump the stages @@ -109,7 +161,7 @@ public class SequenceStateMachine { } } } else { - stageToKeys.keys(stage).add(key); + stageToKeys.add(stage, key); keyToSequences.add(stage, sequence); } } @@ -127,17 +179,36 @@ public class SequenceStateMachine { */ public boolean hasCandidates(int stage) { for (int i = stage; i < completionStage; i++) { - if (stageToKeys.keys(i).isEmpty() == false) { + if (stageToKeys.isEmpty(i) == false) { return true; } } return false; } + public void dropUntil() { + keyToSequences.dropUntil(); + } + + public void until(Iterable markers) { + keyToSequences.until(markers); + } + + public Stats stats() { + return stats; + } + + public void clear() { + stats.clear(); + keyToSequences.clear(); + stageToKeys.clear(); + completed.clear(); + } + @Override public String toString() { return LoggerMessageFormat.format(null, "Tracking [{}] keys with [{}] completed and in-flight {}", - keyToSequences.numberOfKeys(), + keyToSequences, completed.size(), stageToKeys); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/StageToKeys.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/StageToKeys.java index 96ad5688c64..c14d7e511c7 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/StageToKeys.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/StageToKeys.java @@ -23,14 +23,34 @@ class StageToKeys { this.stageToKey = Arrays.asList(new Set[stages]); } - Set keys(int stage) { + void add(int stage, SequenceKey key) { Set set = stageToKey.get(stage); if (set == null) { // TODO: could we use an allocation strategy? set = new LinkedHashSet<>(); stageToKey.set(stage, set); } - return set; + set.add(key); + } + + void remove(int stage, SequenceKey key) { + Set set = stageToKey.get(stage); + if (set != null) { + set.remove(key); + } + } + + boolean isEmpty(int stage) { + Set set = stageToKey.get(stage); + return set == null || set.isEmpty(); + } + + void clear() { + for (Set set : stageToKey) { + if (set != null) { + set.clear(); + } + } } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/UntilGroup.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/UntilGroup.java new file mode 100644 index 00000000000..aa198ab929f --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/UntilGroup.java @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.sequence; + +import org.elasticsearch.xpack.eql.execution.assembler.KeyAndOrdinal; + +public class UntilGroup extends OrdinalGroup { + + UntilGroup(SequenceKey key) { + super(key, KeyAndOrdinal::ordinal); + } +}