EQL: Add option for returning results from the tail of the stream (#64869) (#65040)

Introduce option for specifying whether the results are returned from
the tail (end) of the stream or the head (beginning).
Improve sequencing algorithm by significantly eliminating the number
of in-flight sequences for spare datasets.
Refactor the sequence class by eliminating some of the redundant code.
Change matching behavior for tail sequences.
Return results based on their first entry ordinal instead of
insertion order (which was ordered on the last match ordinal).
Randomize results position inside test suite.

Close #58646

(cherry picked from commit e85d9d1bbee13ad408e789fd62efb30bc8d223f2)
(cherry picked from commit 452c674a10cdc16dced3cde7babf5d5a9d64a6d9)
This commit is contained in:
Costin Leau 2020-11-14 13:44:17 +02:00 committed by GitHub
parent 38b5b59862
commit 76e73fec79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 525 additions and 293 deletions

View File

@ -39,6 +39,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
private QueryBuilder filter = null;
private String timestampField = "@timestamp";
private String eventCategoryField = "event.category";
private String resultPosition = "head";
private int size = 10;
private int fetchSize = 1000;
@ -57,6 +58,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
static final String KEY_SIZE = "size";
static final String KEY_FETCH_SIZE = "fetch_size";
static final String KEY_QUERY = "query";
static final String KEY_RESULT_POSITION = "result_position";
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
static final String KEY_KEEP_ALIVE = "keep_alive";
static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion";
@ -79,6 +81,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
builder.field(KEY_EVENT_CATEGORY_FIELD, eventCategoryField());
builder.field(KEY_SIZE, size());
builder.field(KEY_FETCH_SIZE, fetchSize());
builder.field(KEY_RESULT_POSITION, resultPosition());
builder.field(KEY_QUERY, query);
if (waitForCompletionTimeout != null) {
@ -140,6 +143,19 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
return this;
}
public String resultPosition() {
return resultPosition;
}
public EqlSearchRequest resultPosition(String position) {
if ("head".equals(position) || "tail".equals(position)) {
resultPosition = position;
} else {
throw new IllegalArgumentException("result position needs to be 'head' or 'tail', received '" + position + "'");
}
return this;
}
public int size() {
return this.size;
}
@ -211,6 +227,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
EqlSearchRequest that = (EqlSearchRequest) o;
return size == that.size &&
fetchSize == that.fetchSize &&
resultPosition == that.resultPosition &&
Arrays.equals(indices, that.indices) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(filter, that.filter) &&
@ -237,6 +254,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
tiebreakerField,
eventCategoryField,
query,
resultPosition,
waitForCompletionTimeout,
keepAlive,
keepOnCompletion);

View File

