EQL: Fix early trimming of in-flight data (#66493)

Rework trimToLast to take into account an ordinal for last trimming so
instead of keeping the last entry in a stage, it keeps the last entry
before the given ordinal.
This takes care of the case where a dense stage that requires several
passes does not discard valid data from a previous sparse stage that go
beyond the current stage point.

(cherry picked from commit 4f55749072b39f89822bdd52c67998f7bed890a9)
(cherry picked from commit 6b61dfead88a144c6e85e384d47a24f0c1480c6b)
(cherry picked from commit cece81b5dee88b18e3e7ea189fc342ef53ea19f2)
This commit is contained in:
Costin Leau 2020-12-17 17:19:30 +02:00 committed by Costin Leau
parent 499f4a09e7
commit 4cb3ee5b4e
4 changed files with 41 additions and 48 deletions

View File

@ -95,27 +95,30 @@ class KeyToSequences {
} }
/** /**
* Remove all matches expect the latest. * Remove all matches except the latest occurring _before_ the given ordinal.
*/ */
void trimToTail() { void trimToTail(Ordinal ordinal) {
for (Iterator<SequenceEntry> it = keyToSequences.values().iterator(); it.hasNext(); ) { for (Iterator<SequenceEntry> it = keyToSequences.values().iterator(); it.hasNext(); ) {
SequenceEntry seqs = it.next(); SequenceEntry seqs = it.next();
// first remove the sequences // remember the last item found (will be ascending)
// and remember the last item from the first // to trim unneeded until that occur before it
// initialized stage to be used with until
Sequence firstTail = null; Sequence firstTail = null;
// remove any empty keys
boolean keyIsEmpty = true;
for (SequenceGroup group : seqs.groups) { for (SequenceGroup group : seqs.groups) {
if (group != null) { if (group != null) {
Sequence sequence = group.trimToLast(); Sequence sequence = group.trimBeforeLast(ordinal);
if (firstTail == null) { if (firstTail == null) {
firstTail = sequence; firstTail = sequence;
} }
keyIsEmpty &= group.isEmpty();
} }
} }
// there are no sequences on any stage for this key, drop it // there are no sequences on any stage for this key, drop it
if (firstTail == null) { if (keyIsEmpty) {
it.remove(); it.remove();
} else { }
if (firstTail != null) {
// drop any possible UNTIL that occurs before the last tail // drop any possible UNTIL that occurs before the last tail
UntilGroup until = seqs.until; UntilGroup until = seqs.until;
if (until != null) { if (until != null) {

View File

@ -52,11 +52,27 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
* The element and everything before it is removed. * The element and everything before it is removed.
*/ */
E trimBefore(Ordinal ordinal) { E trimBefore(Ordinal ordinal) {
return trimBefore(ordinal, true);
}
/**
* Returns the latest element from the group that has its timestamp
* less than the given argument alongside its position in the list.
* Everything before the element it is removed. The element is kept.
*/
E trimBeforeLast(Ordinal ordinal) {
return trimBefore(ordinal, false);
}
private E trimBefore(Ordinal ordinal, boolean removeMatch) {
Tuple<E, Integer> match = findBefore(ordinal); Tuple<E, Integer> match = findBefore(ordinal);
// trim // trim
if (match != null) { if (match != null) {
int pos = match.v2() + 1; int pos = match.v2();
if (removeMatch) {
pos = pos + 1;
}
elements.subList(0, pos).clear(); elements.subList(0, pos).clear();
// update min time // update min time
@ -76,17 +92,6 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
return match != null ? match.v1() : null; return match != null ? match.v1() : null;
} }
E trimToLast() {
E last = elements.peekLast();
if (last != null) {
elements.clear();
start = null;
stop = null;
add(last);
}
return last;
}
private Tuple<E, Integer> findBefore(Ordinal ordinal) { private Tuple<E, Integer> findBefore(Ordinal ordinal) {
E match = null; E match = null;
int matchPos = -1; int matchPos = -1;

View File

@ -253,19 +253,15 @@ public class SequenceMatcher {
* This allows the matcher to keep only the last match per stage * This allows the matcher to keep only the last match per stage
* and adjust insertion positions. * and adjust insertion positions.
*/ */
void trim(boolean everything) { void trim(Ordinal ordinal) {
// for descending sequences, remove all in-flight sequences // for descending sequences, remove all in-flight sequences
// since the windows moves head and thus there is no chance // since the windows moves head and thus there is no chance
// of new results coming in // of new results coming in
if (ordinal == null) {
// however this needs to be indicated from outside since
// the same window can be only ASC trimmed during a loop
// and fully once the DESC query moves
if (everything) {
keyToSequences.clear(); keyToSequences.clear();
} else { } else {
// keep only the tail // keep only the tail
keyToSequences.trimToTail(); keyToSequences.trimToTail(ordinal);
} }
} }

View File

@ -60,7 +60,6 @@ public class TumblingWindow implements Executable {
// flag used for DESC sequences to indicate whether // flag used for DESC sequences to indicate whether
// the window needs to restart (since the DESC query still has results) // the window needs to restart (since the DESC query still has results)
private boolean restartWindowFromTailQuery; private boolean restartWindowFromTailQuery;
private final boolean earlyUntil;
private long startTime; private long startTime;
@ -90,7 +89,6 @@ public class TumblingWindow implements Executable {
Criterion<BoxedQueryRequest> baseRequest = criteria.get(0); Criterion<BoxedQueryRequest> baseRequest = criteria.get(0);
this.windowSize = baseRequest.queryRequest().searchSource().size(); this.windowSize = baseRequest.queryRequest().searchSource().size();
this.restartWindowFromTailQuery = baseRequest.descending(); this.restartWindowFromTailQuery = baseRequest.descending();
this.earlyUntil = baseRequest.descending();
} }
@Override @Override
@ -120,27 +118,19 @@ public class TumblingWindow implements Executable {
// for descending queries clean everything // for descending queries clean everything
if (restartWindowFromTailQuery) { if (restartWindowFromTailQuery) {
if (currentStage == 0) { if (currentStage == 0) {
matcher.trim(true); matcher.trim(null);
} }
} }
// trim to last
else { else {
// check case when a rebase occurred and the current query // trim to last until the current window
// has a lot more results than the first once and hasn't // that's because some stages can be sparse, other dense
// covered the whole window. Running a trim early data before // and results from the sparse stage can be after those in the dense one
// the whole window is matched // trimming to last removes these results
boolean trimToLast = false; // same applies for rebase
if (currentStage == 0) { Ordinal marker = criteria.get(currentStage).queryRequest().after();
trimToLast = true; if (marker != null) {
} matcher.trim(marker);
else { }
Ordinal current = criteria.get(currentStage).queryRequest().after();
Ordinal previous = criteria.get(currentStage - 1).queryRequest().after();
trimToLast = current.after(previous);
}
if (trimToLast) {
matcher.trim(false);
}
} }
advance(currentStage, listener); advance(currentStage, listener);
@ -183,7 +173,6 @@ public class TumblingWindow implements Executable {
// get borders for the rest of the queries - but only when at least one result is found // get borders for the rest of the queries - but only when at least one result is found
begin = headOrdinal(hits, base); begin = headOrdinal(hits, base);
end = tailOrdinal(hits, base); end = tailOrdinal(hits, base);
boolean desc = base.descending();
// always create an ASC window // always create an ASC window
info = new WindowInfo(baseStage, begin, end); info = new WindowInfo(baseStage, begin, end);