EQL: Hook engine to Elasticsearch (#52828)

Add query execution and return actual results returned from
Elasticsearch inside the tests

(cherry picked from commit 3e039282bf991af87604a6d4f8eada19d5e33842)
This commit is contained in:
Costin Leau 2020-02-27 11:16:26 +02:00 committed by Costin Leau
parent 69b78f7f8a
commit 40bc06f6ad
14 changed files with 396 additions and 61 deletions

View File

@ -19,6 +19,8 @@
package org.elasticsearch.client;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.eql.EqlSearchRequest;
import org.elasticsearch.client.eql.EqlSearchResponse;
import org.junit.Before;
@ -33,9 +35,23 @@ public class EqlIT extends ESRestHighLevelClientTestCase {
}
public void testBasicSearch() throws Exception {
Request doc1 = new Request(HttpPut.METHOD_NAME, "/index/_doc/1");
doc1.setJsonEntity("{\"event_subtype_full\": \"already_running\", " +
"\"event_type\": \"process\", " +
"\"event_type_full\": \"process_event\", " +
"\"opcode\": 3," +
"\"pid\": 0," +
"\"process_name\": \"System Idle Process\"," +
"\"serial_event_id\": 1," +
"\"subtype\": \"create\"," +
"\"timestamp\": 116444736000000000," +
"\"unique_pid\": 1}");
client().performRequest(doc1);
client().performRequest(new Request(HttpPost.METHOD_NAME, "/_refresh"));
EqlClient eql = highLevelClient().eql();
// TODO: Add real checks when end-to-end basic functionality is implemented
EqlSearchRequest request = new EqlSearchRequest("test", "test");
EqlSearchRequest request = new EqlSearchRequest("index", "process where true");
EqlSearchResponse response = execute(request, eql::search, eql::searchAsync);
assertNotNull(response);
assertFalse(response.isTimeout());

View File

@ -34,6 +34,7 @@ specified in the `rule` parameter. The EQL query matches events with an
----
GET sec_logs/_eql/search
{
"event_type_field": "event.category",
"rule": """
process where process.name == "cmd.exe"
"""

View File

@ -7,13 +7,11 @@ setup:
- index:
_index: eql_test
_id: 1
- str: test1
int: 1
- event_type: process
user: SYSTEM
---
# Testing round-trip and the basic shape of the response
# Currently not implemented or wired and always returns empty result.
# TODO: define more test once everything is wired up
"Execute some EQL.":
- do:
eql.search:
@ -22,6 +20,7 @@ setup:
rule: "process where user = 'SYSTEM'"
- match: {timed_out: false}
- match: {took: 0}
- match: {hits.total.value: 1}
- match: {hits.total.relation: "eq"}
- match: {hits.events.0._source.user: "SYSTEM"}

View File

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.eql.session.Configuration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.index.IndexResolver;
import org.elasticsearch.xpack.ql.util.StringUtils;
import java.util.Collections;
import java.util.List;
public class Querier {
private static final Logger log = LogManager.getLogger(Querier.class);
private final Configuration cfg;
private final Client client;
private final TimeValue keepAlive;
private final QueryBuilder filter;
public Querier(EqlSession eqlSession) {
this.cfg = eqlSession.configuration();
this.client = eqlSession.client();
this.keepAlive = cfg.requestTimeout();
this.filter = cfg.filter();
}
public void query(List<Attribute> output, QueryContainer container, String index, ActionListener<Results> listener) {
// prepare the request
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(container, filter, cfg.size());
// set query timeout
sourceBuilder.timeout(cfg.requestTimeout());
if (log.isTraceEnabled()) {
log.trace("About to execute query {} on {}", StringUtils.toString(sourceBuilder), index);
}
SearchRequest search = prepareRequest(client, sourceBuilder, cfg.requestTimeout(), false,
Strings.commaDelimitedListToStringArray(index));
ActionListener<SearchResponse> l = new SearchAfterListener(listener, client, cfg, output, container, search);
client.search(search, l);
}
public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
String... indices) {
return client.prepareSearch(indices)
// always track total hits accurately
.setTrackTotalHits(true)
.setAllowPartialSearchResults(false)
.setSource(source)
.setTimeout(timeout)
.setIndicesOptions(
includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS)
.request();
}
protected static void logSearchResponse(SearchResponse response, Logger logger) {
List<Aggregation> aggs = Collections.emptyList();
if (response.getAggregations() != null) {
aggs = response.getAggregations().asList();
}
StringBuilder aggsNames = new StringBuilder();
for (int i = 0; i < aggs.size(); i++) {
aggsNames.append(aggs.get(i).getName() + (i + 1 == aggs.size() ? "" : ", "));
}
logger.trace("Got search response [hits {} {}, {} aggregations: [{}], {} failed shards, {} skipped shards, "
+ "{} successful shards, {} total shards, took {}, timed out [{}]]", response.getHits().getTotalHits().relation.toString(),
response.getHits().getTotalHits().value, aggs.size(), aggsNames, response.getFailedShards(), response.getSkippedShards(),
response.getSuccessfulShards(), response.getTotalShards(), response.getTook(), response.isTimedOut());
}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor;
import org.elasticsearch.xpack.eql.querydsl.container.ComputedRef;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.eql.querydsl.container.SearchHitFieldRef;
import org.elasticsearch.xpack.eql.session.Configuration;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.execution.search.FieldExtraction;
import org.elasticsearch.xpack.ql.execution.search.extractor.ComputingExtractor;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.gen.pipeline.HitExtractorInput;
import org.elasticsearch.xpack.ql.expression.gen.pipeline.Pipe;
import org.elasticsearch.xpack.ql.expression.gen.pipeline.ReferenceInput;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
class SearchAfterListener implements ActionListener<SearchResponse> {
private static final Logger log = LogManager.getLogger(SearchAfterListener.class);
private final ActionListener<Results> listener;
private final Client client;
private final Configuration cfg;
private final List<Attribute> output;
private final QueryContainer container;
private final SearchRequest request;
SearchAfterListener(ActionListener<Results> listener, Client client, Configuration cfg, List<Attribute> output,
QueryContainer container, SearchRequest request) {
this.listener = listener;
this.client = client;
this.cfg = cfg;
this.output = output;
this.container = container;
this.request = request;
}
@Override
public void onResponse(SearchResponse response) {
try {
ShardSearchFailure[] failures = response.getShardFailures();
if (CollectionUtils.isEmpty(failures) == false) {
listener.onFailure(new EqlIllegalArgumentException(failures[0].reason(), failures[0].getCause()));
} else {
handleResponse(response, listener);
}
} catch (Exception ex) {
listener.onFailure(ex);
}
}
private void handleResponse(SearchResponse response, ActionListener<Results> listener) {
// create response extractors for the first time
List<Tuple<FieldExtraction, String>> refs = container.fields();
List<HitExtractor> exts = new ArrayList<>(refs.size());
for (Tuple<FieldExtraction, String> ref : refs) {
exts.add(createExtractor(ref.v1()));
}
if (log.isTraceEnabled()) {
Querier.logSearchResponse(response, log);
}
List<?> results = Arrays.asList(response.getHits().getHits());
listener.onResponse(new Results(response.getHits().getTotalHits(), response.getTook(), response.isTimedOut(), results));
}
private HitExtractor createExtractor(FieldExtraction ref) {
if (ref instanceof SearchHitFieldRef) {
SearchHitFieldRef f = (SearchHitFieldRef) ref;
return new FieldHitExtractor(f.name(), f.fullFieldName(), f.getDataType(), cfg.zoneId(), f.useDocValue(), f.hitName(), false);
}
if (ref instanceof ComputedRef) {
Pipe proc = ((ComputedRef) ref).processor();
// collect hitNames
Set<String> hitNames = new LinkedHashSet<>();
proc = proc.transformDown(l -> {
HitExtractor he = createExtractor(l.context());
hitNames.add(he.hitName());
if (hitNames.size() > 1) {
throw new EqlIllegalArgumentException("Multi-level nested fields [{}] not supported yet", hitNames);
}
return new HitExtractorInput(l.source(), l.expression(), he);
}, ReferenceInput.class);
String hitName = null;
if (hitNames.size() == 1) {
hitName = hitNames.iterator().next();
}
return new ComputingExtractor(proc.asProcessor(), hitName);
}
throw new EqlIllegalArgumentException("Unexpected value reference {}", ref.getClass());
}
@Override
public void onFailure(Exception ex) {
listener.onFailure(ex);
}
}

View File

@ -50,6 +50,15 @@ public abstract class SourceGenerator {
sortBuilder.build(source);
optimize(sortBuilder, source);
// set fetch size
if (size != null) {
int sz = size;
if (source.size() == -1) {
source.size(sz);
}
}
return source;
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.execution.search.extractor;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.ql.execution.search.extractor.AbstractFieldHitExtractor;
import org.elasticsearch.xpack.ql.type.DataType;
import org.elasticsearch.xpack.ql.util.DateUtils;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import static org.elasticsearch.xpack.ql.type.DataTypes.DATETIME;
public class FieldHitExtractor extends AbstractFieldHitExtractor {
static final String NAME = "f";
public FieldHitExtractor(StreamInput in) throws IOException {
super(in);
}
public FieldHitExtractor(String name, String fullFieldName, DataType dataType, ZoneId zoneId, boolean useDocValue, String hitName,
boolean arrayLeniency) {
super(name, fullFieldName, dataType, zoneId, useDocValue, hitName, arrayLeniency);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
protected ZoneId readZoneId(StreamInput in) throws IOException {
return DateUtils.UTC;
}
@Override
protected Object unwrapCustomValue(Object values) {
DataType dataType = dataType();
if (dataType == DATETIME) {
if (values instanceof String) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(values.toString())), zoneId());
}
}
return null;
}
@Override
protected boolean isPrimitive(List<?> list) {
return false;
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.eql.plan.physical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.execution.search.Querier;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
@ -49,7 +50,7 @@ public class EsQueryExec extends LeafExec {
@Override
public void execute(EqlSession session, ActionListener<Results> listener) {
throw new UnsupportedOperationException();
new Querier(session).query(output, queryContainer, index, listener);
}
@Override

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.eql.plugin;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@ -15,7 +14,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -30,8 +28,8 @@ import org.elasticsearch.xpack.eql.session.Configuration;
import org.elasticsearch.xpack.eql.session.Results;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import static org.elasticsearch.action.ActionListener.wrap;
public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRequest, EqlSearchResponse> {
private final SecurityContext securityContext;
@ -68,23 +66,16 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRe
.fieldTimestamp(request.timestampField())
.implicitJoinKey(request.implicitJoinKeyField());
Configuration cfg = new Configuration(request.indices(), zoneId, username, clusterName, filter, timeout, includeFrozen, clientId);
//planExecutor.eql(cfg, request.rule(), params, wrap(r -> listener.onResponse(createResponse(r)), listener::onFailure));
listener.onResponse(createResponse(null));
Configuration cfg = new Configuration(request.indices(), zoneId, username, clusterName, filter, timeout, request.fetchSize(),
includeFrozen, clientId);
planExecutor.eql(cfg, request.rule(), params, wrap(r -> listener.onResponse(createResponse(r)), listener::onFailure));
}
static EqlSearchResponse createResponse(Results results) {
// Stubbed search response
// TODO: implement actual search response processing once the parser/executor is in place
// Updated for stubbed response to: process where serial_event_id = 1
// to validate the sample test until the engine is wired in.
List<SearchHit> events = Arrays.asList(
new SearchHit(1, "111", null, null)
);
EqlSearchResponse.Hits hits = new EqlSearchResponse.Hits(events, null,
null, new TotalHits(1, TotalHits.Relation.EQUAL_TO));
EqlSearchResponse.Hits hits = new EqlSearchResponse.Hits(results.searchHits(), results.sequences(), results.counts(), results
.totalHits());
return new EqlSearchResponse(hits, 0, false);
return new EqlSearchResponse(hits, results.tookTime().getMillis(), results.timedOut());
}
static String username(SecurityContext securityContext) {
@ -94,4 +85,4 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRe
static String clusterName(ClusterService clusterService) {
return clusterService.getClusterName().value();
}
}
}

View File

@ -16,20 +16,22 @@ public class Configuration extends org.elasticsearch.xpack.ql.session.Configurat
private final String[] indices;
private final TimeValue requestTimeout;
private final int size;
private final String clientId;
private final boolean includeFrozenIndices;
@Nullable
private QueryBuilder filter;
public Configuration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter,
TimeValue requestTimeout, boolean includeFrozen, String clientId) {
public Configuration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter, TimeValue requestTimeout,
int size, boolean includeFrozen, String clientId) {
super(zi, username, clusterName);
this.indices = indices;
this.filter = filter;
this.requestTimeout = requestTimeout;
this.size = size;
this.clientId = clientId;
this.includeFrozenIndices = includeFrozen;
}
@ -46,6 +48,10 @@ public class Configuration extends org.elasticsearch.xpack.ql.session.Configurat
return filter;
}
public int size() {
return size;
}
public String clientId() {
return clientId;
}

View File

@ -8,6 +8,10 @@ package org.elasticsearch.xpack.eql.session;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Count;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence;
import java.util.List;
@ -15,21 +19,65 @@ import static java.util.Collections.emptyList;
public class Results {
public static final Results EMPTY = new Results(new TotalHits(0, Relation.EQUAL_TO), emptyList());
private enum Type {
SEARCH_HIT,
SEQUENCE,
COUNT;
}
public static final Results EMPTY = new Results(new TotalHits(0, Relation.EQUAL_TO), TimeValue.MINUS_ONE, false, emptyList());
private final TotalHits totalHits;
private final List<Object> results;
private final List<?> results;
private final boolean timedOut;
private final TimeValue tookTime;
private final Type type;
public Results(TotalHits totalHits, List<Object> results) {
public Results(TotalHits totalHits, TimeValue tookTime, boolean timedOut, List<?> results) {
this.totalHits = totalHits;
this.tookTime = tookTime;
this.timedOut = timedOut;
this.results = results;
Type t = Type.SEARCH_HIT;
if (results.isEmpty() == false) {
Object o = results.get(0);
if (o instanceof Sequence) {
t = Type.SEQUENCE;
}
if (o instanceof Count) {
t = Type.COUNT;
}
}
type = t;
}
public TotalHits totalHits() {
return totalHits;
}
public List<Object> results() {
return results;
@SuppressWarnings("unchecked")
public List<SearchHit> searchHits() {
return type == Type.SEARCH_HIT ? (List<SearchHit>) results : null;
}
}
@SuppressWarnings("unchecked")
public List<Sequence> sequences() {
return type == Type.SEQUENCE ? (List<Sequence>) results : null;
}
@SuppressWarnings("unchecked")
public List<Count> counts() {
return type == Type.COUNT ? (List<Count>) results : null;
}
public TimeValue tookTime() {
return tookTime;
}
public boolean timedOut() {
return timedOut;
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.session;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.search.SearchHit;
import java.util.List;
public class Sequence {
private final List<Tuple<Object, List<SearchHit>>> events;
public Sequence(List<Tuple<Object, List<SearchHit>>> events) {
this.events = events;
}
public List<Tuple<Object, List<SearchHit>>> events() {
return events;
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.xpack.eql.session.Configuration;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong;
import static org.elasticsearch.test.ESTestCase.randomZone;
@ -20,7 +21,7 @@ public final class EqlTestUtils {
}
public static final Configuration TEST_CFG = new Configuration(new String[]{"none"}, org.elasticsearch.xpack.ql.util.DateUtils.UTC,
"nobody", "cluster", null, TimeValue.timeValueSeconds(30), false, "");
"nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, "");
public static Configuration randomConfiguration() {
return new Configuration(new String[]{randomAlphaOfLength(16)},
@ -29,6 +30,7 @@ public final class EqlTestUtils {
randomAlphaOfLength(16),
null,
new TimeValue(randomNonNegativeLong()),
randomIntBetween(5, 100),
randomBoolean(),
randomAlphaOfLength(16));
}

View File

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.eql.action;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.Build;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -105,7 +106,7 @@ public class EqlActionIT extends AbstractEqlIntegTestCase {
final int len = events.size();
final long ids[] = new long[len];
for (int i = 0; i < events.size(); i++) {
ids[i] = events.get(i).docId();
ids[i] = ((Number) events.get(i).getSourceAsMap().get("serial_event_id")).longValue();
}
final String msg = "unexpected result for spec: [" + spec.toString() + "]";
assertArrayEquals(msg, spec.expectedEventIds(), ids);