EQL: Avoid filtering on tiebreakers (#63415)
Do not filter by tiebreaker while searching sequence matches as it's not monotonic and thus can filter out valid data. Add handling for data 'near' the boundary that has the same timestamp but different tie-breaker and thus can be just outside the window. Fix #62781 Relates #63215 (cherry picked from commit 36f834600d4d9ded0fb7b1440274b2e597733770) (cherry picked from commit 72a2ce825f3bfd13f87423ba7f3c739ea64c57f6)
This commit is contained in:
parent
1f7b107ed1
commit
2ab5f226c4
|
@ -6,7 +6,7 @@ query = '''
|
|||
[ ERROR where true ]
|
||||
[ STAT where true ]
|
||||
'''
|
||||
expected_event_ids = [1,2,3]
|
||||
expected_event_ids = [1,2,3,1,2,3]
|
||||
|
||||
[[queries]]
|
||||
name = "basicWithFilter"
|
||||
|
|
|
@ -7,9 +7,7 @@
|
|||
package org.elasticsearch.xpack.eql;
|
||||
|
||||
import org.elasticsearch.test.eql.EqlExtraSpecTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
@TestLogging(value = "org.elasticsearch.xpack.eql:TRACE", reason = "results logging")
|
||||
public class EqlExtraIT extends EqlExtraSpecTestCase {
|
||||
|
||||
public EqlExtraIT(String query, String name, long[] eventIds) {
|
||||
|
|
|
@ -6,13 +6,12 @@
|
|||
|
||||
package org.elasticsearch.xpack.eql.execution.assembler;
|
||||
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.RangeQueryBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
|
||||
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
|
||||
import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
|
||||
|
||||
/**
|
||||
|
@ -21,36 +20,23 @@ import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
|
|||
*
|
||||
* Note that the range is not set at once on purpose since each query tends to have
|
||||
* its own number of results separate from the others.
|
||||
* As such, each query starts where it lefts to reach the current in-progress window
|
||||
* As such, each query starts from where it left off to reach the current in-progress window
|
||||
* as oppose to always operating with the exact same window.
|
||||
*/
|
||||
public class BoxedQueryRequest implements QueryRequest {
|
||||
|
||||
private final RangeQueryBuilder timestampRange;
|
||||
private final RangeQueryBuilder tiebreakerRange;
|
||||
|
||||
private final SearchSourceBuilder searchSource;
|
||||
|
||||
private Ordinal from, to;
|
||||
private Ordinal after;
|
||||
|
||||
public BoxedQueryRequest(QueryRequest original, String timestamp, String tiebreaker) {
|
||||
public BoxedQueryRequest(QueryRequest original, String timestamp) {
|
||||
searchSource = original.searchSource();
|
||||
// setup range queries and preserve their reference to simplify the update
|
||||
timestampRange = rangeQuery(timestamp).timeZone("UTC").format("epoch_millis");
|
||||
BoolQueryBuilder filter = boolQuery().filter(timestampRange);
|
||||
if (tiebreaker != null) {
|
||||
tiebreakerRange = rangeQuery(tiebreaker);
|
||||
filter.filter(tiebreakerRange);
|
||||
} else {
|
||||
tiebreakerRange = null;
|
||||
}
|
||||
|
||||
searchSource = original.searchSource();
|
||||
// combine with existing query (if it exists)
|
||||
if (searchSource.query() != null) {
|
||||
filter = filter.must(searchSource.query());
|
||||
}
|
||||
searchSource.query(filter);
|
||||
RuntimeUtils.addFilter(timestampRange, searchSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -72,9 +58,6 @@ public class BoxedQueryRequest implements QueryRequest {
|
|||
public BoxedQueryRequest from(Ordinal begin) {
|
||||
from = begin;
|
||||
timestampRange.gte(begin != null ? begin.timestamp() : null);
|
||||
if (tiebreakerRange != null) {
|
||||
tiebreakerRange.gte(begin != null ? begin.tiebreaker() : null);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -88,14 +71,10 @@ public class BoxedQueryRequest implements QueryRequest {
|
|||
|
||||
/**
|
||||
* Sets the upper boundary for the query (inclusive).
|
||||
* Can be removed (when the query in unbounded) through null.
|
||||
*/
|
||||
public BoxedQueryRequest to(Ordinal end) {
|
||||
to = end;
|
||||
timestampRange.lte(end != null ? end.timestamp() : null);
|
||||
if (tiebreakerRange != null) {
|
||||
tiebreakerRange.lte(end != null ? end.tiebreaker() : null);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ public class ExecutionManager {
|
|||
if (query instanceof EsQueryExec) {
|
||||
SearchSourceBuilder source = ((EsQueryExec) query).source(session);
|
||||
QueryRequest original = () -> source;
|
||||
BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName);
|
||||
BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName);
|
||||
Criterion<BoxedQueryRequest> criterion =
|
||||
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i == 0 && descending);
|
||||
criteria.add(criterion);
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.Arrays;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.idsQuery;
|
||||
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest;
|
||||
|
@ -37,7 +38,7 @@ public class BasicQueryClient implements QueryClient {
|
|||
|
||||
private static final Logger log = RuntimeUtils.QUERY_LOG;
|
||||
|
||||
private final EqlConfiguration cfg;
|
||||
final EqlConfiguration cfg;
|
||||
final Client client;
|
||||
final String[] indices;
|
||||
|
||||
|
@ -53,22 +54,37 @@ public class BasicQueryClient implements QueryClient {
|
|||
// set query timeout
|
||||
searchSource.timeout(cfg.requestTimeout());
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("About to execute query {} on {}", StringUtils.toString(searchSource), indices);
|
||||
}
|
||||
if (cfg.isCancelled()) {
|
||||
throw new TaskCancelledException("cancelled");
|
||||
}
|
||||
|
||||
SearchRequest search = prepareRequest(client, searchSource, false, indices);
|
||||
search(search, new BasicListener(listener));
|
||||
}
|
||||
|
||||
protected void search(SearchRequest search, ActionListener<SearchResponse> listener) {
|
||||
if (cfg.isCancelled()) {
|
||||
listener.onFailure(new TaskCancelledException("cancelled"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("About to execute query {} on {}", StringUtils.toString(search.source()), indices);
|
||||
}
|
||||
|
||||
client.search(search, listener);
|
||||
}
|
||||
|
||||
protected void search(MultiSearchRequest search, ActionListener<MultiSearchResponse> listener) {
|
||||
if (cfg.isCancelled()) {
|
||||
listener.onFailure(new TaskCancelledException("cancelled"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
StringJoiner sj = new StringJoiner("\n");
|
||||
for (SearchRequest request : search.requests()) {
|
||||
sj.add(StringUtils.toString(request.source()));
|
||||
}
|
||||
log.trace("About to execute multi-queries {} on {}", sj, indices);
|
||||
}
|
||||
|
||||
client.multiSearch(search, listener);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ public class Ordinal implements Comparable<Ordinal> {
|
|||
this.timestamp = timestamp;
|
||||
this.tiebreaker = tiebreaker;
|
||||
}
|
||||
|
||||
|
||||
public long timestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
@ -36,11 +36,11 @@ public class Ordinal implements Comparable<Ordinal> {
|
|||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
Ordinal other = (Ordinal) obj;
|
||||
return Objects.equals(timestamp, other.timestamp)
|
||||
&& Objects.equals(tiebreaker, other.tiebreaker);
|
||||
|
@ -81,7 +81,23 @@ public class Ordinal implements Comparable<Ordinal> {
|
|||
return (compareTo(left) <= 0 && compareTo(right) >= 0) || (compareTo(right) <= 0 && compareTo(left) >= 0);
|
||||
}
|
||||
|
||||
public boolean before(Ordinal other) {
|
||||
return compareTo(other) < 0;
|
||||
}
|
||||
|
||||
public boolean beforeOrAt(Ordinal other) {
|
||||
return compareTo(other) <= 0;
|
||||
}
|
||||
|
||||
public boolean after(Ordinal other) {
|
||||
return compareTo(other) > 0;
|
||||
}
|
||||
|
||||
public boolean afterOrAt(Ordinal other) {
|
||||
return compareTo(other) >= 0;
|
||||
}
|
||||
|
||||
public Object[] toArray() {
|
||||
return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,6 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.builder.PointInTimeBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
|
||||
|
@ -30,7 +28,6 @@ import org.elasticsearch.xpack.ql.index.IndexResolver;
|
|||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.action.ActionListener.wrap;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
|
||||
import static org.elasticsearch.xpack.ql.util.ActionListeners.map;
|
||||
|
||||
|
@ -105,12 +102,7 @@ public class PITAwareQueryClient extends BasicQueryClient {
|
|||
String[] indices = request.indices();
|
||||
if (CollectionUtils.isEmpty(indices) == false) {
|
||||
request.indices(Strings.EMPTY_ARRAY);
|
||||
BoolQueryBuilder indexFilter = boolQuery().filter(termsQuery(GetResult._INDEX, indices));
|
||||
QueryBuilder query = source.query();
|
||||
if (query != null) {
|
||||
indexFilter.must(query);
|
||||
}
|
||||
source.query(indexFilter);
|
||||
RuntimeUtils.addFilter(termsQuery(GetResult._INDEX, indices), source);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -123,11 +115,11 @@ public class PITAwareQueryClient extends BasicQueryClient {
|
|||
},
|
||||
// always close PIT in case of exceptions
|
||||
e -> {
|
||||
if (pitId != null) {
|
||||
close(wrap(b -> {
|
||||
}, listener::onFailure));
|
||||
}
|
||||
listener.onFailure(e);
|
||||
if (pitId != null && cfg.isCancelled() == false) {
|
||||
// ignore any success/failure to avoid obfuscating the response
|
||||
close(wrap(b -> {}, ex -> {}));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
@ -34,6 +36,8 @@ import java.util.LinkedHashSet;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
||||
|
||||
public final class RuntimeUtils {
|
||||
|
||||
static final Logger QUERY_LOG = LogManager.getLogger(QueryClient.class);
|
||||
|
@ -50,10 +54,10 @@ public final class RuntimeUtils {
|
|||
aggsNames.append(aggs.get(i).getName() + (i + 1 == aggs.size() ? "" : ", "));
|
||||
}
|
||||
|
||||
logger.trace("Got search response [hits {} {}, {} aggregations: [{}], {} failed shards, {} skipped shards, "
|
||||
+ "{} successful shards, {} total shards, took {}, timed out [{}]]", response.getHits().getTotalHits().relation.toString(),
|
||||
response.getHits().getTotalHits().value, aggs.size(), aggsNames, response.getFailedShards(), response.getSkippedShards(),
|
||||
response.getSuccessfulShards(), response.getTotalShards(), response.getTook(), response.isTimedOut());
|
||||
logger.trace("Got search response [hits {}, {} aggregations: [{}], {} failed shards, {} skipped shards, "
|
||||
+ "{} successful shards, {} total shards, took {}, timed out [{}]]", response.getHits().getTotalHits(), aggs.size(),
|
||||
aggsNames, response.getFailedShards(), response.getSkippedShards(), response.getSuccessfulShards(),
|
||||
response.getTotalShards(), response.getTook(), response.isTimedOut());
|
||||
}
|
||||
|
||||
public static List<HitExtractor> createExtractor(List<FieldExtraction> fields, EqlConfiguration cfg) {
|
||||
|
@ -111,4 +115,30 @@ public final class RuntimeUtils {
|
|||
public static List<SearchHit> searchHits(SearchResponse response) {
|
||||
return Arrays.asList(response.getHits().getHits());
|
||||
}
|
||||
|
||||
// optimized method that adds filter to existing bool queries without additional wrapping
|
||||
// additionally checks whether the given query exists for safe decoration
|
||||
public static SearchSourceBuilder addFilter(QueryBuilder filter, SearchSourceBuilder source) {
|
||||
BoolQueryBuilder bool = null;
|
||||
QueryBuilder query = source.query();
|
||||
|
||||
if (query instanceof BoolQueryBuilder) {
|
||||
bool = (BoolQueryBuilder) query;
|
||||
if (filter != null && bool.filter().contains(filter) == false) {
|
||||
bool.filter(filter);
|
||||
}
|
||||
}
|
||||
else {
|
||||
bool = boolQuery();
|
||||
if (query != null) {
|
||||
bool.filter(query);
|
||||
}
|
||||
if (filter != null) {
|
||||
bool.filter(filter);
|
||||
}
|
||||
|
||||
source.query(bool);
|
||||
}
|
||||
return source;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,6 +76,8 @@ public abstract class SourceGenerator {
|
|||
}
|
||||
}
|
||||
|
||||
optimize(container, source);
|
||||
|
||||
return source;
|
||||
}
|
||||
|
||||
|
@ -94,7 +96,7 @@ public abstract class SourceGenerator {
|
|||
sortBuilder = fieldSort(fa.name())
|
||||
.missing(as.missing().position())
|
||||
.unmappedType(fa.dataType().esType());
|
||||
|
||||
|
||||
if (fa.isNested()) {
|
||||
FieldSortBuilder fieldSort = fieldSort(fa.name())
|
||||
.missing(as.missing().position())
|
||||
|
@ -134,8 +136,6 @@ public abstract class SourceGenerator {
|
|||
}
|
||||
|
||||
private static void optimize(QueryContainer query, SearchSourceBuilder builder) {
|
||||
if (query.shouldTrackHits()) {
|
||||
builder.trackTotalHits(true);
|
||||
}
|
||||
builder.trackTotalHits(query.shouldTrackHits());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,10 +56,12 @@ public class TumblingWindow implements Executable {
|
|||
|
||||
private static class WindowInfo {
|
||||
private final int baseStage;
|
||||
private final Ordinal begin;
|
||||
private final Ordinal end;
|
||||
|
||||
WindowInfo(int baseStage, Ordinal end) {
|
||||
WindowInfo(int baseStage, Ordinal begin, Ordinal end) {
|
||||
this.baseStage = baseStage;
|
||||
this.begin = begin;
|
||||
this.end = end;
|
||||
}
|
||||
}
|
||||
|
@ -104,13 +106,21 @@ public class TumblingWindow implements Executable {
|
|||
|
||||
log.trace("Found [{}] hits", hits.size());
|
||||
|
||||
Ordinal begin = null, end = null;
|
||||
if (hits.isEmpty() == false) {
|
||||
if (matcher.match(baseStage, wrapValues(base, hits)) == false) {
|
||||
payload(listener);
|
||||
return;
|
||||
}
|
||||
|
||||
// get borders for the rest of the queries - but only when at least one result is found
|
||||
begin = base.ordinal(hits.get(0));
|
||||
end = base.ordinal(hits.get(hits.size() - 1));
|
||||
|
||||
log.trace("Found base [{}] window {}->{}", base.stage(), begin, end);
|
||||
}
|
||||
|
||||
|
||||
// only one result means there aren't going to be any matches
|
||||
// so move the window boxing to the next stage
|
||||
if (hits.size() < 2) {
|
||||
|
@ -121,7 +131,7 @@ public class TumblingWindow implements Executable {
|
|||
if (until != null && hits.size() == 1) {
|
||||
// find "until" ordinals - early on to discard data in-flight to avoid matching
|
||||
// hits that can occur in other documents
|
||||
untilCriterion(new WindowInfo(baseStage, base.ordinal(hits.get(0))), listener, next);
|
||||
untilCriterion(new WindowInfo(baseStage, begin, end), listener, next);
|
||||
} else {
|
||||
next.run();
|
||||
}
|
||||
|
@ -133,16 +143,10 @@ public class TumblingWindow implements Executable {
|
|||
return;
|
||||
}
|
||||
|
||||
// get borders for the rest of the queries
|
||||
Ordinal begin = base.ordinal(hits.get(0));
|
||||
Ordinal end = base.ordinal(hits.get(hits.size() - 1));
|
||||
|
||||
// update current query for the next request
|
||||
base.queryRequest().nextAfter(end);
|
||||
|
||||
log.trace("Found base [{}] window {}->{}", base.stage(), begin, end);
|
||||
|
||||
WindowInfo info = new WindowInfo(baseStage, end);
|
||||
WindowInfo info = new WindowInfo(baseStage, begin, end);
|
||||
|
||||
// no more queries to run
|
||||
if (baseStage + 1 < maxStages) {
|
||||
|
@ -212,7 +216,10 @@ public class TumblingWindow implements Executable {
|
|||
log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request);
|
||||
|
||||
client.query(request, wrap(r -> {
|
||||
Ordinal boundary = reversed ? window.begin : window.end;
|
||||
List<SearchHit> hits = searchHits(r);
|
||||
// filter hits that are escaping the window (same timestamp but different tiebreaker)
|
||||
hits = trim(hits, criterion, boundary, reversed);
|
||||
|
||||
log.trace("Found [{}] hits", hits.size());
|
||||
|
||||
|
@ -220,9 +227,9 @@ public class TumblingWindow implements Executable {
|
|||
if (hits.isEmpty()) {
|
||||
// put the markers in place before the next call
|
||||
if (reversed) {
|
||||
request.to(window.end);
|
||||
} else {
|
||||
request.from(window.end);
|
||||
} else {
|
||||
request.to(window.end);
|
||||
}
|
||||
|
||||
// if there are no candidates, advance the window
|
||||
|
@ -235,7 +242,19 @@ public class TumblingWindow implements Executable {
|
|||
}
|
||||
else {
|
||||
// prepare the query for the next search
|
||||
request.nextAfter(criterion.ordinal(hits.get(hits.size() - 1)));
|
||||
// however when dealing with tiebreakers the same timestamp can contain different values that might
|
||||
// be within or outside the window
|
||||
// to make sure one is not lost, check the minimum ordinal between the one found (which might just outside
|
||||
// the window - same timestamp but a higher tiebreaker) and the actual window end
|
||||
Ordinal next = criterion.ordinal(hits.get(hits.size() - 1));
|
||||
|
||||
log.trace("Found range [{}] -> [{}]", criterion.ordinal(hits.get(0)), next);
|
||||
|
||||
// if the searchAfter is outside the window, trim it down
|
||||
if (next.after(boundary)) {
|
||||
next = boundary;
|
||||
}
|
||||
request.nextAfter(next);
|
||||
|
||||
// if the limit has been reached, return what's available
|
||||
if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) {
|
||||
|
@ -245,7 +264,8 @@ public class TumblingWindow implements Executable {
|
|||
}
|
||||
|
||||
// keep running the query runs out of the results (essentially returns less than what we want)
|
||||
if (hits.size() == windowSize) {
|
||||
// however check if the window has been fully consumed
|
||||
if (hits.size() == windowSize && request.after().before(boundary)) {
|
||||
secondaryCriterion(window, currentStage, listener);
|
||||
}
|
||||
// looks like this stage is done, move on
|
||||
|
@ -259,10 +279,27 @@ public class TumblingWindow implements Executable {
|
|||
advance(window.baseStage, listener);
|
||||
}
|
||||
}
|
||||
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
/**
|
||||
* Trim hits outside the (upper) limit.
|
||||
*/
|
||||
private List<SearchHit> trim(List<SearchHit> searchHits, Criterion<BoxedQueryRequest> criterion, Ordinal boundary, boolean reversed) {
|
||||
int offset = 0;
|
||||
|
||||
for (int i = searchHits.size() - 1; i >=0 ; i--) {
|
||||
Ordinal ordinal = criterion.ordinal(searchHits.get(i));
|
||||
boolean withinBoundaries = reversed ? ordinal.afterOrAt(boundary) : ordinal.beforeOrAt(boundary);
|
||||
if (withinBoundaries == false) {
|
||||
offset++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return offset == 0 ? searchHits : searchHits.subList(0, searchHits.size() - offset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Box the query for the given criterion based on the window information.
|
||||
* Returns a boolean indicating whether reversal has been applied or not.
|
||||
|
@ -285,6 +322,10 @@ public class TumblingWindow implements Executable {
|
|||
} else {
|
||||
// otherwise just the upper limit
|
||||
request.to(window.end);
|
||||
// and the lower limit if it hasn't been set
|
||||
if (request.after() == null) {
|
||||
request.nextAfter(window.begin);
|
||||
}
|
||||
}
|
||||
|
||||
return reverse;
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.xpack.ql.index.IndexResolver;
|
|||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
|
||||
|
||||
import static org.elasticsearch.action.ActionListener.wrap;
|
||||
import static org.elasticsearch.xpack.ql.util.ActionListeners.map;
|
||||
|
||||
public class EqlSession {
|
||||
|
||||
|
@ -65,8 +66,7 @@ public class EqlSession {
|
|||
}
|
||||
|
||||
public void eql(String eql, ParserParams params, ActionListener<Results> listener) {
|
||||
eqlExecutable(eql, params, wrap(e -> e.execute(this, wrap(p -> listener.onResponse(Results.fromPayload(p)), listener::onFailure)),
|
||||
listener::onFailure));
|
||||
eqlExecutable(eql, params, wrap(e -> e.execute(this, map(listener, Results::fromPayload)), listener::onFailure));
|
||||
}
|
||||
|
||||
public void eqlExecutable(String eql, ParserParams params, ActionListener<PhysicalPlan> listener) {
|
||||
|
@ -78,11 +78,11 @@ public class EqlSession {
|
|||
}
|
||||
|
||||
public void physicalPlan(LogicalPlan optimized, ActionListener<PhysicalPlan> listener) {
|
||||
optimizedPlan(optimized, wrap(o -> listener.onResponse(planner.plan(o)), listener::onFailure));
|
||||
optimizedPlan(optimized, map(listener, planner::plan));
|
||||
}
|
||||
|
||||
public void optimizedPlan(LogicalPlan verified, ActionListener<LogicalPlan> listener) {
|
||||
analyzedPlan(verified, wrap(v -> listener.onResponse(optimizer.optimize(v)), listener::onFailure));
|
||||
analyzedPlan(verified, map(listener, optimizer::optimize));
|
||||
}
|
||||
|
||||
public void analyzedPlan(LogicalPlan parsed, ActionListener<LogicalPlan> listener) {
|
||||
|
@ -91,18 +91,18 @@ public class EqlSession {
|
|||
return;
|
||||
}
|
||||
|
||||
preAnalyze(parsed, wrap(p -> listener.onResponse(postAnalyze(analyzer.analyze(p))), listener::onFailure));
|
||||
preAnalyze(parsed, map(listener, p -> postAnalyze(analyzer.analyze(p))));
|
||||
}
|
||||
|
||||
private <T> void preAnalyze(LogicalPlan parsed, ActionListener<LogicalPlan> listener) {
|
||||
String indexWildcard = configuration.indexAsWildcard();
|
||||
if(configuration.isCancelled()){
|
||||
throw new TaskCancelledException("cancelled");
|
||||
listener.onFailure(new TaskCancelledException("cancelled"));
|
||||
return;
|
||||
}
|
||||
indexResolver.resolveAsMergedMapping(indexWildcard, null, configuration.includeFrozen(), configuration.filter(),
|
||||
wrap(r -> {
|
||||
listener.onResponse(preAnalyzer.preAnalyze(parsed, r));
|
||||
}, listener::onFailure));
|
||||
map(listener, r -> preAnalyzer.preAnalyze(parsed, r))
|
||||
);
|
||||
}
|
||||
|
||||
private LogicalPlan postAnalyze(LogicalPlan verified) {
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
package org.elasticsearch.xpack.eql.execution.assembler;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||
|
||||
import org.apache.lucene.search.TotalHits;
|
||||
import org.apache.lucene.search.TotalHits.Relation;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
|
@ -95,7 +94,7 @@ public class SequenceSpecTests extends ESTestCase {
|
|||
|
||||
TestCriterion(final int ordinal) {
|
||||
super(ordinal,
|
||||
new BoxedQueryRequest(() -> SearchSourceBuilder.searchSource().query(matchAllQuery()).size(ordinal), "timestamp", null),
|
||||
new BoxedQueryRequest(() -> SearchSourceBuilder.searchSource().query(matchAllQuery()).size(ordinal), "timestamp"),
|
||||
keyExtractors,
|
||||
tsExtractor, tbExtractor, false);
|
||||
this.ordinal = ordinal;
|
||||
|
|
|
@ -4,12 +4,29 @@
|
|||
//
|
||||
// A spec is defined by its name, followed by lines describing the events for each criteria
|
||||
// followed by the expected result.
|
||||
//
|
||||
//
|
||||
|
||||
// events describe in one line per join/seq rule
|
||||
// the number following the string is used as a timestamp/order
|
||||
// while the optional prefix followed by | indicates the key
|
||||
// while the optional prefix followed by | indicates the key
|
||||
|
||||
// since the spec checks only the way matching is done it returns
|
||||
// all the results in the first call and then an empty list.
|
||||
// Since the runtime component can make multiple requests and thus discard
|
||||
// returned data, use X99 as an token to force results from other lines
|
||||
// to be read in (as it makes a larger window for the runtime)
|
||||
|
||||
// For example
|
||||
// A1 A2
|
||||
// B3
|
||||
// fails because B3 should not be read in its first call as it goes beyond A1/A2 window.
|
||||
// however the testing infrastructure doesn't know this so it returns the item
|
||||
// which is then discarded. On the next call, empty results are returned.
|
||||
// To avoid the runtime behavior leaking in, adding an extra token at the end forces
|
||||
// the runtime window to be expanded and thus for all results to be properly read:
|
||||
//
|
||||
// A1 A2 X99 (event occurring last in the base query)
|
||||
// B3
|
||||
|
||||
// spec name
|
||||
trivialSequence
|
||||
|
@ -54,7 +71,7 @@ A3 B4
|
|||
;
|
||||
|
||||
basicSequenceMergingPreviousStage
|
||||
A1 A2
|
||||
A1 A2 X99
|
||||
B3
|
||||
;
|
||||
A2 B3
|
||||
|
@ -68,7 +85,7 @@ A1 B2
|
|||
;
|
||||
|
||||
basicSequenceWithDualMergingPreviousStage
|
||||
A1 A2 A4 A5 A6 A8 A9
|
||||
A1 A2 A4 A5 A6 A8 A9
|
||||
B3 B7
|
||||
;
|
||||
A2 B3
|
||||
|
@ -91,15 +108,15 @@ A3 B4
|
|||
|
||||
|
||||
basicSequenceMergingCurrentStage
|
||||
A1 A2
|
||||
B3 B4
|
||||
A1 A2 X99
|
||||
B3 B4
|
||||
;
|
||||
A2 B3
|
||||
;
|
||||
|
||||
basicSequenceWithMergingCurrentStageAndLeftOverInPreviousStage
|
||||
A1 A2 A5
|
||||
B3 B4
|
||||
B3 B4
|
||||
;
|
||||
A2 B3
|
||||
;
|
||||
|
@ -120,47 +137,47 @@ A4 B5
|
|||
threeStageBasicSequence
|
||||
A1 A4
|
||||
B2 B5
|
||||
C3 C6
|
||||
C3 C6
|
||||
;
|
||||
A1 B2 C3
|
||||
A4 B5 C6
|
||||
;
|
||||
|
||||
threeStageWithOverridingSequence
|
||||
A1 A3
|
||||
A1 A3 X99
|
||||
B2 B4
|
||||
C5
|
||||
;
|
||||
A3 B4 C5
|
||||
;
|
||||
A3 B4 C5
|
||||
;
|
||||
|
||||
threeStageWithOverridingSequenceOnSecondStage
|
||||
A1
|
||||
A1
|
||||
B2 B4
|
||||
C3
|
||||
;
|
||||
A1 B2 C3
|
||||
;
|
||||
A1 B2 C3
|
||||
;
|
||||
|
||||
threeStageWithNoise
|
||||
A4 A6
|
||||
B2 B5
|
||||
A4 A6 X99
|
||||
B2 B5
|
||||
C1 C3 C7
|
||||
;
|
||||
|
||||
A4 B5 C7
|
||||
;
|
||||
|
||||
threeStageWithIntermitentUpdates
|
||||
A1 A4
|
||||
threeStageWithIntermittentUpdates
|
||||
A1 A4 X99
|
||||
B3 B5
|
||||
C2 C6
|
||||
;
|
||||
A4 B5 C6
|
||||
;
|
||||
|
||||
threeStageWithMoreIntermitentUpdates
|
||||
A1 A4 A6
|
||||
threeStageWithMoreIntermittentUpdates
|
||||
A1 A4 A6 X99
|
||||
B3 B5 B7 B8
|
||||
C2 C9
|
||||
;
|
||||
|
@ -168,15 +185,15 @@ A6 B7 C9
|
|||
;
|
||||
|
||||
threeStageWithReverseMatch
|
||||
A3 A5 A8
|
||||
B2 B7
|
||||
A3 A5 A8 X99
|
||||
B2 B7
|
||||
C1 C4 C6 C9
|
||||
;
|
||||
A5 B7 C9
|
||||
;
|
||||
|
||||
threeStageWithLeftoversOnTheLastStage
|
||||
A1 A3 A5
|
||||
A1 A3 A5 X99
|
||||
B2 B4 B6
|
||||
C7 C8 C9 C10
|
||||
;
|
||||
|
@ -184,7 +201,7 @@ A5 B6 C7
|
|||
;
|
||||
|
||||
threeStageWithOverlap
|
||||
A1 A3
|
||||
A1 A3 X99
|
||||
B2 B5
|
||||
C4 B6
|
||||
;
|
||||
|
@ -207,7 +224,7 @@ A5 B6 C7 D8
|
|||
;
|
||||
|
||||
fourStageIncomplete
|
||||
A1 A5
|
||||
A1 A5 X99
|
||||
B2 B6
|
||||
C3 C7
|
||||
D8
|
||||
|
@ -216,7 +233,7 @@ A5 B6 C7 D8
|
|||
;
|
||||
|
||||
fourStageOverlapping
|
||||
A1 A4
|
||||
A1 A4 X99
|
||||
B2 B5
|
||||
C3 C7
|
||||
D6 D8
|
||||
|
@ -233,10 +250,10 @@ A4 B5 C7 D8
|
|||
|
||||
exampleWithTwoStagesAndKeyFromTheDocs
|
||||
|
||||
r|W1 r|W2 u|W6 r|W7
|
||||
r|W1 r|W2 u|W6 r|W7 x|X99
|
||||
u|H3 r|H4 r|H5 u|H8
|
||||
r|I9 u|I10 r|I11
|
||||
;
|
||||
r|W2 H4 I9
|
||||
u|W6 H8 I10
|
||||
;
|
||||
;
|
||||
|
|
Loading…
Reference in New Issue