From f9c15d0fec3423abea608ccb67a6228226ac1f51 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Mon, 6 Jul 2020 18:50:40 +0300 Subject: [PATCH] EQL: Introduce sequencing fetch size (#59063) The current internal sequence algorithm relies on fetching multiple results and then paginating through the dataset. Depending on the dataset and memory, setting a larger page size can yield better performance at the expense of memory. This PR makes this behavior explicit by decoupling the fetch size from size, the maximum number of results desired. As such, use in testing a minimum fetch size which exposed a number of bugs: Jumping across data across queries causing valid data to be seen as a gap. Incorrectly resuming searching across pages (again causing data to be discarded). which have been addressed. (cherry picked from commit 2f389a7724790d7b0bda67264d6eafcfa8b2116e) --- .../client/eql/EqlSearchRequest.java | 27 +++- .../java/org/elasticsearch/client/EqlIT.java | 4 +- .../test/eql/CommonEqlActionTestCase.java | 3 + .../xpack/eql/action/EqlSearchRequest.java | 41 ++++-- .../eql/action/EqlSearchRequestBuilder.java | 9 +- .../xpack/eql/action/RequestDefaults.java | 3 +- .../assembler/BoxedQueryRequest.java | 56 ++++++-- .../eql/execution/assembler/Criterion.java | 5 + .../execution/assembler/ExecutionManager.java | 4 +- .../execution/assembler/TumblingWindow.java | 131 ++++++++++++------ .../eql/execution/search/BasicListener.java | 13 +- .../eql/execution/search/QueryRequest.java | 2 +- .../eql/execution/search/SourceGenerator.java | 20 +-- .../xpack/eql/parser/LogicalPlanBuilder.java | 9 +- .../xpack/eql/parser/ParserParams.java | 11 ++ .../xpack/eql/plan/physical/EsQueryExec.java | 4 +- .../eql/plugin/TransportEqlSearchAction.java | 7 +- .../querydsl/container/QueryContainer.java | 2 +- .../xpack/eql/session/EqlConfiguration.java | 8 +- .../elasticsearch/xpack/eql/EqlTestUtils.java | 10 +- .../eql/action/EqlRequestParserTests.java | 3 +- .../xpack/eql/parser/LogicalPlanTests.java | 2 +- .../src/test/resources/queryfolder_tests.txt | 24 ++++ 23 files changed, 277 insertions(+), 121 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java index 2bdf7d3a78a..21d5a045f5a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java @@ -43,7 +43,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { private String implicitJoinKeyField = "agent.id"; private boolean isCaseSensitive = true; - private int fetchSize = 50; + private int size = 10; + private int fetchSize = 1000; private SearchAfterBuilder searchAfterBuilder; private String query; private String tiebreakerField; @@ -60,6 +61,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field"; static final String KEY_CASE_SENSITIVE = "case_sensitive"; static final String KEY_SIZE = "size"; + static final String KEY_FETCH_SIZE = "fetch_size"; static final String KEY_SEARCH_AFTER = "search_after"; static final String KEY_QUERY = "query"; static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout"; @@ -85,7 +87,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { if (implicitJoinKeyField != null) { builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField()); } - builder.field(KEY_SIZE, fetchSize()); + builder.field(KEY_SIZE, size()); + builder.field(KEY_FETCH_SIZE, fetchSize()); if (searchAfterBuilder != null) { builder.array(KEY_SEARCH_AFTER, searchAfterBuilder.getSortValues()); @@ -172,14 +175,26 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { return this; } + public int size() { + return this.size; + } + + public EqlSearchRequest size(int size) { + this.size = size; + if (fetchSize <= 0) { + throw new IllegalArgumentException("size must be greater than 0"); + } + return this; + } + public int fetchSize() { return this.fetchSize; } public EqlSearchRequest fetchSize(int size) { this.fetchSize = size; - if (fetchSize <= 0) { - throw new IllegalArgumentException("size must be greater than 0"); + if (fetchSize < 2) { + throw new IllegalArgumentException("fetch size must be greater than 1"); } return this; } @@ -246,7 +261,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { return false; } EqlSearchRequest that = (EqlSearchRequest) o; - return fetchSize == that.fetchSize && + return size == that.size && + fetchSize == that.fetchSize && Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions) && Objects.equals(filter, that.filter) && @@ -268,6 +284,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { Arrays.hashCode(indices), indicesOptions, filter, + size, fetchSize, timestampField, tiebreakerField, diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java index 92ed0ea8307..bacf3acfa90 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java @@ -103,7 +103,7 @@ public class EqlIT extends ESRestHighLevelClientTestCase { public void testBasicSearch() throws Exception { EqlClient eql = highLevelClient().eql(); - EqlSearchRequest request = new EqlSearchRequest("index", "process where true"); + EqlSearchRequest request = new EqlSearchRequest("index", "process where true").size(RECORD_COUNT); assertResponse(execute(request, eql::search, eql::searchAsync), RECORD_COUNT); } @@ -115,7 +115,7 @@ public class EqlIT extends ESRestHighLevelClientTestCase { EqlSearchRequest request = new EqlSearchRequest("index", "foo where pid > 0"); // test with non-default event.category mapping - request.eventCategoryField("event_type"); + request.eventCategoryField("event_type").size(RECORD_COUNT); EqlSearchResponse response = execute(request, eql::search, eql::searchAsync); assertResponse(response, RECORD_COUNT / DIVIDER); diff --git a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java index 60e72d1e2d3..087afe21ef3 100644 --- a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java +++ b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java @@ -144,6 +144,9 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase { EqlSearchRequest request = new EqlSearchRequest(testIndexName, query); request.isCaseSensitive(isCaseSensitive); request.tiebreakerField("event.sequence"); + // some queries return more than 10 results + request.size(50); + request.fetchSize(2); return eqlClient().search(request, RequestOptions.DEFAULT); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java index a4f49ff24f9..0fc6ab43b67 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java @@ -32,7 +32,6 @@ import java.util.Objects; import java.util.function.Supplier; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP; @@ -51,7 +50,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re private String tiebreakerField = null; private String eventCategoryField = FIELD_EVENT_CATEGORY; private String implicitJoinKeyField = FIELD_IMPLICIT_JOIN_KEY; - private int fetchSize = FETCH_SIZE; + private int size = RequestDefaults.SIZE; + private int fetchSize = RequestDefaults.FETCH_SIZE; private SearchAfterBuilder searchAfterBuilder; private String query; private boolean isCaseSensitive = false; @@ -67,6 +67,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field"; static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field"; static final String KEY_SIZE = "size"; + static final String KEY_FETCH_SIZE = "fetch_size"; static final String KEY_SEARCH_AFTER = "search_after"; static final String KEY_QUERY = "query"; static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout"; @@ -80,6 +81,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re static final ParseField EVENT_CATEGORY_FIELD = new ParseField(KEY_EVENT_CATEGORY_FIELD); static final ParseField IMPLICIT_JOIN_KEY_FIELD = new ParseField(KEY_IMPLICIT_JOIN_KEY_FIELD); static final ParseField SIZE = new ParseField(KEY_SIZE); + static final ParseField FETCH_SIZE = new ParseField(KEY_FETCH_SIZE); static final ParseField SEARCH_AFTER = new ParseField(KEY_SEARCH_AFTER); static final ParseField QUERY = new ParseField(KEY_QUERY); static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField(KEY_WAIT_FOR_COMPLETION_TIMEOUT); @@ -102,6 +104,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re tiebreakerField = in.readOptionalString(); eventCategoryField = in.readString(); implicitJoinKeyField = in.readString(); + size = in.readVInt(); fetchSize = in.readVInt(); searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new); query = in.readString(); @@ -148,10 +151,14 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re validationException = addValidationError("implicit join key field is null or empty", validationException); } - if (fetchSize <= 0) { + if (size <= 0) { validationException = addValidationError("size must be greater than 0", validationException); } + if (fetchSize < 2) { + validationException = addValidationError("fetch size must be greater than 1", validationException); + } + if (keepAlive != null && keepAlive.getMillis() < MIN_KEEP_ALIVE) { validationException = addValidationError("[keep_alive] must be greater than 1 minute, got:" + keepAlive.toString(), validationException); @@ -173,7 +180,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re if (implicitJoinKeyField != null) { builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField()); } - builder.field(KEY_SIZE, fetchSize()); + builder.field(KEY_SIZE, size()); + builder.field(KEY_FETCH_SIZE, fetchSize()); if (searchAfterBuilder != null) { builder.array(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues()); @@ -204,7 +212,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re parser.declareString(EqlSearchRequest::tiebreakerField, TIEBREAKER_FIELD); parser.declareString(EqlSearchRequest::eventCategoryField, EVENT_CATEGORY_FIELD); parser.declareString(EqlSearchRequest::implicitJoinKeyField, IMPLICIT_JOIN_KEY_FIELD); - parser.declareInt(EqlSearchRequest::fetchSize, SIZE); + parser.declareInt(EqlSearchRequest::size, SIZE); + parser.declareInt(EqlSearchRequest::fetchSize, FETCH_SIZE); parser.declareField(EqlSearchRequest::setSearchAfter, SearchAfterBuilder::fromXContent, SEARCH_AFTER, ObjectParser.ValueType.OBJECT_ARRAY); parser.declareString(EqlSearchRequest::query, QUERY); @@ -259,10 +268,21 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re return this; } - public int fetchSize() { return this.fetchSize; } + public int size() { + return this.size; + } - public EqlSearchRequest fetchSize(int size) { - this.fetchSize = size; + public EqlSearchRequest size(int size) { + this.size = size; + return this; + } + + public int fetchSize() { + return this.fetchSize; + } + + public EqlSearchRequest fetchSize(int fetchSize) { + this.fetchSize = fetchSize; return this; } @@ -334,6 +354,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re out.writeOptionalString(tiebreakerField); out.writeString(eventCategoryField); out.writeString(implicitJoinKeyField); + out.writeVInt(size); out.writeVInt(fetchSize); out.writeOptionalWriteable(searchAfterBuilder); out.writeString(query); @@ -354,7 +375,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re return false; } EqlSearchRequest that = (EqlSearchRequest) o; - return fetchSize == that.fetchSize && + return size == that.size && + fetchSize == that.fetchSize && Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions) && Objects.equals(filter, that.filter) && @@ -375,6 +397,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re Arrays.hashCode(indices), indicesOptions, filter, + size, fetchSize, timestampField, tiebreakerField, diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java index 714f656691f..340271bdc51 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java @@ -45,8 +45,13 @@ public class EqlSearchRequestBuilder extends ActionRequestBuilder-" + string(after) + "-> " + string(to) + "]"; + } + + private static String string(Ordinal o) { + return o != null ? o.toString() : ""; } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java index e5b0ac5f66b..f3954332b82 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java @@ -86,4 +86,9 @@ public class Criterion { } return new Ordinal(timestamp, tbreaker); } + + @Override + public String toString() { + return "[" + stage + "][" + reverse + "]"; + } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java index 1ed781537fd..7f2a2a7d820 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java @@ -69,9 +69,11 @@ public class ExecutionManager { if (query instanceof EsQueryExec) { QueryRequest original = ((EsQueryExec) query).queryRequest(session); + // increase the request size based on the fetch size (since size is applied already through limit) + BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName); Criterion criterion = - new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i> 0 && descending); + new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i > 0 && descending); criteria.add(criterion); } else { // until diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java index 5d5ee40d027..5afaa345d55 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/TumblingWindow.java @@ -41,11 +41,20 @@ public class TumblingWindow implements Executable { private final Matcher matcher; // shortcut private final int maxStages; + private final int windowSize; private long startTime; - private int baseStage = 0; - private Ordinal begin, end; + private static class WindowInfo { + private final int baseStage; + private final Ordinal begin, end; + + WindowInfo(int baseStage, Ordinal begin, Ordinal end) { + this.baseStage = baseStage; + this.begin = begin; + this.end = end; + } + } public TumblingWindow(QueryClient client, List> criteria, @@ -56,6 +65,7 @@ public class TumblingWindow implements Executable { this.until = until; this.criteria = criteria; this.maxStages = criteria.size(); + this.windowSize = criteria.get(0).queryRequest().searchSource().size(); this.matcher = matcher; } @@ -64,23 +74,21 @@ public class TumblingWindow implements Executable { public void execute(ActionListener listener) { log.info("Starting sequence window..."); startTime = System.currentTimeMillis(); - advance(listener); + advance(0, listener); } - - private void advance(ActionListener listener) { + private void advance(int baseStage, ActionListener listener) { // initialize - log.info("Querying base stage"); Criterion base = criteria.get(baseStage); + // remove any potential upper limit (if a criteria has been promoted) + base.queryRequest().to(null); - if (end != null) { - // pick up where we left of - base.queryRequest().next(end); - } - client.query(base.queryRequest(), wrap(p -> baseCriterion(p, listener), listener::onFailure)); + log.info("Querying base stage [{}] {}", base.stage(), base.queryRequest()); + + client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure)); } - private void baseCriterion(Payload p, ActionListener listener) { + private void baseCriterion(int baseStage, Payload p, ActionListener listener) { Criterion base = criteria.get(baseStage); List hits = p.values(); @@ -95,14 +103,7 @@ public class TumblingWindow implements Executable { if (hits.size() < 2) { // if there are still candidates, advance the window base if (matcher.hasCandidates(baseStage) && baseStage + 1 < maxStages) { - // swap window begin/end when changing directions - if (base.reverse() != criteria.get(baseStage + 1).reverse()) { - Ordinal temp = begin; - begin = end; - end = temp; - } - baseStage++; - advance(listener); + advance(baseStage + 1, listener); } // there aren't going to be any matches so cancel search else { @@ -112,53 +113,93 @@ public class TumblingWindow implements Executable { } // get borders for the rest of the queries - begin = base.ordinal(hits.get(0)); - end = base.ordinal(hits.get(hits.size() - 1)); + Ordinal begin = base.ordinal(hits.get(0)); + Ordinal end = base.ordinal(hits.get(hits.size() - 1)); + + // update current query for the next request + base.queryRequest().nextAfter(end); + + log.info("Found base [{}] window {} {}", base.stage(), begin, end); // find until ordinals //NB: not currently implemented // no more queries to run if (baseStage + 1 < maxStages) { - secondaryCriterion(baseStage + 1, listener); + secondaryCriterion(new WindowInfo(baseStage, begin, end), baseStage + 1, listener); } else { - advance(listener); + advance(baseStage, listener); } } - private void secondaryCriterion(int index, ActionListener listener) { - Criterion criterion = criteria.get(index); - log.info("Querying (secondary) stage {}", criterion.stage()); + private void secondaryCriterion(WindowInfo window, int currentStage, ActionListener listener) { + final Criterion criterion = criteria.get(currentStage); + + final BoxedQueryRequest request = criterion.queryRequest(); + Criterion base = criteria.get(window.baseStage); // first box the query - BoxedQueryRequest request = criterion.queryRequest(); - Criterion base = criteria.get(baseStage); - - // if the base has a different direction, swap begin/end + // only the first base can be descending + // all subsequence queries are ascending if (criterion.reverse() != base.reverse()) { - request.between(end, begin); + if (window.end.equals(request.from()) == false) { + // if that's the case, set the starting point + request.from(window.end); + // reposition the pointer + request.nextAfter(window.end); + } } else { - request.between(begin, end); + // otherwise just the upper limit + request.to(window.end); } + log.info("Querying (secondary) stage [{}] {}", criterion.stage(), request); + client.query(request, wrap(p -> { List hits = p.values(); - // no more results in this window so continue in another window + + // no more results for this query if (hits.isEmpty()) { - log.info("Advancing window..."); - advance(listener); - return; + // put the markers in place before the next call + if (criterion.reverse() != base.reverse()) { + request.to(window.end); + } else { + request.from(window.end); + } + + // if there are no candidates, advance the window + if (matcher.hasCandidates(criterion.stage()) == false) { + log.info("Advancing window..."); + advance(window.baseStage, listener); + return; + } + // otherwise let the other queries run to allow potential matches with the existing candidates } - // if the limit has been reached, return what's available - if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) { - listener.onResponse(payload()); - return; + else { + // prepare the query for the next search + request.nextAfter(criterion.ordinal(hits.get(hits.size() - 1))); + + // if the limit has been reached, return what's available + if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) { + listener.onResponse(payload()); + return; + } } - if (index + 1 < maxStages) { - secondaryCriterion(index + 1, listener); - } else { - advance(listener); + // keep running the query runs out of the results (essentially returns less than what we want) + if (hits.size() == windowSize) { + secondaryCriterion(window, currentStage, listener); + } + // looks like this stage is done, move on + else { + // to the next query + if (currentStage + 1 < maxStages) { + secondaryCriterion(window, currentStage + 1, listener); + } + // or to the next window + else { + advance(window.baseStage, listener); + } } }, listener::onFailure)); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java index 323328842dc..ab68cb56af0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/BasicListener.java @@ -34,21 +34,16 @@ public class BasicListener implements ActionListener { if (CollectionUtils.isEmpty(failures) == false) { listener.onFailure(new EqlIllegalArgumentException(failures[0].reason(), failures[0].getCause())); } else { - handleResponse(response, listener); + if (log.isTraceEnabled()) { + logSearchResponse(response, log); + } + listener.onResponse(new SearchResponsePayload(response)); } } catch (Exception ex) { onFailure(ex); } } - private void handleResponse(SearchResponse response, ActionListener listener) { - if (log.isTraceEnabled()) { - logSearchResponse(response, log); - } - listener.onResponse(new SearchResponsePayload(response)); - } - - @Override public void onFailure(Exception ex) { listener.onFailure(ex); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java index d4c16c57ca9..518e50c212c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/QueryRequest.java @@ -12,7 +12,7 @@ public interface QueryRequest { SearchSourceBuilder searchSource(); - default void next(Ordinal ordinal) { + default void nextAfter(Ordinal ordinal) { searchSource().searchAfter(ordinal.toArray()); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java index 750d0243563..22d61f2e2e1 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/SourceGenerator.java @@ -28,7 +28,7 @@ public abstract class SourceGenerator { private SourceGenerator() {} - public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryBuilder filter, Integer size) { + public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryBuilder filter) { QueryBuilder finalQuery = null; // add the source if (container.query() != null) { @@ -59,18 +59,12 @@ public abstract class SourceGenerator { sorting(container, source); source.fetchSource(FetchSourceContext.FETCH_SOURCE); - // set fetch size - if (size != null) { - int sz = size; - if (container.limit() != null) { - Limit limit = container.limit(); - // negative limit means DESC order but since the results are ordered ASC - // pagination becomes mute (since all the data needs to be returned) - sz = limit.limit > 0 ? Math.min(limit.total, size) : limit.total; - } - - if (source.size() == -1) { - source.size(sz); + if (container.limit() != null) { + // add size and from + source.size(container.limit().absLimit()); + // this should be added only for event queries + if (container.limit().offset > 0) { + source.from(container.limit().offset); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java index c43b36bee55..9c916dff017 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/LogicalPlanBuilder.java @@ -102,7 +102,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder { plan = new OrderBy(defaultOrderSource, plan, orders); // add the default limit only if specified - Literal defaultSize = new Literal(synthetic(""), params.fetchSize(), DataTypes.INTEGER); + Literal defaultSize = new Literal(synthetic(""), params.size(), DataTypes.INTEGER); Source defaultLimitSource = synthetic(""); LogicalPlan previous = plan; @@ -209,7 +209,12 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder { List keys = CollectionUtils.combine(joinKeys, visitJoinKeys(joinCtx)); LogicalPlan eventQuery = visitEventFilter(subqueryCtx.eventFilter()); - LogicalPlan child = new Project(source(ctx), eventQuery, CollectionUtils.combine(keys, defaultProjection())); + // add fetch size as a limit so it gets propagated into the resulting query + LogicalPlan fetchSize = new LimitWithOffset(synthetic(""), + new Literal(synthetic(""), params.fetchSize(), DataTypes.INTEGER), + eventQuery); + // filter fields + LogicalPlan child = new Project(source(ctx), fetchSize, CollectionUtils.combine(keys, defaultProjection())); return new KeyedFilter(source(ctx), child, keys, fieldTimestamp(), fieldTiebreaker()); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java index 56a9aa367bc..674aac50649 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/parser/ParserParams.java @@ -14,6 +14,7 @@ import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP; +import static org.elasticsearch.xpack.eql.action.RequestDefaults.SIZE; public class ParserParams { @@ -22,6 +23,7 @@ public class ParserParams { private String fieldTimestamp = FIELD_TIMESTAMP; private String fieldTiebreaker = null; private String implicitJoinKey = FIELD_IMPLICIT_JOIN_KEY; + private int size = SIZE; private int fetchSize = FETCH_SIZE; private List queryParams = emptyList(); @@ -65,6 +67,15 @@ public class ParserParams { return this; } + public int size() { + return size; + } + + public ParserParams size(int size) { + this.size = size; + return this; + } + public int fetchSize() { return fetchSize; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java index 72b6fc2c68d..9686a1e5e1a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java @@ -51,7 +51,9 @@ public class EsQueryExec extends LeafExec { public QueryRequest queryRequest(EqlSession session) { EqlConfiguration cfg = session.configuration(); - SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter(), cfg.size()); + // by default use the configuration size + // join/sequence queries will want to override this + SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter()); return () -> sourceBuilder; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index e9544c76abc..d428cbec5bd 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -24,12 +24,12 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.async.AsyncExecutionId; -import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.eql.action.EqlSearchAction; import org.elasticsearch.xpack.eql.action.EqlSearchRequest; import org.elasticsearch.xpack.eql.action.EqlSearchResponse; import org.elasticsearch.xpack.eql.action.EqlSearchTask; +import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService; import org.elasticsearch.xpack.eql.execution.PlanExecutor; import org.elasticsearch.xpack.eql.parser.ParserParams; import org.elasticsearch.xpack.eql.session.EqlConfiguration; @@ -116,10 +116,11 @@ public class TransportEqlSearchAction extends HandledTransportAction listener.onResponse(createResponse(r, task.getExecutionId())), listener::onFailure)); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java index 3228967def4..303fcf63206 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java @@ -168,7 +168,7 @@ public class QueryContainer { public String toString() { try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.humanReadable(true).prettyPrint(); - SourceGenerator.sourceBuilder(this, null, null).toXContent(builder, ToXContent.EMPTY_PARAMS); + SourceGenerator.sourceBuilder(this, null).toXContent(builder, ToXContent.EMPTY_PARAMS); return Strings.toString(builder); } catch (IOException e) { throw new EqlIllegalArgumentException("error rendering", e); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java index 53fde26794b..792a7b3d415 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java @@ -19,7 +19,6 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu private final String[] indices; private final TimeValue requestTimeout; - private final int size; private final String clientId; private final boolean includeFrozenIndices; private final TaskId taskId; @@ -30,13 +29,12 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu private final QueryBuilder filter; public EqlConfiguration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter, TimeValue requestTimeout, - int size, boolean includeFrozen, boolean isCaseSensitive, String clientId, TaskId taskId, EqlSearchTask task) { + boolean includeFrozen, boolean isCaseSensitive, String clientId, TaskId taskId, EqlSearchTask task) { super(zi, username, clusterName); this.indices = indices; this.filter = filter; this.requestTimeout = requestTimeout; - this.size = size; this.clientId = clientId; this.includeFrozenIndices = includeFrozen; this.isCaseSensitive = isCaseSensitive; @@ -60,10 +58,6 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu return filter; } - public int size() { - return size; - } - public String clientId() { return clientId; } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java index cbefc678781..0a472aeeba8 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java @@ -17,7 +17,6 @@ import java.util.Collections; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; import static org.elasticsearch.test.ESTestCase.randomBoolean; -import static org.elasticsearch.test.ESTestCase.randomIntBetween; import static org.elasticsearch.test.ESTestCase.randomLong; import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong; import static org.elasticsearch.test.ESTestCase.randomZone; @@ -28,12 +27,12 @@ public final class EqlTestUtils { } public static final EqlConfiguration TEST_CFG_CASE_INSENSITIVE = new EqlConfiguration(new String[] {"none"}, - org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, false, "", - new TaskId("test", 123), null); + org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), false, false, + "", new TaskId("test", 123), null); public static final EqlConfiguration TEST_CFG_CASE_SENSITIVE = new EqlConfiguration(new String[] {"none"}, - org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, true, "", - new TaskId("test", 123), null); + org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), false, true, + "", new TaskId("test", 123), null); public static EqlConfiguration randomConfiguration() { return internalRandomConfiguration(randomBoolean()); @@ -50,7 +49,6 @@ public final class EqlTestUtils { randomAlphaOfLength(16), null, new TimeValue(randomNonNegativeLong()), - randomIntBetween(5, 100), randomBoolean(), isCaseSensitive, randomAlphaOfLength(16), diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlRequestParserTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlRequestParserTests.java index b9aca1e171f..fcfe32f0d4f 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlRequestParserTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlRequestParserTests.java @@ -71,7 +71,8 @@ public class EqlRequestParserTests extends ESTestCase { assertEquals("etf", request.eventCategoryField()); assertEquals("imjf", request.implicitJoinKeyField()); assertArrayEquals(new Object[]{12345678, "device-20184", "/user/local/foo.exe", "2019-11-26T00:45:43.542"}, request.searchAfter()); - assertEquals(101, request.fetchSize()); + assertEquals(101, request.size()); + assertEquals(1000, request.fetchSize()); assertEquals("file where user != 'SYSTEM' by file_path", request.query()); assertEquals(setIsCaseSensitive && isCaseSensitive, request.isCaseSensitive()); } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java index 7621ab62e90..ad75df2e967 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java @@ -81,7 +81,7 @@ public class LogicalPlanTests extends ESTestCase { Order order = new Order(Source.EMPTY, timestamp(), OrderDirection.ASC, NullsPosition.FIRST); LogicalPlan project = new Project(Source.EMPTY, filter, singletonList(timestamp())); LogicalPlan sorted = new OrderBy(Source.EMPTY, project, singletonList(order)); - LogicalPlan head = new Head(Source.EMPTY, new Literal(Source.EMPTY, RequestDefaults.FETCH_SIZE, DataTypes.INTEGER), sorted); + LogicalPlan head = new Head(Source.EMPTY, new Literal(Source.EMPTY, RequestDefaults.SIZE, DataTypes.INTEGER), sorted); return head; } diff --git a/x-pack/plugin/eql/src/test/resources/queryfolder_tests.txt b/x-pack/plugin/eql/src/test/resources/queryfolder_tests.txt index aa5f5a6ef69..0202769caba 100644 --- a/x-pack/plugin/eql/src/test/resources/queryfolder_tests.txt +++ b/x-pack/plugin/eql/src/test/resources/queryfolder_tests.txt @@ -534,3 +534,27 @@ process where subtract(43, serial_event_id) == 41 InternalQlScriptUtils.sub(params.v0,InternalQlScriptUtils.docValue(doc,params.v1)),params.v2))", "params":{"v0":43,"v1":"serial_event_id","v2":41} ; + +eventQueryDefaultLimit +process where true +; +"size":10, +; + +eventQueryWithHead +process where true | head 5 +; +"size":5, +; + +eventQueryWithTail +process where true | tail 5 +; +"size":5, +; + +eventQueryWithHeadAndTail +process where true | tail 10 | head 7 +; +"size":7, +;