EQL: Obey size request parameter (#59014)

While at it, change the default size to 10 (to align it with the search
API defaults).

(cherry picked from commit 45795939b277e736a9e4f2f008d1c3f406239075)
This commit is contained in:
Costin Leau 2020-07-03 16:29:53 +03:00 committed by Costin Leau
parent d66084dcaf
commit fe775a315f
9 changed files with 109 additions and 52 deletions

View File

@ -9,6 +9,7 @@ package org.elasticsearch.test.eql;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.Build;
import org.elasticsearch.client.EqlClient;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
@ -143,7 +144,11 @@ public abstract class CommonEqlActionTestCase extends ESRestTestCase {
EqlSearchRequest request = new EqlSearchRequest(testIndexName, query);
request.isCaseSensitive(isCaseSensitive);
request.tiebreakerField("event.sequence");
return highLevelClient().eql().search(request, RequestOptions.DEFAULT);
return eqlClient().search(request, RequestOptions.DEFAULT);
}
private EqlClient eqlClient() {
return highLevelClient().eql();
}
protected void assertSearchHits(List<SearchHit> events) {

View File

@ -14,5 +14,5 @@ public final class RequestDefaults {
public static final String FIELD_EVENT_CATEGORY = "event.category";
public static final String FIELD_IMPLICIT_JOIN_KEY = "agent.id";
public static int FETCH_SIZE = 50;
public static int FETCH_SIZE = 10;
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.xpack.eql.parser.EqlBaseParser.SubqueryContext;
import org.elasticsearch.xpack.eql.plan.logical.Head;
import org.elasticsearch.xpack.eql.plan.logical.Join;
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
import org.elasticsearch.xpack.eql.plan.logical.Tail;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
@ -33,6 +34,7 @@ import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.Literal;
import org.elasticsearch.xpack.ql.expression.Order;
import org.elasticsearch.xpack.ql.expression.Order.NullsPosition;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.ql.expression.predicate.logical.And;
@ -55,24 +57,26 @@ import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.ql.tree.Source.synthetic;
public abstract class LogicalPlanBuilder extends ExpressionBuilder {
private static final Set<String> SUPPORTED_PIPES = Sets.newHashSet("count", "filter", "head", "sort", "tail", "unique", "unique_count");
private final UnresolvedRelation RELATION = new UnresolvedRelation(Source.EMPTY, null, "", false, "");
private final EmptyAttribute UNSPECIFIED_FIELD = new EmptyAttribute(Source.EMPTY);
private final UnresolvedRelation RELATION = new UnresolvedRelation(synthetic("<relation>"), null, "", false, "");
private final EmptyAttribute UNSPECIFIED_FIELD = new EmptyAttribute(synthetic("<unspecified>"));
public LogicalPlanBuilder(ParserParams params) {
super(params);
}
private Attribute fieldTimestamp() {
return new UnresolvedAttribute(Source.EMPTY, params.fieldTimestamp());
return new UnresolvedAttribute(synthetic("<timestamp>"), params.fieldTimestamp());
}
private Attribute fieldTiebreaker() {
return params.fieldTiebreaker() != null ? new UnresolvedAttribute(Source.EMPTY, params.fieldTiebreaker()) : UNSPECIFIED_FIELD;
return params.fieldTiebreaker() != null ?
new UnresolvedAttribute(synthetic("<tiebreaker>"), params.fieldTiebreaker()) : UNSPECIFIED_FIELD;
}
private OrderDirection defaultDirection() {
@ -84,19 +88,49 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
LogicalPlan plan = plan(ctx.query());
// the first pipe will be the implicit order
boolean asc = defaultDirection() == OrderDirection.ASC;
NullsPosition position = asc ? NullsPosition.FIRST : NullsPosition.LAST;
List<Order> orders = new ArrayList<>(2);
Source source = plan.source();
orders.add(new Order(source, fieldTimestamp(), defaultDirection(), Order.NullsPosition.FIRST));
Source defaultOrderSource = synthetic("<default-order>");
orders.add(new Order(defaultOrderSource, fieldTimestamp(), defaultDirection(), position));
// make sure to add the tiebreaker as well
Attribute tiebreaker = fieldTiebreaker();
if (Expressions.isPresent(tiebreaker)) {
orders.add(new Order(source, tiebreaker, defaultDirection(), Order.NullsPosition.FIRST));
orders.add(new Order(defaultOrderSource, tiebreaker, defaultDirection(), position));
}
plan = new OrderBy(source, plan, orders);
// add the actual declared pipes
plan = new OrderBy(defaultOrderSource, plan, orders);
// add the default limit only if specified
Literal defaultSize = new Literal(synthetic("<default-size>"), params.fetchSize(), DataTypes.INTEGER);
Source defaultLimitSource = synthetic("<default-limit>");
LogicalPlan previous = plan;
boolean missingLimit = true;
for (PipeContext pipeCtx : ctx.pipe()) {
plan = pipe(pipeCtx, plan);
plan = pipe(pipeCtx, previous);
if (missingLimit && plan instanceof LimitWithOffset) {
missingLimit = false;
if (plan instanceof Head) {
previous = new Head(defaultLimitSource, defaultSize, previous);
} else {
previous = new Tail(defaultLimitSource, defaultSize, previous);
}
plan = plan.replaceChildren(singletonList(previous));
}
previous = plan;
}
// add limit based on the default order if no tail/head was specified
if (missingLimit) {
if (asc) {
plan = new Head(defaultLimitSource, defaultSize, plan);
} else {
plan = new Tail(defaultLimitSource, defaultSize, plan);
}
}
return plan;
}

View File

@ -10,6 +10,7 @@ import java.time.ZoneId;
import java.util.List;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP;
@ -21,6 +22,7 @@ public class ParserParams {
private String fieldTimestamp = FIELD_TIMESTAMP;
private String fieldTiebreaker = null;
private String implicitJoinKey = FIELD_IMPLICIT_JOIN_KEY;
private int fetchSize = FETCH_SIZE;
private List<Object> queryParams = emptyList();
public ParserParams(ZoneId zoneId) {
@ -63,6 +65,15 @@ public class ParserParams {
return this;
}
public int fetchSize() {
return fetchSize;
}
public ParserParams fetchSize(int fetchSize) {
this.fetchSize = fetchSize;
return this;
}
public List<Object> params() {
return queryParams;
}

View File

@ -115,7 +115,8 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRe
.fieldEventCategory(request.eventCategoryField())
.fieldTimestamp(request.timestampField())
.fieldTiebreaker(request.tiebreakerField())
.implicitJoinKey(request.implicitJoinKeyField());
.implicitJoinKey(request.implicitJoinKeyField())
.fetchSize(request.fetchSize());
EqlConfiguration cfg = new EqlConfiguration(request.indices(), zoneId, username, clusterName, filter, timeout, request.fetchSize(),
includeFrozen, request.isCaseSensitive(), clientId, new TaskId(nodeId, task.getId()), task);

View File

@ -87,9 +87,7 @@ public class OptimizerTests extends ESTestCase {
);
for (String q : tests) {
LogicalPlan plan = accept(q);
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
LogicalPlan plan = defaultPipes(accept(q));
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
@ -110,9 +108,7 @@ public class OptimizerTests extends ESTestCase {
);
for (String q : tests) {
LogicalPlan plan = accept(q);
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
LogicalPlan plan = defaultPipes(accept(q));
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof Filter);
@ -133,9 +129,7 @@ public class OptimizerTests extends ESTestCase {
);
for (String q : tests) {
LogicalPlan plan = accept(q);
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
LogicalPlan plan = defaultPipes(accept(q));
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof Filter);
@ -159,9 +153,7 @@ public class OptimizerTests extends ESTestCase {
);
for (String q : tests) {
LogicalPlan plan = accept(q);
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
LogicalPlan plan = defaultPipes(accept(q));
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
@ -181,9 +173,7 @@ public class OptimizerTests extends ESTestCase {
}
public void testWildcardEscapes() {
LogicalPlan plan = accept("foo where command_line == '* %bar_ * \\\\ \\n \\r \\t'");
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
LogicalPlan plan = defaultPipes(accept("foo where command_line == '* %bar_ * \\\\ \\n \\r \\t'"));
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof Filter);
@ -306,4 +296,11 @@ public class OptimizerTests extends ESTestCase {
Order order = orderBy.order().get(0);
assertEquals(direction, order.direction());
}
private LogicalPlan defaultPipes(LogicalPlan plan) {
assertTrue(plan instanceof LimitWithOffset);
plan = ((LimitWithOffset) plan).child();
assertTrue(plan instanceof OrderBy);
return ((OrderBy) plan).child();
}
}

View File

@ -8,12 +8,16 @@ package org.elasticsearch.xpack.eql.parser;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.eql.action.RequestDefaults;
import org.elasticsearch.xpack.eql.plan.logical.Head;
import org.elasticsearch.xpack.eql.plan.logical.Join;
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.Literal;
import org.elasticsearch.xpack.ql.expression.NamedExpression;
import org.elasticsearch.xpack.ql.expression.Order;
import org.elasticsearch.xpack.ql.expression.Order.NullsPosition;
@ -25,6 +29,7 @@ import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataTypes;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -52,22 +57,14 @@ public class LogicalPlanTests extends ESTestCase {
LogicalPlan fullQuery = parser.createStatement("any where process_name == 'net.exe'");
Expression fullExpression = expr("process_name == 'net.exe'");
LogicalPlan filter = new Filter(Source.EMPTY, relation(), fullExpression);
Order order = new Order(Source.EMPTY, timestamp(), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan project = new Project(Source.EMPTY, filter, singletonList(timestamp()));
LogicalPlan sorted = new OrderBy(Source.EMPTY, project, singletonList(order));
assertEquals(sorted, fullQuery);
assertEquals(wrapFilter(fullExpression), fullQuery);
}
public void testEventQuery() {
LogicalPlan fullQuery = parser.createStatement("process where process_name == 'net.exe'");
Expression fullExpression = expr("event.category == 'process' and process_name == 'net.exe'");
LogicalPlan filter = new Filter(Source.EMPTY, relation(), fullExpression);
Order order = new Order(Source.EMPTY, timestamp(), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan project = new Project(Source.EMPTY, filter, singletonList(timestamp()));
LogicalPlan sorted = new OrderBy(Source.EMPTY, project, singletonList(order));
assertEquals(sorted, fullQuery);
assertEquals(wrapFilter(fullExpression), fullQuery);
}
public void testParameterizedEventQuery() {
@ -75,14 +72,19 @@ public class LogicalPlanTests extends ESTestCase {
LogicalPlan fullQuery = parser.createStatement("process where process_name == 'net.exe'", params);
Expression fullExpression = expr("myCustomEvent == 'process' and process_name == 'net.exe'");
LogicalPlan filter = new Filter(Source.EMPTY, relation(), fullExpression);
assertEquals(wrapFilter(fullExpression), fullQuery);
}
private LogicalPlan wrapFilter(Expression exp) {
LogicalPlan filter = new Filter(Source.EMPTY, relation(), exp);
Order order = new Order(Source.EMPTY, timestamp(), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan project = new Project(Source.EMPTY, filter, singletonList(timestamp()));
LogicalPlan sorted = new OrderBy(Source.EMPTY, project, singletonList(order));
assertEquals(sorted, fullQuery);
LogicalPlan head = new Head(Source.EMPTY, new Literal(Source.EMPTY, RequestDefaults.FETCH_SIZE, DataTypes.INTEGER), sorted);
return head;
}
public void testJoinPlan() {
LogicalPlan plan = parser.createStatement(
"join by pid " +
@ -93,9 +95,7 @@ public class LogicalPlanTests extends ESTestCase {
" " +
"until [process where event_subtype_full == \"termination_event\"]");
assertEquals(OrderBy.class, plan.getClass());
OrderBy ob = (OrderBy) plan;
plan = ob.child();
plan = defaultPipes(plan);
assertEquals(Join.class, plan.getClass());
Join join = (Join) plan;
assertEquals(KeyedFilter.class, join.until().getClass());
@ -124,9 +124,7 @@ public class LogicalPlanTests extends ESTestCase {
" [process where process_name == \"*\" ] " +
" [file where file_path == \"*\"]");
assertEquals(OrderBy.class, plan.getClass());
OrderBy ob = (OrderBy) plan;
plan = ob.child();
plan = defaultPipes(plan);
assertEquals(Sequence.class, plan.getClass());
Sequence seq = (Sequence) plan;
assertEquals(KeyedFilter.class, seq.until().getClass());
@ -146,4 +144,12 @@ public class LogicalPlanTests extends ESTestCase {
TimeValue maxSpan = seq.maxSpan();
assertEquals(new TimeValue(2, TimeUnit.SECONDS), maxSpan);
}
private LogicalPlan defaultPipes(LogicalPlan plan) {
assertTrue(plan instanceof LimitWithOffset);
plan = ((LimitWithOffset) plan).child();
assertTrue(plan instanceof OrderBy);
return ((OrderBy) plan).child();
}
}

View File

@ -89,8 +89,7 @@ public class UnresolvedRelation extends LeafPlan implements Unresolvable {
}
UnresolvedRelation other = (UnresolvedRelation) obj;
return Objects.equals(source(), other.source())
&& Objects.equals(table, other.table)
return Objects.equals(table, other.table)
&& Objects.equals(alias, other.alias)
&& Objects.equals(frozen, other.frozen)
&& Objects.equals(unresolvedMsg, other.unresolvedMsg);

View File

@ -55,4 +55,8 @@ public final class Source {
public String toString() {
return text + location;
}
public static Source synthetic(String text) {
return new Source(Location.EMPTY, text);
}
}