EQL: Introduce sequencing fetch size (#59063)

The current internal sequence algorithm relies on fetching multiple results and then paginating through the dataset. Depending on the dataset and memory, setting a larger page size can yield better performance at the expense of memory.
This PR makes this behavior explicit by decoupling the fetch size from size, the maximum number of results desired.
As such, use in testing a minimum fetch size which exposed a number of bugs:

Jumping across data across queries causing valid data to be seen as a gap.
Incorrectly resuming searching across pages (again causing data to be discarded).
which have been addressed.

(cherry picked from commit 2f389a7724790d7b0bda67264d6eafcfa8b2116e)
This commit is contained in:
Costin Leau 2020-07-06 18:50:40 +03:00 committed by Costin Leau
parent b2e9c6f640
commit f9c15d0fec
23 changed files with 277 additions and 121 deletions

View File

@ -43,7 +43,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
private String implicitJoinKeyField = "agent.id";
private boolean isCaseSensitive = true;
private int fetchSize = 50;
private int size = 10;
private int fetchSize = 1000;
private SearchAfterBuilder searchAfterBuilder;
private String query;
private String tiebreakerField;
@ -60,6 +61,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
static final String KEY_CASE_SENSITIVE = "case_sensitive";
static final String KEY_SIZE = "size";
static final String KEY_FETCH_SIZE = "fetch_size";
static final String KEY_SEARCH_AFTER = "search_after";
static final String KEY_QUERY = "query";
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
@ -85,7 +87,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
if (implicitJoinKeyField != null) {
builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField());
}
builder.field(KEY_SIZE, fetchSize());
builder.field(KEY_SIZE, size());
builder.field(KEY_FETCH_SIZE, fetchSize());
if (searchAfterBuilder != null) {
builder.array(KEY_SEARCH_AFTER, searchAfterBuilder.getSortValues());
@ -172,14 +175,26 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
return this;
}
public int size() {
return this.size;
}
public EqlSearchRequest size(int size) {
this.size = size;
if (fetchSize <= 0) {
throw new IllegalArgumentException("size must be greater than 0");
}
return this;
}
public int fetchSize() {
return this.fetchSize;
}
public EqlSearchRequest fetchSize(int size) {
this.fetchSize = size;
if (fetchSize <= 0) {
throw new IllegalArgumentException("size must be greater than 0");
if (fetchSize < 2) {
throw new IllegalArgumentException("fetch size must be greater than 1");
}
return this;
}
@ -246,7 +261,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
return false;
}
EqlSearchRequest that = (EqlSearchRequest) o;
return fetchSize == that.fetchSize &&
return size == that.size &&
fetchSize == that.fetchSize &&
Arrays.equals(indices, that.indices) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(filter, that.filter) &&
@ -268,6 +284,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
Arrays.hashCode(indices),
indicesOptions,
filter,
size,
fetchSize,
timestampField,
tiebreakerField,

View File

