EQL: Add Head/Tail pipe support (#58536)

Introduce pipe support, in particular head and tail
(which can also be chained).

(cherry picked from commit 4521ca3367147d4d6531cf0ab975d8d705f400ea)
(cherry picked from commit d6731d659d012c96b19879d13cfc9e1eaf4745a4)
This commit is contained in:
Costin Leau 2020-06-27 09:08:03 +03:00 committed by Costin Leau
parent 08e75abd4e
commit 3c81b91474
64 changed files with 1760 additions and 811 deletions

View File

@ -554,7 +554,12 @@ the events in ascending, lexicographic order.
},
"sort": [
1607252647000
]
],
"fields": {
"@timestamp": [
"1607252647000"
]
}
},
{
"_index": "my_index",
@ -585,7 +590,12 @@ the events in ascending, lexicographic order.
},
"sort": [
1607339228000
]
],
"fields": {
"@timestamp": [
"1607339228000"
]
}
}
]
}

View File

@ -100,6 +100,11 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"fields": {
"@timestamp": [
"1607252645000"
]
},
"sort": [
1607252645000
]
@ -124,6 +129,11 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"fields": {
"@timestamp": [
"1607339167000"
]
},
"sort": [
1607339167000
]
@ -171,6 +181,7 @@ GET /sec_logs/_eql/search
"""
}
----
// TEST[s/search/search\?filter_path\=\-\*\.sequences\.events\.\*fields/]
The API returns the following response. Matching events in
the `hits.sequences.events` property are sorted by
@ -219,11 +230,6 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"fields": {
"@timestamp": [
"1607339228000"
]
},
"sort": [
1607339228000
]
@ -248,11 +254,6 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
"path": "C:\\Windows\\System32\\regsvr32.exe"
}
},
"fields": {
"@timestamp": [
"1607339229000"
]
},
"sort": [
1607339229000
]
@ -297,6 +298,7 @@ GET /sec_logs/_eql/search
"""
}
----
// TEST[s/search/search\?filter_path\=\-\*\.sequences\.events\.\*fields/]
The API returns the following response. The `hits.sequences.join_keys` property
contains the shared `agent.id` value for each matching event.
@ -346,11 +348,6 @@ contains the shared `agent.id` value for each matching event.
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"fields": {
"@timestamp": [
"1607339228000"
]
},
"sort": [
1607339228000
]
@ -375,11 +372,6 @@ contains the shared `agent.id` value for each matching event.
"path": "C:\\Windows\\System32\\regsvr32.exe"
}
},
"fields": {
"@timestamp": [
"1607339229000"
]
},
"sort": [
1607339229000
]
@ -515,11 +507,16 @@ tiebreaker for events with the same timestamp.
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"fields": {
"@timestamp": [
"1607252645000"
]
},
"sort": [
1607252645000, <1>
"edwCRnyD" <2>
]
},
]
},
{
"_index": "sec_logs",
"_type": "_doc",
@ -540,6 +537,11 @@ tiebreaker for events with the same timestamp.
"path": "C:\\Windows\\System32\\cmd.exe"
}
},
"fields": {
"@timestamp": [
"1607339167000"
]
},
"sort": [
1607339167000, <1>
"cMyt5SZ2" <2>

View File

@ -1,7 +1,6 @@
# This file is populated with additional EQL queries that were not present in the original EQL python implementation
# test_queries.toml file in order to keep the original unchanges and easier to sync with the EQL reference implementation tests.
[[queries]]
expected_event_ids = [95]
query = '''

View File

@ -12,6 +12,9 @@
# query = 'process where serial_event_id = 1'
# expected_event_ids = [1]
[[queries]]
expected_event_ids = []
query = 'process where missing_field != null'
# fails because of string check - msbuild does not match MSBuild
[[queries]]
@ -20,19 +23,10 @@ sequence by unique_pid [process where opcode=1 and process_name == 'msbuild.exe'
expected_event_ids = [75273, 75304]
description = "test that process sequences are working correctly"
[[queries]]
query = 'process where true | head 6'
expected_event_ids = [1, 2, 3, 4, 5, 6]
[[queries]]
expected_event_ids = []
query = 'process where missing_field != null'
[[queries]]
expected_event_ids = [1, 2, 3, 4, 5]
query = 'process where bad_field == null | head 5'
[[queries]]
tags = ["comparisons", "pipes"]
query = '''
@ -65,25 +59,6 @@ process where true
'''
expected_event_ids = [9, 10]
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = []
query = '''
process where not (exit_code > -1)
and serial_event_id in (58, 64, 69, 74, 80, 85, 90, 93, 94)
| head 10
'''
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [1, 2, 3, 4, 5, 6, 7]
query = 'process where not (exit_code > -1) | head 7'
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [1, 2, 3, 4, 5, 6, 7]
query = 'process where not (-1 < exit_code) | head 7'
[[queries]]
query = 'process where (serial_event_id<9 and serial_event_id >= 7) or (opcode == pid)'
expected_event_ids = [7, 8]
@ -182,12 +157,6 @@ process where opcode=1 and process_name == "smss.exe"
'''
expected_event_ids = [78]
[[queries]]
query = '''
file where true
| tail 3'''
expected_event_ids = [92, 95, 96]
[[queries]]
query = '''
process where opcode in (1,3) and process_name in (parent_process_name, "SYSTEM")
@ -256,17 +225,6 @@ sequence
expected_event_ids = [1, 2, 2, 3]
[[queries]]
query = '''
sequence
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2'''
expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75]
[[queries]]
query = '''
sequence with maxspan=1d
@ -322,14 +280,6 @@ sequence with maxspan=0.5s
| tail 2'''
expected_event_ids = []
[[queries]]
query = '''
sequence
[file where opcode=0] by unique_pid
[file where opcode=0] by unique_pid
| head 1'''
expected_event_ids = [55, 61]
[[queries]]
query = '''
sequence
@ -466,6 +416,59 @@ query = '''
registry where length(bad_field) > 0
'''
[[queries]]
expected_event_ids = [1, 2, 3, 4, 5]
query = 'process where bad_field == null | head 5'
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [58, 64, 69, 74, 80, 85, 90, 93, 94, 75303]
query = 'process where exit_code >= 0'
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [58, 64, 69, 74, 80, 85, 90, 93, 94, 75303]
query = 'process where 0 <= exit_code'
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [58, 64, 69, 74, 80, 85, 90, 93, 94, 75303]
query = 'process where exit_code <= 0'
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [58, 64, 69, 74, 80, 85, 90, 93, 94, 75303]
query = 'process where exit_code < 1'
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [58, 64, 69, 74, 80, 85, 90, 93, 94, 75303]
query = 'process where exit_code > -1'
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [58, 64, 69, 74, 80, 85, 90, 93, 94, 75303]
query = 'process where -1 < exit_code'
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = []
query = '''
process where not (exit_code > -1)
and serial_event_id in (58, 64, 69, 74, 80, 85, 90, 93, 94)
| head 10
'''
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [1, 2, 3, 4, 5, 6, 7]
query = 'process where not (exit_code > -1) | head 7'
[[queries]]
note = "check that comparisons against null values return false"
expected_event_ids = [1, 2, 3, 4, 5, 6, 7]
query = 'process where not (-1 < exit_code) | head 7'
[[queries]]
expected_event_ids = [1, 55, 57, 63, 75304]
query = '''
@ -539,16 +542,6 @@ expected_event_ids = [55, 95]
query = 'process where event of [process where process_name = "python.exe" ]'
expected_event_ids = [48, 50, 51, 54, 93]
[[queries]]
query = '''
sequence by user_name
[file where opcode=0] by file_path
[process where opcode=1] by process_path
[process where opcode=2] by process_path
[file where opcode=2] by file_path
| tail 1'''
expected_event_ids = [88, 89, 90, 91]
[[queries]]
query = '''
sequence by user_name
@ -567,16 +560,6 @@ until [process where opcode=5] by ppid,process_path
| head 2'''
expected_event_ids = [55, 59, 61, 65]
[[queries]]
query = '''
sequence by pid
[file where opcode=0] by file_path
[process where opcode=1] by process_path
[process where opcode=2] by process_path
[file where opcode=2] by file_path
| tail 1'''
expected_event_ids = []
[[queries]]
query = '''
join by user_name

View File

@ -9,25 +9,35 @@ package org.elasticsearch.xpack.eql.execution.assembler;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import java.util.List;
public class Criterion {
public class Criterion implements QueryRequest {
private final SearchSourceBuilder searchSource;
private final List<HitExtractor> keyExtractors;
private final HitExtractor timestampExtractor;
private final HitExtractor tiebreakerExtractor;
// search after markers
private Object[] startMarker;
private Object[] stopMarker;
//TODO: should accept QueryRequest instead of another SearchSourceBuilder
public Criterion(SearchSourceBuilder searchSource, List<HitExtractor> searchAfterExractors, HitExtractor timestampExtractor,
HitExtractor tiebreakerExtractor) {
this.searchSource = searchSource;
this.keyExtractors = searchAfterExractors;
this.timestampExtractor = timestampExtractor;
this.tiebreakerExtractor = tiebreakerExtractor;
this.startMarker = null;
this.stopMarker = null;
}
@Override
public SearchSourceBuilder searchSource() {
return searchSource;
}
@ -64,8 +74,34 @@ public class Criterion {
throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb);
}
public void fromMarkers(Object[] markers) {
// TODO: this is likely to be rewritten afterwards
searchSource.searchAfter(markers);
public Object[] startMarker() {
return startMarker;
}
public Object[] stopMarker() {
return stopMarker;
}
private Object[] marker(SearchHit hit) {
long timestamp = timestamp(hit);
Object tiebreaker = null;
if (tiebreakerExtractor() != null) {
tiebreaker = tiebreaker(hit);
}
return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp };
}
public void startMarker(SearchHit hit) {
startMarker = marker(hit);
}
public void stopMarker(SearchHit hit) {
stopMarker = marker(hit);
}
public Criterion useMarker(Object[] marker) {
searchSource.searchAfter(marker);
return this;
}
}

View File

@ -7,9 +7,9 @@
package org.elasticsearch.xpack.eql.execution.assembler;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Payload;
public interface Executable {
void execute(ActionListener<Results> resultsListener);
void execute(ActionListener<Payload> resultsListener);
}

View File

@ -6,59 +6,44 @@
package org.elasticsearch.xpack.eql.execution.assembler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.listener.RuntimeUtils;
import org.elasticsearch.xpack.eql.execution.payload.Payload;
import org.elasticsearch.xpack.eql.execution.payload.SearchResponsePayload;
import org.elasticsearch.xpack.eql.execution.search.SourceGenerator;
import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor;
import org.elasticsearch.xpack.eql.execution.search.extractor.TimestampFieldHitExtractor;
import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.eql.querydsl.container.FieldExtractorRegistry;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.util.Check;
import org.elasticsearch.xpack.ql.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.xpack.eql.execution.listener.RuntimeUtils.prepareRequest;
public class ExecutionManager implements QueryClient {
private static final Logger log = LogManager.getLogger(ExecutionManager.class);
public class ExecutionManager {
private final EqlSession session;
private final EqlConfiguration cfg;
private final Client client;
private final TimeValue keepAlive;
private final String indices;
public ExecutionManager(EqlSession eqlSession) {
this.session = eqlSession;
this.cfg = eqlSession.configuration();
this.client = eqlSession.client();
this.keepAlive = cfg.requestTimeout();
this.indices = cfg.indexAsWildcard();
}
public Executable assemble(List<List<Attribute>> listOfKeys, List<PhysicalPlan> plans, Attribute timestamp, Attribute tiebreaker) {
public Executable assemble(List<List<Attribute>> listOfKeys,
List<PhysicalPlan> plans,
Attribute timestamp,
Attribute tiebreaker,
OrderDirection direction,
Limit limit) {
FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry();
List<Criterion> criteria = new ArrayList<>(plans.size() - 1);
@ -75,12 +60,10 @@ public class ExecutionManager implements QueryClient {
// search query
// TODO: this could be generalized into an exec only query
Check.isTrue(query instanceof EsQueryExec, "Expected a query but got [{}]", query.getClass());
QueryContainer container = ((EsQueryExec) query).queryContainer();
SearchSourceBuilder searchSource = SourceGenerator.sourceBuilder(container, cfg.filter(), cfg.size());
criteria.add(new Criterion(searchSource, keyExtractors, tsExtractor, tbExtractor));
QueryRequest request = ((EsQueryExec) query).queryRequest(session);
criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor));
}
return new SequenceRuntime(criteria, this);
return new SequenceRuntime(criteria, new BasicQueryClient(session), direction == OrderDirection.DESC, limit);
}
private HitExtractor timestampExtractor(HitExtractor hitExtractor) {
@ -102,20 +85,4 @@ public class ExecutionManager implements QueryClient {
}
return extractors;
}
@Override
public void query(SearchSourceBuilder searchSource, ActionListener<Payload<SearchHit>> listener) {
// set query timeout
searchSource.timeout(cfg.requestTimeout());
if (log.isTraceEnabled()) {
log.trace("About to execute query {} on {}", StringUtils.toString(searchSource), indices);
}
if (cfg.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
SearchRequest search = prepareRequest(client, searchSource, false, indices);
client.search(search, wrap(sr -> listener.onResponse(new SearchResponsePayload(sr)), listener::onFailure));
}
}

View File

@ -8,6 +8,8 @@ package org.elasticsearch.xpack.eql.execution.assembler;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
import java.util.Objects;
class KeyAndOrdinal {
final SequenceKey key;
final long timestamp;
@ -18,4 +20,30 @@ class KeyAndOrdinal {
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
}
@Override
public int hashCode() {
return Objects.hash(key, timestamp, tiebreaker);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
KeyAndOrdinal other = (KeyAndOrdinal) obj;
return Objects.equals(key, other.key)
&& Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker);
}
@Override
public String toString() {
return key + "[" + timestamp + "][" + (tiebreaker != null ? Objects.toString(tiebreaker) : "") + "]";
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.assembler;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
import org.elasticsearch.xpack.eql.session.Results.Type;
import java.util.ArrayList;
import java.util.List;
class SequencePayload extends AbstractPayload {
private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> sequences;
SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook, Object[] nextKeys) {
super(timedOut, timeTook, nextKeys);
sequences = new ArrayList<>(seq.size());
for (Sequence s : seq) {
sequences.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits()));
}
}
@Override
public Type resultType() {
return Type.SEQUENCE;
}
@SuppressWarnings("unchecked")
@Override
public <V> List<V> values() {
return (List<V>) sequences;
}
}

