diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java index 2bdf7d3a78a..21d5a045f5a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java @@ -43,7 +43,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { private String implicitJoinKeyField = "agent.id"; private boolean isCaseSensitive = true; - private int fetchSize = 50; + private int size = 10; + private int fetchSize = 1000; private SearchAfterBuilder searchAfterBuilder; private String query; private String tiebreakerField; @@ -60,6 +61,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field"; static final String KEY_CASE_SENSITIVE = "case_sensitive"; static final String KEY_SIZE = "size"; + static final String KEY_FETCH_SIZE = "fetch_size"; static final String KEY_SEARCH_AFTER = "search_after"; static final String KEY_QUERY = "query"; static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout"; @@ -85,7 +87,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { if (implicitJoinKeyField != null) { builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField()); } - builder.field(KEY_SIZE, fetchSize()); + builder.field(KEY_SIZE, size()); + builder.field(KEY_FETCH_SIZE, fetchSize()); if (searchAfterBuilder != null) { builder.array(KEY_SEARCH_AFTER, searchAfterBuilder.getSortValues()); @@ -172,14 +175,26 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { return this; } + public int size() { + return this.size; + } + + public EqlSearchRequest size(int size) { + this.size = size; + if (fetchSize <= 0) { + throw new IllegalArgumentException("size must be greater than 0"); + } + return this; + } + public int fetchSize() { return this.fetchSize; } public EqlSearchRequest fetchSize(int size) { this.fetchSize = size; - if (fetchSize <= 0) { - throw new IllegalArgumentException("size must be greater than 0"); + if (fetchSize < 2) { + throw new IllegalArgumentException("fetch size must be greater than 1"); } return this; } @@ -246,7 +261,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { return false; } EqlSearchRequest that = (EqlSearchRequest) o; - return fetchSize == that.fetchSize && + return size == that.size && + fetchSize == that.fetchSize && Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions) && Objects.equals(filter, that.filter) && @@ -268,6 +284,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { Arrays.hashCode(indices), indicesOptions, filter, + size, fetchSize, timestampField, tiebreakerField, diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java index 92ed0ea8307..bacf3acfa90 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java @@ -103,7 +103,7 @@ public class EqlIT extends ESRestHighLevelClientTestCase { public void testBasicSearch() throws Exception { EqlClient eql = highLevelClient().eql(); - EqlSearchRequest request = new EqlSearchRequest("index", "process where true"); + EqlSearchRequest request = new EqlSearchRequest("index", "process where true").size(RECORD_COUNT); assertResponse(execute(request, eql::search, eql::searchAsync), RECORD_COUNT); } @@ -115,7 +115,7 @@ public class EqlIT extends ESRestHighLevelClientTestCase { EqlSearchRequest request = new EqlSearchRequest("index", "foo where pid > 0"); // test with non-default event.category mapping - request.eventCategoryField("event_type"); + request.eventCategoryField("event_type").size(RECORD_COUNT); EqlSearchResponse response = execute(request, eql::search, eql::searchAsync); assertResponse(response, RECORD_COUNT / DIVIDER); diff --git a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java index 60e72d1e2d3..087afe21ef3 100644 --- a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java +++ b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java @@ -144,6 +144,9 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { EqlSearchRequest request = new EqlSearchRequest(testIndexName, query); request.isCaseSensitive(isCaseSensitive); request.tiebreakerField("event.sequence"); + // some queries return more than 10 results + request.size(50); + request.fetchSize(2); return eqlClient().search(request, RequestOptions.DEFAULT); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java index a4f49ff24f9..0fc6ab43b67 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java @@ -32,7 +32,6 @@ import java.util.Objects; import java.util.function.Supplier; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP; @@ -51,7 +50,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re private String tiebreakerField = null; private String eventCategoryField = FIELD_EVENT_CATEGORY; private String implicitJoinKeyField = FIELD_IMPLICIT_JOIN_KEY; - private int fetchSize = FETCH_SIZE; + private int size = RequestDefaults.SIZE; + private int fetchSize = RequestDefaults.FETCH_SIZE; private SearchAfterBuilder searchAfterBuilder; private String query; private boolean isCaseSensitive = false; @@ -67,6 +67,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field"; static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field"; static final String KEY_SIZE = "size"; + static final String KEY_FETCH_SIZE = "fetch_size"; static final String KEY_SEARCH_AFTER = "search_after"; static final String KEY_QUERY = "query"; static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout"; @@ -80,6 +81,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re static final ParseField EVENT_CATEGORY_FIELD = new ParseField(KEY_EVENT_CATEGORY_FIELD); static final ParseField IMPLICIT_JOIN_KEY_FIELD = new ParseField(KEY_IMPLICIT_JOIN_KEY_FIELD); static final ParseField SIZE = new ParseField(KEY_SIZE); + static final ParseField FETCH_SIZE = new ParseField(KEY_FETCH_SIZE); static final ParseField SEARCH_AFTER = new ParseField(KEY_SEARCH_AFTER); static final ParseField QUERY = new ParseField(KEY_QUERY); static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField(KEY_WAIT_FOR_COMPLETION_TIMEOUT); @@ -102,6 +104,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re tiebreakerField = in.readOptionalString(); eventCategoryField = in.readString(); implicitJoinKeyField = in.readString(); + size = in.readVInt(); fetchSize = in.readVInt(); searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new); query = in.readString(); @@ -148,10 +151,14 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re validationException = addValidationError("implicit join key field is null or empty", validationException); } - if (fetchSize <= 0) { + if (size <= 0) { validationException = addValidationError("size must be greater than 0", validationException); } + if (fetchSize < 2) { + validationException = addValidationError("fetch size must be greater than 1", validationException); + } + if (keepAlive != null && keepAlive.getMillis() < MIN_KEEP_ALIVE) { validationException = addValidationError("[keep_alive] must be greater than 1 minute, got:" + keepAlive.toString(), validationException); @@ -173,7 +180,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re if (implicitJoinKeyField != null) { builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField()); } - builder.field(KEY_SIZE, fetchSize()); + builder.field(KEY_SIZE, size()); + builder.field(KEY_FETCH_SIZE, fetchSize()); if (searchAfterBuilder != null) { builder.array(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues()); @@ -204,7 +212,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re parser.declareString(EqlSearchRequest::tiebreakerField, TIEBREAKER_FIELD); parser.declareString(EqlSearchRequest::eventCategoryField, EVENT_CATEGORY_FIELD); parser.declareString(EqlSearchRequest::implicitJoinKeyField, IMPLICIT_JOIN_KEY_FIELD); - parser.declareInt(EqlSearchRequest::fetchSize, SIZE); + parser.declareInt(EqlSearchRequest::size, SIZE); + parser.declareInt(EqlSearchRequest::fetchSize, FETCH_SIZE); parser.declareField(EqlSearchRequest::setSearchAfter, SearchAfterBuilder::fromXContent, SEARCH_AFTER, ObjectParser.ValueType.OBJECT_ARRAY); parser.declareString(EqlSearchRequest::query, QUERY); @@ -259,10 +268,21 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re return this; } - public int fetchSize() { return this.fetchSize; } + public int size() { + return this.size; + } - public EqlSearchRequest fetchSize(int size) { - this.fetchSize = size; + public EqlSearchRequest size(int size) { + this.size = size; + return this; + } + + public int fetchSize() { + return this.fetchSize; + } + + public EqlSearchRequest fetchSize(int fetchSize) { + this.fetchSize = fetchSize; return this; } @@ -334,6 +354,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re out.writeOptionalString(tiebreakerField); out.writeString(eventCategoryField); out.writeString(implicitJoinKeyField); + out.writeVInt(size); out.writeVInt(fetchSize); out.writeOptionalWriteable(searchAfterBuilder); out.writeString(query); @@ -354,7 +375,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re return false; } EqlSearchRequest that = (EqlSearchRequest) o; - return fetchSize == that.fetchSize && + return size == that.size && + fetchSize == that.fetchSize && Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions) && Objects.equals(filter, that.filter) && @@ -375,6 +397,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re Arrays.hashCode(indices), indicesOptions, filter, + size, fetchSize, timestampField, tiebreakerField, diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java index 714f656691f..340271bdc51 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java @@ -45,8 +45,13 @@ public class EqlSearchRequestBuilder extends ActionRequestBuilder-" + string(after) + "-> " + string(to) + "]"; + } + + private static String string(Ordinal o) { + return o != null ? o.toString() : ""; } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java index e5b0ac5f66b..f3954332b82 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java @@ -86,4 +86,9 @@ public class Criterion { } return new Ordinal(timestamp, tbreaker); } + + @Override + public String toString() { + return "[" + stage + "][" + reverse + "]"; + } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java index 1ed781537fd..7f2a2a7d820 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java @@ -69,9 +69,11 @@ public class ExecutionManager { if (query instanceof EsQueryExec) { QueryRequest original = ((EsQueryExec) query).queryRequest(session); + // increase the request size based on the fetch size (since size is applied already through limit) + BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName); Criterion criterion = - new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i> 0 && descending); + new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i > 0 && descending); criteria.add(criterion); } else { // until diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java index 5d5ee40d027..5afaa345d55 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java @@ -41,11 +41,20 @@ public class TumblingWindow implements Executable { private final Matcher matcher; // shortcut private final int maxStages; + private final int windowSize; private long startTime; - private int baseStage = 0; - private Ordinal begin, end; + private static class WindowInfo { + private final int baseStage; + private final Ordinal begin, end; + + WindowInfo(int baseStage, Ordinal begin, Ordinal end) { + this.baseStage = baseStage; + this.begin = begin; + this.end = end; + } + } public TumblingWindow(QueryClient client, List> criteria, @@ -56,6 +65,7 @@ public class TumblingWindow implements Executable { this.until = until; this.criteria = criteria; this.maxStages = criteria.size(); + this.windowSize = criteria.get(0).queryRequest().searchSource().size(); this.matcher = matcher; } @@ -64,23 +74,21 @@ public class TumblingWindow implements Executable { public void execute(ActionListener listener) { log.info("Starting sequence window..."); startTime = System.currentTimeMillis(); - advance(listener); + advance(0, listener); } - - private void advance(ActionListener listener) { + private void advance(int baseStage, ActionListener listener) { // initialize - log.info("Querying base stage"); Criterion base = criteria.get(baseStage); + // remove any potential upper limit (if a criteria has been promoted) + base.queryRequest().to(null); - if (end != null) { - // pick up where we left of - base.queryRequest().next(end); - } - client.query(base.queryRequest(), wrap(p -> baseCriterion(p, listener), listener::onFailure)); + log.info("Querying base stage [{}] {}", base.stage(), base.queryRequest()); + + client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure)); } - private void baseCriterion(Payload p, ActionListener listener) { + private void baseCriterion(int baseStage, Payload p, ActionListener listener) { Criterion base = criteria.get(baseStage); List hits = p.values(); @@ -95,14 +103,7 @@ public class TumblingWindow implements Executable { if (hits.size() < 2) { // if there are still candidates, advance the window base if (matcher.hasCandidates(baseStage) && baseStage + 1 < maxStages) { - // swap window begin/end when changing directions - if (base.reverse() != criteria.get(baseStage + 1).reverse()) { - Ordinal temp = begin; - begin = end; - end = temp; - } - baseStage++; - advance(listener); + advance(baseStage + 1, listener); } // there aren't going to be any matches so cancel search else { @@ -112,53 +113,93 @@ public class TumblingWindow implements Executable { } // get borders for the rest of the queries - begin = base.ordinal(hits.get(0)); - end = base.ordinal(hits.get(hits.size() - 1)); + Ordinal begin = base.ordinal(hits.get(0)); + Ordinal end = base.ordinal(hits.get(hits.size() - 1)); + + // update current query for the next request + base.queryRequest().nextAfter(end); + + log.info("Found base [{}] window {} {}", base.stage(), begin, end); // find until ordinals //NB: not currently implemented // no more queries to run if (baseStage + 1 < maxStages) { - secondaryCriterion(baseStage + 1, listener); + secondaryCriterion(new WindowInfo(baseStage, begin, end), baseStage + 1, listener); } else { - advance(listener); + advance(baseStage, listener); } } - private void secondaryCriterion(int index, ActionListener listener) { - Criterion criterion = criteria.get(index); - log.info("Querying (secondary) stage {}", criterion.stage()); + private void secondaryCriterion(WindowInfo window, int currentStage, ActionListener listener) { + final Criterion criterion = criteria.get(currentStage); + + final BoxedQueryRequest request = criterion.queryRequest(); + Criterion base = criteria.get(window.baseStage); // first box the query - BoxedQueryRequest request = criterion.queryRequest(); - Criterion base = criteria.get(baseStage); - - // if the base has a different direction, swap begin/end + // only the first base can be descending + // all subsequence queries are ascending if (criterion.reverse() != base.reverse()) { - request.between(end, begin); + if (window.end.equals(request.from()) == false) { + // if that's the case, set the starting point + request.from(window.end); + // reposition the pointer + request.nextAfter(window.end); + } } else { - request.between(begin, end); + // otherwise just the upper limit + request.to(window.end); } + log.info("Querying (secondary) stage [{}] {}", criterion.stage(), request); + client.query(request, wrap(p -> { List hits = p.values(); - // no more results in this window so continue in another window + + // no more results for this query if (hits.isEmpty()) { - log.info("Advancing window..."); - advance(listener); - return; + // put the markers in place before the next call + if (criterion.reverse() != base.reverse()) { + request.to(window.end); + } else { + request.from(window.end); + } + + // if there are no candidates, advance the window + if (matcher.hasCandidates(criterion.stage()) == false) { + log.info("Advancing window..."); + advance(window.baseStage, listener); + return; + } + // otherwise let the other queries run to allow potential matches with the existing candidates } - // if the limit has been reached, return what's available - if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) { - listener.onResponse(payload()); - return; + else { + // prepare the query for the next search + request.nextAfter(criterion.ordinal(hits.get(hits.size() - 1))); + + // if the limit has been reached, return what's available + if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) { + listener.onResponse(payload()); + return; + } } - if (index + 1 < maxStages) { - secondaryCriterion(index + 1, listener); - } else { - advance(listener); + // keep running the query runs out of the results (essentially returns less than what we want) + if (hits.size() == windowSize) { + secondaryCriterion(window, currentStage, listener); + } + // looks like this stage is done, move on + else { + // to the next query + if (currentStage + 1 < maxStages) { + secondaryCriterion(window, currentStage + 1, listener); + } + // or to the next window + else { + advance(window.baseStage, listener); + } } }, listener::onFailure)); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java index 323328842dc..ab68cb56af0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java @@ -34,21 +34,16 @@ public class BasicListener implements ActionListener { if (CollectionUtils.isEmpty(failures) == false) { listener.onFailure(new EqlIllegalArgumentException(failures[0].reason(), failures[0].getCause())); } else { - handleResponse(response, listener); + if (log.isTraceEnabled()) { + logSearchResponse(response, log); + } + listener.onResponse(new SearchResponsePayload(response)); } } catch (Exception ex) { onFailure(ex); } } - private void handleResponse(SearchResponse response, ActionListener listener) { - if (log.isTraceEnabled()) { - logSearchResponse(response, log); - } - listener.onResponse(new SearchResponsePayload(response)); - } - - @Override public void onFailure(Exception ex) { listener.onFailure(ex); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java index d4c16c57ca9..518e50c212c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java @@ -12,7 +12,7 @@ public interface QueryRequest { SearchSourceBuilder searchSource(); - default void next(Ordinal ordinal) { + default void nextAfter(Ordinal ordinal) { searchSource().searchAfter(ordinal.toArray()); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java index 750d0243563..22d61f2e2e1 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java @@ -28,7 +28,7 @@ public abstract class SourceGenerator { private SourceGenerator() {} - public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryBuilder filter, Integer size) { + public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryBuilder filter) { QueryBuilder finalQuery = null; // add the source if (container.query() != null) { @@ -59,18 +59,12 @@ public abstract class SourceGenerator { sorting(container, source); source.fetchSource(FetchSourceContext.FETCH_SOURCE); - // set fetch size - if (size != null) { - int sz = size; - if (container.limit() != null) { - Limit limit = container.limit(); - // negative limit means DESC order but since the results are ordered ASC - // pagination becomes mute (since all the data needs to be returned) - sz = limit.limit > 0 ? Math.min(limit.total, size) : limit.total; - } - - if (source.size() == -1) { - source.size(sz); + if (container.limit() != null) { + // add size and from + source.size(container.limit().absLimit()); + // this should be added only for event queries + if (container.limit().offset > 0) { + source.from(container.limit().offset); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java index c43b36bee55..9c916dff017 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java @@ -102,7 +102,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder { plan = new OrderBy(defaultOrderSource, plan, orders); // add the default limit only if specified - Literal defaultSize = new Literal(synthetic(""), params.fetchSize(), DataTypes.INTEGER); + Literal defaultSize = new Literal(synthetic(""), params.size(), DataTypes.INTEGER); Source defaultLimitSource = synthetic(""); LogicalPlan previous = plan; @@ -209,7 +209,12 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder { List keys = CollectionUtils.combine(joinKeys, visitJoinKeys(joinCtx)); LogicalPlan eventQuery = visitEventFilter(subqueryCtx.eventFilter()); - LogicalPlan child = new Project(source(ctx), eventQuery, CollectionUtils.combine(keys, defaultProjection())); + // add fetch size as a limit so it gets propagated into the resulting query + LogicalPlan fetchSize = new LimitWithOffset(synthetic(""), + new Literal(synthetic(""), params.fetchSize(), DataTypes.INTEGER), + eventQuery); + // filter fields + LogicalPlan child = new Project(source(ctx), fetchSize, CollectionUtils.combine(keys, defaultProjection())); return new KeyedFilter(source(ctx), child, keys, fieldTimestamp(), fieldTiebreaker()); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java index 56a9aa367bc..674aac50649 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java @@ -14,6 +14,7 @@ import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP; +import static org.elasticsearch.xpack.eql.action.RequestDefaults.SIZE; public class ParserParams { @@ -22,6 +23,7 @@ public class ParserParams { private String fieldTimestamp = FIELD_TIMESTAMP; private String fieldTiebreaker = null; private String implicitJoinKey = FIELD_IMPLICIT_JOIN_KEY; + private int size = SIZE; private int fetchSize = FETCH_SIZE; private List queryParams = emptyList(); @@ -65,6 +67,15 @@ public class ParserParams { return this; } + public int size() { + return size; + } + + public ParserParams size(int size) { + this.size = size; + return this; + } + public int fetchSize() { return fetchSize; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java index 72b6fc2c68d..9686a1e5e1a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java @@ -51,7 +51,9 @@ public class EsQueryExec extends LeafExec { public QueryRequest queryRequest(EqlSession session) { EqlConfiguration cfg = session.configuration(); - SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter(), cfg.size()); + // by default use the configuration size + // join/sequence queries will want to override this + SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter()); return () -> sourceBuilder; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index e9544c76abc..d428cbec5bd 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -24,12 +24,12 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.async.AsyncExecutionId; -import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.eql.action.EqlSearchAction; import org.elasticsearch.xpack.eql.action.EqlSearchRequest; import org.elasticsearch.xpack.eql.action.EqlSearchResponse; import org.elasticsearch.xpack.eql.action.EqlSearchTask; +import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService; import org.elasticsearch.xpack.eql.execution.PlanExecutor; import org.elasticsearch.xpack.eql.parser.ParserParams; import org.elasticsearch.xpack.eql.session.EqlConfiguration; @@ -116,10 +116,11 @@ public class TransportEqlSearchAction extends HandledTransportAction listener.onResponse(createResponse(r, task.getExecutionId())), listener::onFailure)); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java index 3228967def4..303fcf63206 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java @@ -168,7 +168,7 @@ public class QueryContainer { public String toString() { try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.humanReadable(true).prettyPrint(); - SourceGenerator.sourceBuilder(this, null, null).toXContent(builder, ToXContent.EMPTY_PARAMS); + SourceGenerator.sourceBuilder(this, null).toXContent(builder, ToXContent.EMPTY_PARAMS); return Strings.toString(builder); } catch (IOException e) { throw new EqlIllegalArgumentException("error rendering", e); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java index 53fde26794b..792a7b3d415 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java @@ -19,7 +19,6 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu private final String[] indices; private final TimeValue requestTimeout; - private final int size; private final String clientId; private final boolean includeFrozenIndices; private final TaskId taskId; @@ -30,13 +29,12 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu private final QueryBuilder filter; public EqlConfiguration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter, TimeValue requestTimeout, - int size, boolean includeFrozen, boolean isCaseSensitive, String clientId, TaskId taskId, EqlSearchTask task) { + boolean includeFrozen, boolean isCaseSensitive, String clientId, TaskId taskId, EqlSearchTask task) { super(zi, username, clusterName); this.indices = indices; this.filter = filter; this.requestTimeout = requestTimeout; - this.size = size; this.clientId = clientId; this.includeFrozenIndices = includeFrozen; this.isCaseSensitive = isCaseSensitive; @@ -60,10 +58,6 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu return filter; } - public int size() { - return size; - } - public String clientId() { return clientId; } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java index cbefc678781..0a472aeeba8 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java @@ -17,7 +17,6 @@ import java.util.Collections; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; import static org.elasticsearch.test.ESTestCase.randomBoolean; -import static org.elasticsearch.test.ESTestCase.randomIntBetween; import static org.elasticsearch.test.ESTestCase.randomLong; import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong; import static org.elasticsearch.test.ESTestCase.randomZone; @@ -28,12 +27,12 @@ public final class EqlTestUtils { } public static final EqlConfiguration TEST_CFG_CASE_INSENSITIVE = new EqlConfiguration(new String[] {"none"}, - org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, false, "", - new TaskId("test", 123), null); + org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), false, false, + "", new TaskId("test", 123), null); public static final EqlConfiguration TEST_CFG_CASE_SENSITIVE = new EqlConfiguration(new String[] {"none"}, - org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, true, "", - new TaskId("test", 123), null); + org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), false, true, + "", new TaskId("test", 123), null); public static EqlConfiguration randomConfiguration() { return internalRandomConfiguration(randomBoolean()); @@ -50,7 +49,6 @@ public final class EqlTestUtils { randomAlphaOfLength(16), null, new TimeValue(randomNonNegativeLong()), - randomIntBetween(5, 100), randomBoolean(), isCaseSensitive, randomAlphaOfLength(16), diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlRequestParserTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlRequestParserTests.java index b9aca1e171f..fcfe32f0d4f 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlRequestParserTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlRequestParserTests.java @@ -71,7 +71,8 @@ public class EqlRequestParserTests extends ESTestCase { assertEquals("etf", request.eventCategoryField()); assertEquals("imjf", request.implicitJoinKeyField()); assertArrayEquals(new Object[]{12345678, "device-20184", "/user/local/foo.exe", "2019-11-26T00:45:43.542"}, request.searchAfter()); - assertEquals(101, request.fetchSize()); + assertEquals(101, request.size()); + assertEquals(1000, request.fetchSize()); assertEquals("file where user != 'SYSTEM' by file_path", request.query()); assertEquals(setIsCaseSensitive && isCaseSensitive, request.isCaseSensitive()); } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java index 7621ab62e90..ad75df2e967 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java @@ -81,7 +81,7 @@ public class LogicalPlanTests extends ESTestCase { Order order = new Order(Source.EMPTY, timestamp(), OrderDirection.ASC, NullsPosition.FIRST); LogicalPlan project = new Project(Source.EMPTY, filter, singletonList(timestamp())); LogicalPlan sorted = new OrderBy(Source.EMPTY, project, singletonList(order)); - LogicalPlan head = new Head(Source.EMPTY, new Literal(Source.EMPTY, RequestDefaults.FETCH_SIZE, DataTypes.INTEGER), sorted); + LogicalPlan head = new Head(Source.EMPTY, new Literal(Source.EMPTY, RequestDefaults.SIZE, DataTypes.INTEGER), sorted); return head; } diff --git a/x-pack/plugin/eql/src/test/resources/queryfolder_tests.txt b/x-pack/plugin/eql/src/test/resources/queryfolder_tests.txt index aa5f5a6ef69..0202769caba 100644 --- a/x-pack/plugin/eql/src/test/resources/queryfolder_tests.txt +++ b/x-pack/plugin/eql/src/test/resources/queryfolder_tests.txt @@ -534,3 +534,27 @@ process where subtract(43, serial_event_id) == 41 InternalQlScriptUtils.sub(params.v0,InternalQlScriptUtils.docValue(doc,params.v1)),params.v2))", "params":{"v0":43,"v1":"serial_event_id","v2":41} ; + +eventQueryDefaultLimit +process where true +; +"size":10, +; + +eventQueryWithHead +process where true | head 5 +; +"size":5, +; + +eventQueryWithTail +process where true | tail 5 +; +"size":5, +; + +eventQueryWithHeadAndTail +process where true | tail 10 | head 7 +; +"size":7, +;