@ -103,7 +103,7 @@ public class EqlIT extends ESRestHighLevelClientTestCase {
public void testBasicSearch() throws Exception {
EqlClient eql = highLevelClient().eql();
EqlSearchRequest request = new EqlSearchRequest("index", "process where true");
EqlSearchRequest request = new EqlSearchRequest("index", "process where true").size(RECORD_COUNT);
assertResponse(execute(request, eql::search, eql::searchAsync), RECORD_COUNT);
}
@ -115,7 +115,7 @@ public class EqlIT extends ESRestHighLevelClientTestCase {
EqlSearchRequest request = new EqlSearchRequest("index", "foo where pid > 0");
// test with non-default event.category mapping
request.eventCategoryField("event_type");
request.eventCategoryField("event_type").size(RECORD_COUNT);
EqlSearchResponse response = execute(request, eql::search, eql::searchAsync);
assertResponse(response, RECORD_COUNT / DIVIDER);

View File

@ -144,6 +144,9 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
EqlSearchRequest request = new EqlSearchRequest(testIndexName, query);
request.isCaseSensitive(isCaseSensitive);
request.tiebreakerField("event.sequence");
// some queries return more than 10 results
request.size(50);
request.fetchSize(2);
return eqlClient().search(request, RequestOptions.DEFAULT);
}

View File

@ -32,7 +32,6 @@ import java.util.Objects;
import java.util.function.Supplier;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP;
@ -51,7 +50,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
private String tiebreakerField = null;
private String eventCategoryField = FIELD_EVENT_CATEGORY;
private String implicitJoinKeyField = FIELD_IMPLICIT_JOIN_KEY;
private int fetchSize = FETCH_SIZE;
private int size = RequestDefaults.SIZE;
private int fetchSize = RequestDefaults.FETCH_SIZE;
private SearchAfterBuilder searchAfterBuilder;
private String query;
private boolean isCaseSensitive = false;
@ -67,6 +67,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
static final String KEY_SIZE = "size";
static final String KEY_FETCH_SIZE = "fetch_size";
static final String KEY_SEARCH_AFTER = "search_after";
static final String KEY_QUERY = "query";
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
@ -80,6 +81,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
static final ParseField EVENT_CATEGORY_FIELD = new ParseField(KEY_EVENT_CATEGORY_FIELD);
static final ParseField IMPLICIT_JOIN_KEY_FIELD = new ParseField(KEY_IMPLICIT_JOIN_KEY_FIELD);
static final ParseField SIZE = new ParseField(KEY_SIZE);
static final ParseField FETCH_SIZE = new ParseField(KEY_FETCH_SIZE);
static final ParseField SEARCH_AFTER = new ParseField(KEY_SEARCH_AFTER);
static final ParseField QUERY = new ParseField(KEY_QUERY);
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField(KEY_WAIT_FOR_COMPLETION_TIMEOUT);
@ -102,6 +104,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
tiebreakerField = in.readOptionalString();
eventCategoryField = in.readString();
implicitJoinKeyField = in.readString();
size = in.readVInt();
fetchSize = in.readVInt();
searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
query = in.readString();
@ -148,10 +151,14 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
validationException = addValidationError("implicit join key field is null or empty", validationException);
}
if (fetchSize <= 0) {
if (size <= 0) {
validationException = addValidationError("size must be greater than 0", validationException);
}
if (fetchSize < 2) {
validationException = addValidationError("fetch size must be greater than 1", validationException);
}
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEP_ALIVE) {
validationException =
addValidationError("[keep_alive] must be greater than 1 minute, got:" + keepAlive.toString(), validationException);
@ -173,7 +180,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
if (implicitJoinKeyField != null) {
builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField());
}
builder.field(KEY_SIZE, fetchSize());
builder.field(KEY_SIZE, size());
builder.field(KEY_FETCH_SIZE, fetchSize());
if (searchAfterBuilder != null) {
builder.array(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues());
@ -204,7 +212,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
parser.declareString(EqlSearchRequest::tiebreakerField, TIEBREAKER_FIELD);
parser.declareString(EqlSearchRequest::eventCategoryField, EVENT_CATEGORY_FIELD);
parser.declareString(EqlSearchRequest::implicitJoinKeyField, IMPLICIT_JOIN_KEY_FIELD);
parser.declareInt(EqlSearchRequest::fetchSize, SIZE);
parser.declareInt(EqlSearchRequest::size, SIZE);
parser.declareInt(EqlSearchRequest::fetchSize, FETCH_SIZE);
parser.declareField(EqlSearchRequest::setSearchAfter, SearchAfterBuilder::fromXContent, SEARCH_AFTER,
ObjectParser.ValueType.OBJECT_ARRAY);
parser.declareString(EqlSearchRequest::query, QUERY);
@ -259,10 +268,21 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
return this;
}
public int fetchSize() { return this.fetchSize; }
public int size() {
return this.size;
}
public EqlSearchRequest fetchSize(int size) {
this.fetchSize = size;
public EqlSearchRequest size(int size) {
this.size = size;
return this;
}
public int fetchSize() {
return this.fetchSize;
}
public EqlSearchRequest fetchSize(int fetchSize) {
this.fetchSize = fetchSize;
return this;
}
@ -334,6 +354,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
out.writeOptionalString(tiebreakerField);
out.writeString(eventCategoryField);
out.writeString(implicitJoinKeyField);
out.writeVInt(size);
out.writeVInt(fetchSize);
out.writeOptionalWriteable(searchAfterBuilder);
out.writeString(query);
@ -354,7 +375,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
return false;
}
EqlSearchRequest that = (EqlSearchRequest) o;
return fetchSize == that.fetchSize &&
return size == that.size &&
fetchSize == that.fetchSize &&
Arrays.equals(indices, that.indices) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(filter, that.filter) &&
@ -375,6 +397,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
Arrays.hashCode(indices),
indicesOptions,
filter,
size,
fetchSize,
timestampField,
tiebreakerField,