View File

@ -6,17 +6,22 @@
package org.elasticsearch.xpack.eql.execution.assembler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.payload.Payload;
import org.elasticsearch.xpack.eql.execution.payload.ReversePayload;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.QueryClient;
import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.elasticsearch.action.ActionListener.wrap;
@ -26,105 +31,101 @@ import static org.elasticsearch.action.ActionListener.wrap;
*/
class SequenceRuntime implements Executable {
private final Logger log = LogManager.getLogger(SequenceRuntime.class);
private final List<Criterion> criteria;
// NB: just like in a list, this represents the total number of stages yet counting starts at 0
private final int numberOfStages;
private final SequenceStateMachine stateMachine;
private final QueryClient queryClient;
private final boolean descending;
private long startTime;
SequenceRuntime(List<Criterion> criteria, QueryClient queryClient) {
SequenceRuntime(List<Criterion> criteria, QueryClient queryClient, boolean descending, Limit limit) {
this.criteria = criteria;
this.numberOfStages = criteria.size();
this.queryClient = queryClient;
boolean hasTiebreaker = criteria.get(0).tiebreakerExtractor() != null;
this.stateMachine = new SequenceStateMachine(numberOfStages, hasTiebreaker);
this.stateMachine = new SequenceStateMachine(numberOfStages, hasTiebreaker, limit);
this.descending = descending;
}
@Override
public void execute(ActionListener<Results> resultsListener) {
public void execute(ActionListener<Payload> listener) {
startTime = System.currentTimeMillis();
startSequencing(resultsListener);
log.info("Starting sequencing");
queryStage(0, listener);
}
private void startSequencing(ActionListener<Results> resultsListener) {
Criterion firstStage = criteria.get(0);
queryClient.query(firstStage.searchSource(), wrap(payload -> {
// 1. execute last stage (find keys)
startTracking(payload, resultsListener);
// 2. go descending through the rest of the stages, while adjusting the query
inspectStage(1, resultsListener);
}, resultsListener::onFailure));
}
private void startTracking(Payload<SearchHit> payload, ActionListener<Results> resultsListener) {
Criterion lastCriterion = criteria.get(0);
List<SearchHit> hits = payload.values();
// nothing matches the first query, bail out early
if (hits.isEmpty()) {
resultsListener.onResponse(assembleResults());
return;
}
long tMin = Long.MAX_VALUE;
long tMax = Long.MIN_VALUE;
Comparable<Object> bMin = null;
// we could have extracted that in the hit loop but that if would have been evaluated
// for every document
if (hits.isEmpty() == false) {
tMin = lastCriterion.timestamp(hits.get(0));
tMax = lastCriterion.timestamp(hits.get(hits.size() - 1));
if (lastCriterion.tiebreakerExtractor() != null) {
bMin = lastCriterion.tiebreaker(hits.get(0));
}
}
for (SearchHit hit : hits) {
KeyAndOrdinal ko = findKey(hit, lastCriterion);
Sequence seq = new Sequence(ko.key, numberOfStages, ko.timestamp, ko.tiebreaker, hit);
stateMachine.trackSequence(seq, tMin, tMax);
}
stateMachine.setTimestampMarker(0, tMin);
if (bMin != null) {
stateMachine.setTiebreakerMarker(0, bMin);
}
}
private void inspectStage(int stage, ActionListener<Results> resultsListener) {
private void queryStage(int stage, ActionListener<Payload> listener) {
// sequencing is done, return results
if (stage == numberOfStages) {
resultsListener.onResponse(assembleResults());
if (hasFinished(stage)) {
listener.onResponse(sequencePayload());
return;
}
// else continue finding matches
Criterion currentCriterion = criteria.get(stage);
// narrow by the previous stage timestamp marker
currentCriterion.fromMarkers(stateMachine.getMarkers(stage - 1));
if (stage > 0) {
// FIXME: revisit this during pagination since the second criterion need to be limited to the range of the first one
// narrow by the previous stage timestamp marker
Criterion previous = criteria.get(stage - 1);
// if DESC, flip the markers (the stop becomes the start due to the reverse order), otherwise keep it accordingly
Object[] marker = descending && stage == 1 ? previous.stopMarker() : previous.startMarker();
currentCriterion.useMarker(marker);
}
queryClient.query(currentCriterion.searchSource(), wrap(payload -> {
findMatches(stage, payload);
inspectStage(stage + 1, resultsListener);
}, resultsListener::onFailure));
log.info("Querying stage {}", stage);
queryClient.query(currentCriterion, wrap(payload -> {
List<SearchHit> hits = payload.values();
// nothing matches the query -> bail out
// FIXME: needs to be changed when doing pagination
if (hits.isEmpty()) {
listener.onResponse(sequencePayload());
return;
}
findMatches(stage, hits);
queryStage(stage + 1, listener);
}, listener::onFailure));
}
private void findMatches(int currentStage, Payload<SearchHit> payload) {
Criterion currentCriterion = criteria.get(currentStage);
List<SearchHit> hits = payload.values();
// hits are guaranteed to be non-empty
private void findMatches(int currentStage, List<SearchHit> hits) {
// update criterion
Criterion criterion = criteria.get(currentStage);
criterion.startMarker(hits.get(0));
criterion.stopMarker(hits.get(hits.size() - 1));
// break the results per key
for (SearchHit hit : hits) {
KeyAndOrdinal ko = findKey(hit, currentCriterion);
stateMachine.match(currentStage, ko.key, ko.timestamp, ko.tiebreaker, hit);
// when dealing with descending order, queries outside the base are ASC (search_before)
// so look at the data in reverse (that is DESC)
for (Iterator<SearchHit> it = descending ? new ReversedIterator<>(hits) : hits.iterator(); it.hasNext();) {
SearchHit hit = it.next();
KeyAndOrdinal ko = key(hit, criterion);
if (currentStage == 0) {
Sequence seq = new Sequence(ko.key, numberOfStages, ko.timestamp, ko.tiebreaker, hit);
long tStart = (long) criterion.startMarker()[0];
long tStop = (long) criterion.stopMarker()[0];
stateMachine.trackSequence(seq, tStart, tStop);
} else {
stateMachine.match(currentStage, ko.key, ko.timestamp, ko.tiebreaker, hit);
// early skip in case of reaching the limit
// check the last stage to avoid calling the state machine in other stages
if (stateMachine.reachedLimit()) {
return;
}
}
}
}
private KeyAndOrdinal findKey(SearchHit hit, Criterion criterion) {
private KeyAndOrdinal key(SearchHit hit, Criterion criterion) {
List<HitExtractor> keyExtractors = criterion.keyExtractors();
SequenceKey key;
@ -141,14 +142,14 @@ class SequenceRuntime implements Executable {
return new KeyAndOrdinal(key, criterion.timestamp(hit), criterion.tiebreaker(hit));
}
private Results assembleResults() {
List<Sequence> done = stateMachine.completeSequences();
List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> response = new ArrayList<>(done.size());
for (Sequence s : done) {
response.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits()));
}
private Payload sequencePayload() {
List<Sequence> completed = stateMachine.completeSequences();
TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
return Results.fromSequences(tookTime, response);
SequencePayload payload = new SequencePayload(completed, false, tookTime, null);
return descending ? new ReversePayload(payload) : payload;
}
private boolean hasFinished(int stage) {
return stage == numberOfStages;
}
}

View File

@ -7,18 +7,15 @@
package org.elasticsearch.xpack.eql.execution.payload;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.session.Payload;
import java.util.List;
public abstract class AbstractPayload implements Payload {
public class SequencePayload implements Payload<Sequence> {
private final boolean timedOut;
private final TimeValue timeTook;
private final Object[] nextKeys;
private final List<Sequence> seq;
private boolean timedOut;
private TimeValue timeTook;
private Object[] nextKeys;
public SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook, Object[] nextKeys) {
this.seq = seq;
protected AbstractPayload(boolean timedOut, TimeValue timeTook, Object[] nextKeys) {
this.timedOut = timedOut;
this.timeTook = timeTook;
this.nextKeys = nextKeys;
@ -26,7 +23,7 @@ public class SequencePayload implements Payload<Sequence> {
@Override
public boolean timedOut() {
return false;
return timedOut;
}
@Override
@ -38,9 +35,4 @@ public class SequencePayload implements Payload<Sequence> {
public Object[] nextKeys() {
return nextKeys;
}
@Override
public List<Sequence> values() {
return seq;
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.payload;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Results.Type;
import java.util.Collections;
import java.util.List;
public class ReversePayload implements Payload {
private final Payload delegate;
public ReversePayload(Payload delegate) {
this.delegate = delegate;
Collections.reverse(delegate.values());
}
@Override
public Type resultType() {
return delegate.resultType();
}
@Override
public boolean timedOut() {
return delegate.timedOut();
}
@Override
public TimeValue timeTook() {
return delegate.timeTook();
}
@Override
public Object[] nextKeys() {
return delegate.nextKeys();
}
@Override
public <V> List<V> values() {
return delegate.values();
}
}

View File

@ -7,37 +7,29 @@
package org.elasticsearch.xpack.eql.execution.payload;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.session.Results.Type;
import java.util.Arrays;
import java.util.List;
public class SearchResponsePayload implements Payload<SearchHit> {
public class SearchResponsePayload extends AbstractPayload {
private final SearchResponse response;
private final List<SearchHit> hits;
public SearchResponsePayload(SearchResponse response) {
this.response = response;
super(response.isTimedOut(), response.getTook(), null);
hits = Arrays.asList(response.getHits().getHits());
}
@Override
public boolean timedOut() {
return response.isTimedOut();
public Type resultType() {
return Type.SEARCH_HIT;
}
@SuppressWarnings("unchecked")
@Override
public TimeValue timeTook() {
return response.getTook();
}
@Override
public Object[] nextKeys() {
throw new UnsupportedOperationException();
}
@Override
public List<SearchHit> values() {
return Arrays.asList(response.getHits().getHits());
public <V> List<V> values() {
return (List<V>) hits;
}
}

View File

@ -4,36 +4,27 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.listener;
package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.execution.payload.SearchResponsePayload;
import org.elasticsearch.xpack.eql.session.Payload;
import java.util.Arrays;
import java.util.List;
import static org.elasticsearch.xpack.eql.execution.listener.RuntimeUtils.logSearchResponse;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.logSearchResponse;
public class BasicListener implements ActionListener<SearchResponse> {
private static final Logger log = LogManager.getLogger(BasicListener.class);
private static final Logger log = RuntimeUtils.QUERY_LOG;
private final ActionListener<Results> listener;
private final SearchRequest request;
public BasicListener(ActionListener<Results> listener,
SearchRequest request) {
private final ActionListener<Payload> listener;
public BasicListener(ActionListener<Payload> listener) {
this.listener = listener;
this.request = request;
}
@Override
@ -46,17 +37,15 @@ public class BasicListener implements ActionListener<SearchResponse> {
handleResponse(response, listener);
}
} catch (Exception ex) {
listener.onFailure(ex);
onFailure(ex);
}
}
private void handleResponse(SearchResponse response, ActionListener<Results> listener) {
private void handleResponse(SearchResponse response, ActionListener<Payload> listener) {
if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}
List<SearchHit> results = Arrays.asList(response.getHits().getHits());
listener.onResponse(Results.fromHits(response.getTook(), results));
listener.onResponse(new SearchResponsePayload(response));
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.ql.util.StringUtils;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest;
public class BasicQueryClient implements QueryClient {
private static final Logger log = RuntimeUtils.QUERY_LOG;
private final EqlConfiguration cfg;
private final Client client;
private final String indices;
public BasicQueryClient(EqlSession eqlSession) {
this.cfg = eqlSession.configuration();
this.client = eqlSession.client();
this.indices = cfg.indexAsWildcard();
}
@Override
public void query(QueryRequest request, ActionListener<Payload> listener) {
SearchSourceBuilder searchSource = request.searchSource();
// set query timeout
searchSource.timeout(cfg.requestTimeout());
if (log.isTraceEnabled()) {
log.trace("About to execute query {} on {}", StringUtils.toString(searchSource), indices);
}
if (cfg.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
SearchRequest search = prepareRequest(client, searchSource, false, indices);
client.search(search, new BasicListener(listener));
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.xpack.eql.util.MathUtils;
import java.util.Objects;
public class Limit {
public final int limit;
public final int offset;
public final int total;
public Limit(int limit, int offset) {
this.limit = limit;
this.offset = offset;
this.total = MathUtils.abs(limit) + offset;
}
public int absLimit() {
return MathUtils.abs(limit);
}
@Override
public int hashCode() {
return Objects.hash(limit, offset);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Limit other = (Limit) obj;
return Objects.equals(limit, other.limit) && Objects.equals(offset, other.offset);
}
}

View File

@ -1,79 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.eql.execution.listener.BasicListener;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.index.IndexResolver;
import org.elasticsearch.xpack.ql.util.StringUtils;
public class Querier {
private static final Logger log = LogManager.getLogger(Querier.class);
private final EqlConfiguration cfg;
private final Client client;
private final TimeValue keepAlive;
private final QueryBuilder filter;
public Querier(EqlSession eqlSession) {
this.cfg = eqlSession.configuration();
this.client = eqlSession.client();
this.keepAlive = cfg.requestTimeout();
this.filter = cfg.filter();
}
public void query(QueryContainer container, String index, ActionListener<Results> listener) {
// prepare the request
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(container, filter, cfg.size());
// set query timeout
sourceBuilder.timeout(cfg.requestTimeout());
if (log.isTraceEnabled()) {
log.trace("About to execute query {} on {}", StringUtils.toString(sourceBuilder), index);
}
if (cfg.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
SearchRequest search = prepareRequest(client, sourceBuilder, cfg.requestTimeout(), false,
Strings.commaDelimitedListToStringArray(index));
ActionListener<SearchResponse> l = new BasicListener(listener, search);
client.search(search, l);
}
public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
String... indices) {
return client.prepareSearch(indices)
// always track total hits accurately
.setTrackTotalHits(true)
.setAllowPartialSearchResults(false)
.setSource(source)
.setTimeout(timeout)
.setIndicesOptions(
includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS)
.request();
}
}

View File

@ -4,17 +4,15 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.assembler;
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.execution.payload.Payload;
import org.elasticsearch.xpack.eql.session.Payload;
/**
* Infrastructure interface used to decouple listener consumers from the stateful classes holding client-references and co.
*/
interface QueryClient {
public interface QueryClient {
void query(SearchSourceBuilder searchSource, ActionListener<Payload<SearchHit>> listener);
void query(QueryRequest request, ActionListener<Payload> listener);
}

View File

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.search.builder.SearchSourceBuilder;
public interface QueryRequest {
SearchSourceBuilder searchSource();
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.execution.payload.ReversePayload;
import org.elasticsearch.xpack.eql.session.Payload;
public class ReverseListener implements ActionListener<Payload> {
private final ActionListener<Payload> delegate;
public ReverseListener(ActionListener<Payload> delegate) {
this.delegate = delegate;
}
@Override
public void onResponse(Payload response) {
delegate.onResponse(new ReversePayload(response));
}
@Override
public void onFailure(Exception e) {
delegate.onFailure(e);
}
}

View File

@ -4,8 +4,9 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.listener;
package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@ -33,6 +34,8 @@ import java.util.Set;
public final class RuntimeUtils {
static final Logger QUERY_LOG = LogManager.getLogger(QueryClient.class);
private RuntimeUtils() {}
static void logSearchResponse(SearchResponse response, Logger logger) {
@ -102,4 +105,4 @@ public final class RuntimeUtils {
includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS)
.request();
}
}
}

View File

@ -62,6 +62,12 @@ public abstract class SourceGenerator {
// set fetch size
if (size != null) {
int sz = size;
if (container.limit() != null) {
Limit limit = container.limit();
// negative limit means DESC order but since the results are ordered ASC
// pagination becomes mute (since all the data needs to be returned)
sz = limit.limit > 0 ? Math.min(limit.total, size) : limit.total;
}
if (source.size() == -1) {
source.size(sz);

View File

@ -109,6 +109,7 @@ public class SequenceFrame {
public void trim(int position) {
sequences.subList(0, position).clear();
// update min time
if (sequences.isEmpty() == false) {
min = sequences.get(0).currentTimestamp();

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.eql.execution.sequence;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import java.util.LinkedList;
import java.util.List;
@ -27,16 +28,20 @@ public class SequenceStateMachine {
/** this ignores the key */
private final long[] timestampMarkers;
private final Comparable<Object>[] tiebreakerMarkers;
private final Comparable<?>[] tiebreakerMarkers;
private final boolean hasTieBreaker;
private final int completionStage;
/** list of completed sequences - separate to avoid polluting the other stages */
private final List<Sequence> completed;
private int offset = 0;
private int limit = -1;
private boolean limitReached = false;
@SuppressWarnings({ "rawtypes", "unchecked" })
public SequenceStateMachine(int stages, boolean hasTiebreaker) {
@SuppressWarnings("rawtypes")
public SequenceStateMachine(int stages, boolean hasTiebreaker, Limit limit) {
this.completionStage = stages - 1;
this.stageToKeys = new StageToKeys(completionStage);
@ -46,6 +51,12 @@ public class SequenceStateMachine {
this.completed = new LinkedList<>();
this.hasTieBreaker = hasTiebreaker;
// limit && offset
if (limit != null) {
this.offset = limit.offset;
this.limit = limit.absLimit();
}
}
public List<Sequence> completeSequences() {
@ -70,16 +81,16 @@ public class SequenceStateMachine {
public Object[] getMarkers(int stage) {
long ts = timestampMarkers[stage];
Comparable<Object> tb = tiebreakerMarkers[stage];
Comparable<?> tb = tiebreakerMarkers[stage];
return hasTieBreaker ? new Object[] { ts, tb } : new Object[] { ts };
}
public void trackSequence(Sequence sequence, long tMin, long tMax) {
public void trackSequence(Sequence sequence, long tStart, long tStop) {
SequenceKey key = sequence.key();
stageToKeys.keys(0).add(key);
SequenceFrame frame = keyToSequences.frame(0, key);
frame.setTimeFrame(tMin, tMax);
frame.setTimeFrame(tStart, tStop);
frame.add(sequence);
}
@ -94,14 +105,14 @@ public class SequenceStateMachine {
if (frame == null || frame.isEmpty()) {
return false;
}
// pick the sequence with the highest timestamp lower than current match timestamp
Tuple<Sequence, Integer> before = frame.before(timestamp, tiebreaker);
if (before == null) {
// pick the sequence with the highest (for ASC) / lowest (for DESC) timestamp lower than current match timestamp
Tuple<Sequence, Integer> neighbour = frame.before(timestamp, tiebreaker);
if (neighbour == null) {
return false;
}
Sequence sequence = before.v1();
Sequence sequence = neighbour.v1();
// eliminate the match and all previous values from the frame
frame.trim(before.v2() + 1);
frame.trim(neighbour.v2() + 1);
// update sequence
sequence.putMatch(stage, hit, timestamp, tiebreaker);
@ -112,11 +123,24 @@ public class SequenceStateMachine {
// bump the stages
if (stage == completionStage) {
completed.add(sequence);
// add the sequence only if needed
if (offset > 0) {
offset--;
} else {
if (limit < 0 || (limit > 0 && completed.size() < limit)) {
completed.add(sequence);
// update the bool lazily
limitReached = limit > 0 && completed.size() == limit;
}
}
} else {
stageToKeys.keys(stage).add(key);
keyToSequences.frame(stage, key).add(sequence);
}
return true;
}
public boolean reachedLimit() {
return limitReached;
}
}

View File

@ -6,12 +6,19 @@
package org.elasticsearch.xpack.eql.optimizer;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.plan.logical.Join;
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.util.MathUtils;
import org.elasticsearch.xpack.eql.util.StringUtils;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.Literal;
import org.elasticsearch.xpack.ql.expression.Order;
import org.elasticsearch.xpack.ql.expression.Order.NullsPosition;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.expression.predicate.logical.Not;
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNotNull;
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNull;
@ -31,11 +38,19 @@ import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.ReplaceSurrogateFunct
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SetAsOptimized;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.TransformDirection;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.ql.rule.RuleExecutor;
import org.elasticsearch.xpack.ql.type.DataTypes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.util.Collections.singletonList;
public class Optimizer extends RuleExecutor<LogicalPlan> {
@ -45,9 +60,8 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
@Override
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {
Batch substitutions = new Batch("Operator Replacement", Limiter.ONCE,
new ReplaceSurrogateFunction());
Batch substitutions = new Batch("Operator Replacement", Limiter.ONCE, new ReplaceSurrogateFunction());
Batch operators = new Batch("Operator Optimization",
new ConstantFolding(),
// boolean
@ -61,19 +75,25 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
new CombineBinaryComparisons(),
// prune/elimination
new PruneFilters(),
new PruneLiteralsInOrderBy()
);
new PruneLiteralsInOrderBy(),
new CombineLimits());
Batch ordering = new Batch("Implicit Order",
new SortByLimit(),
new PushDownOrderBy());
Batch local = new Batch("Skip Elasticsearch",
new SkipEmptyFilter(),
new SkipEmptyJoin());
new SkipEmptyJoin(),
new SkipQueryOnLimitZero());
Batch label = new Batch("Set as Optimized", Limiter.ONCE,
new SetAsOptimized());
return Arrays.asList(substitutions, operators, local, label);
return Arrays.asList(substitutions, operators, ordering, local, label);
}
private static class ReplaceWildcards extends OptimizerRule<Filter> {
private static boolean isWildcard(Expression expr) {
@ -135,8 +155,8 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
static class PruneFilters extends org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PruneFilters {
@Override
protected LogicalPlan nonMatchingFilter(Filter filter) {
return new LocalRelation(filter.source(), filter.output());
protected LogicalPlan skipPlan(Filter filter) {
return Optimizer.skipPlan(filter);
}
}
@ -154,14 +174,167 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
return plan;
}
}
static class SkipQueryOnLimitZero extends org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SkipQueryOnLimitZero {
@Override
protected LogicalPlan skipPlan(Limit limit) {
return Optimizer.skipPlan(limit);
}
}
private static LogicalPlan skipPlan(UnaryPlan plan) {
return new LocalRelation(plan.source(), plan.output());
}
/**
* Combine tail and head into one limit.
* The rules moves up since the first limit is the one that defines whether it's the head (positive) or
* the tail (negative) limit of the data and the rest simply work in this space.
*/
static final class CombineLimits extends OptimizerRule<LimitWithOffset> {
CombineLimits() {
super(TransformDirection.UP);
}
@Override
protected LogicalPlan rule(LimitWithOffset limit) {
// bail out early
if (limit.child() instanceof LimitWithOffset == false) {
return limit;
}
LimitWithOffset primary = (LimitWithOffset) limit.child();
int primaryLimit = (Integer) primary.limit().fold();
int primaryOffset = primary.offset();
// +1 means ASC, -1 descending and 0 if there are no results
int sign = Integer.signum(primaryLimit);
int secondaryLimit = (Integer) limit.limit().fold();
if (limit.offset() != 0) {
throw new EqlIllegalArgumentException("Limits with different offset not implemented yet");
}
// for the same direction
if (primaryLimit > 0 && secondaryLimit > 0) {
// consider the minimum
primaryLimit = Math.min(primaryLimit, secondaryLimit);
} else if (primaryLimit < 0 && secondaryLimit < 0) {
primaryLimit = Math.max(primaryLimit, secondaryLimit);
} else {
// the secondary limit cannot go beyond the primary - if it does it gets ignored
if (MathUtils.abs(secondaryLimit) < MathUtils.abs(primaryLimit)) {
primaryOffset += MathUtils.abs(primaryLimit + secondaryLimit);
// preserve order
primaryLimit = MathUtils.abs(secondaryLimit) * sign;
}
}
Literal literal = new Literal(primary.limit().source(), primaryLimit, DataTypes.INTEGER);
return new LimitWithOffset(primary.source(), literal, primaryOffset, primary.child());
}
}
/**
* Align the implicit order with the limit (head means ASC or tail means DESC).
*/
static final class SortByLimit extends OptimizerRule<LimitWithOffset> {
@Override
protected LogicalPlan rule(LimitWithOffset limit) {
if (limit.limit().foldable()) {
LogicalPlan child = limit.child();
if (child instanceof OrderBy) {
OrderBy ob = (OrderBy) child;
if (PushDownOrderBy.isDefaultOrderBy(ob)) {
int l = (Integer) limit.limit().fold();
OrderDirection direction = Integer.signum(l) > 0 ? OrderDirection.ASC : OrderDirection.DESC;
ob = new OrderBy(ob.source(), ob.child(), PushDownOrderBy.changeOrderDirection(ob.order(), direction));
limit = new LimitWithOffset(limit.source(), limit.limit(), limit.offset(), ob);
}
}
}
return limit;
}
}
/**
* Push down the OrderBy into the actual queries before translating them.
* There is always an implicit order (timestamp + tiebreaker ascending).
*/
static final class PushDownOrderBy extends OptimizerRule<OrderBy> {
@Override
protected LogicalPlan rule(OrderBy orderBy) {
LogicalPlan plan = orderBy;
if (isDefaultOrderBy(orderBy)) {
LogicalPlan child = orderBy.child();
//
// When dealing with sequences, the matching needs to happen ascending
// hence why the queries will always be ascending
// but if the order is descending, apply that only to the first query
// which is used to discover the window for which matching is being applied.
//
if (child instanceof Join) {
Join join = (Join) child;
List<KeyedFilter> queries = join.queries();
// the main reason ASC is used is the lack of search_before (which is emulated through search_after + ASC)
List<Order> ascendingOrders = changeOrderDirection(orderBy.order(), OrderDirection.ASC);
// preserve the order direction as is (can be DESC) for the base query
List<KeyedFilter> orderedQueries = new ArrayList<>(queries.size());
boolean baseFilter = true;
for (KeyedFilter filter : queries) {
// preserve the order for the base query, everything else needs to be ascending
List<Order> pushedOrder = baseFilter ? orderBy.order() : ascendingOrders;
OrderBy order = new OrderBy(filter.source(), filter.child(), pushedOrder);
orderedQueries.add((KeyedFilter) filter.replaceChildren(singletonList(order)));
baseFilter = false;
}
KeyedFilter until = join.until();
OrderBy order = new OrderBy(until.source(), until.child(), ascendingOrders);
until = (KeyedFilter) until.replaceChildren(singletonList(order));
OrderDirection direction = orderBy.order().get(0).direction();
plan = join.with(orderedQueries, until, direction);
}
}
return plan;
}
private static boolean isDefaultOrderBy(OrderBy orderBy) {
LogicalPlan child = orderBy.child();
// the default order by is the first pipe
// so it has to be on top of a event query or join/sequence
return child instanceof Project || child instanceof Join;
}
private static List<Order> changeOrderDirection(List<Order> orders, Order.OrderDirection direction) {
List<Order> changed = new ArrayList<>(orders.size());
boolean hasChanged = false;
for (Order order : orders) {
if (order.direction() != direction) {
order = new Order(order.source(), order.child(), direction,
direction == OrderDirection.ASC ? NullsPosition.FIRST : NullsPosition.LAST);
hasChanged = true;
}
changed.add(order);
}
return hasChanged ? changed : orders;
}
}
static class SkipEmptyJoin extends OptimizerRule<Join> {
@Override
protected LogicalPlan rule(Join plan) {
// check for empty filters
for (KeyedFilter filter : plan.queries()) {
if (filter.child() instanceof LocalRelation) {
if (filter.anyMatch(LocalRelation.class::isInstance)) {
return new LocalRelation(plan.source(), plan.output(), Results.Type.SEQUENCE);
}
}

View File

@ -134,16 +134,6 @@ public class EqlParser {
this.ruleNames = ruleNames;
}
@Override
public void exitPipe(EqlBaseParser.PipeContext context) {
Token token = context.PIPE().getSymbol();
throw new ParsingException(
"Pipes are not supported",
null,
token.getLine(),
token.getCharPositionInLine());
}
@Override
public void exitProcessCheck(EqlBaseParser.ProcessCheckContext context) {
Token token = context.relationship;

View File

@ -75,7 +75,12 @@ public class ExpressionBuilder extends IdentifierBuilder {
@Override
public List<Attribute> visitJoinKeys(JoinKeysContext ctx) {
return ctx != null ? visitList(ctx.expression(), Attribute.class) : emptyList();
try {
return ctx != null ? visitList(ctx.expression(), Attribute.class) : emptyList();
} catch (ClassCastException ex) {
Source source = source(ctx);
throw new ParsingException(source, "Unsupported join key ", source.text());
}
}
@Override

View File

@ -7,19 +7,25 @@ package org.elasticsearch.xpack.eql.parser;
import org.antlr.v4.runtime.tree.ParseTree;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.BooleanExpressionContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.EventFilterContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.IntegerLiteralContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.JoinContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.JoinKeysContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.JoinTermContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.NumberContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.PipeContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.SequenceContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.SequenceParamsContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.SequenceTermContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.StatementContext;
import org.elasticsearch.xpack.eql.parser.EqlBaseParser.SubqueryContext;
import org.elasticsearch.xpack.eql.plan.logical.Head;
import org.elasticsearch.xpack.eql.plan.logical.Join;
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
import org.elasticsearch.xpack.eql.plan.logical.Tail;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.EmptyAttribute;
@ -27,6 +33,7 @@ import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.Literal;
import org.elasticsearch.xpack.ql.expression.Order;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.ql.expression.predicate.logical.And;
import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.Equals;
@ -38,15 +45,21 @@ import org.elasticsearch.xpack.ql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.CollectionUtils;
import org.elasticsearch.xpack.ql.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
public abstract class LogicalPlanBuilder extends ExpressionBuilder {
private static final Set<String> SUPPORTED_PIPES = Sets.newHashSet("count", "filter", "head", "sort", "tail", "unique", "unique_count");
private final UnresolvedRelation RELATION = new UnresolvedRelation(Source.EMPTY, null, "", false, "");
private final EmptyAttribute UNSPECIFIED_FIELD = new EmptyAttribute(Source.EMPTY);
@ -62,9 +75,34 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
return params.fieldTiebreaker() != null ? new UnresolvedAttribute(Source.EMPTY, params.fieldTiebreaker()) : UNSPECIFIED_FIELD;
}
private OrderDirection defaultDirection() {
return OrderDirection.ASC;
}
@Override
public Object visitStatement(StatementContext ctx) {
LogicalPlan plan = plan(ctx.query());
// the first pipe will be the implicit order
List<Order> orders = new ArrayList<>(2);
Source source = plan.source();
orders.add(new Order(source, fieldTimestamp(), defaultDirection(), Order.NullsPosition.FIRST));
// make sure to add the tiebreaker as well
Attribute tiebreaker = fieldTiebreaker();
if (Expressions.isPresent(tiebreaker)) {
orders.add(new Order(source, tiebreaker, defaultDirection(), Order.NullsPosition.FIRST));
}
plan = new OrderBy(source, plan, orders);
// add the actual declared pipes
for (PipeContext pipeCtx : ctx.pipe()) {
plan = pipe(pipeCtx, plan);
}
return plan;
}
@Override
public LogicalPlan visitEventQuery(EqlBaseParser.EventQueryContext ctx) {
return new Project(source(ctx), visitEventFilter(ctx.eventFilter()), emptyList());
return new Project(source(ctx), visitEventFilter(ctx.eventFilter()), defaultProjection());
}
@Override
@ -83,19 +121,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
condition = new And(source, eventMatch, condition);
}
Filter filter = new Filter(source, RELATION, condition);
List<Order> orders = new ArrayList<>(2);
// TODO: add implicit sorting - when pipes are added, this would better sit there (as a default pipe)
orders.add(new Order(source, fieldTimestamp(), Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
// make sure to add the tiebreaker as well
Attribute tiebreaker = fieldTiebreaker();
if (Expressions.isPresent(tiebreaker)) {
orders.add(new Order(source, tiebreaker, Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
}
OrderBy orderBy = new OrderBy(source, filter, orders);
return orderBy;
return new Filter(source, RELATION, condition);
}
@Override
@ -133,7 +159,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
until = defaultUntil(source);
}
return new Join(source, queries, until, fieldTimestamp(), fieldTiebreaker());
return new Join(source, queries, until, fieldTimestamp(), fieldTiebreaker(), defaultDirection());
}
private KeyedFilter defaultUntil(Source source) {
@ -149,14 +175,16 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
List<Attribute> keys = CollectionUtils.combine(joinKeys, visitJoinKeys(joinCtx));
LogicalPlan eventQuery = visitEventFilter(subqueryCtx.eventFilter());
List<Attribute> output = CollectionUtils.combine(keys, fieldTimestamp());
LogicalPlan child = new Project(source(ctx), eventQuery, CollectionUtils.combine(keys, defaultProjection()));
return new KeyedFilter(source(ctx), child, keys, fieldTimestamp(), fieldTiebreaker());
}
private List<Attribute> defaultProjection() {
Attribute fieldTieBreaker = fieldTiebreaker();
if (Expressions.isPresent(fieldTieBreaker)) {
output = CollectionUtils.combine(output, fieldTieBreaker);
return asList(fieldTimestamp(), fieldTiebreaker());
}
LogicalPlan child = new Project(source(ctx), eventQuery, output);
return new KeyedFilter(source(ctx), child, keys, fieldTimestamp(), fieldTiebreaker());
return singletonList(fieldTimestamp());
}
@Override
@ -199,7 +227,7 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
until = defaultUntil(source);
}
return new Sequence(source, queries, until, maxSpan, fieldTimestamp(), fieldTiebreaker());
return new Sequence(source, queries, until, maxSpan, fieldTimestamp(), fieldTiebreaker(), defaultDirection());
}
public KeyedFilter visitSequenceTerm(SequenceTermContext ctx, List<Attribute> joinKeys) {
@ -262,4 +290,51 @@ public abstract class LogicalPlanBuilder extends ExpressionBuilder {
text(numberCtx));
}
}
}
private LogicalPlan pipe(PipeContext ctx, LogicalPlan plan) {
String name = text(ctx.IDENTIFIER());
if (SUPPORTED_PIPES.contains(name) == false) {
List<String> potentialMatches = StringUtils.findSimilar(name, SUPPORTED_PIPES);
String msg = "Unrecognized pipe [{}]";
if (potentialMatches.isEmpty() == false) {
String matchString = potentialMatches.toString();
msg += ", did you mean " + (potentialMatches.size() == 1
? matchString
: "any of " + matchString) + "?";
}
throw new ParsingException(source(ctx.IDENTIFIER()), msg, name);
}
switch (name) {
case "head":
Expression headLimit = pipeIntArgument(source(ctx), name, ctx.booleanExpression());
return new Head(source(ctx), headLimit, plan);
case "tail":
Expression tailLimit = pipeIntArgument(source(ctx), name, ctx.booleanExpression());
// negate the limit
return new Tail(source(ctx), tailLimit, plan);
default:
throw new ParsingException(source(ctx), "Pipe [{}] is not supported yet", name);
}
}
private Expression pipeIntArgument(Source source, String pipeName, List<BooleanExpressionContext> exps) {
int size = CollectionUtils.isEmpty(exps) ? 0 : exps.size();
if (size != 1) {
throw new ParsingException(source, "Pipe [{}] expects exactly one argument but found [{}]", pipeName, size);
}
BooleanExpressionContext limitCtx = exps.get(0);
Expression expression = expression(limitCtx);
if (expression.dataType().isInteger() == false || expression.foldable() == false || (int) expression.fold() < 0) {
throw new ParsingException(source(limitCtx), "Pipe [{}] expects a positive integer but found [{}]", pipeName, expression
.sourceText());
}
return expression;
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.plan.logical;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
public class Head extends LimitWithOffset {
public Head(Source source, Expression limit, LogicalPlan child) {
super(source, limit, child);
}
@Override
protected NodeInfo<Limit> info() {
return NodeInfo.create(this, Head::new, limit(), child());
}
@Override
protected Head replaceChild(LogicalPlan newChild) {
return new Head(source(), limit(), newChild);
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.ql.capabilities.Resolvables;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
@ -20,7 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
public class Join extends LogicalPlan {
@ -28,17 +29,29 @@ public class Join extends LogicalPlan {
private final KeyedFilter until;
private final Attribute timestamp;
private final Attribute tiebreaker;
private final OrderDirection direction;
public Join(Source source, List<KeyedFilter> queries, KeyedFilter until, Attribute timestamp, Attribute tiebreaker) {
public Join(Source source,
List<KeyedFilter> queries,
KeyedFilter until,
Attribute timestamp,
Attribute tiebreaker,
OrderDirection direction) {
super(source, CollectionUtils.combine(queries, until));
this.queries = queries;
this.until = until;
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
this.direction = direction;
}
private Join(Source source, List<LogicalPlan> queries, LogicalPlan until, Attribute timestamp, Attribute tiebreaker) {
this(source, asKeyed(queries), asKeyed(until), timestamp, tiebreaker);
private Join(Source source,
List<LogicalPlan> queries,
LogicalPlan until,
Attribute timestamp,
Attribute tiebreaker,
OrderDirection direction) {
this(source, asKeyed(queries), asKeyed(until), timestamp, tiebreaker, direction);
}
static List<KeyedFilter> asKeyed(List<LogicalPlan> list) {
@ -59,7 +72,7 @@ public class Join extends LogicalPlan {
@Override
protected NodeInfo<? extends Join> info() {
return NodeInfo.create(this, Join::new, queries, until, timestamp, tiebreaker);
return NodeInfo.create(this, Join::new, queries, until, timestamp, tiebreaker, direction);
}
@Override
@ -68,7 +81,7 @@ public class Join extends LogicalPlan {
throw new EqlIllegalArgumentException("expected at least [2] children but received [{}]", newChildren.size());
}
int lastIndex = newChildren.size() - 1;
return new Join(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), timestamp, tiebreaker);
return new Join(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), timestamp, tiebreaker, direction);
}
@Override
@ -107,9 +120,17 @@ public class Join extends LogicalPlan {
return tiebreaker;
}
public OrderDirection direction() {
return direction;
}
public Join with(List<KeyedFilter> queries, KeyedFilter until, OrderDirection direction) {
return new Join(source(), queries, until, timestamp, tiebreaker, direction);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, tiebreaker, queries, until);
return Objects.hash(direction, timestamp, tiebreaker, queries, until);
}
@Override
@ -123,7 +144,7 @@ public class Join extends LogicalPlan {
Join other = (Join) obj;
return Objects.equals(queries, other.queries)
return Objects.equals(direction, other.direction) && Objects.equals(queries, other.queries)
&& Objects.equals(until, other.until)
&& Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker);
@ -131,6 +152,6 @@ public class Join extends LogicalPlan {
@Override
public List<Object> nodeProperties() {
return emptyList();
return singletonList(direction);
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.plan.logical;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
import java.util.Objects;
public class LimitWithOffset extends org.elasticsearch.xpack.ql.plan.logical.Limit {
private final int offset;
public LimitWithOffset(Source source, Expression limit, LogicalPlan child) {
this(source, limit, 0, child);
}
public LimitWithOffset(Source source, Expression limit, int offset, LogicalPlan child) {
super(source, limit, child);
this.offset = offset;
}
@Override
protected NodeInfo<Limit> info() {
return NodeInfo.create(this, LimitWithOffset::new, limit(), offset, child());
}
@Override
protected LimitWithOffset replaceChild(LogicalPlan newChild) {
return new LimitWithOffset(source(), limit(), offset, newChild);
}
public int offset() {
return offset;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), offset);
}
@Override
public boolean equals(Object obj) {
if (super.equals(obj)) {
LimitWithOffset other = (LimitWithOffset) obj;
return Objects.equals(offset, other.offset);
}
return false;
}
}

View File

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.plan.logical;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
@ -16,27 +17,37 @@ import org.elasticsearch.xpack.ql.tree.Source;
import java.util.List;
import java.util.Objects;
import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;
public class Sequence extends Join {
private final TimeValue maxSpan;
public Sequence(Source source, List<KeyedFilter> queries, KeyedFilter until, TimeValue maxSpan, Attribute timestamp,
Attribute tiebreaker) {
super(source, queries, until, timestamp, tiebreaker);
public Sequence(Source source,
List<KeyedFilter> queries,
KeyedFilter until,
TimeValue maxSpan,
Attribute timestamp,
Attribute tiebreaker,
OrderDirection direction) {
super(source, queries, until, timestamp, tiebreaker, direction);
this.maxSpan = maxSpan;
}
private Sequence(Source source, List<LogicalPlan> queries, LogicalPlan until, TimeValue maxSpan, Attribute timestamp,
Attribute tiebreaker) {
super(source, asKeyed(queries), asKeyed(until), timestamp, tiebreaker);
private Sequence(Source source,
List<LogicalPlan> queries,
LogicalPlan until,
TimeValue maxSpan,
Attribute timestamp,
Attribute tiebreaker,
OrderDirection direction) {
super(source, asKeyed(queries), asKeyed(until), timestamp, tiebreaker, direction);
this.maxSpan = maxSpan;
}
@Override
protected NodeInfo<Sequence> info() {
return NodeInfo.create(this, Sequence::new, queries(), until(), maxSpan, timestamp(), tiebreaker());
return NodeInfo.create(this, Sequence::new, queries(), until(), maxSpan, timestamp(), tiebreaker(), direction());
}
@Override
@ -45,13 +56,19 @@ public class Sequence extends Join {
throw new EqlIllegalArgumentException("expected at least [2] children but received [{}]", newChildren.size());
}
int lastIndex = newChildren.size() - 1;
return new Sequence(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), maxSpan, timestamp(), tiebreaker());
return new Sequence(source(), newChildren.subList(0, lastIndex), newChildren.get(lastIndex), maxSpan, timestamp(), tiebreaker(),
direction());
}
public TimeValue maxSpan() {
return maxSpan;
}
@Override
public Join with(List<KeyedFilter> queries, KeyedFilter until, OrderDirection direction) {
return new Sequence(source(), queries, until, maxSpan, timestamp(), tiebreaker(), direction);
}
@Override
public int hashCode() {
return Objects.hash(maxSpan, super.hashCode());
@ -68,6 +85,6 @@ public class Sequence extends Join {
@Override
public List<Object> nodeProperties() {
return singletonList(maxSpan);
return asList(maxSpan, direction());
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.plan.logical;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.predicate.operator.arithmetic.Neg;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
public class Tail extends LimitWithOffset {
public Tail(Source source, Expression limit, LogicalPlan child) {
this(source, child, new Neg(limit.source(), limit));
}
/**
* Constructor that does not negate the limit expression.
*/
private Tail(Source source, LogicalPlan child, Expression limit) {
super(source, limit, child);
}
@Override
protected NodeInfo<Limit> info() {
return NodeInfo.create(this, Tail::new, child(), limit());
}
@Override
protected Tail replaceChild(LogicalPlan newChild) {
return new Tail(source(), newChild, limit());
}
}

View File

@ -6,10 +6,17 @@
package org.elasticsearch.xpack.eql.plan.physical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.execution.search.Querier;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.search.ReverseListener;
import org.elasticsearch.xpack.eql.execution.search.SourceGenerator;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
@ -19,28 +26,22 @@ import java.util.Objects;
public class EsQueryExec extends LeafExec {
private final String index;
private final List<Attribute> output;
private final QueryContainer queryContainer;
public EsQueryExec(Source source, String index, List<Attribute> output, QueryContainer queryContainer) {
public EsQueryExec(Source source, List<Attribute> output, QueryContainer queryContainer) {
super(source);
this.index = index;
this.output = output;
this.queryContainer = queryContainer;
}
@Override
protected NodeInfo<EsQueryExec> info() {
return NodeInfo.create(this, EsQueryExec::new, index, output, queryContainer);
return NodeInfo.create(this, EsQueryExec::new, output, queryContainer);
}
public EsQueryExec with(QueryContainer queryContainer) {
return new EsQueryExec(source(), index, output, queryContainer);
}
public String index() {
return index;
return new EsQueryExec(source(), output, queryContainer);
}
@Override
@ -48,14 +49,33 @@ public class EsQueryExec extends LeafExec {
return output;
}
public QueryRequest queryRequest(EqlSession session) {
EqlConfiguration cfg = session.configuration();
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(queryContainer, cfg.filter(), cfg.size());
return () -> sourceBuilder;
}
@Override
public void execute(EqlSession session, ActionListener<Results> listener) {
new Querier(session).query(queryContainer, index, listener);
public void execute(EqlSession session, ActionListener<Payload> listener) {
QueryRequest request = queryRequest(session);
listener = shouldReverse(request) ? new ReverseListener(listener) : listener;
new BasicQueryClient(session).query(request, listener);
}
private boolean shouldReverse(QueryRequest query) {
SearchSourceBuilder searchSource = query.searchSource();
// since all results need to be ASC, use this hack to figure out whether the results need to be flipped
for (SortBuilder<?> sort : searchSource.sorts()) {
if (sort.order() == SortOrder.DESC) {
return true;
}
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(index, queryContainer, output);
return Objects.hash(queryContainer, output);
}
@Override
@ -69,14 +89,13 @@ public class EsQueryExec extends LeafExec {
}
EsQueryExec other = (EsQueryExec) obj;
return Objects.equals(index, other.index)
&& Objects.equals(queryContainer, other.queryContainer)
return Objects.equals(queryContainer, other.queryContainer)
&& Objects.equals(output, other.output);
}
@Override
public String nodeString() {
return nodeName() + "[" + index + "," + queryContainer + "]";
return nodeName() + "[" + queryContainer + "]";
}
public QueryContainer queryContainer() {

View File

@ -5,32 +5,32 @@
*/
package org.elasticsearch.xpack.eql.plan.physical;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
import java.util.Objects;
public class LimitExec extends UnaryExec implements Unexecutable {
public class LimitWithOffsetExec extends UnaryExec implements Unexecutable {
private final Expression limit;
private final Limit limit;
public LimitExec(Source source, PhysicalPlan child, Expression limit) {
public LimitWithOffsetExec(Source source, PhysicalPlan child, Limit limit) {
super(source, child);
this.limit = limit;
}
@Override
protected NodeInfo<LimitExec> info() {
return NodeInfo.create(this, LimitExec::new, child(), limit);
protected NodeInfo<LimitWithOffsetExec> info() {
return NodeInfo.create(this, LimitWithOffsetExec::new, child(), limit);
}
@Override
protected LimitExec replaceChild(PhysicalPlan newChild) {
return new LimitExec(source(), newChild, limit);
protected LimitWithOffsetExec replaceChild(PhysicalPlan newChild) {
return new LimitWithOffsetExec(source(), newChild, limit);
}
public Expression limit() {
public Limit limit() {
return limit;
}
@ -49,7 +49,7 @@ public class LimitExec extends UnaryExec implements Unexecutable {
return false;
}
LimitExec other = (LimitExec) obj;
LimitWithOffsetExec other = (LimitWithOffsetExec) obj;
return Objects.equals(limit, other.limit)
&& Objects.equals(child(), other.child());
}

View File

@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.session.EmptyExecutable;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Executable;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
@ -45,7 +45,7 @@ public class LocalExec extends LeafExec {
}
@Override
public void execute(EqlSession session, ActionListener<Results> listener) {
public void execute(EqlSession session, ActionListener<Payload> listener) {
executable.execute(session, listener);
}
@ -67,4 +67,4 @@ public class LocalExec extends LeafExec {
LocalExec other = (LocalExec) obj;
return Objects.equals(executable, other.executable);
}
}
}

View File

@ -9,10 +9,12 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.session.EmptyExecutable;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Executable;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.NodeUtils;
import org.elasticsearch.xpack.ql.tree.Source;
import java.util.List;
@ -62,7 +64,7 @@ public class LocalRelation extends LogicalPlan implements Executable {
}
@Override
public void execute(EqlSession session, ActionListener<Results> listener) {
public void execute(EqlSession session, ActionListener<Payload> listener) {
executable.execute(session, listener);
}
@ -84,4 +86,10 @@ public class LocalRelation extends LogicalPlan implements Executable {
LocalRelation other = (LocalRelation) obj;
return Objects.equals(executable, other.executable);
}
@Override
public String nodeString() {
return nodeName() + NodeUtils.limitedToString(output());
}
}

View File

@ -9,11 +9,13 @@ package org.elasticsearch.xpack.eql.plan.physical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.assembler.ExecutionManager;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.NamedExpression;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
@ -29,6 +31,8 @@ public class SequenceExec extends PhysicalPlan {
private final List<List<Attribute>> keys;
private final Attribute timestamp;
private final Attribute tiebreaker;
private final Limit limit;
private final OrderDirection direction;
public SequenceExec(Source source,
List<List<Attribute>> keys,
@ -36,20 +40,29 @@ public class SequenceExec extends PhysicalPlan {
List<Attribute> untilKeys,
PhysicalPlan until,
Attribute timestamp,
Attribute tiebreaker) {
this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tiebreaker);
Attribute tiebreaker,
OrderDirection direction) {
this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tiebreaker, null, direction);
}
private SequenceExec(Source source, List<PhysicalPlan> children, List<List<Attribute>> keys, Attribute ts, Attribute tb) {
private SequenceExec(Source source,
List<PhysicalPlan> children,
List<List<Attribute>> keys,
Attribute ts,
Attribute tb,
Limit limit,
OrderDirection direction) {
super(source, children);
this.keys = keys;
this.timestamp = ts;
this.tiebreaker = tb;
this.limit = limit;
this.direction = direction;
}
@Override
protected NodeInfo<SequenceExec> info() {
return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tiebreaker);
return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tiebreaker, limit, direction);
}
@Override
@ -59,7 +72,7 @@ public class SequenceExec extends PhysicalPlan {
children().size(),
newChildren.size());
}
return new SequenceExec(source(), newChildren, keys, timestamp, tiebreaker);
return new SequenceExec(source(), newChildren, keys, timestamp, tiebreaker, limit, direction);
}
@Override
@ -87,14 +100,26 @@ public class SequenceExec extends PhysicalPlan {
return tiebreaker;
}
public Limit limit() {
return limit;
}
public OrderDirection direction() {
return direction;
}
public SequenceExec with(Limit limit) {
return new SequenceExec(source(), children(), keys(), timestamp(), tiebreaker(), limit, direction);
}
@Override
public void execute(EqlSession session, ActionListener<Results> listener) {
new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker()).execute(listener);
public void execute(EqlSession session, ActionListener<Payload> listener) {
new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker(), direction, limit()).execute(listener);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, tiebreaker, keys, children());
return Objects.hash(timestamp, tiebreaker, keys, limit, direction, children());
}
@Override
@ -110,6 +135,8 @@ public class SequenceExec extends PhysicalPlan {
SequenceExec other = (SequenceExec) obj;
return Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker)
&& Objects.equals(limit, other.limit)
&& Objects.equals(direction, other.direction)
&& Objects.equals(children(), other.children())
&& Objects.equals(keys, other.keys);
}

View File

@ -9,14 +9,14 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.planner.PlanningException;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Executable;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Payload;
// this is mainly a marker interface to validate a plan before being executed
public interface Unexecutable extends Executable {
@Override
default void execute(EqlSession session, ActionListener<Results> listener) {
default void execute(EqlSession session, ActionListener<Payload> listener) {
throw new PlanningException("Current plan {} is not executable", this);
}
}

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.eql.plan.physical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.planner.PlanningException;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
@ -41,7 +41,7 @@ public class UnplannedExec extends LeafExec implements Unexecutable {
}
@Override
public void execute(EqlSession session, ActionListener<Results> listener) {
public void execute(EqlSession session, ActionListener<Payload> listener) {
throw new PlanningException("Current plan {} is not executable", this);
}

View File

@ -6,11 +6,13 @@
package org.elasticsearch.xpack.eql.planner;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.eql.plan.physical.FilterExec;
import org.elasticsearch.xpack.eql.plan.physical.LimitExec;
import org.elasticsearch.xpack.eql.plan.physical.LimitWithOffsetExec;
import org.elasticsearch.xpack.eql.plan.physical.LocalExec;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
import org.elasticsearch.xpack.eql.plan.physical.OrderExec;
@ -21,14 +23,16 @@ import org.elasticsearch.xpack.eql.plan.physical.UnplannedExec;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.Foldables;
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.rule.Rule;
import org.elasticsearch.xpack.ql.rule.RuleExecutor;
import org.elasticsearch.xpack.ql.type.DataTypeConverter;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.ReflectionUtils;
import java.util.ArrayList;
@ -73,7 +77,8 @@ class Mapper extends RuleExecutor<PhysicalPlan> {
Expressions.asAttributes(s.until().keys()),
map(s.until().child()),
s.timestamp(),
s.tiebreaker());
s.tiebreaker(),
s.direction());
}
if (p instanceof LocalRelation) {
@ -95,9 +100,10 @@ class Mapper extends RuleExecutor<PhysicalPlan> {
return new OrderExec(p.source(), map(o.child()), o.order());
}
if (p instanceof Limit) {
Limit l = (Limit) p;
return new LimitExec(p.source(), map(l.child()), l.limit());
if (p instanceof LimitWithOffset) {
LimitWithOffset l = (LimitWithOffset) p;
int limit = (Integer) DataTypeConverter.convert(Foldables.valueOf(l.limit()), DataTypes.INTEGER);
return new LimitWithOffsetExec(p.source(), map(l.child()), new Limit(limit, l.offset()));
}
if (p instanceof EsRelation) {
@ -107,7 +113,7 @@ class Mapper extends RuleExecutor<PhysicalPlan> {
if (c.frozen()) {
container = container.withFrozen();
}
return new EsQueryExec(p.source(), c.index().name(), output, container);
return new EsQueryExec(p.source(), output, container);
}
return planLater(p);

View File

@ -9,9 +9,12 @@ package org.elasticsearch.xpack.eql.planner;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.eql.plan.physical.FilterExec;
import org.elasticsearch.xpack.eql.plan.physical.LimitWithOffsetExec;
import org.elasticsearch.xpack.eql.plan.physical.OrderExec;
import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.eql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.eql.plan.physical.SequenceExec;
import org.elasticsearch.xpack.eql.plan.physical.UnaryExec;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expression;
@ -39,7 +42,8 @@ class QueryFolder extends RuleExecutor<PhysicalPlan> {
Batch fold = new Batch("Fold queries",
new FoldProject(),
new FoldFilter(),
new FoldOrderBy()
new FoldOrderBy(),
new FoldLimit()
);
Batch finish = new Batch("Finish query", Limiter.ONCE,
new PlanOutputToQueryRef()
@ -49,67 +53,73 @@ class QueryFolder extends RuleExecutor<PhysicalPlan> {
}
private static class FoldProject extends FoldingRule<ProjectExec> {
private static class FoldProject extends QueryFoldingRule<ProjectExec> {
@Override
protected PhysicalPlan rule(ProjectExec project) {
if (project.child() instanceof EsQueryExec) {
EsQueryExec exec = (EsQueryExec) project.child();
return new EsQueryExec(exec.source(), exec.index(), project.output(), exec.queryContainer());
}
return project;
protected PhysicalPlan rule(ProjectExec project, EsQueryExec exec) {
return new EsQueryExec(exec.source(), project.output(), exec.queryContainer());
}
}
private static class FoldFilter extends FoldingRule<FilterExec> {
private static class FoldFilter extends QueryFoldingRule<FilterExec> {
@Override
protected PhysicalPlan rule(FilterExec plan) {
if (plan.child() instanceof EsQueryExec) {
EsQueryExec exec = (EsQueryExec) plan.child();
QueryContainer qContainer = exec.queryContainer();
protected PhysicalPlan rule(FilterExec plan, EsQueryExec exec) {
QueryContainer qContainer = exec.queryContainer();
Query query = QueryTranslator.toQuery(plan.condition());
Query query = QueryTranslator.toQuery(plan.condition());
if (qContainer.query() != null || query != null) {
query = ExpressionTranslators.and(plan.source(), qContainer.query(), query);
}
qContainer = qContainer.with(query);
return exec.with(qContainer);
if (qContainer.query() != null || query != null) {
query = ExpressionTranslators.and(plan.source(), qContainer.query(), query);
}
return plan;
qContainer = qContainer.with(query);
return exec.with(qContainer);
}
}
private static class FoldOrderBy extends FoldingRule<OrderExec> {
private static class FoldOrderBy extends QueryFoldingRule<OrderExec> {
@Override
protected PhysicalPlan rule(OrderExec plan) {
if (plan.child() instanceof EsQueryExec) {
EsQueryExec exec = (EsQueryExec) plan.child();
QueryContainer qContainer = exec.queryContainer();
protected PhysicalPlan rule(OrderExec plan, EsQueryExec query) {
QueryContainer qContainer = query.queryContainer();
for (Order order : plan.order()) {
Direction direction = Direction.from(order.direction());
Missing missing = Missing.from(order.nullsPosition());
for (Order order : plan.order()) {
Direction direction = Direction.from(order.direction());
Missing missing = Missing.from(order.nullsPosition());
// check whether sorting is on an group (and thus nested agg) or field
Expression orderExpression = order.child();
// check whether sorting is on an group (and thus nested agg) or field
Expression orderExpression = order.child();
String lookup = Expressions.id(orderExpression);
String lookup = Expressions.id(orderExpression);
// field
if (orderExpression instanceof FieldAttribute) {
FieldAttribute fa = (FieldAttribute) orderExpression;
qContainer = qContainer.addSort(lookup, new AttributeSort(fa, direction, missing));
}
// unknown
else {
throw new EqlIllegalArgumentException("unsupported sorting expression {}", orderExpression);
}
// field
if (orderExpression instanceof FieldAttribute) {
FieldAttribute fa = (FieldAttribute) orderExpression;
qContainer = qContainer.addSort(lookup, new AttributeSort(fa, direction, missing));
}
// unknown
else {
throw new EqlIllegalArgumentException("unsupported sorting expression {}", orderExpression);
}
}
return exec.with(qContainer);
return query.with(qContainer);
}
}
private static class FoldLimit extends FoldingRule<LimitWithOffsetExec> {
@Override
protected PhysicalPlan rule(LimitWithOffsetExec limit) {
PhysicalPlan plan = limit;
PhysicalPlan child = limit.child();
if (child instanceof EsQueryExec) {
EsQueryExec query = (EsQueryExec) child;
plan = query.with(query.queryContainer().with(limit.limit()));
}
if (child instanceof SequenceExec) {
SequenceExec exec = (SequenceExec) child;
plan = exec.with(limit.limit());
}
return plan;
}
@ -139,4 +149,18 @@ class QueryFolder extends RuleExecutor<PhysicalPlan> {
@Override
protected abstract PhysicalPlan rule(SubPlan plan);
}
}
abstract static class QueryFoldingRule<SubPlan extends UnaryExec> extends FoldingRule<SubPlan> {
@Override
protected final PhysicalPlan rule(SubPlan plan) {
PhysicalPlan p = plan;
if (plan.child() instanceof EsQueryExec) {
p = rule(plan, (EsQueryExec) plan.child());
}
return p;
}
protected abstract PhysicalPlan rule(SubPlan plan, EsQueryExec query);
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.Limit;
import org.elasticsearch.xpack.eql.execution.search.SourceGenerator;
import org.elasticsearch.xpack.ql.execution.search.FieldExtraction;
import org.elasticsearch.xpack.ql.expression.Attribute;
@ -45,18 +46,27 @@ public class QueryContainer {
private final boolean trackHits;
private final boolean includeFrozen;
private final Limit limit;
public QueryContainer() {
this(null, emptyList(), AttributeMap.emptyAttributeMap(), emptyMap(), false, false);
this(null, emptyList(), AttributeMap.emptyAttributeMap(), emptyMap(), false, false, null);
}
private QueryContainer(Query query, List<Tuple<FieldExtraction, String>> fields, AttributeMap<Expression> attributes,
Map<String, Sort> sort, boolean trackHits, boolean includeFrozen) {
private QueryContainer(Query query,
List<Tuple<FieldExtraction, String>> fields,
AttributeMap<Expression> attributes,
Map<String, Sort> sort,
boolean trackHits,
boolean includeFrozen,
Limit limit) {
this.query = query;
this.fields = fields;
this.sort = sort;
this.attributes = attributes;
this.trackHits = trackHits;
this.includeFrozen = includeFrozen;
this.limit = limit;
}
public QueryContainer withFrozen() {
@ -79,8 +89,16 @@ public class QueryContainer {
return trackHits;
}
public Limit limit() {
return limit;
}
public QueryContainer with(Query q) {
return new QueryContainer(q, fields, attributes, sort, trackHits, includeFrozen);
return new QueryContainer(q, fields, attributes, sort, trackHits, includeFrozen, limit);
}
public QueryContainer with(Limit limit) {
return new QueryContainer(query, fields, attributes, sort, trackHits, includeFrozen, limit);
}
public QueryContainer addColumn(Attribute attr) {
@ -111,7 +129,7 @@ public class QueryContainer {
public QueryContainer addSort(String expressionId, Sort sortable) {
Map<String, Sort> newSort = new LinkedHashMap<>(this.sort);
newSort.put(expressionId, sortable);
return new QueryContainer(query, fields, attributes, newSort, trackHits, includeFrozen);
return new QueryContainer(query, fields, attributes, newSort, trackHits, includeFrozen, limit);
}
//
@ -119,12 +137,12 @@ public class QueryContainer {
//
public QueryContainer addColumn(FieldExtraction ref, String id) {
return new QueryContainer(query, combine(fields, new Tuple<>(ref, id)), attributes, sort, trackHits, includeFrozen);
return new QueryContainer(query, combine(fields, new Tuple<>(ref, id)), attributes, sort, trackHits, includeFrozen, limit);
}
@Override
public int hashCode() {
return Objects.hash(query, attributes, fields, trackHits, includeFrozen);
return Objects.hash(query, attributes, fields, trackHits, includeFrozen, limit);
}
@Override
@ -141,8 +159,9 @@ public class QueryContainer {
return Objects.equals(query, other.query)
&& Objects.equals(attributes, other.attributes)
&& Objects.equals(fields, other.fields)
&& Objects.equals(trackHits, other.trackHits)
&& Objects.equals(includeFrozen, other.includeFrozen);
&& trackHits == other.trackHits
&& includeFrozen == other.includeFrozen
&& Objects.equals(limit, other.limit);
}
@Override

View File

@ -5,23 +5,19 @@
*/
package org.elasticsearch.xpack.eql.session;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.ql.expression.Attribute;
import java.util.List;
import java.util.Objects;
import static java.util.Collections.emptyList;
public class EmptyExecutable implements Executable {
private final List<Attribute> output;
private final Results.Type resultType;
private final Type resultType;
public EmptyExecutable(List<Attribute> output, Results.Type resultType) {
public EmptyExecutable(List<Attribute> output, Type resultType) {
this.output = output;
this.resultType = resultType;
}
@ -32,8 +28,8 @@ public class EmptyExecutable implements Executable {
}
@Override
public void execute(EqlSession session, ActionListener<Results> listener) {
listener.onResponse(new Results(new TotalHits(0, Relation.EQUAL_TO), TimeValue.ZERO, false, emptyList(), resultType));
public void execute(EqlSession session, ActionListener<Payload> listener) {
listener.onResponse(new EmptyPayload(resultType));
}
@Override

View File

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.session;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.session.Results.Type;
import java.util.List;
import static java.util.Collections.emptyList;
public class EmptyPayload implements Payload {
private final Type type;
public EmptyPayload(Type type) {
this.type = type;
}
@Override
public Type resultType() {
return type;
}
@Override
public boolean timedOut() {
return false;
}
@Override
public TimeValue timeTook() {
return TimeValue.ZERO;
}
@Override
public Object[] nextKeys() {
return null;
}
@Override
public <V> List<V> values() {
return emptyList();
}
}

View File

@ -61,7 +61,8 @@ public class EqlSession {
}
public void eql(String eql, ParserParams params, ActionListener<Results> listener) {
eqlExecutable(eql, params, wrap(e -> e.execute(this, listener), listener::onFailure));
eqlExecutable(eql, params, wrap(e -> e.execute(this, wrap(p -> listener.onResponse(Results.fromPayload(p)), listener::onFailure)),
listener::onFailure));
}
public void eqlExecutable(String eql, ParserParams params, ActionListener<PhysicalPlan> listener) {

View File

@ -15,5 +15,5 @@ public interface Executable {
List<Attribute> output();
void execute(EqlSession session, ActionListener<Results> listener);
void execute(EqlSession session, ActionListener<Payload> listener);
}

View File

@ -4,13 +4,19 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.payload;
package org.elasticsearch.xpack.eql.session;
import org.elasticsearch.common.unit.TimeValue;
import java.util.List;
public interface Payload<V> {
/**
* Container for internal results. Can be low-level such as SearchHits or Sequences.
* Generalized to allow reuse and internal pluggability.
*/
public interface Payload {
Results.Type resultType();
boolean timedOut();
@ -18,5 +24,5 @@ public interface Payload<V> {
Object[] nextKeys();
List<V> values();
<V> List<V> values();
}

View File

@ -29,18 +29,11 @@ public class Results {
private final TimeValue tookTime;
private final Type type;
public static Results fromHits(TimeValue tookTime, List<SearchHit> hits) {
return new Results(new TotalHits(hits.size(), Relation.EQUAL_TO), tookTime, false, hits, Type.SEARCH_HIT);
public static Results fromPayload(Payload payload) {
List<?> values = payload.values();
return new Results(new TotalHits(values.size(), Relation.EQUAL_TO), payload.timeTook(), false, values, payload.resultType());
}
public static Results fromSequences(TimeValue tookTime, List<Sequence> sequences) {
return new Results(new TotalHits(sequences.size(), Relation.EQUAL_TO), tookTime, false, sequences, Type.SEQUENCE);
}
public static Results fromCounts(TimeValue tookTime, List<Count> counts) {
return new Results(new TotalHits(counts.size(), Relation.EQUAL_TO), tookTime, false, counts, Type.COUNT);
}
Results(TotalHits totalHits, TimeValue tookTime, boolean timedOut, List<?> results, Type type) {
this.totalHits = totalHits;
this.tookTime = tookTime;

View File

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.util;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
public class MathUtils {
public static int abs(int number) {
if (number == Integer.MIN_VALUE) {
throw new EqlIllegalArgumentException("[" + number + "] cannot be negated since the result is outside the range");
}
return number < 0 ? -number : number;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.util;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
public class ReversedIterator<T> implements Iterator<T> {
private final ListIterator<T> delegate;
public ReversedIterator(List<T> delegate) {
this.delegate = delegate.listIterator(delegate.size());
}
@Override
public boolean hasNext() {
return delegate.hasPrevious();
}
@Override
public T next() {
return delegate.previous();
}
@Override
public void remove() {
delegate.remove();
}
}

View File

@ -87,10 +87,6 @@ public class VerifierTests extends ESTestCase {
assertEquals("1:11: Unknown column [pib], did you mean any of [pid, ppid]?", error("foo where pib == 1"));
}
public void testPipesUnsupported() {
assertEquals("1:20: Pipes are not supported", errorParsing("process where true | head 6"));
}
public void testProcessRelationshipsUnsupported() {
assertEquals("2:7: Process relationships are not supported",
errorParsing("process where opcode=1 and process_name == \"csrss.exe\"\n" +

View File

@ -17,8 +17,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence;
import org.elasticsearch.xpack.eql.execution.assembler.SeriesUtils.SeriesSpec;
import org.elasticsearch.xpack.eql.execution.payload.Payload;
import org.elasticsearch.xpack.eql.session.Payload;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import java.io.IOException;
@ -112,7 +113,7 @@ public class SequenceRuntimeTests extends ESTestCase {
}
}
static class TestPayload implements Payload<SearchHit> {
static class TestPayload implements Payload {
private final List<SearchHit> hits;
private final Map<Integer, Tuple<String, String>> events;
@ -128,6 +129,11 @@ public class SequenceRuntimeTests extends ESTestCase {
}
}
@Override
public Type resultType() {
return Type.SEARCH_HIT;
}
@Override
public boolean timedOut() {
return false;
@ -143,9 +149,10 @@ public class SequenceRuntimeTests extends ESTestCase {
return new Object[0];
}
@SuppressWarnings("unchecked")
@Override
public List<SearchHit> values() {
return hits;
public <V> List<V> values() {
return (List<V>) hits;
}
@Override
@ -181,10 +188,10 @@ public class SequenceRuntimeTests extends ESTestCase {
}
// convert the results through a test specific payload
SequenceRuntime runtime = new SequenceRuntime(criteria, (c, l) -> {
Map<Integer, Tuple<String, String>> evs = events.get(c.size());
SequenceRuntime runtime = new SequenceRuntime(criteria, (r, l) -> {
Map<Integer, Tuple<String, String>> evs = events.get(r.searchSource().size());
l.onResponse(new TestPayload(evs));
});
}, false, null);
// finally make the assertion at the end of the listener
runtime.execute(wrap(this::checkResults, ex -> {
@ -192,8 +199,8 @@ public class SequenceRuntimeTests extends ESTestCase {
}));
}
private void checkResults(Results results) {
List<Sequence> seq = results.sequences();
private void checkResults(Payload payload) {
List<Sequence> seq = Results.fromPayload(payload).sequences();
String prefix = "Line " + lineNumber + ":";
assertNotNull(prefix + "no matches found", seq);
assertEquals(prefix + "different sequences matched ", matches.size(), seq.size());

View File

@ -6,13 +6,25 @@
package org.elasticsearch.xpack.eql.optimizer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.eql.analysis.Analyzer;
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.eql.analysis.Verifier;
import org.elasticsearch.xpack.eql.expression.function.EqlFunctionRegistry;
import org.elasticsearch.xpack.eql.parser.EqlParser;
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
import org.elasticsearch.xpack.eql.plan.logical.Tail;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.EmptyAttribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.expression.Literal;
import org.elasticsearch.xpack.ql.expression.Order;
import org.elasticsearch.xpack.ql.expression.Order.NullsPosition;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.expression.predicate.logical.And;
import org.elasticsearch.xpack.ql.expression.predicate.logical.Not;
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNotNull;
@ -20,10 +32,14 @@ import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNull;
import org.elasticsearch.xpack.ql.expression.predicate.regex.Like;
import org.elasticsearch.xpack.ql.index.EsIndex;
import org.elasticsearch.xpack.ql.index.IndexResolution;
import org.elasticsearch.xpack.ql.plan.TableIdentifier;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.ql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.type.EsField;
import org.elasticsearch.xpack.ql.type.TypesTests;
@ -31,7 +47,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.eql.EqlTestUtils.TEST_CFG;
import static org.elasticsearch.xpack.ql.tree.Source.EMPTY;
public class OptimizerTests extends ESTestCase {
@ -39,6 +60,7 @@ public class OptimizerTests extends ESTestCase {
private static final String INDEX_NAME = "test";
private EqlParser parser = new EqlParser();
private IndexResolution index = loadIndexResolution("mapping-default.json");
private Optimizer optimizer = new Optimizer();
private static Map<String, EsField> loadEqlMapping(String name) {
return TypesTests.loadMapping(name);
@ -51,7 +73,6 @@ public class OptimizerTests extends ESTestCase {
private LogicalPlan accept(IndexResolution resolution, String eql) {
PreAnalyzer preAnalyzer = new PreAnalyzer();
Analyzer analyzer = new Analyzer(TEST_CFG, new EqlFunctionRegistry(), new Verifier());
Optimizer optimizer = new Optimizer();
return optimizer.optimize(analyzer.analyze(preAnalyzer.preAnalyze(parser.createStatement(eql), resolution)));
}
@ -67,10 +88,11 @@ public class OptimizerTests extends ESTestCase {
for (String q : tests) {
LogicalPlan plan = accept(q);
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof Filter);
Filter filter = (Filter) plan;
@ -89,10 +111,10 @@ public class OptimizerTests extends ESTestCase {
for (String q : tests) {
LogicalPlan plan = accept(q);
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof Filter);
Filter filter = (Filter) plan;
@ -112,10 +134,10 @@ public class OptimizerTests extends ESTestCase {
for (String q : tests) {
LogicalPlan plan = accept(q);
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof Filter);
Filter filter = (Filter) plan;
@ -138,10 +160,11 @@ public class OptimizerTests extends ESTestCase {
for (String q : tests) {
LogicalPlan plan = accept(q);
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof Filter);
Filter filter = (Filter) plan;
@ -159,10 +182,10 @@ public class OptimizerTests extends ESTestCase {
public void testWildcardEscapes() {
LogicalPlan plan = accept("foo where command_line == '* %bar_ * \\\\ \\n \\r \\t'");
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof OrderBy);
plan = ((OrderBy) plan).child();
assertTrue(plan instanceof Project);
plan = ((Project) plan).child();
assertTrue(plan instanceof Filter);
Filter filter = (Filter) plan;
@ -175,4 +198,112 @@ public class OptimizerTests extends ESTestCase {
assertEquals(like.pattern().asLuceneWildcard(), "* %bar_ * \\\\ \n \r \t");
assertEquals(like.pattern().asIndexNameWildcard(), "* %bar_ * \\ \n \r \t");
}
}
public void testCombineHeadBigHeadSmall() {
checkOffsetAndLimit(accept("process where true | head 10 | head 1"), 0, 1);
}
public void testCombineHeadSmallHeadBig() {
checkOffsetAndLimit(accept("process where true | head 1 | head 12"), 0, 1);
}
public void testCombineTailBigTailSmall() {
checkOffsetAndLimit(accept("process where true | tail 10 | tail 1"), 0, -1);
}
public void testCombineTailSmallTailBig() {
checkOffsetAndLimit(accept("process where true | tail 1 | tail 12"), 0, -1);
}
public void testCombineHeadBigTailSmall() {
checkOffsetAndLimit(accept("process where true | head 10 | tail 7"), 3, 7);
}
public void testCombineTailBigHeadSmall() {
checkOffsetAndLimit(accept("process where true | tail 10 | head 7"), 3, -7);
}
public void testCombineTailSmallHeadBig() {
checkOffsetAndLimit(accept("process where true | tail 7 | head 10"), 0, -7);
}
public void testCombineHeadBigTailBig() {
checkOffsetAndLimit(accept("process where true | head 1 | tail 7"), 0, 1);
}
public void testCombineHeadTailWithHeadAndTail() {
checkOffsetAndLimit(accept("process where true | head 10 | tail 7 | head 5 | tail 3"), 5, 3);
}
public void testCombineTailHeadWithTailAndHead() {
checkOffsetAndLimit(accept("process where true | tail 10 | head 7 | tail 5 | head 3"), 5, -3);
}
private void checkOffsetAndLimit(LogicalPlan plan, int offset, int limit) {
assertTrue(plan instanceof LimitWithOffset);
LimitWithOffset lo = (LimitWithOffset) plan;
assertEquals("Incorrect offset", offset, lo.offset());
assertEquals("Incorrect limit", limit, lo.limit().fold());
}
private static Attribute timestamp() {
return new FieldAttribute(EMPTY, "test", new EsField("field", DataTypes.INTEGER, emptyMap(), true));
}
private static Attribute tiebreaker() {
return new EmptyAttribute(EMPTY);
}
private static LogicalPlan rel() {
return new UnresolvedRelation(EMPTY, new TableIdentifier(EMPTY, "catalog", "index"), "", false);
}
private static KeyedFilter keyedFilter(LogicalPlan child) {
return new KeyedFilter(EMPTY, child, emptyList(), timestamp(), tiebreaker());
}
public void testSkipQueryOnLimitZero() {
KeyedFilter rule1 = keyedFilter(new LocalRelation(EMPTY, emptyList()));
KeyedFilter rule2 = keyedFilter(new Filter(EMPTY, rel(), new IsNull(EMPTY, Literal.TRUE)));
KeyedFilter until = keyedFilter(new Filter(EMPTY, rel(), Literal.FALSE));
Sequence s = new Sequence(EMPTY, asList(rule1, rule2), until, TimeValue.MINUS_ONE, timestamp(), tiebreaker(), OrderDirection.ASC);
LogicalPlan optimized = optimizer.optimize(s);
assertEquals(LocalRelation.class, optimized.getClass());
}
public void testSortByLimit() {
Project p = new Project(EMPTY, rel(), emptyList());
OrderBy o = new OrderBy(EMPTY, p, singletonList(new Order(EMPTY, tiebreaker(), OrderDirection.ASC, NullsPosition.FIRST)));
Tail t = new Tail(EMPTY, new Literal(EMPTY, 1, DataTypes.INTEGER), o);
LogicalPlan optimized = new Optimizer.SortByLimit().rule(t);
assertEquals(LimitWithOffset.class, optimized.getClass());
LimitWithOffset l = (LimitWithOffset) optimized;
assertOrder(l, OrderDirection.DESC);
}
public void testPushdownOrderBy() {
Filter filter = new Filter(EMPTY, rel(), new IsNull(EMPTY, Literal.TRUE));
KeyedFilter rule1 = keyedFilter(filter);
KeyedFilter rule2 = keyedFilter(filter);
KeyedFilter until = keyedFilter(filter);
Sequence s = new Sequence(EMPTY, asList(rule1, rule2), until, TimeValue.MINUS_ONE, timestamp(), tiebreaker(), OrderDirection.ASC);
OrderBy o = new OrderBy(EMPTY, s, singletonList(new Order(EMPTY, tiebreaker(), OrderDirection.DESC, NullsPosition.FIRST)));
LogicalPlan optimized = new Optimizer.PushDownOrderBy().rule(o);
assertEquals(Sequence.class, optimized.getClass());
Sequence seq = (Sequence) optimized;
assertOrder(seq.until(), OrderDirection.ASC);
assertOrder(seq.queries().get(0), OrderDirection.DESC);
assertOrder(seq.queries().get(1), OrderDirection.ASC);
}
private void assertOrder(UnaryPlan plan, OrderDirection direction) {
assertEquals(OrderBy.class, plan.child().getClass());
OrderBy orderBy = (OrderBy) plan.child();
Order order = orderBy.order().get(0);
assertEquals(direction, order.direction());
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.xpack.eql.plan.logical.Join;
import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.NamedExpression;
import org.elasticsearch.xpack.ql.expression.Order;
@ -28,7 +29,6 @@ import org.elasticsearch.xpack.ql.tree.Source;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.ql.type.DateUtils.UTC;
@ -40,26 +40,34 @@ public class LogicalPlanTests extends ESTestCase {
return parser.createExpression(source);
}
private static Attribute timestamp() {
return new UnresolvedAttribute(Source.EMPTY, "@timestamp");
}
private static LogicalPlan relation() {
return new UnresolvedRelation(Source.EMPTY, null, "", false, "");
}
public void testAnyQuery() {
LogicalPlan fullQuery = parser.createStatement("any where process_name == 'net.exe'");
Expression fullExpression = expr("process_name == 'net.exe'");
LogicalPlan filter = new Filter(Source.EMPTY, new UnresolvedRelation(Source.EMPTY, null, "", false, ""), fullExpression);
Order order = new Order(Source.EMPTY, new UnresolvedAttribute(Source.EMPTY, "@timestamp"), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan sorted = new OrderBy(Source.EMPTY, filter, singletonList(order));
LogicalPlan expected = new Project(Source.EMPTY, sorted, emptyList());
assertEquals(expected, fullQuery);
LogicalPlan filter = new Filter(Source.EMPTY, relation(), fullExpression);
Order order = new Order(Source.EMPTY, timestamp(), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan project = new Project(Source.EMPTY, filter, singletonList(timestamp()));
LogicalPlan sorted = new OrderBy(Source.EMPTY, project, singletonList(order));
assertEquals(sorted, fullQuery);
}
public void testEventQuery() {
LogicalPlan fullQuery = parser.createStatement("process where process_name == 'net.exe'");
Expression fullExpression = expr("event.category == 'process' and process_name == 'net.exe'");
LogicalPlan filter = new Filter(Source.EMPTY, new UnresolvedRelation(Source.EMPTY, null, "", false, ""), fullExpression);
Order order = new Order(Source.EMPTY, new UnresolvedAttribute(Source.EMPTY, "@timestamp"), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan sorted = new OrderBy(Source.EMPTY, filter, singletonList(order));
LogicalPlan expected = new Project(Source.EMPTY, sorted, emptyList());
assertEquals(expected, fullQuery);
LogicalPlan filter = new Filter(Source.EMPTY, relation(), fullExpression);
Order order = new Order(Source.EMPTY, timestamp(), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan project = new Project(Source.EMPTY, filter, singletonList(timestamp()));
LogicalPlan sorted = new OrderBy(Source.EMPTY, project, singletonList(order));
assertEquals(sorted, fullQuery);
}
public void testParameterizedEventQuery() {
@ -67,11 +75,11 @@ public class LogicalPlanTests extends ESTestCase {
LogicalPlan fullQuery = parser.createStatement("process where process_name == 'net.exe'", params);
Expression fullExpression = expr("myCustomEvent == 'process' and process_name == 'net.exe'");
LogicalPlan filter = new Filter(Source.EMPTY, new UnresolvedRelation(Source.EMPTY, null, "", false, ""), fullExpression);
Order order = new Order(Source.EMPTY, new UnresolvedAttribute(Source.EMPTY, "@timestamp"), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan sorted = new OrderBy(Source.EMPTY, filter, singletonList(order));
LogicalPlan expected = new Project(Source.EMPTY, sorted, emptyList());
assertEquals(expected, fullQuery);
LogicalPlan filter = new Filter(Source.EMPTY, relation(), fullExpression);
Order order = new Order(Source.EMPTY, timestamp(), OrderDirection.ASC, NullsPosition.FIRST);
LogicalPlan project = new Project(Source.EMPTY, filter, singletonList(timestamp()));
LogicalPlan sorted = new OrderBy(Source.EMPTY, project, singletonList(order));
assertEquals(sorted, fullQuery);
}
@ -85,6 +93,9 @@ public class LogicalPlanTests extends ESTestCase {
" " +
"until [process where event_subtype_full == \"termination_event\"]");
assertEquals(OrderBy.class, plan.getClass());
OrderBy ob = (OrderBy) plan;
plan = ob.child();
assertEquals(Join.class, plan.getClass());
Join join = (Join) plan;
assertEquals(KeyedFilter.class, join.until().getClass());
@ -113,6 +124,9 @@ public class LogicalPlanTests extends ESTestCase {
" [process where process_name == \"*\" ] " +
" [file where file_path == \"*\"]");
assertEquals(OrderBy.class, plan.getClass());
OrderBy ob = (OrderBy) plan;
plan = ob.child();
assertEquals(Sequence.class, plan.getClass());
Sequence seq = (Sequence) plan;
assertEquals(KeyedFilter.class, seq.until().getClass());

View File

@ -120,7 +120,7 @@ public class QueryFolderOkTests extends AbstractQueryFolderTestCase {
PhysicalPlan p = plan(query);
assertEquals(EsQueryExec.class, p.getClass());
EsQueryExec eqe = (EsQueryExec) p;
assertEquals(0, eqe.output().size());
assertEquals(1, eqe.output().size());
final String query = eqe.queryContainer().toString().replaceAll("\\s+", "");

View File

@ -564,4 +564,126 @@ sequence by pid with maxspan=500ms
sequence by pid with maxspan=2h
[process where process_name == "*" ]
[file where file_path == "*"]
;
;
//
// Pipes
//
security where event_id == 4624
| tail 10
;
process where true | head 6;
process where bad_field == null | head 5;
process where not (exit_code > -1)
and serial_event_id in (58, 64, 69, 74, 80, 85, 90, 93, 94)
| head 10
;
process where not (exit_code > -1) | head 7;
process where not (-1 < exit_code) | head 7;
file where true
| tail 3;
sequence
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence
[file where opcode=0] by unique_pid
[file where opcode=0] by unique_pid
| head 1;
sequence with maxspan=1d
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence with maxspan=1h
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence with maxspan=1m
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence with maxspan=10s
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence
[file where opcode=0 and file_name="*.exe"] by unique_pid
[file where opcode=0 and file_name="*.exe"] by unique_pid
until [process where opcode=5000] by unique_ppid
| head 1;
sequence
[file where opcode=0 and file_name="*.exe"] by unique_pid
[file where opcode=0 and file_name="*.exe"] by unique_pid
until [process where opcode=1] by unique_ppid
| head 1;
join
[file where opcode=0 and file_name="*.exe"] by unique_pid
[file where opcode=2 and file_name="*.exe"] by unique_pid
until [process where opcode=1] by unique_ppid
| head 1;
sequence by user_name
[file where opcode=0] by file_path
[process where opcode=1] by process_path
[process where opcode=2] by process_path
[file where opcode=2] by file_path
| tail 1;
sequence by user_name
[file where opcode=0] by pid,file_path
[file where opcode=2] by pid,file_path
until [process where opcode=5] by ppid,process_path
| head 2;
sequence by pid
[file where opcode=0] by file_path
[process where opcode=1] by process_path
[process where opcode=2] by process_path
[file where opcode=2] by file_path
| tail 1;
join by user_name
[file where true] by pid,file_path
[process where true] by ppid,process_path
| head 2;
process where fake_field != "*"
| head 4;
process where not (fake_field == "*")
| head 4;

View File

@ -22,10 +22,6 @@ process where process_name == "powershell.exe"
| head 50
;
security where event_id == 4624
| tail 10
;
file where true | sort file_name
;
@ -82,6 +78,12 @@ sequence by unique_pid [process where true] [file where true] fork=true;
// no longer supported
//sequence by unique_pid [process where true] [file where true] fork=1;
sequence
[process where true] by unique_pid
[file where true] fork=true by unique_pid
[process where true] by unique_ppid
| head 4;
sequence by unique_pid [process where true] [file where true] fork=false;
// no longer supported
@ -120,9 +122,6 @@ sequence by pid with maxspan=1.0075d
* https://raw.githubusercontent.com/endgameinc/eql/master/eql/etc/test_queries.toml
*/
process where true | head 6;
process where bad_field == null | head 5;
process where serial_event_id <= 8 and serial_event_id > 7
| filter serial_event_id == 8;
@ -143,15 +142,6 @@ process where true
;
process where not (exit_code > -1)
and serial_event_id in (58, 64, 69, 74, 80, 85, 90, 93, 94)
| head 10
;
process where not (exit_code > -1) | head 7;
process where not (-1 < exit_code) | head 7;
process where process_name == "VMACTHLP.exe" and unique_pid == 12 | filter true;
@ -212,10 +202,6 @@ process where opcode=1 and process_name == "smss.exe"
file where true
| tail 3;
file where true
| tail 4
@ -243,46 +229,6 @@ process where true
| sort md5, event_subtype_full, null_field, process_name
| sort serial_event_id;
sequence
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence with maxspan=1d
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence with maxspan=1h
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence with maxspan=1m
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence with maxspan=10s
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2;
sequence with maxspan=0.5s
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
@ -291,55 +237,14 @@ sequence with maxspan=0.5s
| head 4
| tail 2;
sequence
[file where opcode=0] by unique_pid
[file where opcode=0] by unique_pid
| head 1;
sequence
[file where opcode=0] by unique_pid
[file where opcode=0] by unique_pid
| filter events[1].serial_event_id == 92;
sequence
[file where opcode=0 and file_name="*.exe"] by unique_pid
[file where opcode=0 and file_name="*.exe"] by unique_pid
until [process where opcode=5000] by unique_ppid
| head 1;
sequence
[file where opcode=0 and file_name="*.exe"] by unique_pid
[file where opcode=0 and file_name="*.exe"] by unique_pid
until [process where opcode=1] by unique_ppid
| head 1;
join
[file where opcode=0 and file_name="*.exe"] by unique_pid
[file where opcode=2 and file_name="*.exe"] by unique_pid
until [process where opcode=1] by unique_ppid
| head 1
;
join by string(unique_pid)
[process where opcode=1]
[file where opcode=0 and file_name="svchost.exe"]
[file where opcode == 0 and file_name == "lsass.exe"]
| head 1
;
join by string(unique_pid), unique_pid, unique_pid * 2
[process where opcode=1]
[file where opcode=0 and file_name="svchost.exe"]
[file where opcode == 0 and file_name == "lsass.exe"]
until [file where opcode == 2]
: tail 1
;
any where true
| unique event_type_full;
process where opcode=1 and process_name in ("services.exe", "smss.exe", "lsass.exe")
and descendant of [process where process_name == "cmd.exe" ];
@ -392,39 +297,6 @@ sequence
| tail 1
;
sequence by user_name
[file where opcode=0] by file_path
[process where opcode=1] by process_path
[process where opcode=2] by process_path
[file where opcode=2] by file_path
| tail 1;
sequence by user_name
[file where opcode=0] by pid,file_path
[file where opcode=2] by pid,file_path
until [process where opcode=5] by ppid,process_path
| head 2;
sequence by pid
[file where opcode=0] by file_path
[process where opcode=1] by process_path
[process where opcode=2] by process_path
[file where opcode=2] by file_path
| tail 1;
join by user_name
[file where true] by pid,file_path
[process where true] by ppid,process_path
| head 2;
sequence
[process where true] by unique_pid
[file where true] fork=true by unique_pid
[process where true] by unique_ppid
| head 4;
process where 'net.EXE' == original_file_name
| filter process_name="net*.exe"
@ -443,15 +315,6 @@ process where original_file_name == process_name
process where process_name != original_file_name
| filter length(original_file_name) > 0;
process where fake_field != "*"
| head 4;
process where not (fake_field == "*")
| head 4;
any where process_name == "svchost.exe"
| unique_count event_type_full, process_name;

View File

@ -27,6 +27,7 @@ import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.LessT
import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.NotEquals;
import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.NullEquals;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
import org.elasticsearch.xpack.ql.rule.Rule;
@ -1057,7 +1058,7 @@ public final class OptimizerRules {
return filter.child();
}
if (FALSE.equals(condition) || Expressions.isNull(condition)) {
return nonMatchingFilter(filter);
return skipPlan(filter);
}
}
@ -1067,7 +1068,7 @@ public final class OptimizerRules {
return filter;
}
protected abstract LogicalPlan nonMatchingFilter(Filter filter);
protected abstract LogicalPlan skipPlan(Filter filter);
private static Expression foldBinaryLogic(Expression expression) {
if (expression instanceof Or) {
@ -1120,6 +1121,21 @@ public final class OptimizerRules {
}
}
public abstract static class SkipQueryOnLimitZero extends OptimizerRule<Limit> {
@Override
protected LogicalPlan rule(Limit limit) {
if (limit.limit().foldable()) {
if (Integer.valueOf(0).equals((limit.limit().fold()))) {
return skipPlan(limit);
}
}
return limit;
}
protected abstract LogicalPlan skipPlan(Limit limit);
}
public static final class SetAsOptimized extends Rule<LogicalPlan, LogicalPlan> {
@Override

View File

@ -9,12 +9,11 @@ import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.index.EsIndex;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.NodeUtils;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.EsField;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -99,36 +98,8 @@ public class EsRelation extends LeafPlan {
&& frozen == other.frozen;
}
private static final int TO_STRING_LIMIT = 52;
private static <E> String limitedToString(Collection<E> c) {
Iterator<E> it = c.iterator();
if (!it.hasNext()) {
return "[]";
}
// ..]
StringBuilder sb = new StringBuilder(TO_STRING_LIMIT + 4);
sb.append('[');
for (;;) {
E e = it.next();
String next = e == c ? "(this Collection)" : String.valueOf(e);
if (next.length() + sb.length() > TO_STRING_LIMIT) {
sb.append(next.substring(0, Math.max(0, TO_STRING_LIMIT - sb.length())));
sb.append('.').append('.').append(']');
return sb.toString();
} else {
sb.append(next);
}
if (!it.hasNext()) {
return sb.append(']').toString();
}
sb.append(',').append(' ');
}
}
@Override
public String nodeString() {
return nodeName() + "[" + index + "]" + limitedToString(attrs);
return nodeName() + "[" + index + "]" + NodeUtils.limitedToString(attrs);
}
}

View File

@ -5,6 +5,9 @@
*/
package org.elasticsearch.xpack.ql.tree;
import java.util.Collection;
import java.util.Iterator;
public abstract class NodeUtils {
public static <A extends Node<A>, B extends Node<B>> String diffString(A left, B right) {
return diffString(left.toString(), right.toString());
@ -53,4 +56,33 @@ public abstract class NodeUtils {
}
return sb.toString();
}
private static final int TO_STRING_LIMIT = 52;
public static <E> String limitedToString(Collection<E> c) {
Iterator<E> it = c.iterator();
if (!it.hasNext()) {
return "[]";
}
// ..]
StringBuilder sb = new StringBuilder(TO_STRING_LIMIT + 4);
sb.append('[');
for (;;) {
E e = it.next();
String next = e == c ? "(this Collection)" : String.valueOf(e);
if (next.length() + sb.length() > TO_STRING_LIMIT) {
sb.append(next.substring(0, Math.max(0, TO_STRING_LIMIT - sb.length())));
sb.append('.').append('.').append(']');
return sb.toString();
} else {
sb.append(next);
}
if (!it.hasNext()) {
return sb.append(']').toString();
}
sb.append(',').append(' ');
}
}
}

View File

@ -337,4 +337,4 @@ public final class StringUtils {
}
}
}
}

View File

@ -1113,24 +1113,23 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
static class PruneFilters extends org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PruneFilters {
@Override
protected LogicalPlan nonMatchingFilter(Filter filter) {
return new LocalRelation(filter.source(), new EmptyExecutable(filter.output()));
protected LogicalPlan skipPlan(Filter filter) {
return Optimizer.skipPlan(filter);
}
}
static class SkipQueryOnLimitZero extends org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SkipQueryOnLimitZero {
static class SkipQueryOnLimitZero extends OptimizerRule<Limit> {
@Override
protected LogicalPlan rule(Limit limit) {
if (limit.limit() instanceof Literal) {
if (Integer.valueOf(0).equals((limit.limit().fold()))) {
return new LocalRelation(limit.source(), new EmptyExecutable(limit.output()));
}
}
return limit;
protected LogicalPlan skipPlan(Limit limit) {
return Optimizer.skipPlan(limit);
}
}
private static LogicalPlan skipPlan(UnaryPlan plan) {
return new LocalRelation(plan.source(), new EmptyExecutable(plan.output()));
}
static class SkipQueryIfFoldingProjection extends OptimizerRule<LogicalPlan> {
@Override
protected LogicalPlan rule(LogicalPlan plan) {

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.NodeUtils;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Executable;
@ -75,4 +76,9 @@ public class LocalRelation extends LogicalPlan implements Executable {
LocalRelation other = (LocalRelation) obj;
return Objects.equals(executable, other.executable);
}
}
@Override
public String nodeString() {
return nodeName() + NodeUtils.limitedToString(output());
}
}