diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java index c0d5afa9c6b..8ac77d2da10 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java @@ -95,27 +95,30 @@ class KeyToSequences { } /** - * Remove all matches expect the latest. + * Remove all matches except the latest occurring _before_ the given ordinal. */ - void trimToTail() { + void trimToTail(Ordinal ordinal) { for (Iterator it = keyToSequences.values().iterator(); it.hasNext(); ) { SequenceEntry seqs = it.next(); - // first remove the sequences - // and remember the last item from the first - // initialized stage to be used with until + // remember the last item found (will be ascending) + // to trim unneeded until that occur before it Sequence firstTail = null; + // remove any empty keys + boolean keyIsEmpty = true; for (SequenceGroup group : seqs.groups) { if (group != null) { - Sequence sequence = group.trimToLast(); + Sequence sequence = group.trimBeforeLast(ordinal); if (firstTail == null) { firstTail = sequence; } + keyIsEmpty &= group.isEmpty(); } } // there are no sequences on any stage for this key, drop it - if (firstTail == null) { + if (keyIsEmpty) { it.remove(); - } else { + } + if (firstTail != null) { // drop any possible UNTIL that occurs before the last tail UntilGroup until = seqs.until; if (until != null) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java index 19211b4e0c2..496d83443f4 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java @@ -52,11 +52,27 @@ abstract class OrdinalGroup implements Iterable { * The element and everything before it is removed. */ E trimBefore(Ordinal ordinal) { + return trimBefore(ordinal, true); + } + + /** + * Returns the latest element from the group that has its timestamp + * less than the given argument alongside its position in the list. + * Everything before the element it is removed. The element is kept. + */ + E trimBeforeLast(Ordinal ordinal) { + return trimBefore(ordinal, false); + } + + private E trimBefore(Ordinal ordinal, boolean removeMatch) { Tuple match = findBefore(ordinal); // trim if (match != null) { - int pos = match.v2() + 1; + int pos = match.v2(); + if (removeMatch) { + pos = pos + 1; + } elements.subList(0, pos).clear(); // update min time @@ -76,17 +92,6 @@ abstract class OrdinalGroup implements Iterable { return match != null ? match.v1() : null; } - E trimToLast() { - E last = elements.peekLast(); - if (last != null) { - elements.clear(); - start = null; - stop = null; - add(last); - } - return last; - } - private Tuple findBefore(Ordinal ordinal) { E match = null; int matchPos = -1; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java index 26083902272..f1962fe20ab 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java @@ -253,19 +253,15 @@ public class SequenceMatcher { * This allows the matcher to keep only the last match per stage * and adjust insertion positions. */ - void trim(boolean everything) { + void trim(Ordinal ordinal) { // for descending sequences, remove all in-flight sequences // since the windows moves head and thus there is no chance // of new results coming in - - // however this needs to be indicated from outside since - // the same window can be only ASC trimmed during a loop - // and fully once the DESC query moves - if (everything) { + if (ordinal == null) { keyToSequences.clear(); } else { // keep only the tail - keyToSequences.trimToTail(); + keyToSequences.trimToTail(ordinal); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java index 0d6ac5e7bcd..9a35130a65a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java @@ -60,7 +60,6 @@ public class TumblingWindow implements Executable { // flag used for DESC sequences to indicate whether // the window needs to restart (since the DESC query still has results) private boolean restartWindowFromTailQuery; - private final boolean earlyUntil; private long startTime; @@ -90,7 +89,6 @@ public class TumblingWindow implements Executable { Criterion baseRequest = criteria.get(0); this.windowSize = baseRequest.queryRequest().searchSource().size(); this.restartWindowFromTailQuery = baseRequest.descending(); - this.earlyUntil = baseRequest.descending(); } @Override @@ -120,27 +118,19 @@ public class TumblingWindow implements Executable { // for descending queries clean everything if (restartWindowFromTailQuery) { if (currentStage == 0) { - matcher.trim(true); + matcher.trim(null); } } - // trim to last else { - // check case when a rebase occurred and the current query - // has a lot more results than the first once and hasn't - // covered the whole window. Running a trim early data before - // the whole window is matched - boolean trimToLast = false; - if (currentStage == 0) { - trimToLast = true; - } - else { - Ordinal current = criteria.get(currentStage).queryRequest().after(); - Ordinal previous = criteria.get(currentStage - 1).queryRequest().after(); - trimToLast = current.after(previous); - } - if (trimToLast) { - matcher.trim(false); - } + // trim to last until the current window + // that's because some stages can be sparse, other dense + // and results from the sparse stage can be after those in the dense one + // trimming to last removes these results + // same applies for rebase + Ordinal marker = criteria.get(currentStage).queryRequest().after(); + if (marker != null) { + matcher.trim(marker); + } } advance(currentStage, listener); @@ -183,7 +173,6 @@ public class TumblingWindow implements Executable { // get borders for the rest of the queries - but only when at least one result is found begin = headOrdinal(hits, base); end = tailOrdinal(hits, base); - boolean desc = base.descending(); // always create an ASC window info = new WindowInfo(baseStage, begin, end);