View File

@ -45,8 +45,13 @@ public class EqlSearchRequestBuilder extends ActionRequestBuilder<EqlSearchReque
return this;
}
public EqlSearchRequestBuilder fetchSize(int size) {
request.fetchSize(size);
public EqlSearchRequestBuilder size(int size) {
request.size(size);
return this;
}
public EqlSearchRequestBuilder fetchSize(int fetchSize) {
request.fetchSize(fetchSize);
return this;
}

View File

@ -14,5 +14,6 @@ public final class RequestDefaults {
public static final String FIELD_EVENT_CATEGORY = "event.category";
public static final String FIELD_IMPLICIT_JOIN_KEY = "agent.id";
public static int FETCH_SIZE = 10;
public static int SIZE = 10;
public static int FETCH_SIZE = 1000;
}

View File

@ -15,6 +15,15 @@ import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
/**
* Ranged or boxed query. Provides a beginning or end to the current query.
* The query moves between them through search_after.
*
* Note that the range is not set at once on purpose since each query tends to have
* its own number of results separate from the others.
* As such, each query starts where it lefts to reach the current in-progress window
* as oppose to always operating with the exact same window.
*/
public class BoxedQueryRequest implements QueryRequest {
private final RangeQueryBuilder timestampRange;
@ -22,6 +31,9 @@ public class BoxedQueryRequest implements QueryRequest {
private final SearchSourceBuilder searchSource;
private Ordinal from, to;
private Ordinal after;
public BoxedQueryRequest(QueryRequest original, String timestamp, String tiebreaker) {
searchSource = original.searchSource();
@ -44,28 +56,50 @@ public class BoxedQueryRequest implements QueryRequest {
}
@Override
public void next(Ordinal ordinal) {
// reset existing constraints
timestampRange.gte(null).lte(null);
if (tiebreakerRange != null) {
tiebreakerRange.gte(null).lte(null);
}
public void nextAfter(Ordinal ordinal) {
after = ordinal;
// and leave only search_after
searchSource.searchAfter(ordinal.toArray());
}
public BoxedQueryRequest between(Ordinal begin, Ordinal end) {
timestampRange.gte(begin.timestamp()).lte(end.timestamp());
/**
* Sets the lower boundary for the query (non-inclusive).
* Can be removed (when the query in unbounded) through null.
*/
public BoxedQueryRequest from(Ordinal begin) {
from = begin;
if (tiebreakerRange != null) {
tiebreakerRange.gte(begin.tiebreaker()).lte(end.tiebreaker());
timestampRange.gte(begin != null ? begin.timestamp() : null);
tiebreakerRange.gt(begin != null ? begin.tiebreaker() : null);
} else {
timestampRange.gt(begin != null ? begin.timestamp() : null);
}
return this;
}
public Ordinal from() {
return from;
}
/**
* Sets the upper boundary for the query (inclusive).
* Can be removed (when the query in unbounded) through null.
*/
public BoxedQueryRequest to(Ordinal end) {
to = end;
timestampRange.lte(end != null ? end.timestamp() : null);
if (tiebreakerRange != null) {
tiebreakerRange.lte(end != null ? end.tiebreaker() : null);
}
return this;
}
@Override
public String toString() {
return searchSource.toString();
return "( " + string(from) + " >-" + string(after) + "-> " + string(to) + "]";
}
private static String string(Ordinal o) {
return o != null ? o.toString() : "<none>";
}
}

View File

@ -86,4 +86,9 @@ public class Criterion<Q extends QueryRequest> {
}
return new Ordinal(timestamp, tbreaker);
}
@Override
public String toString() {
return "[" + stage + "][" + reverse + "]";
}
}

View File

@ -69,9 +69,11 @@ public class ExecutionManager {
if (query instanceof EsQueryExec) {
QueryRequest original = ((EsQueryExec) query).queryRequest(session);
// increase the request size based on the fetch size (since size is applied already through limit)
BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName);
Criterion<BoxedQueryRequest> criterion =
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i> 0 && descending);
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i > 0 && descending);
criteria.add(criterion);
} else {
// until

View File

@ -41,11 +41,20 @@ public class TumblingWindow implements Executable {
private final Matcher matcher;
// shortcut
private final int maxStages;
private final int windowSize;
private long startTime;
private int baseStage = 0;
private Ordinal begin, end;
private static class WindowInfo {
private final int baseStage;
private final Ordinal begin, end;
WindowInfo(int baseStage, Ordinal begin, Ordinal end) {
this.baseStage = baseStage;
this.begin = begin;
this.end = end;
}
}
public TumblingWindow(QueryClient client,
List<Criterion<BoxedQueryRequest>> criteria,
@ -56,6 +65,7 @@ public class TumblingWindow implements Executable {
this.until = until;
this.criteria = criteria;
this.maxStages = criteria.size();
this.windowSize = criteria.get(0).queryRequest().searchSource().size();
this.matcher = matcher;
}
@ -64,23 +74,21 @@ public class TumblingWindow implements Executable {
public void execute(ActionListener<Payload> listener) {
log.info("Starting sequence window...");
startTime = System.currentTimeMillis();
advance(listener);
advance(0, listener);
}
private void advance(ActionListener<Payload> listener) {
private void advance(int baseStage, ActionListener<Payload> listener) {
// initialize
log.info("Querying base stage");
Criterion<BoxedQueryRequest> base = criteria.get(baseStage);
// remove any potential upper limit (if a criteria has been promoted)
base.queryRequest().to(null);
if (end != null) {
// pick up where we left of
base.queryRequest().next(end);
}
client.query(base.queryRequest(), wrap(p -> baseCriterion(p, listener), listener::onFailure));
log.info("Querying base stage [{}] {}", base.stage(), base.queryRequest());
client.query(base.queryRequest(), wrap(p -> baseCriterion(baseStage, p, listener), listener::onFailure));
}
private void baseCriterion(Payload p, ActionListener<Payload> listener) {
private void baseCriterion(int baseStage, Payload p, ActionListener<Payload> listener) {
Criterion<BoxedQueryRequest> base = criteria.get(baseStage);
List<SearchHit> hits = p.values();
@ -95,14 +103,7 @@ public class TumblingWindow implements Executable {
if (hits.size() < 2) {
// if there are still candidates, advance the window base
if (matcher.hasCandidates(baseStage) && baseStage + 1 < maxStages) {
// swap window begin/end when changing directions
if (base.reverse() != criteria.get(baseStage + 1).reverse()) {
Ordinal temp = begin;
begin = end;
end = temp;
}
baseStage++;
advance(listener);
advance(baseStage + 1, listener);
}
// there aren't going to be any matches so cancel search
else {
@ -112,53 +113,93 @@ public class TumblingWindow implements Executable {
}
// get borders for the rest of the queries
begin = base.ordinal(hits.get(0));
end = base.ordinal(hits.get(hits.size() - 1));
Ordinal begin = base.ordinal(hits.get(0));
Ordinal end = base.ordinal(hits.get(hits.size() - 1));
// update current query for the next request
base.queryRequest().nextAfter(end);
log.info("Found base [{}] window {} {}", base.stage(), begin, end);
// find until ordinals
//NB: not currently implemented
// no more queries to run
if (baseStage + 1 < maxStages) {
secondaryCriterion(baseStage + 1, listener);
secondaryCriterion(new WindowInfo(baseStage, begin, end), baseStage + 1, listener);
} else {
advance(listener);
advance(baseStage, listener);
}
}
private void secondaryCriterion(int index, ActionListener<Payload> listener) {
Criterion<BoxedQueryRequest> criterion = criteria.get(index);
log.info("Querying (secondary) stage {}", criterion.stage());
private void secondaryCriterion(WindowInfo window, int currentStage, ActionListener<Payload> listener) {
final Criterion<BoxedQueryRequest> criterion = criteria.get(currentStage);
final BoxedQueryRequest request = criterion.queryRequest();
Criterion<BoxedQueryRequest> base = criteria.get(window.baseStage);
// first box the query
BoxedQueryRequest request = criterion.queryRequest();
Criterion<BoxedQueryRequest> base = criteria.get(baseStage);
// if the base has a different direction, swap begin/end
// only the first base can be descending
// all subsequence queries are ascending
if (criterion.reverse() != base.reverse()) {
request.between(end, begin);
if (window.end.equals(request.from()) == false) {
// if that's the case, set the starting point
request.from(window.end);
// reposition the pointer
request.nextAfter(window.end);
}
} else {
request.between(begin, end);
// otherwise just the upper limit
request.to(window.end);
}
log.info("Querying (secondary) stage [{}] {}", criterion.stage(), request);
client.query(request, wrap(p -> {
List<SearchHit> hits = p.values();
// no more results in this window so continue in another window
// no more results for this query
if (hits.isEmpty()) {
log.info("Advancing window...");
advance(listener);
return;
// put the markers in place before the next call
if (criterion.reverse() != base.reverse()) {
request.to(window.end);
} else {
request.from(window.end);
}
// if there are no candidates, advance the window
if (matcher.hasCandidates(criterion.stage()) == false) {
log.info("Advancing window...");
advance(window.baseStage, listener);
return;
}
// otherwise let the other queries run to allow potential matches with the existing candidates
}
// if the limit has been reached, return what's available
if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) {
listener.onResponse(payload());
return;
else {
// prepare the query for the next search
request.nextAfter(criterion.ordinal(hits.get(hits.size() - 1)));
// if the limit has been reached, return what's available
if (matcher.match(criterion.stage(), wrapValues(criterion, hits)) == false) {
listener.onResponse(payload());
return;
}
}
if (index + 1 < maxStages) {
secondaryCriterion(index + 1, listener);
} else {
advance(listener);
// keep running the query runs out of the results (essentially returns less than what we want)
if (hits.size() == windowSize) {
secondaryCriterion(window, currentStage, listener);
}
// looks like this stage is done, move on
else {
// to the next query
if (currentStage + 1 < maxStages) {
secondaryCriterion(window, currentStage + 1, listener);
}
// or to the next window
else {
advance(window.baseStage, listener);
}
}
}, listener::onFailure));

View File

@ -34,21 +34,16 @@ public class BasicListener implements ActionListener<SearchResponse> {
if (CollectionUtils.isEmpty(failures) == false) {
listener.onFailure(new EqlIllegalArgumentException(failures[0].reason(), failures[0].getCause()));
} else {
handleResponse(response, listener);
if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}
listener.onResponse(new SearchResponsePayload(response));
}
} catch (Exception ex) {
onFailure(ex);
}
}
private void handleResponse(SearchResponse response, ActionListener<Payload> listener) {
if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}
listener.onResponse(new SearchResponsePayload(response));
}
@Override
public void onFailure(Exception ex) {
listener.onFailure(ex);

View File

@ -12,7 +12,7 @@ public interface QueryRequest {
SearchSourceBuilder searchSource();
default void next(Ordinal ordinal) {
default void nextAfter(Ordinal ordinal) {
searchSource().searchAfter(ordinal.toArray());
}
}

View File

@ -28,7 +28,7 @@ public abstract class SourceGenerator {
private SourceGenerator() {}
public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryBuilder filter, Integer size) {
public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryBuilder filter) {
QueryBuilder finalQuery = null;
// add the source
if (container.query() != null) {
@ -59,18 +59,12 @@ public abstract class SourceGenerator {
sorting(container, source);
source.fetchSource(FetchSourceContext.FETCH_SOURCE);
// set fetch size
if (size != null) {
int sz = size;
if (container.limit() != null) {
Limit limit = container.limit();
// negative limit means DESC order but since the results are ordered ASC
// pagination becomes mute (since all the data needs to be returned)
sz = limit.limit > 0 ? Math.min(limit.total, size) : limit.total;
}
if (source.size() == -1) {
source.size(sz);
if (container.limit() != null) {
// add size and from
source.size(container.limit().absLimit());
// this should be added only for event queries
if (container.limit().offset > 0) {
source.from(container.limit().offset);
}
}

View File

@ -102,7 +102,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
plan = new OrderBy(defaultOrderSource, plan, orders);
// add the default limit only if specified
Literal defaultSize = new Literal(synthetic("<default-size>"), params.fetchSize(), DataTypes.INTEGER);
Literal defaultSize = new Literal(synthetic("<default-size>"), params.size(), DataTypes.INTEGER);
Source defaultLimitSource = synthetic("<default-limit>");
LogicalPlan previous = plan;
@ -209,7 +209,12 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
List<Attribute> keys = CollectionUtils.combine(joinKeys, visitJoinKeys(joinCtx));
LogicalPlan eventQuery = visitEventFilter(subqueryCtx.eventFilter());
LogicalPlan child = new Project(source(ctx), eventQuery, CollectionUtils.combine(keys, defaultProjection()));
// add fetch size as a limit so it gets propagated into the resulting query
LogicalPlan fetchSize = new LimitWithOffset(synthetic("<fetch-size>"),
new Literal(synthetic("<fetch-value>"), params.fetchSize(), DataTypes.INTEGER),
eventQuery);
// filter fields
LogicalPlan child = new Project(source(ctx), fetchSize, CollectionUtils.combine(keys, defaultProjection()));
return new KeyedFilter(source(ctx), child, keys, fieldTimestamp(), fieldTiebreaker());
}

View File

@ -14,6 +14,7 @@ import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.SIZE;
public class ParserParams {
@ -22,6 +23,7 @@ public class ParserParams {
private String fieldTimestamp = FIELD_TIMESTAMP;
private String fieldTiebreaker = null;
private String implicitJoinKey = FIELD_IMPLICIT_JOIN_KEY;
private int size = SIZE;
private int fetchSize = FETCH_SIZE;
private List<Object> queryParams = emptyList();
@ -65,6 +67,15 @@ public class ParserParams {
return this;
}
public int size() {
return size;
}
public ParserParams size(int size) {
this.size = size;
return this;
}
public int fetchSize() {
return fetchSize;
}

View File

@ -51,7 +51,9 @@ public class EsQueryExec extends LeafExec {
public QueryRequest queryRequest(EqlSession session) {
EqlConfiguration cfg = session.configuration();
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter(), cfg.size());
// by default use the configuration size
// join/sequence queries will want to override this
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter());
return () -> sourceBuilder;
}

View File

@ -24,12 +24,12 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.eql.action.EqlSearchAction;
import org.elasticsearch.xpack.eql.action.EqlSearchRequest;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
import org.elasticsearch.xpack.eql.action.EqlSearchTask;
import org.elasticsearch.xpack.eql.async.AsyncTaskManagementService;
import org.elasticsearch.xpack.eql.execution.PlanExecutor;
import org.elasticsearch.xpack.eql.parser.ParserParams;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
@ -116,10 +116,11 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRe
.fieldTimestamp(request.timestampField())
.fieldTiebreaker(request.tiebreakerField())
.implicitJoinKey(request.implicitJoinKeyField())
.size(request.size())
.fetchSize(request.fetchSize());
EqlConfiguration cfg = new EqlConfiguration(request.indices(), zoneId, username, clusterName, filter, timeout, request.fetchSize(),
includeFrozen, request.isCaseSensitive(), clientId, new TaskId(nodeId, task.getId()), task);
EqlConfiguration cfg = new EqlConfiguration(request.indices(), zoneId, username, clusterName, filter, timeout, includeFrozen,
request.isCaseSensitive(), clientId, new TaskId(nodeId, task.getId()), task);
planExecutor.eql(cfg, request.query(), params, wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())),
listener::onFailure));
}

View File

@ -168,7 +168,7 @@ public class QueryContainer {
public String toString() {
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.humanReadable(true).prettyPrint();
SourceGenerator.sourceBuilder(this, null, null).toXContent(builder, ToXContent.EMPTY_PARAMS);
SourceGenerator.sourceBuilder(this, null).toXContent(builder, ToXContent.EMPTY_PARAMS);
return Strings.toString(builder);
} catch (IOException e) {
throw new EqlIllegalArgumentException("error rendering", e);

View File

@ -19,7 +19,6 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu
private final String[] indices;
private final TimeValue requestTimeout;
private final int size;
private final String clientId;
private final boolean includeFrozenIndices;
private final TaskId taskId;
@ -30,13 +29,12 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu
private final QueryBuilder filter;
public EqlConfiguration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter, TimeValue requestTimeout,
int size, boolean includeFrozen, boolean isCaseSensitive, String clientId, TaskId taskId, EqlSearchTask task) {
boolean includeFrozen, boolean isCaseSensitive, String clientId, TaskId taskId, EqlSearchTask task) {
super(zi, username, clusterName);
this.indices = indices;
this.filter = filter;
this.requestTimeout = requestTimeout;
this.size = size;
this.clientId = clientId;
this.includeFrozenIndices = includeFrozen;
this.isCaseSensitive = isCaseSensitive;
@ -60,10 +58,6 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu
return filter;
}
public int size() {
return size;
}
public String clientId() {
return clientId;
}

View File

@ -17,7 +17,6 @@ import java.util.Collections;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.elasticsearch.test.ESTestCase.randomLong;
import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong;
import static org.elasticsearch.test.ESTestCase.randomZone;
@ -28,12 +27,12 @@ public final class EqlTestUtils {
}
public static final EqlConfiguration TEST_CFG_CASE_INSENSITIVE = new EqlConfiguration(new String[] {"none"},
org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, false, "",
new TaskId("test", 123), null);
org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), false, false,
"", new TaskId("test", 123), null);
public static final EqlConfiguration TEST_CFG_CASE_SENSITIVE = new EqlConfiguration(new String[] {"none"},
org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, true, "",
new TaskId("test", 123), null);
org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), false, true,
"", new TaskId("test", 123), null);
public static EqlConfiguration randomConfiguration() {
return internalRandomConfiguration(randomBoolean());
@ -50,7 +49,6 @@ public final class EqlTestUtils {
randomAlphaOfLength(16),
null,
new TimeValue(randomNonNegativeLong()),
randomIntBetween(5, 100),
randomBoolean(),
isCaseSensitive,
randomAlphaOfLength(16),

View File

@ -71,7 +71,8 @@ public class EqlRequestParserTests extends ESTestCase {
assertEquals("etf", request.eventCategoryField());
assertEquals("imjf", request.implicitJoinKeyField());
assertArrayEquals(new Object[]{12345678, "device-20184", "/user/local/foo.exe", "2019-11-26T00:45:43.542"}, request.searchAfter());
assertEquals(101, request.fetchSize());
assertEquals(101, request.size());
assertEquals(1000, request.fetchSize());
assertEquals("file where user != 'SYSTEM' by file_path", request.query());
assertEquals(setIsCaseSensitive && isCaseSensitive, request.isCaseSensitive());
}

View File

@ -81,7 +81,7 @@ public class LogicalPlanTests extends ESTestCase {
Order order = new Order(Source.EMPTY, timestamp(), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan project = new Project(Source.EMPTY, filter, singletonList(timestamp()));
LogicalPlan sorted = new OrderBy(Source.EMPTY, project, singletonList(order));
LogicalPlan head = new Head(Source.EMPTY, new Literal(Source.EMPTY, RequestDefaults.FETCH_SIZE, DataTypes.INTEGER), sorted);
LogicalPlan head = new Head(Source.EMPTY, new Literal(Source.EMPTY, RequestDefaults.SIZE, DataTypes.INTEGER), sorted);
return head;
}

View File

@ -534,3 +534,27 @@ process where subtract(43, serial_event_id) == 41
InternalQlScriptUtils.sub(params.v0,InternalQlScriptUtils.docValue(doc,params.v1)),params.v2))",
"params":{"v0":43,"v1":"serial_event_id","v2":41}
;
eventQueryDefaultLimit
process where true
;
"size":10,
;
eventQueryWithHead
process where true | head 5
;
"size":5,
;
eventQueryWithTail
process where true | tail 5
;
"size":5,
;
eventQueryWithHeadAndTail
process where true | tail 10 | head 7
;
"size":7,
;