EQL: Revert filter pipe (#61907)
The current implementation of the filter pipe is incomplete hence why it got reverted. Note this is not a complete revert as some of the improvements of said commit (such as the PostAnalyzer) are useful in general. Relates #61805 (cherry picked from commit 7a7eb66f7d39586c3a3bc00dce49e6c47a23b46a)
This commit is contained in:
parent
3fb6dc05d2
commit
99ee87e332
|
@ -248,6 +248,16 @@ case_insensitive = true
|
|||
query = 'process where process_name >= "SYSTE" and process_name <= "systex"'
|
||||
expected_event_ids = [1, 2]
|
||||
|
||||
|
||||
[[queries]]
|
||||
name = "processWithStringEqualityCaseInsensitive1"
|
||||
case_insensitive = true
|
||||
query = '''
|
||||
process where process_name == "VMACTHLP.exe" and unique_pid == 12
|
||||
| filter true
|
||||
'''
|
||||
expected_event_ids = [12]
|
||||
|
||||
[[queries]]
|
||||
name = "processNameIN"
|
||||
query = '''
|
||||
|
|
|
@ -46,6 +46,35 @@ name = "equalsNullHead"
|
|||
expected_event_ids = [1, 2, 3, 4, 5]
|
||||
query = 'process where bad_field == null | head 5'
|
||||
|
||||
|
||||
[[queries]]
|
||||
name = "lteAndGtWithFilter"
|
||||
tags = ["comparisons", "pipes"]
|
||||
query = '''
|
||||
process where serial_event_id <= 8 and serial_event_id > 7
|
||||
| filter serial_event_id == 8
|
||||
'''
|
||||
expected_event_ids = [8]
|
||||
|
||||
[[queries]]
|
||||
name = "filtersLteAndGt"
|
||||
query = '''
|
||||
process where true
|
||||
| filter serial_event_id <= 10
|
||||
| filter serial_event_id > 6
|
||||
'''
|
||||
expected_event_ids = [7, 8, 9, 10]
|
||||
|
||||
[[queries]]
|
||||
name = "filterLteAndGtWithHead"
|
||||
query = '''
|
||||
process where true
|
||||
| filter serial_event_id <= 10
|
||||
| filter serial_event_id > 6
|
||||
| head 2
|
||||
'''
|
||||
expected_event_ids = [7, 8]
|
||||
|
||||
[[queries]]
|
||||
name = "headWithFiltersAndTail"
|
||||
query = '''
|
||||
|
@ -100,6 +129,7 @@ case_insensitive = true
|
|||
query = 'process where process_name >= "SYSTE" and process_name <= "systex"'
|
||||
expected_event_ids = [1, 2]
|
||||
|
||||
|
||||
[[queries]]
|
||||
name = "processWithStringEqualityCaseInsensitive1"
|
||||
case_insensitive = true
|
||||
|
|
|
@ -8,7 +8,6 @@ package org.elasticsearch.xpack.eql.analysis;
|
|||
|
||||
import org.elasticsearch.xpack.eql.plan.logical.Head;
|
||||
import org.elasticsearch.xpack.eql.plan.logical.Join;
|
||||
import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
|
||||
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
|
||||
import org.elasticsearch.xpack.eql.plan.logical.Tail;
|
||||
import org.elasticsearch.xpack.eql.stats.FeatureMetric;
|
||||
|
@ -147,9 +146,6 @@ public class Verifier {
|
|||
|
||||
plan.forEachDown(p -> {
|
||||
Set<Failure> localFailures = new LinkedHashSet<>();
|
||||
|
||||
checkNoPipesAfterLimit(p, localFailures);
|
||||
|
||||
failures.addAll(localFailures);
|
||||
|
||||
// mark the plan as analyzed
|
||||
|
@ -240,12 +236,4 @@ public class Verifier {
|
|||
|
||||
return failures;
|
||||
}
|
||||
|
||||
private void checkNoPipesAfterLimit(LogicalPlan p, Set<Failure> localFailures) {
|
||||
if ((p instanceof LimitWithOffset) == false) {
|
||||
if (p.anyMatch(LimitWithOffset.class::isInstance)) {
|
||||
localFailures.add(fail(p, "Pipe [{}] not allowed (yet) after head/tail", p.sourceText()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,7 +19,6 @@ import org.elasticsearch.xpack.ql.expression.Literal;
|
|||
import org.elasticsearch.xpack.ql.expression.Order;
|
||||
import org.elasticsearch.xpack.ql.expression.Order.NullsPosition;
|
||||
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
|
||||
import org.elasticsearch.xpack.ql.expression.predicate.logical.And;
|
||||
import org.elasticsearch.xpack.ql.expression.predicate.logical.Not;
|
||||
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNotNull;
|
||||
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNull;
|
||||
|
@ -39,7 +38,6 @@ import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.ReplaceSurrogateFunct
|
|||
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SetAsOptimized;
|
||||
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.TransformDirection;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.Filter;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.LeafPlan;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.Limit;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
|
||||
|
@ -53,7 +51,6 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
public class Optimizer extends RuleExecutor<LogicalPlan> {
|
||||
|
||||
|
@ -80,8 +77,7 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
|
|||
// prune/elimination
|
||||
new PruneFilters(),
|
||||
new PruneLiteralsInOrderBy(),
|
||||
new CombineLimits(),
|
||||
new PushDownFilterPipe());
|
||||
new CombineLimits());
|
||||
|
||||
Batch ordering = new Batch("Implicit Order",
|
||||
new SortByLimit(),
|
||||
|
@ -241,42 +237,6 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Push down filter pipes.
|
||||
*/
|
||||
static class PushDownFilterPipe extends OptimizerRule<Filter> {
|
||||
|
||||
@Override
|
||||
protected LogicalPlan rule(Filter filter) {
|
||||
LogicalPlan child = filter.child();
|
||||
|
||||
// can't push it down further
|
||||
if (child instanceof LeafPlan) {
|
||||
return filter;
|
||||
}
|
||||
// combine filters if possible
|
||||
if (child instanceof Filter) {
|
||||
Filter f = (Filter) child;
|
||||
return new Filter(f.source(), f.child(), new And(filter.source(), f.condition(), filter.condition()));
|
||||
}
|
||||
|
||||
// treat Join separately to avoid pushing the filter on until
|
||||
if (child instanceof Join) {
|
||||
Join j = (Join) child;
|
||||
return j.with(j.queries().stream()
|
||||
.map(q -> {
|
||||
Filter f = new Filter(filter.source(), q.child(), filter.condition());
|
||||
return new KeyedFilter(q.source(), f, q.keys(), q.timestamp(), q.tiebreaker());
|
||||
})
|
||||
.collect(toList()), j.until(), j.direction());
|
||||
}
|
||||
// otherwise keep pushing it down
|
||||
return child.replaceChildren(child.children().stream()
|
||||
.map(c -> new Filter(filter.source(), c, filter.condition()))
|
||||
.collect(toList()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Align the implicit order with the limit (head means ASC or tail means DESC).
|
||||
*/
|
||||
|
|
|
@ -338,9 +338,6 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
|
|||
}
|
||||
|
||||
switch (name) {
|
||||
case FILTER_PIPE:
|
||||
return new Filter(source(ctx), plan, onlyOnePipeArgument(source(ctx), name, ctx.booleanExpression()));
|
||||
|
||||
case HEAD_PIPE:
|
||||
Expression headLimit = pipeIntArgument(source(ctx), name, ctx.booleanExpression());
|
||||
return new Head(source(ctx), headLimit, plan);
|
||||
|
|
|
@ -332,8 +332,4 @@ public class VerifierTests extends ESTestCase {
|
|||
"define one or use MATCH/QUERY instead",
|
||||
error(idxr, "process where string(multi_field.english) == 'foo'"));
|
||||
}
|
||||
|
||||
public void testLimitWithFilter() {
|
||||
assertEquals("1:30: Pipe [| filter false] not allowed (yet) after head/tail", error("process where true | head 10 | filter false"));
|
||||
}
|
||||
}
|
|
@ -22,7 +22,6 @@ import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
|
|||
import org.elasticsearch.xpack.eql.stats.Metrics;
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.expression.EmptyAttribute;
|
||||
import org.elasticsearch.xpack.ql.expression.Expression;
|
||||
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
|
||||
import org.elasticsearch.xpack.ql.expression.Literal;
|
||||
import org.elasticsearch.xpack.ql.expression.Order;
|
||||
|
@ -32,7 +31,6 @@ import org.elasticsearch.xpack.ql.expression.predicate.logical.And;
|
|||
import org.elasticsearch.xpack.ql.expression.predicate.logical.Not;
|
||||
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNotNull;
|
||||
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNull;
|
||||
import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.Equals;
|
||||
import org.elasticsearch.xpack.ql.expression.predicate.regex.Like;
|
||||
import org.elasticsearch.xpack.ql.index.EsIndex;
|
||||
import org.elasticsearch.xpack.ql.index.IndexResolution;
|
||||
|
@ -302,55 +300,4 @@ public class OptimizerTests extends ESTestCase {
|
|||
assertTrue(plan instanceof OrderBy);
|
||||
return ((OrderBy) plan).child();
|
||||
}
|
||||
|
||||
|
||||
public void testFilterPipePushdownEventQuery() {
|
||||
Filter filter = new Filter(EMPTY, rel(), new IsNull(EMPTY, Literal.TRUE));
|
||||
Filter pipe = new Filter(EMPTY, filter, new Equals(EMPTY, timestamp(), Literal.TRUE));
|
||||
|
||||
LogicalPlan optimized = new Optimizer.PushDownFilterPipe().rule(pipe);
|
||||
assertEquals(Filter.class, optimized.getClass());
|
||||
Filter f = (Filter) optimized;
|
||||
Expression exp = f.condition();
|
||||
assertEquals(And.class, exp.getClass());
|
||||
|
||||
And and = (And) exp;
|
||||
assertEquals(filter.condition(), and.left());
|
||||
assertEquals(pipe.condition(), and.right());
|
||||
}
|
||||
|
||||
public void testFilterPipePushdownSequence() {
|
||||
Filter filter = new Filter(EMPTY, rel(), new IsNull(EMPTY, Literal.TRUE));
|
||||
KeyedFilter rule1 = keyedFilter(filter);
|
||||
KeyedFilter rule2 = keyedFilter(filter);
|
||||
KeyedFilter until = keyedFilter(filter);
|
||||
Sequence s = new Sequence(EMPTY, asList(rule1, rule2), until, TimeValue.MINUS_ONE, timestamp(), tiebreaker(), OrderDirection.ASC);
|
||||
Filter pipe = new Filter(EMPTY, s, new Equals(EMPTY, timestamp(), Literal.TRUE));
|
||||
|
||||
// apply it once to push down the filter
|
||||
LogicalPlan optimized = new Optimizer.PushDownFilterPipe().apply(pipe);
|
||||
// second to combine the filters
|
||||
optimized = new Optimizer.PushDownFilterPipe().apply(optimized);
|
||||
assertEquals(Sequence.class, optimized.getClass());
|
||||
|
||||
Sequence seq = (Sequence) optimized;
|
||||
assertEquals(filter.condition(), condition(seq.until()));
|
||||
|
||||
Expression rule1Condition = condition(seq.children().get(0));
|
||||
And and = (And) rule1Condition;
|
||||
assertEquals(filter.condition(), and.left());
|
||||
assertEquals(pipe.condition(), and.right());
|
||||
|
||||
Expression rule2Condition = condition(seq.children().get(1));
|
||||
and = (And) rule2Condition;
|
||||
assertEquals(filter.condition(), and.left());
|
||||
assertEquals(pipe.condition(), and.right());
|
||||
}
|
||||
|
||||
private Expression condition(LogicalPlan plan) {
|
||||
assertEquals(KeyedFilter.class, plan.getClass());
|
||||
KeyedFilter kf = (KeyedFilter) plan;
|
||||
assertEquals(Filter.class, kf.child().getClass());
|
||||
return ((Filter) kf.child()).condition();
|
||||
}
|
||||
}
|
|
@ -90,24 +90,11 @@ process where serial_event_id = 1;
|
|||
|
||||
process where serial_event_id < 4;
|
||||
|
||||
|
||||
process where false;
|
||||
|
||||
process where missing_field != null;
|
||||
|
||||
process where serial_event_id <= 8 and serial_event_id > 7
|
||||
| filter serial_event_id == 8;
|
||||
|
||||
process where true
|
||||
| filter serial_event_id <= 10
|
||||
| filter serial_event_id > 6;
|
||||
|
||||
process where true
|
||||
| filter serial_event_id <= 10
|
||||
| filter serial_event_id > 6
|
||||
| head 2;
|
||||
|
||||
process where process_name == "VMACTHLP.exe" and unique_pid == 12 | filter true;
|
||||
|
||||
process where process_name == "impossible name" or (serial_event_id < 4.5 and serial_event_id >= 3.1)
|
||||
;
|
||||
|
||||
|
@ -583,6 +570,7 @@ sequence by pid with maxspan=2h
|
|||
// Pipes
|
||||
//
|
||||
|
||||
|
||||
security where event_id == 4624
|
||||
| tail 10
|
||||
;
|
||||
|
@ -595,15 +583,11 @@ process where not (exit_code > -1)
|
|||
| head 10
|
||||
;
|
||||
|
||||
|
||||
process where not (exit_code > -1) | head 7;
|
||||
|
||||
process where not (-1 < exit_code) | head 7;
|
||||
|
||||
process where true | filter true;
|
||||
|
||||
process where 1==1 | filter abc == def;
|
||||
|
||||
process where 1==1 | filter abc == def and 1 != 2;
|
||||
|
||||
file where true
|
||||
| tail 3;
|
||||
|
@ -702,19 +686,4 @@ process where fake_field != "*"
|
|||
| head 4;
|
||||
|
||||
process where not (fake_field == "*")
|
||||
| head 4;
|
||||
|
||||
process where 'net.EXE' == original_file_name
|
||||
| filter process_name="net*.exe"
|
||||
;
|
||||
|
||||
process where process_name == original_file_name
|
||||
| filter process_name='net*.exe'
|
||||
;
|
||||
|
||||
process where original_file_name == process_name
|
||||
| filter length(original_file_name) > 0
|
||||
;
|
||||
|
||||
process where process_name != original_file_name
|
||||
| filter length(original_file_name) > 0;
|
||||
| head 4;
|
|
@ -42,9 +42,21 @@ process where 1==1 | count user_name, unique_pid, concat(field2,a,bc);
|
|||
|
||||
process where 1==1 | unique user_name, concat(field2,a,bc), field2;
|
||||
|
||||
|
||||
|
||||
process where true | filter true;
|
||||
|
||||
process where 1==1 | filter abc == def;
|
||||
|
||||
process where 1==1 | filter abc == def and 1 != 2;
|
||||
|
||||
process where 1==1 | count process_name | filter percent > 0.5;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
any where true | unique a, b, c | sort a, b, c | count;
|
||||
|
||||
any where true | unique a, b, c | sort a, b, c | count;
|
||||
|
@ -110,6 +122,32 @@ sequence by pid with maxspan=1.0075d
|
|||
* https://raw.githubusercontent.com/endgameinc/eql/master/eql/etc/test_queries.toml
|
||||
*/
|
||||
|
||||
process where serial_event_id <= 8 and serial_event_id > 7
|
||||
| filter serial_event_id == 8;
|
||||
|
||||
process where true
|
||||
| filter serial_event_id <= 10
|
||||
| filter serial_event_id > 6;
|
||||
|
||||
process where true
|
||||
| filter serial_event_id <= 10
|
||||
| filter serial_event_id > 6
|
||||
| head 2;
|
||||
|
||||
process where true
|
||||
| head 1000
|
||||
| filter serial_event_id <= 10
|
||||
| filter serial_event_id > 6
|
||||
| tail 2
|
||||
;
|
||||
|
||||
|
||||
|
||||
|
||||
process where process_name == "VMACTHLP.exe" and unique_pid == 12 | filter true;
|
||||
|
||||
|
||||
|
||||
process where process_name in ("python.exe", "SMSS.exe", "explorer.exe")
|
||||
| unique process_name;
|
||||
|
||||
|
@ -259,6 +297,24 @@ sequence
|
|||
| tail 1
|
||||
;
|
||||
|
||||
|
||||
process where 'net.EXE' == original_file_name
|
||||
| filter process_name="net*.exe"
|
||||
;
|
||||
|
||||
process where process_name == original_file_name
|
||||
| filter process_name='net*.exe'
|
||||
;
|
||||
|
||||
process where original_file_name == process_name
|
||||
| filter length(original_file_name) > 0
|
||||
;
|
||||
|
||||
|
||||
|
||||
process where process_name != original_file_name
|
||||
| filter length(original_file_name) > 0;
|
||||
|
||||
any where process_name == "svchost.exe"
|
||||
| unique_count event_type_full, process_name;
|
||||
|
||||
|
|
|
@ -565,15 +565,3 @@ process where true | tail 10 | head 7
|
|||
;
|
||||
"size":7,
|
||||
;
|
||||
|
||||
eventQueryWithFilterPipe
|
||||
process where true | filter serial_event_id <= 4
|
||||
;
|
||||
"range":{"serial_event_id":{"from":null,"to":4,"include_lower":false,"include_upper":true
|
||||
;
|
||||
|
||||
eventQueryWithFilterAndFilterPipe
|
||||
process where serial_event_id > 1 | filter serial_event_id <= 4
|
||||
;
|
||||
"range":{"serial_event_id":{"from":1,"to":4,"include_lower":false,"include_upper":true
|
||||
;
|
||||
|
|
Loading…
Reference in New Issue