From 2ab5f226c498aad8307b8e9a096ae42bb8e4b900 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 8 Oct 2020 12:17:37 +0300 Subject: [PATCH] EQL: Avoid filtering on tiebreakers (#63415) Do not filter by tiebreaker while searching sequence matches as it's not monotonic and thus can filter out valid data. Add handling for data 'near' the boundary that has the same timestamp but different tie-breaker and thus can be just outside the window. Fix #62781 Relates #63215 (cherry picked from commit 36f834600d4d9ded0fb7b1440274b2e597733770) (cherry picked from commit 72a2ce825f3bfd13f87423ba7f3c739ea64c57f6) --- .../common/src/main/resources/test_extra.toml | 2 +- .../elasticsearch/xpack/eql/EqlExtraIT.java | 2 - .../assembler/BoxedQueryRequest.java | 31 ++------ .../execution/assembler/ExecutionManager.java | 2 +- .../execution/search/BasicQueryClient.java | 32 ++++++-- .../xpack/eql/execution/search/Ordinal.java | 24 +++++- .../execution/search/PITAwareQueryClient.java | 18 ++--- .../eql/execution/search/RuntimeUtils.java | 38 +++++++++- .../eql/execution/search/SourceGenerator.java | 10 +-- .../execution/sequence/TumblingWindow.java | 69 ++++++++++++++---- .../xpack/eql/session/EqlSession.java | 18 ++--- .../assembler/SequenceSpecTests.java | 3 +- .../src/test/resources/sequences.series-spec | 73 ++++++++++++------- 13 files changed, 205 insertions(+), 117 deletions(-) diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/test_extra.toml b/x-pack/plugin/eql/qa/common/src/main/resources/test_extra.toml index 21f2f26604c..66de63b4a2b 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/test_extra.toml +++ b/x-pack/plugin/eql/qa/common/src/main/resources/test_extra.toml @@ -6,7 +6,7 @@ query = ''' [ ERROR where true ] [ STAT where true ] ''' -expected_event_ids = [1,2,3] +expected_event_ids = [1,2,3,1,2,3] [[queries]] name = "basicWithFilter" diff --git a/x-pack/plugin/eql/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/eql/EqlExtraIT.java b/x-pack/plugin/eql/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/eql/EqlExtraIT.java index 313571b415d..517bf981421 100644 --- a/x-pack/plugin/eql/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/eql/EqlExtraIT.java +++ b/x-pack/plugin/eql/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/eql/EqlExtraIT.java @@ -7,9 +7,7 @@ package org.elasticsearch.xpack.eql; import org.elasticsearch.test.eql.EqlExtraSpecTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; -@TestLogging(value = "org.elasticsearch.xpack.eql:TRACE", reason = "results logging") public class EqlExtraIT extends EqlExtraSpecTestCase { public EqlExtraIT(String query, String name, long[] eventIds) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java index 62b324636a9..8ed4bd4972b 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java @@ -6,13 +6,12 @@ package org.elasticsearch.xpack.eql.execution.assembler; -import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.eql.execution.search.Ordinal; import org.elasticsearch.xpack.eql.execution.search.QueryRequest; +import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils; -import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; /** @@ -21,36 +20,23 @@ import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; * * Note that the range is not set at once on purpose since each query tends to have * its own number of results separate from the others. - * As such, each query starts where it lefts to reach the current in-progress window + * As such, each query starts from where it left off to reach the current in-progress window * as oppose to always operating with the exact same window. */ public class BoxedQueryRequest implements QueryRequest { private final RangeQueryBuilder timestampRange; - private final RangeQueryBuilder tiebreakerRange; private final SearchSourceBuilder searchSource; private Ordinal from, to; private Ordinal after; - public BoxedQueryRequest(QueryRequest original, String timestamp, String tiebreaker) { + public BoxedQueryRequest(QueryRequest original, String timestamp) { + searchSource = original.searchSource(); // setup range queries and preserve their reference to simplify the update timestampRange = rangeQuery(timestamp).timeZone("UTC").format("epoch_millis"); - BoolQueryBuilder filter = boolQuery().filter(timestampRange); - if (tiebreaker != null) { - tiebreakerRange = rangeQuery(tiebreaker); - filter.filter(tiebreakerRange); - } else { - tiebreakerRange = null; - } - - searchSource = original.searchSource(); - // combine with existing query (if it exists) - if (searchSource.query() != null) { - filter = filter.must(searchSource.query()); - } - searchSource.query(filter); + RuntimeUtils.addFilter(timestampRange, searchSource); } @Override @@ -72,9 +58,6 @@ public class BoxedQueryRequest implements QueryRequest { public BoxedQueryRequest from(Ordinal begin) { from = begin; timestampRange.gte(begin != null ? begin.timestamp() : null); - if (tiebreakerRange != null) { - tiebreakerRange.gte(begin != null ? begin.tiebreaker() : null); - } return this; } @@ -88,14 +71,10 @@ public class BoxedQueryRequest implements QueryRequest { /** * Sets the upper boundary for the query (inclusive). - * Can be removed (when the query in unbounded) through null. */ public BoxedQueryRequest to(Ordinal end) { to = end; timestampRange.lte(end != null ? end.timestamp() : null); - if (tiebreakerRange != null) { - tiebreakerRange.lte(end != null ? end.tiebreaker() : null); - } return this; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java index 55cc9616ef4..0831fb9048b 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java @@ -72,7 +72,7 @@ public class ExecutionManager { if (query instanceof EsQueryExec) { SearchSourceBuilder source = ((EsQueryExec) query).source(session); QueryRequest original = () -> source; - BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName); + BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName); Criterion criterion = new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i == 0 && descending); criteria.add(criterion); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java index f715e02c2c1..9cc62e83ed3 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicQueryClient.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.StringJoiner; import static org.elasticsearch.index.query.QueryBuilders.idsQuery; import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest; @@ -37,7 +38,7 @@ public class BasicQueryClient implements QueryClient { private static final Logger log = RuntimeUtils.QUERY_LOG; - private final EqlConfiguration cfg; + final EqlConfiguration cfg; final Client client; final String[] indices; @@ -53,22 +54,37 @@ public class BasicQueryClient implements QueryClient { // set query timeout searchSource.timeout(cfg.requestTimeout()); - if (log.isTraceEnabled()) { - log.trace("About to execute query {} on {}", StringUtils.toString(searchSource), indices); - } - if (cfg.isCancelled()) { - throw new TaskCancelledException("cancelled"); - } - SearchRequest search = prepareRequest(client, searchSource, false, indices); search(search, new BasicListener(listener)); } protected void search(SearchRequest search, ActionListener listener) { + if (cfg.isCancelled()) { + listener.onFailure(new TaskCancelledException("cancelled")); + return; + } + + if (log.isTraceEnabled()) { + log.trace("About to execute query {} on {}", StringUtils.toString(search.source()), indices); + } + client.search(search, listener); } protected void search(MultiSearchRequest search, ActionListener listener) { + if (cfg.isCancelled()) { + listener.onFailure(new TaskCancelledException("cancelled")); + return; + } + + if (log.isTraceEnabled()) { + StringJoiner sj = new StringJoiner("\n"); + for (SearchRequest request : search.requests()) { + sj.add(StringUtils.toString(request.source())); + } + log.trace("About to execute multi-queries {} on {}", sj, indices); + } + client.multiSearch(search, listener); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java index dc0fc5827b1..ff72d89580d 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/Ordinal.java @@ -17,7 +17,7 @@ public class Ordinal implements Comparable { this.timestamp = timestamp; this.tiebreaker = tiebreaker; } - + public long timestamp() { return timestamp; } @@ -36,11 +36,11 @@ public class Ordinal implements Comparable { if (this == obj) { return true; } - + if (obj == null || getClass() != obj.getClass()) { return false; } - + Ordinal other = (Ordinal) obj; return Objects.equals(timestamp, other.timestamp) && Objects.equals(tiebreaker, other.tiebreaker); @@ -81,7 +81,23 @@ public class Ordinal implements Comparable { return (compareTo(left) <= 0 && compareTo(right) >= 0) || (compareTo(right) <= 0 && compareTo(left) >= 0); } + public boolean before(Ordinal other) { + return compareTo(other) < 0; + } + + public boolean beforeOrAt(Ordinal other) { + return compareTo(other) <= 0; + } + + public boolean after(Ordinal other) { + return compareTo(other) > 0; + } + + public boolean afterOrAt(Ordinal other) { + return compareTo(other) >= 0; + } + public Object[] toArray() { return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp }; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java index 502efc047e9..bb420208b68 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClient.java @@ -15,8 +15,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.index.get.GetResult; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; @@ -30,7 +28,6 @@ import org.elasticsearch.xpack.ql.index.IndexResolver; import java.util.function.Function; import static org.elasticsearch.action.ActionListener.wrap; -import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.termsQuery; import static org.elasticsearch.xpack.ql.util.ActionListeners.map; @@ -105,12 +102,7 @@ public class PITAwareQueryClient extends BasicQueryClient { String[] indices = request.indices(); if (CollectionUtils.isEmpty(indices) == false) { request.indices(Strings.EMPTY_ARRAY); - BoolQueryBuilder indexFilter = boolQuery().filter(termsQuery(GetResult._INDEX, indices)); - QueryBuilder query = source.query(); - if (query != null) { - indexFilter.must(query); - } - source.query(indexFilter); + RuntimeUtils.addFilter(termsQuery(GetResult._INDEX, indices), source); } } @@ -123,11 +115,11 @@ public class PITAwareQueryClient extends BasicQueryClient { }, // always close PIT in case of exceptions e -> { - if (pitId != null) { - close(wrap(b -> { - }, listener::onFailure)); - } listener.onFailure(e); + if (pitId != null && cfg.isCancelled() == false) { + // ignore any success/failure to avoid obfuscating the response + close(wrap(b -> {}, ex -> {})); + } }); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java index 4f33d5465ac..bbdd4279ba9 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/RuntimeUtils.java @@ -11,6 +11,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -34,6 +36,8 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; + public final class RuntimeUtils { static final Logger QUERY_LOG = LogManager.getLogger(QueryClient.class); @@ -50,10 +54,10 @@ public final class RuntimeUtils { aggsNames.append(aggs.get(i).getName() + (i + 1 == aggs.size() ? "" : ", ")); } - logger.trace("Got search response [hits {} {}, {} aggregations: [{}], {} failed shards, {} skipped shards, " - + "{} successful shards, {} total shards, took {}, timed out [{}]]", response.getHits().getTotalHits().relation.toString(), - response.getHits().getTotalHits().value, aggs.size(), aggsNames, response.getFailedShards(), response.getSkippedShards(), - response.getSuccessfulShards(), response.getTotalShards(), response.getTook(), response.isTimedOut()); + logger.trace("Got search response [hits {}, {} aggregations: [{}], {} failed shards, {} skipped shards, " + + "{} successful shards, {} total shards, took {}, timed out [{}]]", response.getHits().getTotalHits(), aggs.size(), + aggsNames, response.getFailedShards(), response.getSkippedShards(), response.getSuccessfulShards(), + response.getTotalShards(), response.getTook(), response.isTimedOut()); } public static List createExtractor(List fields, EqlConfiguration cfg) { @@ -111,4 +115,30 @@ public final class RuntimeUtils { public static List searchHits(SearchResponse response) { return Arrays.asList(response.getHits().getHits()); } + + // optimized method that adds filter to existing bool queries without additional wrapping + // additionally checks whether the given query exists for safe decoration + public static SearchSourceBuilder addFilter(QueryBuilder filter, SearchSourceBuilder source) { + BoolQueryBuilder bool = null; + QueryBuilder query = source.query(); + + if (query instanceof BoolQueryBuilder) { + bool = (BoolQueryBuilder) query; + if (filter != null && bool.filter().contains(filter) == false) { + bool.filter(filter); + } + } + else { + bool = boolQuery(); + if (query != null) { + bool.filter(query); + } + if (filter != null) { + bool.filter(filter); + } + + source.query(bool); + } + return source; + } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java index 0a9ab105c2b..b783605216c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java @@ -76,6 +76,8 @@ public abstract class SourceGenerator { } } + optimize(container, source); + return source; } @@ -94,7 +96,7 @@ public abstract class SourceGenerator { sortBuilder = fieldSort(fa.name()) .missing(as.missing().position()) .unmappedType(fa.dataType().esType()); - + if (fa.isNested()) { FieldSortBuilder fieldSort = fieldSort(fa.name()) .missing(as.missing().position()) @@ -134,8 +136,6 @@ public abstract class SourceGenerator { } private static void optimize(QueryContainer query, SearchSourceBuilder builder) { - if (query.shouldTrackHits()) { - builder.trackTotalHits(true); - } + builder.trackTotalHits(query.shouldTrackHits()); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java index 61cd9404d66..a2fdf484a89 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java @@ -56,10 +56,12 @@ public class TumblingWindow implements Executable { private static class WindowInfo { private final int baseStage; + private final Ordinal begin; private final Ordinal end; - WindowInfo(int baseStage, Ordinal end) { + WindowInfo(int baseStage, Ordinal begin, Ordinal end) { this.baseStage = baseStage; + this.begin = begin; this.end = end; } } @@ -104,13 +106,21 @@ public class TumblingWindow implements Executable { log.trace("Found [{}] hits", hits.size()); + Ordinal begin = null, end = null; if (hits.isEmpty() == false) { if (matcher.match(baseStage, wrapValues(base, hits)) == false) { payload(listener); return; } + + // get borders for the rest of the queries - but only when at least one result is found + begin = base.ordinal(hits.get(0)); + end = base.ordinal(hits.get(hits.size() - 1)); + + log.trace("Found base [{}] window {}->{}", base.stage(), begin, end); } + // only one result means there aren't going to be any matches // so move the window boxing to the next stage if (hits.size() < 2) { @@ -121,7 +131,7 @@ public class TumblingWindow implements Executable { if (until != null && hits.size() == 1) { // find "until" ordinals - early on to discard data in-flight to avoid matching // hits that can occur in other documents - untilCriterion(new WindowInfo(baseStage, base.ordinal(hits.get(0))), listener, next); + untilCriterion(new WindowInfo(baseStage, begin, end), listener, next); } else { next.run(); } @@ -133,16 +143,10 @@ public class TumblingWindow implements Executable { return; } - // get borders for the rest of the queries - Ordinal begin = base.ordinal(hits.get(0)); - Ordinal end = base.ordinal(hits.get(hits.size() - 1)); - // update current query for the next request base.queryRequest().nextAfter(end); - log.trace("Found base [{}] window {}->{}", base.stage(), begin, end); - - WindowInfo info = new WindowInfo(baseStage, end); + WindowInfo info = new WindowInfo(baseStage, begin, end); // no more queries to run if (baseStage + 1 < maxStages) { @@ -212,7 +216,10 @@ public class TumblingWindow implements Executable { log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request); client.query(request, wrap(r -> { + Ordinal boundary = reversed ? window.begin : window.end; List hits = searchHits(r); + // filter hits that are escaping the window (same timestamp but different tiebreaker) + hits = trim(hits, criterion, boundary, reversed); log.trace("Found [{}] hits", hits.size()); @@ -220,9 +227,9 @@ public class TumblingWindow implements Executable { if (hits.isEmpty()) { // put the markers in place before the next call if (reversed) { - request.to(window.end); - } else { request.from(window.end); + } else { + request.to(window.end); } // if there are no candidates, advance the window @@ -235,7 +242,19 @@ public class TumblingWindow implements Executable { } else { // prepare the query for the next search - request.nextAfter(criterion.ordinal(hits.get(hits.size() - 1))); + // however when dealing with tiebreakers the same timestamp can contain different values that might + // be within or outside the window + // to make sure one is not lost, check the minimum ordinal between the one found (which might just outside + // the window - same timestamp but a higher tiebreaker) and the actual window end + Ordinal next = criterion.ordinal(hits.get(hits.size() - 1)); + + log.trace("Found range [{}] -> [{}]", criterion.ordinal(hits.get(0)), next); + + // if the searchAfter is outside the window, trim it down + if (next.after(boundary)) { + next = boundary; + } + request.nextAfter(next); // if the limit has been reached, return what's available if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) { @@ -245,7 +264,8 @@ public class TumblingWindow implements Executable { } // keep running the query runs out of the results (essentially returns less than what we want) - if (hits.size() == windowSize) { + // however check if the window has been fully consumed + if (hits.size() == windowSize && request.after().before(boundary)) { secondaryCriterion(window, currentStage, listener); } // looks like this stage is done, move on @@ -259,10 +279,27 @@ public class TumblingWindow implements Executable { advance(window.baseStage, listener); } } - }, listener::onFailure)); } + /** + * Trim hits outside the (upper) limit. + */ + private List trim(List searchHits, Criterion criterion, Ordinal boundary, boolean reversed) { + int offset = 0; + + for (int i = searchHits.size() - 1; i >=0 ; i--) { + Ordinal ordinal = criterion.ordinal(searchHits.get(i)); + boolean withinBoundaries = reversed ? ordinal.afterOrAt(boundary) : ordinal.beforeOrAt(boundary); + if (withinBoundaries == false) { + offset++; + } else { + break; + } + } + return offset == 0 ? searchHits : searchHits.subList(0, searchHits.size() - offset); + } + /** * Box the query for the given criterion based on the window information. * Returns a boolean indicating whether reversal has been applied or not. @@ -285,6 +322,10 @@ public class TumblingWindow implements Executable { } else { // otherwise just the upper limit request.to(window.end); + // and the lower limit if it hasn't been set + if (request.after() == null) { + request.nextAfter(window.begin); + } } return reverse; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java index 38e20d4b8c8..1b5fa574f1d 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.ql.index.IndexResolver; import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; import static org.elasticsearch.action.ActionListener.wrap; +import static org.elasticsearch.xpack.ql.util.ActionListeners.map; public class EqlSession { @@ -65,8 +66,7 @@ public class EqlSession { } public void eql(String eql, ParserParams params, ActionListener listener) { - eqlExecutable(eql, params, wrap(e -> e.execute(this, wrap(p -> listener.onResponse(Results.fromPayload(p)), listener::onFailure)), - listener::onFailure)); + eqlExecutable(eql, params, wrap(e -> e.execute(this, map(listener, Results::fromPayload)), listener::onFailure)); } public void eqlExecutable(String eql, ParserParams params, ActionListener listener) { @@ -78,11 +78,11 @@ public class EqlSession { } public void physicalPlan(LogicalPlan optimized, ActionListener listener) { - optimizedPlan(optimized, wrap(o -> listener.onResponse(planner.plan(o)), listener::onFailure)); + optimizedPlan(optimized, map(listener, planner::plan)); } public void optimizedPlan(LogicalPlan verified, ActionListener listener) { - analyzedPlan(verified, wrap(v -> listener.onResponse(optimizer.optimize(v)), listener::onFailure)); + analyzedPlan(verified, map(listener, optimizer::optimize)); } public void analyzedPlan(LogicalPlan parsed, ActionListener listener) { @@ -91,18 +91,18 @@ public class EqlSession { return; } - preAnalyze(parsed, wrap(p -> listener.onResponse(postAnalyze(analyzer.analyze(p))), listener::onFailure)); + preAnalyze(parsed, map(listener, p -> postAnalyze(analyzer.analyze(p)))); } private void preAnalyze(LogicalPlan parsed, ActionListener listener) { String indexWildcard = configuration.indexAsWildcard(); if(configuration.isCancelled()){ - throw new TaskCancelledException("cancelled"); + listener.onFailure(new TaskCancelledException("cancelled")); + return; } indexResolver.resolveAsMergedMapping(indexWildcard, null, configuration.includeFrozen(), configuration.filter(), - wrap(r -> { - listener.onResponse(preAnalyzer.preAnalyze(parsed, r)); - }, listener::onFailure)); + map(listener, r -> preAnalyzer.preAnalyze(parsed, r)) + ); } private LogicalPlan postAnalyze(LogicalPlan verified) { diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java index 86e222c8873..3f59c1a0cbb 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceSpecTests.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.eql.execution.assembler; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; - import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits.Relation; import org.elasticsearch.ExceptionsHelper; @@ -95,7 +94,7 @@ public class SequenceSpecTests extends ESTestCase { TestCriterion(final int ordinal) { super(ordinal, - new BoxedQueryRequest(() -> SearchSourceBuilder.searchSource().query(matchAllQuery()).size(ordinal), "timestamp", null), + new BoxedQueryRequest(() -> SearchSourceBuilder.searchSource().query(matchAllQuery()).size(ordinal), "timestamp"), keyExtractors, tsExtractor, tbExtractor, false); this.ordinal = ordinal; diff --git a/x-pack/plugin/eql/src/test/resources/sequences.series-spec b/x-pack/plugin/eql/src/test/resources/sequences.series-spec index 156932fd796..a05f2c40cb6 100644 --- a/x-pack/plugin/eql/src/test/resources/sequences.series-spec +++ b/x-pack/plugin/eql/src/test/resources/sequences.series-spec @@ -4,12 +4,29 @@ // // A spec is defined by its name, followed by lines describing the events for each criteria // followed by the expected result. -// +// // events describe in one line per join/seq rule // the number following the string is used as a timestamp/order -// while the optional prefix followed by | indicates the key +// while the optional prefix followed by | indicates the key +// since the spec checks only the way matching is done it returns +// all the results in the first call and then an empty list. +// Since the runtime component can make multiple requests and thus discard +// returned data, use X99 as an token to force results from other lines +// to be read in (as it makes a larger window for the runtime) + +// For example +// A1 A2 +// B3 +// fails because B3 should not be read in its first call as it goes beyond A1/A2 window. +// however the testing infrastructure doesn't know this so it returns the item +// which is then discarded. On the next call, empty results are returned. +// To avoid the runtime behavior leaking in, adding an extra token at the end forces +// the runtime window to be expanded and thus for all results to be properly read: +// +// A1 A2 X99 (event occurring last in the base query) +// B3 // spec name trivialSequence @@ -54,7 +71,7 @@ A3 B4 ; basicSequenceMergingPreviousStage -A1 A2 +A1 A2 X99 B3 ; A2 B3 @@ -68,7 +85,7 @@ A1 B2 ; basicSequenceWithDualMergingPreviousStage -A1 A2 A4 A5 A6 A8 A9 +A1 A2 A4 A5 A6 A8 A9 B3 B7 ; A2 B3 @@ -91,15 +108,15 @@ A3 B4 basicSequenceMergingCurrentStage -A1 A2 - B3 B4 +A1 A2 X99 + B3 B4 ; A2 B3 ; basicSequenceWithMergingCurrentStageAndLeftOverInPreviousStage A1 A2 A5 - B3 B4 + B3 B4 ; A2 B3 ; @@ -120,47 +137,47 @@ A4 B5 threeStageBasicSequence A1 A4 B2 B5 - C3 C6 + C3 C6 ; A1 B2 C3 A4 B5 C6 ; threeStageWithOverridingSequence -A1 A3 +A1 A3 X99 B2 B4 C5 -; -A3 B4 C5 +; +A3 B4 C5 ; threeStageWithOverridingSequenceOnSecondStage -A1 +A1 B2 B4 C3 -; -A1 B2 C3 +; +A1 B2 C3 ; threeStageWithNoise - A4 A6 - B2 B5 + A4 A6 X99 + B2 B5 C1 C3 C7 ; A4 B5 C7 ; -threeStageWithIntermitentUpdates -A1 A4 +threeStageWithIntermittentUpdates +A1 A4 X99 B3 B5 C2 C6 ; A4 B5 C6 ; -threeStageWithMoreIntermitentUpdates -A1 A4 A6 +threeStageWithMoreIntermittentUpdates +A1 A4 A6 X99 B3 B5 B7 B8 C2 C9 ; @@ -168,15 +185,15 @@ A6 B7 C9 ; threeStageWithReverseMatch - A3 A5 A8 - B2 B7 + A3 A5 A8 X99 + B2 B7 C1 C4 C6 C9 ; A5 B7 C9 ; threeStageWithLeftoversOnTheLastStage -A1 A3 A5 +A1 A3 A5 X99 B2 B4 B6 C7 C8 C9 C10 ; @@ -184,7 +201,7 @@ A5 B6 C7 ; threeStageWithOverlap -A1 A3 +A1 A3 X99 B2 B5 C4 B6 ; @@ -207,7 +224,7 @@ A5 B6 C7 D8 ; fourStageIncomplete -A1 A5 +A1 A5 X99 B2 B6 C3 C7 D8 @@ -216,7 +233,7 @@ A5 B6 C7 D8 ; fourStageOverlapping -A1 A4 +A1 A4 X99 B2 B5 C3 C7 D6 D8 @@ -233,10 +250,10 @@ A4 B5 C7 D8 exampleWithTwoStagesAndKeyFromTheDocs -r|W1 r|W2 u|W6 r|W7 +r|W1 r|W2 u|W6 r|W7 x|X99 u|H3 r|H4 r|H5 u|H8 r|I9 u|I10 r|I11 ; r|W2 H4 I9 u|W6 H8 I10 -; +;