@ -7,6 +7,7 @@
package org.elasticsearch.test.eql;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.elasticsearch.client.EqlClient;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
@ -118,11 +119,19 @@ public abstract class BaseEqlSpecTestCase extends ESRestTestCase {
// some queries return more than 10 results
request.size(50);
request.fetchSize(randomIntBetween(2, 50));
request.resultPosition(randomBoolean() ? "head" : "tail");
return runRequest(eqlClient(), request);
}
protected EqlSearchResponse runRequest(EqlClient eqlClient, EqlSearchRequest request) throws IOException {
return eqlClient.search(request, RequestOptions.DEFAULT);
int timeout = Math.toIntExact(timeout().millis());
RequestConfig config = RequestConfig.copy(RequestConfig.DEFAULT)
.setConnectionRequestTimeout(timeout)
.setConnectTimeout(timeout)
.setSocketTimeout(timeout)
.build();
return eqlClient.search(request, RequestOptions.DEFAULT.toBuilder().setRequestConfig(config).build());
}
protected EqlClient eqlClient() {

View File

@ -268,7 +268,8 @@ sequence by unique_pid
[any where true]
[any where serial_event_id < 72]
'''
expected_event_ids = [54, 55, 59,
expected_event_ids = [
54, 55, 59,
55, 59, 61,
59, 61, 65,
16, 60, 66,

View File

@ -49,6 +49,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
private int size = RequestDefaults.SIZE;
private int fetchSize = RequestDefaults.FETCH_SIZE;
private String query;
private String resultPosition = "head";
// Async settings
private TimeValue waitForCompletionTimeout = null;
@ -65,6 +66,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
static final String KEY_KEEP_ALIVE = "keep_alive";
static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion";
static final String KEY_RESULT_POSITION = "result_position";
static final ParseField FILTER = new ParseField(KEY_FILTER);
static final ParseField TIMESTAMP_FIELD = new ParseField(KEY_TIMESTAMP_FIELD);
@ -76,6 +78,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField(KEY_WAIT_FOR_COMPLETION_TIMEOUT);
static final ParseField KEEP_ALIVE = new ParseField(KEY_KEEP_ALIVE);
static final ParseField KEEP_ON_COMPLETION = new ParseField(KEY_KEEP_ON_COMPLETION);
static final ParseField RESULT_POSITION = new ParseField(KEY_RESULT_POSITION);
private static final ObjectParser<EqlSearchRequest, Void> PARSER = objectParser(EqlSearchRequest::new);
@ -171,6 +174,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
builder.field(KEY_KEEP_ALIVE, keepAlive);
}
builder.field(KEY_KEEP_ON_COMPLETION, keepOnCompletion);
builder.field(KEY_RESULT_POSITION, resultPosition);
return builder;
}
@ -195,6 +199,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
parser.declareField(EqlSearchRequest::keepAlive,
(p, c) -> TimeValue.parseTimeValue(p.text(), KEY_KEEP_ALIVE), KEEP_ALIVE, ObjectParser.ValueType.VALUE);
parser.declareBoolean(EqlSearchRequest::keepOnCompletion, KEEP_ON_COMPLETION);
parser.declareString(EqlSearchRequest::resultPosition, RESULT_POSITION);
return parser;
}
@ -284,6 +289,19 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
return this;
}
public String resultPosition() {
return resultPosition;
}
public EqlSearchRequest resultPosition(String position) {
if ("head".equals(position) || "tail".equals(position)) {
resultPosition = position;
} else {
throw new IllegalArgumentException("result position needs to be 'head' or 'tail', received '" + position + "'");
}
return this;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -327,7 +345,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
Objects.equals(eventCategoryField, that.eventCategoryField) &&
Objects.equals(query, that.query) &&
Objects.equals(waitForCompletionTimeout, that.waitForCompletionTimeout) &&
Objects.equals(keepAlive, that.keepAlive);
Objects.equals(keepAlive, that.keepAlive) &&
Objects.equals(resultPosition, that.resultPosition);
}
@ -344,7 +363,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
eventCategoryField,
query,
waitForCompletionTimeout,
keepAlive);
keepAlive,
resultPosition);
}
@Override

View File

@ -26,7 +26,6 @@ import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
public class BoxedQueryRequest implements QueryRequest {
private final RangeQueryBuilder timestampRange;
private final SearchSourceBuilder searchSource;
private Ordinal from, to;
@ -61,6 +60,16 @@ public class BoxedQueryRequest implements QueryRequest {
return this;
}
/**
* Sets the upper boundary for the query (inclusive).
* Can be removed through null.
*/
public BoxedQueryRequest to(Ordinal end) {
to = end;
timestampRange.lte(end != null ? end.timestamp() : null);
return this;
}
public Ordinal after() {
return after;
}
@ -69,13 +78,8 @@ public class BoxedQueryRequest implements QueryRequest {
return from;
}
/**
* Sets the upper boundary for the query (inclusive).
*/
public BoxedQueryRequest to(Ordinal end) {
to = end;
timestampRange.lte(end != null ? end.timestamp() : null);
return this;
public Ordinal to() {
return to;
}
@Override

View File

@ -23,35 +23,39 @@ public class Criterion<Q extends QueryRequest> {
private final HitExtractor timestamp;
private final HitExtractor tiebreaker;
private final boolean reverse;
private final boolean descending;
Criterion(int stage,
Q queryRequest,
List<HitExtractor> keys,
HitExtractor timestamp,
HitExtractor tiebreaker,
boolean reverse) {
boolean descending) {
this.stage = stage;
this.queryRequest = queryRequest;
this.keys = keys;
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
this.reverse = reverse;
this.descending = descending;
}
public int stage() {
return stage;
}
public boolean reverse() {
return reverse;
public boolean descending() {
return descending;
}
public Q queryRequest() {
return queryRequest;
}
public int keySize() {
return keys.size();
}
public SequenceKey key(SearchHit hit) {
SequenceKey key;
if (keys.isEmpty()) {
@ -89,6 +93,6 @@ public class Criterion<Q extends QueryRequest> {
@Override
public String toString() {
return "[" + stage + "][" + reverse + "]";
return "[" + stage + "][" + descending + "]";
}
}
}

View File

@ -57,7 +57,6 @@ public class ExecutionManager {
HitExtractor tbExtractor = Expressions.isPresent(tiebreaker) ? hitExtractor(tiebreaker, extractorRegistry) : null;
// NB: since there's no aliasing inside EQL, the attribute name is the same as the underlying field name
String timestampName = Expressions.name(timestamp);
String tiebreakerName = Expressions.isPresent(tiebreaker) ? Expressions.name(tiebreaker) : null;
// secondary criteriam
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(plans.size() - 1);

View File

@ -13,9 +13,10 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
/** Dedicated collection for mapping a key to a list of sequences */
/** The list represents the sequence for each stage (based on its index) and is fixed in size */
/**
* Dedicated collection for mapping a key to a list of sequences
* The list represents the sequence for each stage (based on its index) and is fixed in size
*/
class KeyToSequences {
private final int listSize;
@ -52,24 +53,6 @@ class KeyToSequences {
groups[stage].add(sequence);
}
void resetGroupInsertPosition() {
for (SequenceGroup[] groups : keyToSequences.values()) {
for (SequenceGroup group : groups) {
if (group != null) {
group.resetInsertPosition();
}
}
}
}
void resetUntilInsertPosition() {
for (UntilGroup until : keyToUntil.values()) {
if (until != null) {
until.resetInsertPosition();
}
}
}
void until(Iterable<KeyAndOrdinal> until) {
for (KeyAndOrdinal keyAndOrdinal : until) {
// ignore unknown keys
@ -116,17 +99,26 @@ class KeyToSequences {
keyToUntil.clear();
}
/**
* Remove all matches expect the latest.
*/
void trimToTail() {
for (SequenceGroup[] groups : keyToSequences.values()) {
for (SequenceGroup group : groups) {
if (group != null) {
group.trimToLast();
}
}
}
}
public void clear() {
keyToSequences.clear();
keyToUntil.clear();
}
int numberOfKeys() {
return keyToSequences.size();
}
@Override
public String toString() {
return LoggerMessageFormat.format(null, "Keys=[{}], Until=[{}]", keyToSequences.size(), keyToUntil.size());
}
}
}

View File

@ -12,7 +12,7 @@ import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import java.util.Objects;
/**
* A match within a sequence, holding the result and occurrance time.
* A match within a sequence, holding the result and occurrence time.
*/
class Match {

View File

@ -11,13 +11,15 @@ import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
/** List of in-flight ordinals for a given key. For fast lookup, typically associated with a stage. */
/**
* List of in-flight ordinals for a given key. For fast lookup, typically associated with a stage.
* this class expects the insertion to be ordered
*/
abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
private final SequenceKey key;
@ -26,23 +28,12 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
// NB: since the size varies significantly, use a LinkedList
// Considering the order it might make sense to use a B-Tree+ for faster lookups which should work well with
// timestamp compression (whose range is known for the current frame).
private final List<E> elements = new LinkedList<>();
/**
* index in the list used for resetting the insertion point
* it gets reset when dealing with descending queries since the data inserted is ascending in a page
* but descending compared to the previous stages.
*/
private int insertPosition = 0;
private int hashCode = 0;
private final LinkedList<E> elements = new LinkedList<>();
private Ordinal start, stop;
protected OrdinalGroup(SequenceKey key, Function<E, Ordinal> extractor) {
this.key = key;
hashCode = key.hashCode();
this.extractor = extractor;
}
@ -51,27 +42,14 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
}
void add(E element) {
hashCode = 31 * hashCode + Objects.hashCode(element);
Ordinal ordinal = extractor.apply(element);
if (start == null) {
if (start == null || start.compareTo(ordinal) > 0) {
start = ordinal;
} else if (stop == null) {
stop = ordinal;
} else {
if (start.compareTo(ordinal) > 0) {
start = ordinal;
}
if (stop.compareTo(ordinal) < 0) {
stop = ordinal;
}
}
// add element at the current position
elements.add(insertPosition++, element);
}
void resetInsertPosition() {
insertPosition = 0;
if (stop == null || stop.compareTo(ordinal) < 0) {
stop = ordinal;
}
elements.add(element);
}
/**
@ -87,15 +65,10 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
int pos = match.v2() + 1;
elements.subList(0, pos).clear();
// update insert position
insertPosition = insertPosition - pos;
if (insertPosition < 0) {
insertPosition = 0;
}
// update min time
if (elements.isEmpty() == false) {
start = extractor.apply(elements.get(0));
start = extractor.apply(elements.peekFirst());
stop = extractor.apply(elements.peekLast());
} else {
start = null;
stop = null;
@ -109,6 +82,16 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
return match != null ? match.v1() : null;
}
void trimToLast() {
E last = elements.peekLast();
if (last != null) {
elements.clear();
start = null;
stop = null;
add(last);
}
}
private Tuple<E, Integer> findBefore(Ordinal ordinal) {
E match = null;
int matchPos = -1;
@ -151,7 +134,7 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
public int hashCode() {
return key.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
@ -164,11 +147,11 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
OrdinalGroup<?> other = (OrdinalGroup<?>) obj;
return Objects.equals(key, other.key)
&& Objects.equals(hashCode, other.hashCode);
&& Objects.equals(elements, other.elements);
}
@Override
public String toString() {
return format(null, "[{}][{}-{}]({} seqs)", key, start, stop, elements.size());
}
}
}

View File

@ -24,7 +24,7 @@ import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
* Defined by its key and stage.
* This class is NOT immutable (to optimize memory) which means its associations need to be managed.
*/
public class Sequence {
public class Sequence implements Comparable<Sequence> {
private final SequenceKey key;
private final int stages;
@ -40,14 +40,13 @@ public class Sequence {
this.matches[0] = new Match(ordinal, firstHit);
}
public int putMatch(int stage, Ordinal ordinal, HitReference hit) {
public void putMatch(int stage, Ordinal ordinal, HitReference hit) {
if (stage == currentStage + 1) {
int previousStage = currentStage;
currentStage = stage;
matches[currentStage] = new Match(ordinal, hit);
return previousStage;
} else {
throw new EqlIllegalArgumentException("Invalid stage [{}] specified for sequence[key={}, stage={}]", stage, key, currentStage);
}
throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage={}]", stage, key, currentStage);
}
public SequenceKey key() {
@ -70,26 +69,31 @@ public class Sequence {
return hits;
}
@Override
public int compareTo(Sequence o) {
return ordinal().compareTo(o.ordinal());
}
@Override
public int hashCode() {
return Objects.hash(currentStage, key);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Sequence other = (Sequence) obj;
return Objects.equals(currentStage, other.currentStage)
&& Objects.equals(key, other.key);
}
@Override
public String toString() {
int numberOfDigits = stages > 100 ? 3 : stages > 10 ? 2 : 1;
@ -105,7 +109,7 @@ public class Sequence {
for (int i = 0; i < matches.length; i++) {
sb.append(format(null, "\n [{}]={{}}", nf.format(i), matches[i]));
}
return sb.toString();
}
}
}

View File

@ -12,6 +12,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static java.util.Collections.emptyList;
public class SequenceKey {
public static final SequenceKey NONE = new SequenceKey();
@ -25,7 +27,7 @@ public class SequenceKey {
}
public List<Object> asList() {
return Arrays.asList(keys);
return keys == null ? emptyList() : Arrays.asList(keys);
}
@Override
@ -44,7 +46,7 @@ public class SequenceKey {
}
SequenceKey other = (SequenceKey) obj;
return Arrays.deepEquals(keys, other.keys);
return Arrays.equals(keys, other.keys);
}
@Override

View File

@ -15,8 +15,10 @@ import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import java.util.LinkedList;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
/**
* Matcher of sequences. Keeps track of on-going sequences and advancing them through each stage.
@ -31,7 +33,7 @@ public class SequenceMatcher {
long until = 0;
long rejectionMaxspan = 0;
long rejectionUntil = 0;
@Override
public String toString() {
return LoggerMessageFormat.format(null, "Stats: Seen [{}]/Ignored [{}]/Until [{}]/Rejected {Maxspan [{}]/Until [{}]}",
@ -51,24 +53,23 @@ public class SequenceMatcher {
}
}
/** Current sequences for each key */
/** Note will be multiple sequences for the same key and the same stage with different timestamps */
// Current sequences for each key
// Note will be multiple sequences for the same key and the same stage with different timestamps
private final KeyToSequences keyToSequences;
/** Current keys on each stage */
// Current keys on each stage
private final StageToKeys stageToKeys;
private final int numberOfStages;
private final int completionStage;
/** list of completed sequences - separate to avoid polluting the other stages */
private final List<Sequence> completed;
private int completedInsertPosition = 0;
// Set of completed sequences - separate to avoid polluting the other stages
// It is a set since matches are ordered at insertion time based on the ordinal of the first entry
private final Set<Sequence> completed;
private final long maxSpanInMillis;
private final boolean descending;
private Limit limit;
private final Limit limit;
private boolean headLimit = false;
private final Stats stats = new Stats();
@ -81,7 +82,7 @@ public class SequenceMatcher {
this.descending = descending;
this.stageToKeys = new StageToKeys(completionStage);
this.keyToSequences = new KeyToSequences(completionStage);
this.completed = new LinkedList<>();
this.completed = new TreeSet<>();
this.maxSpanInMillis = maxSpan.millis();
@ -109,9 +110,6 @@ public class SequenceMatcher {
if (stage == 0) {
Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit);
// descending queries return descending blocks of ASC data
// to avoid sorting things during insertion,
trackSequence(seq);
} else {
match(stage, ko.key, ko.ordinal, hit);
@ -144,7 +142,7 @@ public class SequenceMatcher {
*/
private void match(int stage, SequenceKey key, Ordinal ordinal, HitReference hit) {
stats.seen++;
int previousStage = stage - 1;
// check key presence to avoid creating a collection
SequenceGroup group = keyToSequences.groupIfPresent(previousStage, key);
@ -165,7 +163,7 @@ public class SequenceMatcher {
keyToSequences.remove(previousStage, group);
stageToKeys.remove(previousStage, key);
}
//
// Conditional checks
//
@ -188,7 +186,7 @@ public class SequenceMatcher {
}
}
}
sequence.putMatch(stage, ordinal, hit);
// bump the stages
@ -203,26 +201,13 @@ public class SequenceMatcher {
}
}
completed.add(completedInsertPosition++, sequence);
completed.add(sequence);
// update the bool lazily
// only consider positive limits / negative ones imply tail which means having to go
// through the whole page of results before selecting the last ones
// doing a limit early returns the 'head' not 'tail'
headLimit = limit != null && limit.limit() > 0 && completed.size() == limit.totalLimit();
} else {
if (descending) {
// when dealing with descending queries
// avoid duplicate matching (since the ASC query can return previously seen results)
group = keyToSequences.groupIfPresent(stage, key);
if (group != null) {
for (Ordinal previous : group) {
if (previous.equals(ordinal)) {
return;
}
}
}
}
stageToKeys.add(stage, key);
keyToSequences.add(stage, sequence);
}
@ -235,8 +220,21 @@ public class SequenceMatcher {
* However sequences on higher stages can, hence this check to know whether
* it's possible to advance the window early.
*/
boolean hasCandidates(int stage) {
for (int i = stage; i < completionStage; i++) {
boolean hasFollowingCandidates(int stage) {
return hasCandidates(stage, completionStage);
}
/**
* Checks whether the previous stages still have in-flight data.
* Used to see whether, after rebasing a window it makes sense to continue finding matches.
* If there are no in-progress windows, any future results are unnecessary.
*/
boolean hasCandidates() {
return hasCandidates(0, completionStage);
}
private boolean hasCandidates(int start, int stop) {
for (int i = start; i < stop; i++) {
if (stageToKeys.isEmpty(i) == false) {
return true;
}
@ -244,9 +242,9 @@ public class SequenceMatcher {
return false;
}
List<Sequence> completed() {
return limit != null ? limit.view(completed) : completed;
List<Sequence> asList = new ArrayList<>(completed);
return limit != null ? limit.view(asList) : asList;
}
void dropUntil() {
@ -257,16 +255,24 @@ public class SequenceMatcher {
keyToSequences.until(markers);
}
void resetInsertPosition() {
// when dealing with descending calls
// update the insert point of all sequences
// for the next batch of hits which will be sorted ascending
// yet will occur _before_ the current batch
if (descending) {
keyToSequences.resetGroupInsertPosition();
keyToSequences.resetUntilInsertPosition();
/**
* Called when moving to a new page.
* This allows the matcher to keep only the last match per stage
* and adjust insertion positions.
*/
void trim(boolean everything) {
// for descending sequences, remove all in-flight sequences
// since the windows moves head and thus there is no chance
// of new results coming in
completedInsertPosition = 0;
// however this needs to be indicated from outside since
// the same window can be only ASC trimmed during a loop
// and fully once the DESC query moves
if (everything) {
keyToSequences.clear();
} else {
// keep only the tail
keyToSequences.trimToTail();
}
}
@ -283,9 +289,9 @@ public class SequenceMatcher {
@Override
public String toString() {
return LoggerMessageFormat.format(null, "Tracking [{}] keys with [{}] completed and in-flight {}",
return LoggerMessageFormat.format(null, "Tracking [{}] keys with [{}] completed and {} in-flight",
keyToSequences,
completed.size(),
stageToKeys);
}
}
}

View File

@ -35,10 +35,15 @@ import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.searchHi
* Time-based window encapsulating query creation and advancement.
* Since queries can return different number of results, to avoid creating incorrect sequences,
* all searches are 'boxed' to a base query.
* The base query is initially the first query - when no results are found, the next query gets promoted.
*
* This allows the window to find any follow-up results even if they are found outside the initial window
* of a base query.
* The window always moves ASC (sorted on timestamp/tiebreaker ordinal) since events in a sequence occur
* one after the other. The window starts at the base (the first query) - when no results are found,
* the next query gets promoted. This allows the window to find any follow-up results even if they are
* found outside the initial window of a base query.
*
* TAIL/DESC sequences are handled somewhat differently. The first/base query moves DESC and the tumbling
* window keeps moving ASC but using the second query as its base. When the tumbling window finishes instead
* of bailing out, the DESC query keeps advancing.
*/
public class TumblingWindow implements Executable {
@ -52,6 +57,13 @@ public class TumblingWindow implements Executable {
private final int maxStages;
private final int windowSize;
private final boolean hasKeys;
// flag used for DESC sequences to indicate whether
// the window needs to restart (since the DESC query still has results)
private boolean restartWindowFromTailQuery;
private final boolean earlyUntil;
private long startTime;
private static class WindowInfo {
@ -75,31 +87,91 @@ public class TumblingWindow implements Executable {
this.until = until;
this.criteria = criteria;
this.maxStages = criteria.size();
this.windowSize = criteria.get(0).queryRequest().searchSource().size();
this.matcher = matcher;
Criterion<BoxedQueryRequest> baseRequest = criteria.get(0);
this.windowSize = baseRequest.queryRequest().searchSource().size();
this.hasKeys = baseRequest.keySize() > 0;
this.restartWindowFromTailQuery = baseRequest.descending();
this.earlyUntil = baseRequest.descending();
}
@Override
public void execute(ActionListener<Payload> listener) {
log.trace("Starting sequence window w/ fetch size [{}]", windowSize);
startTime = System.currentTimeMillis();
advance(0, listener);
tumbleWindow(0, listener);
}
private void advance(int baseStage, ActionListener<Payload> listener) {
/**
* Move the window while preserving the same base.
*/
private void tumbleWindow(int currentStage, ActionListener<Payload> listener) {
if (currentStage > 0 && matcher.hasCandidates() == false) {
if (restartWindowFromTailQuery) {
currentStage = 0;
} else {
// if there are no in-flight sequences (from previous stages)
// no need to look for more results
payload(listener);
return;
}
}
log.trace("Tumbling window...");
// finished all queries in this window, run a trim
// for descending queries clean everything
if (restartWindowFromTailQuery) {
if (currentStage == 0) {
matcher.trim(true);
}
}
// trim to last
else {
// check case when a rebase occurred and the current query
// has a lot more results than the first once and hasn't
// covered the whole window. Running a trim early data before
// the whole window is matched
boolean trimToLast = false;
if (currentStage == 0) {
trimToLast = true;
}
else {
Ordinal current = criteria.get(currentStage).queryRequest().after();
Ordinal previous = criteria.get(currentStage - 1).queryRequest().after();
trimToLast = current.after(previous);
}
if (trimToLast) {
matcher.trim(false);
}
}
advance(currentStage, listener);
}
/**
* Move the window while advancing the query base.
*/
private void rebaseWindow(int nextStage, ActionListener<Payload> listener) {
log.trace("Rebasing window...");
advance(nextStage, listener);
}
private void advance(int stage, ActionListener<Payload> listener) {
// initialize
Criterion<BoxedQueryRequest> base = criteria.get(baseStage);
Criterion<BoxedQueryRequest> base = criteria.get(stage);
// remove any potential upper limit (if a criteria has been promoted)
base.queryRequest().to(null);
matcher.resetInsertPosition();
log.trace("{}", matcher);
log.trace("Querying base stage [{}] {}", base.stage(), base.queryRequest());
log.trace("Querying base stage [{}] {}", stage, base.queryRequest());
client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure));
client.query(base.queryRequest(), wrap(p -> baseCriterion(stage, p, listener), listener::onFailure));
}
/**
* Start the base query but, to account for until, do not match the results right away.
*/
private void baseCriterion(int baseStage, SearchResponse r, ActionListener<Payload> listener) {
Criterion<BoxedQueryRequest> base = criteria.get(baseStage);
List<SearchHit> hits = searchHits(r);
@ -107,59 +179,140 @@ public class TumblingWindow implements Executable {
log.trace("Found [{}] hits", hits.size());
Ordinal begin = null, end = null;
final WindowInfo info;
// if there is at least one result, process it
if (hits.isEmpty() == false) {
if (matcher.match(baseStage, wrapValues(base, hits)) == false) {
payload(listener);
// get borders for the rest of the queries - but only when at least one result is found
begin = headOrdinal(hits, base);
end = tailOrdinal(hits, base);
boolean desc = base.descending();
// always create an ASC window
info = new WindowInfo(baseStage, begin, end);
log.trace("Found {}base [{}] window {}->{}", base.descending() ? "tail ": "", base.stage(), begin, end);
// update current query for the next request
base.queryRequest().nextAfter(end);
// early until check if dealing with a TAIL sequence
// execute UNTIL *before* matching the results
// this is needed for TAIL sequences since the base of the window
// is called once with the DESC query, then with the ASC one
// thus UNTIL needs to be executed before matching the second query
// that is the ASC base of the window
if (earlyUntil && until != null && baseStage == 1) {
// find "until" ordinals - early on to discard data in-flight to avoid matching
// hits that can occur in other documents
untilCriterion(info, listener, () -> completeBaseCriterion(baseStage, hits, info, listener));
return;
}
// get borders for the rest of the queries - but only when at least one result is found
begin = base.ordinal(hits.get(0));
end = base.ordinal(hits.get(hits.size() - 1));
log.trace("Found base [{}] window {}->{}", base.stage(), begin, end);
} else {
info = null;
}
// match the results
completeBaseCriterion(baseStage, hits, info, listener);
}
// only one result means there aren't going to be any matches
// so move the window boxing to the next stage
if (hits.size() < 2) {
// if there are still candidates, advance the window base
if (matcher.hasCandidates(baseStage) && baseStage + 1 < maxStages) {
Runnable next = () -> advance(baseStage + 1, listener);
private void completeBaseCriterion(int baseStage, List<SearchHit> hits, WindowInfo info, ActionListener<Payload> listener) {
Criterion<BoxedQueryRequest> base = criteria.get(baseStage);
if (until != null && hits.size() == 1) {
// find "until" ordinals - early on to discard data in-flight to avoid matching
// hits that can occur in other documents
untilCriterion(new WindowInfo(baseStage, begin, end), listener, next);
} else {
next.run();
}
}
// there aren't going to be any matches so cancel search
else {
payload(listener);
}
// check for matches - if the limit has been reached, abort
if (matcher.match(baseStage, wrapValues(base, hits)) == false) {
payload(listener);
return;
}
// update current query for the next request
base.queryRequest().nextAfter(end);
int nextStage = baseStage + 1;
boolean windowCompleted = hits.size() < windowSize;
WindowInfo info = new WindowInfo(baseStage, begin, end);
// there are still queries
if (nextStage < maxStages) {
boolean descendingQuery = base.descending();
Runnable next = null;
// no more queries to run
if (baseStage + 1 < maxStages) {
Runnable next = () -> secondaryCriterion(info, baseStage + 1, listener);
if (until != null) {
// find "until" ordinals - early on to discard data in-flight to avoid matching
// hits that can occur in other documents
// if there are results, setup the next stage
if (info != null) {
if (descendingQuery) {
// TAIL query
setupWindowFromTail(info.end);
} else {
boxQuery(info, criteria.get(nextStage));
}
}
// this is the last round of matches
if (windowCompleted) {
boolean shouldTerminate = false;
// in case of DESC queries indicate there's no more window restarting
if (descendingQuery) {
if (info != null) {
// DESC means starting the window
restartWindowFromTailQuery = false;
next = () -> advance(1, listener);
}
// if there are no new results, no need to check the window
else {
shouldTerminate = true;
}
}
// for ASC queries continue if there are still matches available
else {
if (matcher.hasFollowingCandidates(baseStage)) {
next = () -> rebaseWindow(nextStage, listener);
}
// otherwise bail-out, unless it's a DESC sequence that hasn't completed yet
// in which case restart
else {
if (restartWindowFromTailQuery == false) {
shouldTerminate = true;
} else {
tumbleWindow(0, listener);
return;
}
}
}
// otherwise bail-out
if (shouldTerminate) {
payload(listener);
return;
}
}
// go to the next stage
else {
// DESC means starting the window
if (descendingQuery) {
next = () -> advance(1, listener);
}
// ASC to continue
else {
next = () -> secondaryCriterion(info, nextStage, listener);
}
}
// until check for HEAD queries
if (earlyUntil == false && until != null && info != null) {
untilCriterion(info, listener, next);
} else {
next.run();
}
} else {
advance(baseStage, listener);
}
// no more queries to run
else {
// no more results either
if (windowCompleted) {
if (restartWindowFromTailQuery) {
tumbleWindow(0, listener);
} else {
payload(listener);
}
}
// there are still results, keep going
else {
tumbleWindow(baseStage, listener);
}
}
}
@ -170,7 +323,7 @@ public class TumblingWindow implements Executable {
// including dropping any in-flight sequences that were not dropped (because they did not match)
matcher.dropUntil();
final boolean reversed = boxQuery(window, until);
boxQuery(window, until);
log.trace("Querying until stage {}", request);
@ -179,23 +332,14 @@ public class TumblingWindow implements Executable {
log.trace("Found [{}] hits", hits.size());
// no more results for until - let the other queries run
if (hits.isEmpty()) {
// put the markers in place before the next call
if (reversed) {
request.to(window.end);
} else {
request.from(window.end);
}
} else {
if (hits.isEmpty() == false) {
// prepare the query for the next search
request.nextAfter(until.ordinal(hits.get(hits.size() - 1)));
// if the limit has been reached, return what's available
request.nextAfter(tailOrdinal(hits, until));
matcher.until(wrapUntilValues(wrapValues(until, hits)));
}
// keep running the query runs out of the results (essentially returns less than what we want)
if (hits.size() == windowSize) {
if (hits.size() == windowSize && request.after().before(window.end)) {
untilCriterion(window, listener, next);
}
// looks like this stage is done, move on
@ -211,72 +355,72 @@ public class TumblingWindow implements Executable {
final Criterion<BoxedQueryRequest> criterion = criteria.get(currentStage);
final BoxedQueryRequest request = criterion.queryRequest();
final boolean reversed = boxQuery(window, criterion);
//boxQuery(window, criterion);
log.trace("Querying (secondary) stage [{}] {}", criterion.stage(), request);
client.query(request, wrap(r -> {
Ordinal boundary = reversed ? window.begin : window.end;
List<SearchHit> hits = searchHits(r);
// filter hits that are escaping the window (same timestamp but different tiebreaker)
hits = trim(hits, criterion, boundary, reversed);
// apply it only to ASC queries; DESC queries need it to find matches going the opposite direction
hits = trim(hits, criterion, window.end);
log.trace("Found [{}] hits", hits.size());
// no more results for this query
if (hits.isEmpty()) {
// put the markers in place before the next call
if (reversed) {
request.from(window.end);
} else {
request.to(window.end);
}
int nextStage = currentStage + 1;
// if there are no candidates, advance the window
if (matcher.hasCandidates(criterion.stage()) == false) {
log.trace("Advancing window...");
advance(window.baseStage, listener);
return;
}
// otherwise let the other queries run to allow potential matches with the existing candidates
}
else {
// if there is at least one result, process it
if (hits.isEmpty() == false) {
// prepare the query for the next search
// however when dealing with tiebreakers the same timestamp can contain different values that might
// be within or outside the window
// to make sure one is not lost, check the minimum ordinal between the one found (which might just outside
// the window - same timestamp but a higher tiebreaker) and the actual window end
Ordinal next = criterion.ordinal(hits.get(hits.size() - 1));
Ordinal tailOrdinal = tailOrdinal(hits, criterion);
Ordinal headOrdinal = headOrdinal(hits, criterion);
log.trace("Found range [{}] -> [{}]", criterion.ordinal(hits.get(0)), next);
log.trace("Found range [{}] -> [{}]", headOrdinal, tailOrdinal);
// if the searchAfter is outside the window, trim it down
if (next.after(boundary)) {
next = boundary;
// set search after
// for ASC queries limit results to the search window
// for DESC queries, do not otherwise the follow-up events won't match the headOrdinal result in DESC
if (tailOrdinal.after(window.end)) {
tailOrdinal = window.end;
}
request.nextAfter(next);
request.nextAfter(tailOrdinal);
// if the limit has been reached, return what's available
if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) {
payload(listener);
return;
}
// any subsequence query will be ASC - initialize its starting point if not set
// this is the case during the headOrdinal run for HEAD queries or for each window for TAIL ones
if (nextStage < maxStages) {
BoxedQueryRequest nextRequest = criteria.get(nextStage).queryRequest();
if (nextRequest.from() == null || nextRequest.after() == null) {
nextRequest.from(headOrdinal);
nextRequest.nextAfter(headOrdinal);
}
}
}
// keep running the query runs out of the results (essentially returns less than what we want)
// however check if the window has been fully consumed
if (hits.size() == windowSize && request.after().before(boundary)) {
if (hits.size() == windowSize && request.after().before(window.end)) {
secondaryCriterion(window, currentStage, listener);
}
// looks like this stage is done, move on
else {
// to the next query
if (currentStage + 1 < maxStages) {
// but first check is there are still candidates within the current window
if (currentStage + 1 < maxStages && matcher.hasFollowingCandidates(criterion.stage())) {
secondaryCriterion(window, currentStage + 1, listener);
}
// or to the next window
else {
advance(window.baseStage, listener);
} else {
// otherwise, advance it
tumbleWindow(window.baseStage, listener);
}
}
}, listener::onFailure));
@ -285,13 +429,12 @@ public class TumblingWindow implements Executable {
/**
* Trim hits outside the (upper) limit.
*/
private List<SearchHit> trim(List<SearchHit> searchHits, Criterion<BoxedQueryRequest> criterion, Ordinal boundary, boolean reversed) {
private List<SearchHit> trim(List<SearchHit> searchHits, Criterion<BoxedQueryRequest> criterion, Ordinal boundary) {
int offset = 0;
for (int i = searchHits.size() - 1; i >=0 ; i--) {
for (int i = searchHits.size() - 1; i >= 0 ; i--) {
Ordinal ordinal = criterion.ordinal(searchHits.get(i));
boolean withinBoundaries = reversed ? ordinal.afterOrAt(boundary) : ordinal.beforeOrAt(boundary);
if (withinBoundaries == false) {
if (ordinal.after(boundary)) {
offset++;
} else {
break;
@ -301,34 +444,54 @@ public class TumblingWindow implements Executable {
}
/**
* Box the query for the given criterion based on the window information.
* Returns a boolean indicating whether reversal has been applied or not.
* Box the query for the given (ASC) criterion based on the window information.
*/
private boolean boxQuery(WindowInfo window, Criterion<BoxedQueryRequest> criterion) {
private void boxQuery(WindowInfo window, Criterion<BoxedQueryRequest> criterion) {
final BoxedQueryRequest request = criterion.queryRequest();
Criterion<BoxedQueryRequest> base = criteria.get(window.baseStage);
boolean reverse = criterion.reverse() != base.reverse();
// first box the query
// only the first base can be descending
// all subsequence queries are ascending
if (reverse) {
if (window.end.equals(request.from()) == false) {
// if that's the case, set the starting point
request.from(window.end);
// reposition the pointer
request.nextAfter(window.end);
}
} else {
// otherwise just the upper limit
// for HEAD, it's the window upper limit that keeps changing
// so check TO.
if (window.end.equals(request.to()) == false) {
request.to(window.end);
// and the lower limit if it hasn't been set
if (request.after() == null) {
request.nextAfter(window.begin);
}
}
return reverse;
// initialize the start of the next query if needed (such as until)
// in DESC queries, this is set before the window starts
// in ASC queries, this is initialized based on the first result from the base query
if (request.from() == null) {
request.from(window.begin);
request.nextAfter(window.begin);
}
}
/**
* Used by TAIL sequences. Sets the starting point of the (ASC) window.
* It does that by initializing the from of the stage 1 (the window base)
* and resets "from" from the other sub-queries so they can initialized accordingly
* (based on the results of their predecessors).
*/
private void setupWindowFromTail(Ordinal from) {
// TAIL can only be at stage 0
// the ASC window starts at stage 1
BoxedQueryRequest request = criteria.get(1).queryRequest();
// check if it hasn't been set before
if (from.equals(request.from()) == false) {
// initialize the next request
request.from(from)
.nextAfter(from);
// initialize until (if available)
if (until != null) {
until.queryRequest()
.from(from)
.nextAfter(from);
}
// reset all sub queries
for (int i = 2; i < maxStages; i++) {
BoxedQueryRequest subRequest = criteria.get(i).queryRequest();
subRequest.from(null);
}
}
}
private void payload(ActionListener<Payload> listener) {
@ -359,9 +522,17 @@ public class TumblingWindow implements Executable {
return new TimeValue(System.currentTimeMillis() - startTime);
}
private static Ordinal headOrdinal(List<SearchHit> hits, Criterion<BoxedQueryRequest> criterion) {
return criterion.ordinal(hits.get(0));
}
private static Ordinal tailOrdinal(List<SearchHit> hits, Criterion<BoxedQueryRequest> criterion) {
return criterion.ordinal(hits.get(hits.size() - 1));
}
Iterable<List<HitReference>> hits(List<Sequence> sequences) {
return () -> {
final Iterator<Sequence> delegate = criteria.get(0).reverse() != criteria.get(1).reverse() ?
final Iterator<Sequence> delegate = criteria.get(0).descending() != criteria.get(1).descending() ?
new ReversedIterator<>(sequences) :
sequences.iterator();
@ -382,7 +553,7 @@ public class TumblingWindow implements Executable {
Iterable<Tuple<KeyAndOrdinal, HitReference>> wrapValues(Criterion<?> criterion, List<SearchHit> hits) {
return () -> {
final Iterator<SearchHit> delegate = criterion.reverse() ? new ReversedIterator<>(hits) : hits.iterator();
final Iterator<SearchHit> delegate = criterion.descending() ? new ReversedIterator<>(hits) : hits.iterator();
return new Iterator<Tuple<KeyAndOrdinal, HitReference>>() {

View File

@ -45,7 +45,6 @@ import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.ql.rule.RuleExecutor;
import org.elasticsearch.xpack.ql.type.DataTypes;
@ -402,7 +401,8 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
Join join = (Join) child;
List<KeyedFilter> queries = join.queries();
// the main reason ASC is used is the lack of search_before (which is emulated through search_after + ASC)
// the main reason DESC is used is the lack of search_before (which is emulated through search_after + ASC)
// see https://github.com/elastic/elasticsearch/issues/62118
List<Order> ascendingOrders = changeOrderDirection(orderBy.order(), OrderDirection.ASC);
// preserve the order direction as is (can be DESC) for the base query
List<KeyedFilter> orderedQueries = new ArrayList<>(queries.size());
@ -430,7 +430,7 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
LogicalPlan child = orderBy.child();
// the default order by is the first pipe
// so it has to be on top of a event query or join/sequence
return child instanceof Project || child instanceof Join;
return child instanceof Filter || child instanceof Join;
}
private static List<Order> changeOrderDirection(List<Order> orders, Order.OrderDirection direction) {

View File

@ -80,8 +80,8 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
new UnresolvedAttribute(synthetic("<tiebreaker>"), params.fieldTiebreaker()) : UNSPECIFIED_FIELD;
}
private OrderDirection defaultDirection() {
return OrderDirection.ASC;
private OrderDirection resultPosition() {
return params.resultPosition();
}
@Override
@ -94,16 +94,16 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
// the first pipe will be the implicit order
// declared here for resolving any possible tie-breakers
boolean asc = defaultDirection() == OrderDirection.ASC;
boolean asc = resultPosition() == OrderDirection.ASC;
NullsPosition position = asc ? NullsPosition.FIRST : NullsPosition.LAST;
List<Order> orders = new ArrayList<>(2);
Source defaultOrderSource = synthetic("<default-order>");
orders.add(new Order(defaultOrderSource, fieldTimestamp(), defaultDirection(), position));
orders.add(new Order(defaultOrderSource, fieldTimestamp(), resultPosition(), position));
// make sure to add the tiebreaker as well
Attribute tiebreaker = fieldTiebreaker();
if (Expressions.isPresent(tiebreaker)) {
orders.add(new Order(defaultOrderSource, tiebreaker, defaultDirection(), position));
orders.add(new Order(defaultOrderSource, tiebreaker, resultPosition(), position));
}
plan = new OrderBy(defaultOrderSource, plan, orders);
@ -202,7 +202,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
until = defaultUntil(source);
}
return new Join(source, queries, until, fieldTimestamp(), fieldTiebreaker(), defaultDirection());
return new Join(source, queries, until, fieldTimestamp(), fieldTiebreaker(), resultPosition());
}
private KeyedFilter defaultUntil(Source source) {
@ -260,7 +260,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
until = defaultUntil(source);
}
return new Sequence(source, queries, until, maxSpan, fieldTimestamp(), fieldTiebreaker(), defaultDirection());
return new Sequence(source, queries, until, maxSpan, fieldTimestamp(), fieldTiebreaker(), resultPosition());
}
public KeyedFilter visitSequenceTerm(SequenceTermContext ctx, List<Attribute> joinKeys) {
@ -277,18 +277,18 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
if (numberCtx instanceof IntegerLiteralContext) {
Number number = (Number) visitIntegerLiteral((IntegerLiteralContext) numberCtx).fold();
long value = number.longValue();
if (value <= 0) {
throw new ParsingException(source(numberCtx), "A positive maxspan value is required; found [{}]", value);
}
String timeString = text(ctx.timeUnit().IDENTIFIER());
if (timeString == null) {
throw new ParsingException(source(ctx.timeUnit()), "No time unit specified, did you mean [s] as in [{}s]?", text(ctx
.timeUnit()));
}
TimeUnit timeUnit = null;
switch (timeString) {
case "ms":
@ -325,7 +325,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
if (SUPPORTED_PIPES.contains(name) == false) {
List<String> potentialMatches = StringUtils.findSimilar(name, SUPPORTED_PIPES);
String msg = "Unrecognized pipe [{}]";
if (potentialMatches.isEmpty() == false) {
String matchString = potentialMatches.toString();

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.eql.parser;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import java.time.ZoneId;
import java.util.List;
@ -21,6 +23,7 @@ public class ParserParams {
private String fieldEventCategory = FIELD_EVENT_CATEGORY;
private String fieldTimestamp = FIELD_TIMESTAMP;
private String fieldTiebreaker = null;
private OrderDirection resultPosition = OrderDirection.ASC;
private int size = SIZE;
private int fetchSize = FETCH_SIZE;
private List<Object> queryParams = emptyList();
@ -28,7 +31,7 @@ public class ParserParams {
public ParserParams(ZoneId zoneId) {
this.zoneId = zoneId;
}
public String fieldEventCategory() {
return fieldEventCategory;
}
@ -56,6 +59,15 @@ public class ParserParams {
return this;
}
public OrderDirection resultPosition() {
return resultPosition;
}
public ParserParams resultPosition(OrderDirection resultPosition) {
this.resultPosition = resultPosition;
return this;
}
public int size() {
return size;
}
@ -86,4 +98,4 @@ public class ParserParams {
public ZoneId zoneId() {
return zoneId;
}
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.eql.execution.PlanExecutor;
import org.elasticsearch.xpack.eql.parser.ParserParams;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.expression.Order;
import java.io.IOException;
import java.time.ZoneId;
@ -114,6 +115,7 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRe
.fieldEventCategory(request.eventCategoryField())
.fieldTimestamp(request.timestampField())
.fieldTiebreaker(request.tiebreakerField())
.resultPosition("tail".equals(request.resultPosition()) ? Order.OrderDirection.DESC : Order.OrderDirection.ASC)
.size(request.size())
.fetchSize(request.fetchSize());

View File

@ -94,7 +94,12 @@ public class SequenceSpecTests extends ESTestCase {
TestCriterion(final int ordinal) {
super(ordinal,
new BoxedQueryRequest(() -> SearchSourceBuilder.searchSource().query(matchAllQuery()).size(ordinal), "timestamp"),
new BoxedQueryRequest(() -> SearchSourceBuilder.searchSource()
// set a non-negative size
.size(10)
.query(matchAllQuery())
// pass the ordinal through terminate after
.terminateAfter(ordinal), "timestamp"),
keyExtractors,
tsExtractor, tbExtractor, false);
this.ordinal = ordinal;
@ -165,9 +170,9 @@ public class SequenceSpecTests extends ESTestCase {
@Override
public void query(QueryRequest r, ActionListener<SearchResponse> l) {
int ordinal = r.searchSource().size();
int ordinal = r.searchSource().terminateAfter();
if (ordinal != Integer.MAX_VALUE) {
r.searchSource().size(Integer.MAX_VALUE);
r.searchSource().terminateAfter(Integer.MAX_VALUE);
}
Map<Integer, Tuple<String, String>> evs = ordinal != Integer.MAX_VALUE ? events.get(ordinal) : emptyMap();

View File

@ -190,8 +190,8 @@ public class OptimizerTests extends ESTestCase {
}
public void testSortByLimit() {
Project p = new Project(EMPTY, rel(), emptyList());
OrderBy o = new OrderBy(EMPTY, p, singletonList(new Order(EMPTY, tiebreaker(), OrderDirection.ASC, NullsPosition.FIRST)));
Filter f = new Filter(EMPTY, rel(), TRUE);
OrderBy o = new OrderBy(EMPTY, f, singletonList(new Order(EMPTY, tiebreaker(), OrderDirection.ASC, NullsPosition.FIRST)));
Tail t = new Tail(EMPTY, new Literal(EMPTY, 1, INTEGER), o);
LogicalPlan optimized = new Optimizer.SortByLimit().rule(t);