diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java index 21d5a045f5a..7416803b834 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java @@ -191,8 +191,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { return this.fetchSize; } - public EqlSearchRequest fetchSize(int size) { - this.fetchSize = size; + public EqlSearchRequest fetchSize(int fetchSize) { + this.fetchSize = fetchSize; if (fetchSize < 2) { throw new IllegalArgumentException("fetch size must be greater than 1"); } diff --git a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java index 087afe21ef3..6b0c9700043 100644 --- a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java +++ b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java @@ -146,7 +146,7 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { request.tiebreakerField("event.sequence"); // some queries return more than 10 results request.size(50); - request.fetchSize(2); + request.fetchSize(randomIntBetween(2, 50)); return eqlClient().search(request, RequestOptions.DEFAULT); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java index 3bdb60fbe56..99d9671f16e 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java @@ -63,16 +63,14 @@ public class BoxedQueryRequest implements QueryRequest { } /** - * Sets the lower boundary for the query (non-inclusive). + * Sets the lower boundary for the query (inclusive). * Can be removed (when the query in unbounded) through null. */ public BoxedQueryRequest from(Ordinal begin) { from = begin; + timestampRange.gte(begin != null ? begin.timestamp() : null); if (tiebreakerRange != null) { - timestampRange.gte(begin != null ? begin.timestamp() : null); - tiebreakerRange.gt(begin != null ? begin.tiebreaker() : null); - } else { - timestampRange.gt(begin != null ? begin.timestamp() : null); + tiebreakerRange.gte(begin != null ? begin.tiebreaker() : null); } return this; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java index 95e6840bca8..84c929a9ec2 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java @@ -61,7 +61,6 @@ class Matcher { return false; } - public boolean hasCandidates(int stage) { return stateMachine.hasCandidates(stage); } @@ -71,4 +70,9 @@ class Matcher { TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime); return new SequencePayload(completed, false, tookTime); } + + @Override + public String toString() { + return stateMachine.toString(); + } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java index 5afaa345d55..4e16bfa8286 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java @@ -33,7 +33,7 @@ import static org.elasticsearch.action.ActionListener.wrap; */ public class TumblingWindow implements Executable { - private final Logger log = LogManager.getLogger(Matcher.class); + private final Logger log = LogManager.getLogger(TumblingWindow.class); private final QueryClient client; private final List> criteria; @@ -72,7 +72,7 @@ public class TumblingWindow implements Executable { @Override public void execute(ActionListener listener) { - log.info("Starting sequence window..."); + log.trace("Starting sequence window w/ fetch size [{}]", windowSize); startTime = System.currentTimeMillis(); advance(0, listener); } @@ -83,7 +83,8 @@ public class TumblingWindow implements Executable { // remove any potential upper limit (if a criteria has been promoted) base.queryRequest().to(null); - log.info("Querying base stage [{}] {}", base.stage(), base.queryRequest()); + log.trace("{}", matcher); + log.trace("Querying base stage [{}] {}", base.stage(), base.queryRequest()); client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure)); } @@ -119,7 +120,7 @@ public class TumblingWindow implements Executable { // update current query for the next request base.queryRequest().nextAfter(end); - log.info("Found base [{}] window {} {}", base.stage(), begin, end); + log.trace("Found base [{}] window {}->{}", base.stage(), begin, end); // find until ordinals //NB: not currently implemented @@ -153,11 +154,12 @@ public class TumblingWindow implements Executable { request.to(window.end); } - log.info("Querying (secondary) stage [{}] {}", criterion.stage(), request); + log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request); client.query(request, wrap(p -> { List hits = p.values(); + log.trace("Found [{}] hits", hits.size()); // no more results for this query if (hits.isEmpty()) { // put the markers in place before the next call @@ -169,7 +171,7 @@ public class TumblingWindow implements Executable { // if there are no candidates, advance the window if (matcher.hasCandidates(criterion.stage()) == false) { - log.info("Advancing window..."); + log.trace("Advancing window..."); advance(window.baseStage, listener); return; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java index 8173bbbaf58..fd0cd7fb2ec 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java @@ -46,4 +46,8 @@ class KeyToSequences { } groups[stage].add(sequence); } + + int numberOfKeys() { + return keyToSequences.size(); + } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java index 08a8e7badb8..1d48e8f6eb6 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java @@ -37,6 +37,7 @@ public class SequenceGroup { Ordinal ordinal = sequence.ordinal(); if (start == null) { start = ordinal; + } else if (stop == null) { stop = ordinal; } else { if (start.compareTo(ordinal) > 0) { @@ -91,6 +92,7 @@ public class SequenceGroup { if (sequences.isEmpty() == false) { start = sequences.get(0).ordinal(); } else { + start = null; stop = null; } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java index 597b2e2fee1..85a6b9119b6 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.execution.sequence; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.execution.search.Limit; @@ -82,18 +83,18 @@ public class SequenceStateMachine { Sequence sequence = before.v1(); // eliminate the match and all previous values from the frame group.trim(before.v2() + 1); - - // check maxspan before continuing the sequence - if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startTimestamp() >= maxSpanInMillis)) { - return; - } - - sequence.putMatch(stage, hit, ordinal); // remove the frame and keys early (as the key space is large) if (group.isEmpty()) { stageToKeys.keys(previousStage).remove(key); } + + // check maxspan before continuing the sequence + if (maxSpanInMillis > 0 && (ordinal.timestamp() - sequence.startTimestamp() > maxSpanInMillis)) { + return; + } + + sequence.putMatch(stage, hit, ordinal); // bump the stages if (stage == completionStage) { @@ -117,7 +118,27 @@ public class SequenceStateMachine { return limitReached; } + /** + * Checks whether the rest of the stages have in-flight data. + * This method is called when a query returns no data meaning + * sequences on previous stages cannot match this window (since there's no new data). + * However sequences on higher stages can, hence this check to know whether + * it's possible to advance the window early. + */ public boolean hasCandidates(int stage) { - return stage < completionStage && stageToKeys.keys(stage).isEmpty() == false; + for (int i = stage; i < completionStage; i++) { + if (stageToKeys.keys(i).isEmpty() == false) { + return true; + } + } + return false; + } + + @Override + public String toString() { + return LoggerMessageFormat.format(null, "Tracking [{}] keys with [{}] completed and in-flight {}", + keyToSequences.numberOfKeys(), + completed.size(), + stageToKeys); } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/StageToKeys.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/StageToKeys.java index 600f6febbb6..96ad5688c64 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/StageToKeys.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/StageToKeys.java @@ -10,6 +10,7 @@ import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.StringJoiner; /** Dedicated collection for mapping a stage (represented by the index collection) to a set of keys */ class StageToKeys { @@ -32,7 +33,10 @@ class StageToKeys { return set; } - Set completedKeys() { - return keys(stageToKey.size() - 1); + @Override + public String toString() { + StringJoiner sj = new StringJoiner(",", "[", "]"); + stageToKey.forEach(s -> sj.add(s != null ? "" + s.size() : "0")); + return sj.toString(); } }