EQL: Fix bug in skipping window (#59196)

Corrected condition that caused a sequence window to be skipped when a query
returns no results by checking not just the current stage but also following
ones as they can match with in-flight sequences.
Improve logging
Fix NPE when emptying a SequenceGroup
Increase randomization in testing
Make maxspan inclusive (up to and equal to value vs just up to)

(cherry picked from commit ad32c488688cb350c2934dfca03af86045e997b0)
This commit is contained in:
Costin Leau 2020-07-08 14:30:41 +03:00 committed by Costin Leau
parent 0b9eb210b8
commit 3e32d060bf
9 changed files with 60 additions and 25 deletions

View File

@ -191,8 +191,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
return this.fetchSize;
}
public EqlSearchRequest fetchSize(int size) {
this.fetchSize = size;
public EqlSearchRequest fetchSize(int fetchSize) {
this.fetchSize = fetchSize;
if (fetchSize < 2) {
throw new IllegalArgumentException("fetch size must be greater than 1");
}

View File

@ -146,7 +146,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
request.tiebreakerField("event.sequence");
// some queries return more than 10 results
request.size(50);
request.fetchSize(2);
request.fetchSize(randomIntBetween(2, 50));
return eqlClient().search(request, RequestOptions.DEFAULT);
}

View File

@ -63,16 +63,14 @@ public class BoxedQueryRequest implements QueryRequest {
}
/**
* Sets the lower boundary for the query (non-inclusive).
* Sets the lower boundary for the query (inclusive).
* Can be removed (when the query in unbounded) through null.
*/
public BoxedQueryRequest from(Ordinal begin) {
from = begin;
timestampRange.gte(begin != null ? begin.timestamp() : null);
if (tiebreakerRange != null) {
timestampRange.gte(begin != null ? begin.timestamp() : null);
tiebreakerRange.gt(begin != null ? begin.tiebreaker() : null);
} else {
timestampRange.gt(begin != null ? begin.timestamp() : null);
tiebreakerRange.gte(begin != null ? begin.tiebreaker() : null);
}
return this;
}

View File

@ -61,7 +61,6 @@ class Matcher {
return false;
}
public boolean hasCandidates(int stage) {
return stateMachine.hasCandidates(stage);
}
@ -71,4 +70,9 @@ class Matcher {
TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
return new SequencePayload(completed, false, tookTime);
}
@Override
public String toString() {
return stateMachine.toString();
}
}

View File

@ -33,7 +33,7 @@ import static org.elasticsearch.action.ActionListener.wrap;
*/
public class TumblingWindow implements Executable {
private final Logger log = LogManager.getLogger(Matcher.class);
private final Logger log = LogManager.getLogger(TumblingWindow.class);
private final QueryClient client;
private final List<Criterion<BoxedQueryRequest>> criteria;
@ -72,7 +72,7 @@ public class TumblingWindow implements Executable {
@Override
public void execute(ActionListener<Payload> listener) {
log.info("Starting sequence window...");
log.trace("Starting sequence window w/ fetch size [{}]", windowSize);
startTime = System.currentTimeMillis();
advance(0, listener);
}
@ -83,7 +83,8 @@ public class TumblingWindow implements Executable {
// remove any potential upper limit (if a criteria has been promoted)
base.queryRequest().to(null);
log.info("Querying base stage [{}] {}", base.stage(), base.queryRequest());
log.trace("{}", matcher);
log.trace("Querying base stage [{}] {}", base.stage(), base.queryRequest());
client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure));
}
@ -119,7 +120,7 @@ public class TumblingWindow implements Executable {
// update current query for the next request
base.queryRequest().nextAfter(end);
log.info("Found base [{}] window {} {}", base.stage(), begin, end);
log.trace("Found base [{}] window {}->{}", base.stage(), begin, end);
// find until ordinals
//NB: not currently implemented
@ -153,11 +154,12 @@ public class TumblingWindow implements Executable {
request.to(window.end);
}
log.info("Querying (secondary) stage [{}] {}", criterion.stage(), request);
log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request);
client.query(request, wrap(p -> {
List<SearchHit> hits = p.values();
log.trace("Found [{}] hits", hits.size());
// no more results for this query
if (hits.isEmpty()) {
// put the markers in place before the next call
@ -169,7 +171,7 @@ public class TumblingWindow implements Executable {
// if there are no candidates, advance the window
if (matcher.hasCandidates(criterion.stage()) == false) {
log.info("Advancing window...");
log.trace("Advancing window...");
advance(window.baseStage, listener);
return;
}

View File

@ -46,4 +46,8 @@ class KeyToSequences {
}
groups[stage].add(sequence);
}
int numberOfKeys() {
return keyToSequences.size();
}
}

View File

@ -37,6 +37,7 @@ public class SequenceGroup {
Ordinal ordinal = sequence.ordinal();
if (start == null) {
start = ordinal;
} else if (stop == null) {
stop = ordinal;
} else {
if (start.compareTo(ordinal) > 0) {
@ -91,6 +92,7 @@ public class SequenceGroup {
if (sequences.isEmpty() == false) {
start = sequences.get(0).ordinal();
} else {
start = null;
stop = null;
}
}

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.search.Limit;
@ -82,18 +83,18 @@ public class SequenceStateMachine {
Sequence sequence = before.v1();
// eliminate the match and all previous values from the frame
group.trim(before.v2() + 1);
// check maxspan before continuing the sequence
if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startTimestamp() >= maxSpanInMillis)) {
return;
}
sequence.putMatch(stage, hit, ordinal);
// remove the frame and keys early (as the key space is large)
if (group.isEmpty()) {
stageToKeys.keys(previousStage).remove(key);
}
// check maxspan before continuing the sequence
if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startTimestamp() > maxSpanInMillis)) {
return;
}
sequence.putMatch(stage, hit, ordinal);
// bump the stages
if (stage == completionStage) {
@ -117,7 +118,27 @@ public class SequenceStateMachine {
return limitReached;
}
/**
* Checks whether the rest of the stages have in-flight data.
* This method is called when a query returns no data meaning
* sequences on previous stages cannot match this window (since there's no new data).
* However sequences on higher stages can, hence this check to know whether
* it's possible to advance the window early.
*/
public boolean hasCandidates(int stage) {
return stage < completionStage && stageToKeys.keys(stage).isEmpty() == false;
for (int i = stage; i < completionStage; i++) {
if (stageToKeys.keys(i).isEmpty() == false) {
return true;
}
}
return false;
}
@Override
public String toString() {
return LoggerMessageFormat.format(null, "Tracking [{}] keys with [{}] completed and in-flight {}",
keyToSequences.numberOfKeys(),
completed.size(),
stageToKeys);
}
}

View File

@ -10,6 +10,7 @@ import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;
/** Dedicated collection for mapping a stage (represented by the index collection) to a set of keys */
class StageToKeys {
@ -32,7 +33,10 @@ class StageToKeys {
return set;
}
Set<SequenceKey> completedKeys() {
return keys(stageToKey.size() - 1);
@Override
public String toString() {
StringJoiner sj = new StringJoiner(",", "[", "]");
stageToKey.forEach(s -> sj.add(s != null ? "" + s.size() : "0"));
return sj.toString();
}
}