Add telemetry metrics (#59526)

This commit is contained in:
Andrei Stefan 2020-07-14 16:25:24 +03:00 committed by GitHub
parent 59f639a279
commit cf752992d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 881 additions and 32 deletions

View File

@ -52,7 +52,7 @@ public class EqlFeatureSetUsage extends XPackFeatureSet.Usage {
@Override
public Version getMinimalSupportedVersion() {
return Version.V_7_7_0;
return Version.V_7_9_0;
}
}

View File

@ -39,7 +39,7 @@ public class DataLoader {
private static final String TEST_DATA = "/test_data.json";
private static final String MAPPING = "/mapping-default.json";
static final String indexPrefix = "endgame";
static final String testIndexName = indexPrefix + "-1.4.0";
public static final String testIndexName = indexPrefix + "-1.4.0";
public static void main(String[] args) throws IOException {
try (RestClient client = RestClient.builder(new HttpHost("localhost", 9200)).build()) {
@ -52,15 +52,23 @@ public class DataLoader {
}
}
@SuppressWarnings("unchecked")
protected static void loadDatasetIntoEs(RestHighLevelClient client,
public static void loadDatasetIntoEs(RestHighLevelClient client,
CheckedBiFunction<XContent, InputStream, XContentParser, IOException> p) throws IOException {
createTestIndex(client);
loadData(client, p);
}
private static void createTestIndex(RestHighLevelClient client) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(testIndexName)
.mapping(Streams.readFully(DataLoader.class.getResourceAsStream(MAPPING)), XContentType.JSON);
client.indices().create(request, RequestOptions.DEFAULT);
}
@SuppressWarnings("unchecked")
private static void loadData(RestHighLevelClient client, CheckedBiFunction<XContent, InputStream, XContentParser, IOException> p)
throws IOException {
BulkRequest bulk = new BulkRequest();
bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.test.eql.stats;
import java.util.Locale;
public enum FeatureMetric {
SEQUENCE,
JOIN,
EVENT,
SEQUENCE_MAXSPAN,
SEQUENCE_UNTIL,
SEQUENCE_QUERIES_TWO,
SEQUENCE_QUERIES_THREE,
SEQUENCE_QUERIES_FOUR,
SEQUENCE_QUERIES_FIVE_OR_MORE,
JOIN_QUERIES_TWO,
JOIN_QUERIES_THREE,
JOIN_QUERIES_FOUR,
JOIN_QUERIES_FIVE_OR_MORE,
JOIN_UNTIL,
JOIN_KEYS_ONE,
JOIN_KEYS_TWO,
JOIN_KEYS_THREE,
JOIN_KEYS_FOUR,
JOIN_KEYS_FIVE_OR_MORE,
PIPE_HEAD,
PIPE_TAIL;
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,378 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.test.eql.stats;
import org.elasticsearch.Build;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.eql.DataLoader;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.unmodifiableSet;
/**
* Tests a random number of queries that increase various (most of the times, one query will "touch" multiple metrics values) metrics.
*/
public abstract class RestEqlUsageTestCase extends ESRestTestCase {
private RestHighLevelClient highLevelClient;
private Map<String, Integer> baseMetrics = new HashMap<String, Integer>();
private Integer baseAllTotalQueries = 0;
private Integer baseAllFailedQueries = 0;
@BeforeClass
public static void checkForSnapshot() {
assumeTrue("Only works on snapshot builds for now", Build.CURRENT.isSnapshot());
}
/**
* This method gets the metrics' values before the test runs, in case these values
* were changed by other tests running in the same REST test cluster. The test itself
* will count the new metrics' values starting from the base values initialized here.
* These values will increase during the execution of the test with updates in {@link #assertFeatureMetric(int, Map, String)}
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Before
private void getBaseMetrics() throws UnsupportedOperationException, IOException {
Map<String, Object> baseStats = getStats();
List<Map<String, Map<String, Map>>> nodesListStats = (List) baseStats.get("stats");
for (Map perNodeStats : nodesListStats) {
Map queriesMetrics = (Map) ((Map) perNodeStats.get("stats")).get("queries");
Map featuresMetrics = getFeaturesMetrics(perNodeStats);
for (FeatureMetric metric : FeatureMetric.values()) {
String metricName = metric.toString();
if (baseMetrics.containsKey(metricName)) {
baseMetrics.put(metricName, baseMetrics.get(metricName) + ((Integer) featuresMetrics.get(metricName)));
} else {
baseMetrics.put(metricName, (Integer) featuresMetrics.get(metricName));
}
}
// initialize the "base" metric values with whatever values are already recorded on ES
baseAllTotalQueries += ((Map<String, Integer>) queriesMetrics.get("_all")).get("total");
baseAllFailedQueries += ((Map<String, Integer>) queriesMetrics.get("_all")).get("failed");
}
}
/**
* "Flatten" the response from ES putting all the features metrics in the same Map.
* "features": {
* "joins": {
* "join_queries_three": 0,
* "join_queries_two": 0,
* "join_until": 0,
* "join_queries_five_or_more": 0,
* "join_queries_four": 0
* },
* "sequence": 0,
* "keys": {
* "join_keys_two": 0,
* "join_keys_one": 0,
* "join_keys_three": 0,
* "join_keys_five_or_more": 0,
* "join_keys_four": 0
* },
* "join": 0,
* "sequences": {
* "sequence_queries_three": 0,
* "sequence_queries_four": 0,
* "sequence_queries_two": 0,
* "sequence_until": 0,
* "sequence_queries_five_or_more": 0,
* "sequence_maxspan": 0
* },
* "event": 0,
* "pipes": {
* "pipe_tail": 0,
* "pipe_head": 0
* }
* }
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private Map getFeaturesMetrics(Map perNodeStats) {
Map featuresMetrics = (Map) ((Map) perNodeStats.get("stats")).get("features");
featuresMetrics.putAll((Map) featuresMetrics.get("keys"));
featuresMetrics.putAll((Map) featuresMetrics.get("sequences"));
featuresMetrics.putAll((Map) featuresMetrics.get("joins"));
featuresMetrics.putAll((Map) featuresMetrics.get("pipes"));
return featuresMetrics;
}
public void testEqlRestUsage() throws IOException {
DataLoader.loadDatasetIntoEs(highLevelClient(), (t, u) -> createParser(t, u));
//
// random event queries
//
int randomEventExecutions = randomIntBetween(1, 15);
int allTotalQueries = baseAllTotalQueries + randomEventExecutions;
for (int i = 0; i < randomEventExecutions; i++) {
runEql("process where serial_event_id < 4 | head 3");
}
Map<String, Object> responseAsMap = getStats();
Set<String> metricsToCheck = unmodifiableSet(new HashSet<>(Arrays.asList("pipe_head", "event")));
assertFeaturesMetrics(randomEventExecutions, responseAsMap, metricsToCheck);
assertFeaturesMetricsExcept(responseAsMap, metricsToCheck);
assertAllQueryMetrics(allTotalQueries, responseAsMap);
//
// random two sequences queries
//
int randomSequenceExecutions = randomIntBetween(1, 15);
allTotalQueries += randomSequenceExecutions;
for (int i = 0; i < randomSequenceExecutions; i++) {
runEql("sequence [process where serial_event_id = 1] [process where serial_event_id = 2]");
}
responseAsMap = getStats();
metricsToCheck = unmodifiableSet(new HashSet<>(Arrays.asList("sequence", "sequence_queries_two", "pipe_head")));
assertFeaturesMetrics(randomSequenceExecutions, responseAsMap, metricsToCheck);
assertFeaturesMetricsExcept(responseAsMap, metricsToCheck);
assertAllQueryMetrics(allTotalQueries, responseAsMap);
//
// random tail queries
//
int randomTailExecutions = randomIntBetween(1, 15);
allTotalQueries += randomTailExecutions;
for (int i = 0; i < randomTailExecutions; i++) {
runEql("process where serial_event_id < 4 | tail 2");
}
responseAsMap = getStats();
metricsToCheck = unmodifiableSet(new HashSet<>(Arrays.asList("pipe_tail", "event")));
assertFeaturesMetrics(randomTailExecutions, responseAsMap, metricsToCheck);
assertFeaturesMetricsExcept(responseAsMap, metricsToCheck);
assertAllQueryMetrics(allTotalQueries, responseAsMap);
//
// random sequence with maxspan and four queries
//
int randomMaxspanExecutions = randomIntBetween(1, 15);
allTotalQueries += randomMaxspanExecutions;
for (int i = 0; i < randomMaxspanExecutions; i++) {
runEql("sequence with maxspan=1d" +
" [process where serial_event_id < 4] by exit_code" +
" [process where opcode == 1] by user" +
" [process where opcode == 2] by user" +
" [file where parent_process_name == 'file_delete_event'] by exit_code" +
" until [process where opcode=1] by ppid" +
" | head 4" +
" | tail 2");
}
responseAsMap = getStats();
metricsToCheck = unmodifiableSet(new HashSet<>(Arrays.asList("sequence", "sequence_maxspan", "sequence_queries_four",
"pipe_head", "pipe_tail", "join_keys_one", "sequence_until")));
assertFeaturesMetrics(randomMaxspanExecutions, responseAsMap, metricsToCheck);
assertFeaturesMetricsExcept(responseAsMap, metricsToCheck);
assertAllQueryMetrics(allTotalQueries, responseAsMap);
//
// random sequence with three queries
//
int randomThreeQueriesSequences = randomIntBetween(1, 15);
allTotalQueries += randomThreeQueriesSequences;
for (int i = 0; i < randomThreeQueriesSequences; i++) {
runEql("sequence with maxspan=1d" +
" [process where serial_event_id < 4] by exit_code" +
" [process where opcode == 1] by user" +
" [process where opcode == 2] by user");
}
responseAsMap = getStats();
metricsToCheck = unmodifiableSet(new HashSet<>(Arrays.asList("sequence", "sequence_queries_three", "pipe_head", "join_keys_one",
"sequence_maxspan")));
assertFeaturesMetrics(randomThreeQueriesSequences, responseAsMap, metricsToCheck);
assertFeaturesMetricsExcept(responseAsMap, metricsToCheck);
assertAllQueryMetrics(allTotalQueries, responseAsMap);
//
// random sequence with five queries and three join keys
//
int randomFiveQueriesSequences = randomIntBetween(1, 15);
allTotalQueries += randomFiveQueriesSequences;
for (int i = 0; i < randomFiveQueriesSequences; i++) {
runEql("sequence by user, ppid, exit_code with maxspan=1m" +
" [process where serial_event_id < 4]" +
" [process where opcode == 1]" +
" [file where parent_process_name == 'file_delete_event']" +
" [process where serial_event_id < 4]" +
" [process where opcode == 1]" +
"| tail 4");
}
responseAsMap = getStats();
metricsToCheck = unmodifiableSet(new HashSet<>(Arrays.asList("sequence", "sequence_queries_five_or_more", "pipe_tail",
"join_keys_three", "sequence_maxspan")));
assertFeaturesMetrics(randomFiveQueriesSequences, responseAsMap, metricsToCheck);
assertFeaturesMetricsExcept(responseAsMap, metricsToCheck);
assertAllQueryMetrics(allTotalQueries, responseAsMap);
//
// random sequence with four join keys
//
int randomFourJoinKeysExecutions = randomIntBetween(1, 15);
allTotalQueries += randomFourJoinKeysExecutions;
for (int i = 0; i < randomFourJoinKeysExecutions; i++) {
runEql("sequence by exit_code, user, serial_event_id, pid" +
" [process where serial_event_id < 4]" +
" [process where opcode == 1]");
}
responseAsMap = getStats();
metricsToCheck = unmodifiableSet(new HashSet<>(Arrays.asList("sequence", "sequence_queries_two", "pipe_head", "join_keys_four")));
assertFeaturesMetrics(randomFourJoinKeysExecutions, responseAsMap, metricsToCheck);
assertFeaturesMetricsExcept(responseAsMap, metricsToCheck);
assertAllQueryMetrics(allTotalQueries, responseAsMap);
//
// random sequence with five join keys
//
int randomFiveJoinKeysExecutions = randomIntBetween(1, 15);
allTotalQueries += randomFiveJoinKeysExecutions;
for (int i = 0; i < randomFiveJoinKeysExecutions; i++) {
runEql("sequence by exit_code, user, serial_event_id, pid, ppid" +
" [process where serial_event_id < 4]" +
" [process where opcode == 1]");
}
responseAsMap = getStats();
metricsToCheck = unmodifiableSet(new HashSet<>(Arrays.asList("sequence", "sequence_queries_two", "pipe_head",
"join_keys_five_or_more")));
assertFeaturesMetrics(randomFiveJoinKeysExecutions, responseAsMap, metricsToCheck);
assertFeaturesMetricsExcept(responseAsMap, metricsToCheck);
assertAllQueryMetrics(allTotalQueries, responseAsMap);
//
// random failed queries
//
int randomFailedExecutions = randomIntBetween(1, 15);
int allFailedQueries = baseAllFailedQueries + randomFailedExecutions;
allTotalQueries += randomFailedExecutions;
for (int i = 0; i < randomFailedExecutions; i++) {
// not interested in the exception type, but in the fact that the metrics are incremented when an exception is thrown
expectThrows(Exception.class, () -> {
runEql(
randomFrom(
"process where missing_field < 4 | tail 2",
"sequence abc [process where serial_event_id = 1]",
"sequence with maxspan=1x [process where serial_event_id = 1]",
"sequence by exit_code, user [process where serial_event_id < 4] by ppid",
"sequence by"
)
);
});
}
responseAsMap = getStats();
assertAllFailedQueryMetrics(allFailedQueries, responseAsMap);
assertAllQueryMetrics(allTotalQueries, responseAsMap);
}
private void assertAllQueryMetrics(int allTotalQueries, Map<String, Object> responseAsMap) throws IOException {
assertAllQueryMetric(allTotalQueries, responseAsMap, "total");
}
private void assertAllFailedQueryMetrics(int allFailedQueries, Map<String, Object> responseAsMap) throws IOException {
assertAllQueryMetric(allFailedQueries, responseAsMap, "failed");
}
private Map<String, Object> getStats() throws UnsupportedOperationException, IOException {
Request request = new Request("GET", "/_eql/stats");
Map<String, Object> responseAsMap;
try (InputStream content = client().performRequest(request).getEntity().getContent()) {
responseAsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
return responseAsMap;
}
private void runEql(String eql) throws IOException {
Request request = new Request("POST", DataLoader.testIndexName + "/_eql/search");
request.setJsonEntity("{\"query\":\"" + eql +"\"}");
client().performRequest(request);
}
private void assertFeaturesMetrics(int expected, Map<String, Object> responseAsMap, Set<String> metricsToCheck) throws IOException {
for(String metricName : metricsToCheck) {
assertFeatureMetric(expected, responseAsMap, metricName);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void assertFeatureMetric(int expected, Map<String, Object> responseAsMap, String feature) throws IOException {
List<Map<String, ?>> nodesListStats = (List<Map<String, ?>>) responseAsMap.get("stats");
int actualMetricValue = 0;
for (Map perNodeStats : nodesListStats) {
Map featuresMetrics = getFeaturesMetrics(perNodeStats);
actualMetricValue += (int) featuresMetrics.get(feature);
}
assertEquals(expected + baseMetrics.get(feature), actualMetricValue);
/*
* update the base value for future checks in {@link #assertFeaturesMetricsExcept(Set, Map)}
*/
baseMetrics.put(feature, expected + baseMetrics.get(feature));
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void assertQueryMetric(int expected, Map<String, Object> responseAsMap, String queryType, String metric) throws IOException {
List<Map<String, Map<String, Map>>> nodesListStats = (List) responseAsMap.get("stats");
int actualMetricValue = 0;
for (Map perNodeStats : nodesListStats) {
Map queriesMetrics = (Map) ((Map) perNodeStats.get("stats")).get("queries");
Map perTypeQueriesMetrics = (Map) queriesMetrics.get(queryType);
actualMetricValue += (int) perTypeQueriesMetrics.get(metric);
}
assertEquals(expected, actualMetricValue);
}
private void assertAllQueryMetric(int expected, Map<String, Object> responseAsMap, String metric) throws IOException {
assertQueryMetric(expected, responseAsMap, "_all", metric);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void assertFeaturesMetricsExcept(Map<String, Object> responseAsMap, Set<String> exceptions) {
List<Map<String, ?>> nodesListStats = (List<Map<String, ?>>) responseAsMap.get("stats");
for (FeatureMetric metric : FeatureMetric.values()) {
String metricName = metric.toString();
if (exceptions.contains(metricName) == false) {
Integer actualValue = 0;
for (Map perNodeStats : nodesListStats) {
Map featuresMetrics = getFeaturesMetrics(perNodeStats);
Integer featureMetricValue = (Integer) featuresMetrics.get(metricName);
actualValue += featureMetricValue;
}
assertEquals(baseMetrics.get(metricName), actualValue);
}
}
}
private RestHighLevelClient highLevelClient() {
if (highLevelClient == null) {
highLevelClient = new RestHighLevelClient(
client(),
ignore -> {
},
Collections.emptyList()) {
};
}
return highLevelClient;
}
}

View File

@ -0,0 +1,7 @@
package org.elasticsearch.xpack.eql;
import org.elasticsearch.test.eql.stats.RestEqlUsageTestCase;
public class EqlStatsIT extends RestEqlUsageTestCase {
}

View File

@ -6,16 +6,25 @@
package org.elasticsearch.xpack.eql.analysis;
import org.elasticsearch.xpack.eql.plan.logical.Head;
import org.elasticsearch.xpack.eql.plan.logical.Join;
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
import org.elasticsearch.xpack.eql.plan.logical.Tail;
import org.elasticsearch.xpack.eql.stats.FeatureMetric;
import org.elasticsearch.xpack.eql.stats.Metrics;
import org.elasticsearch.xpack.ql.capabilities.Unresolvable;
import org.elasticsearch.xpack.ql.common.Failure;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.tree.Node;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.Holder;
import org.elasticsearch.xpack.ql.util.StringUtils;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
@ -23,6 +32,27 @@ import java.util.Map;
import java.util.Set;
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.EVENT;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_UNTIL;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_FIVE_OR_MORE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_FOUR;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_ONE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_THREE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_TWO;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_QUERIES_FIVE_OR_MORE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_QUERIES_FOUR;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_QUERIES_THREE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_QUERIES_TWO;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.PIPE_HEAD;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.PIPE_TAIL;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_UNTIL;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_MAXSPAN;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_FIVE_OR_MORE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_FOUR;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_THREE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_TWO;
import static org.elasticsearch.xpack.ql.common.Failure.fail;
/**
@ -31,6 +61,12 @@ import static org.elasticsearch.xpack.ql.common.Failure.fail;
*/
public class Verifier {
private final Metrics metrics;
public Verifier(Metrics metrics) {
this.metrics = metrics;
}
public Map<Node<?>, String> verifyFailures(LogicalPlan plan) {
Collection<Failure> failures = verify(plan);
return failures.stream().collect(toMap(Failure::node, Failure::message));
@ -104,6 +140,87 @@ public class Verifier {
failures.addAll(localFailures);
});
// gather metrics
if (failures.isEmpty()) {
BitSet b = new BitSet(FeatureMetric.values().length);
Holder<Boolean> isLikelyAnEventQuery = new Holder<>(false);
plan.forEachDown(p -> {
if (p instanceof Project) {
isLikelyAnEventQuery.set(true);
} else if (p instanceof Head) {
b.set(PIPE_HEAD.ordinal());
} else if (p instanceof Tail) {
b.set(PIPE_TAIL.ordinal());
} else if (p instanceof Join) {
Join j = (Join) p;
if (p instanceof Sequence) {
b.set(SEQUENCE.ordinal());
Sequence s = (Sequence) p;
if (s.maxSpan().duration() > 0) {
b.set(SEQUENCE_MAXSPAN.ordinal());
}
int queriesCount = s.queries().size();
switch (queriesCount) {
case 2: b.set(SEQUENCE_QUERIES_TWO.ordinal());
break;
case 3: b.set(SEQUENCE_QUERIES_THREE.ordinal());
break;
case 4: b.set(SEQUENCE_QUERIES_FOUR.ordinal());
break;
default: b.set(SEQUENCE_QUERIES_FIVE_OR_MORE.ordinal());
break;
}
if (j.until().keys().isEmpty() == false) {
b.set(SEQUENCE_UNTIL.ordinal());
}
} else {
b.set(FeatureMetric.JOIN.ordinal());
int queriesCount = j.queries().size();
switch (queriesCount) {
case 2: b.set(JOIN_QUERIES_TWO.ordinal());
break;
case 3: b.set(JOIN_QUERIES_THREE.ordinal());
break;
case 4: b.set(JOIN_QUERIES_FOUR.ordinal());
break;
default: b.set(JOIN_QUERIES_FIVE_OR_MORE.ordinal());
break;
}
if (j.until().keys().isEmpty() == false) {
b.set(JOIN_UNTIL.ordinal());
}
}
int joinKeysCount = j.queries().get(0).keys().size();
switch (joinKeysCount) {
case 1: b.set(JOIN_KEYS_ONE.ordinal());
break;
case 2: b.set(JOIN_KEYS_TWO.ordinal());
break;
case 3: b.set(JOIN_KEYS_THREE.ordinal());
break;
case 4: b.set(JOIN_KEYS_FOUR.ordinal());
break;
default: if (joinKeysCount >= 5) {
b.set(JOIN_KEYS_FIVE_OR_MORE.ordinal());
}
break;
}
}
});
if (isLikelyAnEventQuery.get() && b.get(SEQUENCE.ordinal()) == false && b.get(JOIN.ordinal()) == false) {
b.set(EVENT.ordinal());
}
for (int i = b.nextSetBit(0); i >= 0; i = b.nextSetBit(i + 1)) {
metrics.inc(FeatureMetric.values()[i]);
}
}
return failures;
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.stats.Metrics;
import org.elasticsearch.xpack.eql.stats.QueryMetric;
import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.ql.index.IndexResolver;
@ -49,7 +50,7 @@ public class PlanExecutor {
this.metrics = new Metrics();
this.preAnalyzer = new PreAnalyzer();
this.verifier = new Verifier();
this.verifier = new Verifier(metrics);
this.optimizer = new Optimizer();
this.planner = new Planner();
}
@ -59,7 +60,11 @@ public class PlanExecutor {
}
public void eql(EqlConfiguration cfg, String eql, ParserParams parserParams, ActionListener<Results> listener) {
newSession(cfg).eql(eql, parserParams, wrap(listener::onResponse, listener::onFailure));
metrics.total(QueryMetric.ALL);
newSession(cfg).eql(eql, parserParams, wrap(listener::onResponse, ex -> {
metrics.failed(QueryMetric.ALL);
listener.onFailure(ex);
}));
}
public Metrics metrics() {

View File

@ -11,10 +11,50 @@ import java.util.Locale;
public enum FeatureMetric {
SEQUENCE,
JOIN,
PIPE;
EVENT,
SEQUENCE_MAXSPAN,
SEQUENCE_UNTIL,
SEQUENCE_QUERIES_TWO,
SEQUENCE_QUERIES_THREE,
SEQUENCE_QUERIES_FOUR,
SEQUENCE_QUERIES_FIVE_OR_MORE,
JOIN_QUERIES_TWO,
JOIN_QUERIES_THREE,
JOIN_QUERIES_FOUR,
JOIN_QUERIES_FIVE_OR_MORE,
JOIN_UNTIL,
JOIN_KEYS_ONE,
JOIN_KEYS_TWO,
JOIN_KEYS_THREE,
JOIN_KEYS_FOUR,
JOIN_KEYS_FIVE_OR_MORE,
PIPE_HEAD,
PIPE_TAIL;
private final String prefix;
FeatureMetric() {
String featureName = this.toString();
String prefix = "features.";
if (featureName.startsWith("sequence_")) {
prefix += "sequences.";
} else if (featureName.startsWith("join_k")) {
prefix += "keys.";
} else if (featureName.startsWith("join_")) {
prefix += "joins.";
} else if (featureName.startsWith("pipe_")) {
prefix += "pipes.";
}
this.prefix = prefix;
}
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
public String prefixedName() {
return this.prefix + this.toString();
}
}

View File

@ -19,6 +19,7 @@ import java.util.Map.Entry;
* Class encapsulating the metrics collected for EQL
*/
public class Metrics {
private enum OperationType {
FAILED, TOTAL;
@ -27,20 +28,28 @@ public class Metrics {
return this.name().toLowerCase(Locale.ROOT);
}
}
// map that holds total/failed counters for each eql "feature" (join, pipe, sequence...)
private final Map<FeatureMetric, Map<OperationType, CounterMetric>> featuresMetrics;
protected static String FPREFIX = "features.";
// map that holds total/failed counters for all queries, atm
private final Map<QueryMetric, Map<OperationType, CounterMetric>> opsByTypeMetrics;
// map that holds counters for each eql "feature" (join, pipe, sequence...)
private final Map<FeatureMetric, CounterMetric> featuresMetrics;
protected static String QPREFIX = "queries.";
public Metrics() {
Map<FeatureMetric, Map<OperationType, CounterMetric>> fMap = new LinkedHashMap<>();
for (FeatureMetric metric : FeatureMetric.values()) {
Map<QueryMetric, Map<OperationType, CounterMetric>> qMap = new LinkedHashMap<>();
for (QueryMetric metric : QueryMetric.values()) {
Map<OperationType, CounterMetric> metricsMap = new LinkedHashMap<>(OperationType.values().length);
for (OperationType type : OperationType.values()) {
metricsMap.put(type, new CounterMetric());
}
fMap.put(metric, Collections.unmodifiableMap(metricsMap));
qMap.put(metric, Collections.unmodifiableMap(metricsMap));
}
opsByTypeMetrics = Collections.unmodifiableMap(qMap);
Map<FeatureMetric, CounterMetric> fMap = new LinkedHashMap<>(FeatureMetric.values().length);
for (FeatureMetric featureMetric : FeatureMetric.values()) {
fMap.put(featureMetric, new CounterMetric());
}
featuresMetrics = Collections.unmodifiableMap(fMap);
}
@ -49,37 +58,49 @@ public class Metrics {
* Increments the "total" counter for a metric
* This method should be called only once per query.
*/
public void total(FeatureMetric metric) {
public void total(QueryMetric metric) {
inc(metric, OperationType.TOTAL);
}
/**
* Increments the "failed" counter for a metric
*/
public void failed(FeatureMetric metric) {
public void failed(QueryMetric metric) {
inc(metric, OperationType.FAILED);
}
private void inc(FeatureMetric metric, OperationType op) {
this.featuresMetrics.get(metric).get(op).inc();
private void inc(QueryMetric metric, OperationType op) {
this.opsByTypeMetrics.get(metric).get(op).inc();
}
/**
* Increments the counter for a "features" metric
*/
public void inc(FeatureMetric metric) {
this.featuresMetrics.get(metric).inc();
}
public Counters stats() {
Counters counters = new Counters();
// queries metrics
for (Entry<FeatureMetric, Map<OperationType, CounterMetric>> entry : featuresMetrics.entrySet()) {
for (Entry<QueryMetric, Map<OperationType, CounterMetric>> entry : opsByTypeMetrics.entrySet()) {
String metricName = entry.getKey().toString();
for (OperationType type : OperationType.values()) {
long metricCounter = entry.getValue().get(type).count();
String operationTypeName = type.toString();
counters.inc(FPREFIX + metricName + "." + operationTypeName, metricCounter);
counters.inc(FPREFIX + "_all." + operationTypeName, metricCounter);
counters.inc(QPREFIX + metricName + "." + operationTypeName, metricCounter);
counters.inc(QPREFIX + "_all." + operationTypeName, metricCounter);
}
}
// features metrics
for (Entry<FeatureMetric, CounterMetric> entry : featuresMetrics.entrySet()) {
counters.inc(entry.getKey().prefixedName(), entry.getValue().count());
}
return counters;
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.stats;
import java.util.Locale;
public enum QueryMetric {
ALL;
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.xpack.eql.EqlTestUtils;
import org.elasticsearch.xpack.eql.expression.function.EqlFunctionRegistry;
import org.elasticsearch.xpack.eql.parser.EqlParser;
import org.elasticsearch.xpack.eql.parser.ParsingException;
import org.elasticsearch.xpack.eql.stats.Metrics;
import org.elasticsearch.xpack.ql.index.EsIndex;
import org.elasticsearch.xpack.ql.index.IndexResolution;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
@ -36,7 +37,7 @@ public class VerifierTests extends ESTestCase {
private LogicalPlan accept(IndexResolution resolution, String eql) {
PreAnalyzer preAnalyzer = new PreAnalyzer();
Analyzer analyzer = new Analyzer(EqlTestUtils.TEST_CFG_CASE_INSENSITIVE, new EqlFunctionRegistry(), new Verifier());
Analyzer analyzer = new Analyzer(EqlTestUtils.TEST_CFG_CASE_INSENSITIVE, new EqlFunctionRegistry(), new Verifier(new Metrics()));
return analyzer.analyze(preAnalyzer.preAnalyze(parser.createStatement(eql), resolution));
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.eql.plan.logical.LimitWithOffset;
import org.elasticsearch.xpack.eql.plan.logical.Sequence;
import org.elasticsearch.xpack.eql.plan.logical.Tail;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
import org.elasticsearch.xpack.eql.stats.Metrics;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.EmptyAttribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
@ -66,13 +67,13 @@ public class OptimizerTests extends ESTestCase {
return TypesTests.loadMapping(name);
}
private IndexResolution loadIndexResolution(String name) {
public static IndexResolution loadIndexResolution(String name) {
return IndexResolution.valid(new EsIndex(INDEX_NAME, loadEqlMapping(name)));
}
private LogicalPlan accept(IndexResolution resolution, String eql) {
PreAnalyzer preAnalyzer = new PreAnalyzer();
Analyzer analyzer = new Analyzer(TEST_CFG_CASE_INSENSITIVE, new EqlFunctionRegistry(), new Verifier());
Analyzer analyzer = new Analyzer(TEST_CFG_CASE_INSENSITIVE, new EqlFunctionRegistry(), new Verifier(new Metrics()));
return optimizer.optimize(analyzer.analyze(preAnalyzer.preAnalyze(parser.createStatement(eql), resolution)));
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.xpack.eql.analysis.Verifier;
import org.elasticsearch.xpack.eql.expression.function.EqlFunctionRegistry;
import org.elasticsearch.xpack.eql.parser.EqlParser;
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
import org.elasticsearch.xpack.eql.stats.Metrics;
import org.elasticsearch.xpack.ql.expression.Alias;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
@ -38,7 +39,7 @@ public class TomlFoldTests extends ESTestCase {
private static EqlParser parser = new EqlParser();
private static final EqlFunctionRegistry functionRegistry = new EqlFunctionRegistry();
private static Verifier verifier = new Verifier();
private static Verifier verifier = new Verifier(new Metrics());
private static Analyzer caseSensitiveAnalyzer = new Analyzer(TEST_CFG_CASE_SENSITIVE, functionRegistry, verifier);
private static Analyzer caseInsensitiveAnalyzer = new Analyzer(TEST_CFG_CASE_INSENSITIVE, functionRegistry, verifier);

View File

@ -16,6 +16,7 @@ import org.elasticsearch.xpack.eql.optimizer.Optimizer;
import org.elasticsearch.xpack.eql.parser.EqlParser;
import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.eql.stats.Metrics;
import org.elasticsearch.xpack.ql.index.EsIndex;
import org.elasticsearch.xpack.ql.index.IndexResolution;
@ -25,7 +26,7 @@ public abstract class AbstractQueryFolderTestCase extends ESTestCase {
protected EqlParser parser = new EqlParser();
protected PreAnalyzer preAnalyzer = new PreAnalyzer();
protected EqlConfiguration configuration = EqlTestUtils.randomConfiguration();
protected Analyzer analyzer = new Analyzer(configuration, new EqlFunctionRegistry(), new Verifier());
protected Analyzer analyzer = new Analyzer(configuration, new EqlFunctionRegistry(), new Verifier(new Metrics()));
protected Optimizer optimizer = new Optimizer();
protected Planner planner = new Planner();

View File

@ -0,0 +1,213 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.stats;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
import org.elasticsearch.xpack.eql.EqlTestUtils;
import org.elasticsearch.xpack.eql.analysis.Analyzer;
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.eql.analysis.Verifier;
import org.elasticsearch.xpack.eql.expression.function.EqlFunctionRegistry;
import org.elasticsearch.xpack.eql.optimizer.OptimizerTests;
import org.elasticsearch.xpack.eql.parser.EqlParser;
import org.elasticsearch.xpack.ql.index.IndexResolution;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.EVENT;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_FIVE_OR_MORE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_FOUR;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_ONE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_THREE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_TWO;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_QUERIES_TWO;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_UNTIL;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.PIPE_HEAD;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.PIPE_TAIL;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_MAXSPAN;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_FIVE_OR_MORE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_FOUR;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_THREE;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_QUERIES_TWO;
import static org.elasticsearch.xpack.eql.stats.FeatureMetric.SEQUENCE_UNTIL;
public class VerifierMetricsTests extends ESTestCase {
private EqlParser parser = new EqlParser();
private PreAnalyzer preAnalyzer = new PreAnalyzer();
private EqlFunctionRegistry eqlFunctionRegistry = new EqlFunctionRegistry();
private IndexResolution index = OptimizerTests.loadIndexResolution("mapping-default.json");
public void testEventQuery() {
Counters c = eql("process where serial_event_id < 4");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(EVENT, PIPE_HEAD))));
}
public void testSequenceQuery() {
Counters c = eql("sequence\r\n" +
" [process where serial_event_id = 1]\r\n" +
" [process where serial_event_id = 2]");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(SEQUENCE, PIPE_HEAD, SEQUENCE_QUERIES_TWO))));
}
@AwaitsFix(bugUrl = "waiting for the join implementation")
public void testJoinQuery() {
Counters c = eql("join\r\n" +
" [file where file_name=\"*.exe\"] by ppid\r\n" +
" [file where file_name=\"*.com\"] by pid\r\n" +
"until [process where opcode=1] by ppid\r\n" +
"| head 1");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(JOIN, PIPE_HEAD, JOIN_UNTIL, JOIN_QUERIES_TWO, JOIN_KEYS_ONE))));
}
public void testHeadQuery() {
Counters c = eql("process where serial_event_id < 4 | head 2");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(EVENT, PIPE_HEAD))));
}
public void testTailQuery() {
Counters c = eql("process where serial_event_id < 4 | tail 2");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(EVENT, PIPE_TAIL))));
}
public void testSequenceMaxSpanQuery() {
Counters c = eql("sequence with maxspan=1d\r\n" +
" [process where serial_event_id < 4] by exit_code\r\n" +
" [process where opcode == 1] by user\r\n" +
" [process where opcode == 2] by user\r\n" +
" [file where parent_process_name == \"file_delete_event\"] by exit_code\r\n" +
"until [process where opcode=1] by ppid\r\n" +
"| head 4\r\n" +
"| tail 2");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(SEQUENCE, PIPE_HEAD, PIPE_TAIL, SEQUENCE_MAXSPAN, SEQUENCE_UNTIL,
SEQUENCE_QUERIES_FOUR, JOIN_KEYS_ONE))));
}
public void testSequenceWithTwoQueries() {
Counters c = eql("sequence with maxspan=1d\r\n" +
" [process where serial_event_id < 4] by exit_code\r\n" +
" [process where opcode == 1] by user\r\n" +
"until [process where opcode=1] by ppid\r\n" +
"| head 4\r\n" +
"| tail 2");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(SEQUENCE, PIPE_HEAD, PIPE_TAIL, SEQUENCE_MAXSPAN, SEQUENCE_UNTIL,
SEQUENCE_QUERIES_TWO, JOIN_KEYS_ONE))));
}
public void testSequenceWithThreeQueries() {
Counters c = eql("sequence with maxspan=1d\r\n" +
" [process where serial_event_id < 4] by exit_code\r\n" +
" [process where opcode == 1] by user\r\n" +
" [file where parent_process_name == \"file_delete_event\"] by exit_code\r\n" +
"| head 4");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(SEQUENCE, PIPE_HEAD, SEQUENCE_MAXSPAN, SEQUENCE_QUERIES_THREE,
JOIN_KEYS_ONE))));
}
public void testSequenceWithFiveQueries() {
Counters c = eql("sequence with maxspan=1d\r\n" +
" [process where serial_event_id < 4] by exit_code\r\n" +
" [process where opcode == 1] by user\r\n" +
" [file where parent_process_name == \"file_delete_event\"] by exit_code\r\n" +
" [process where serial_event_id < 4] by exit_code\r\n" +
" [process where opcode == 1] by user\r\n" +
"| head 4");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(SEQUENCE, PIPE_HEAD, SEQUENCE_MAXSPAN, SEQUENCE_QUERIES_FIVE_OR_MORE,
JOIN_KEYS_ONE))));
}
public void testSequenceWithSevenQueries() {
Counters c = eql("sequence by exit_code, user\r\n" +
" [process where serial_event_id < 4]\r\n" +
" [process where opcode == 1]\r\n" +
" [file where parent_process_name == \"file_delete_event\"]\r\n" +
" [process where serial_event_id < 4]\r\n" +
" [process where opcode == 1]\r\n" +
" [process where true]\r\n" +
" [process where true]\r\n" +
"| tail 1");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(SEQUENCE, PIPE_TAIL, SEQUENCE_QUERIES_FIVE_OR_MORE, JOIN_KEYS_TWO))));
}
public void testSequenceWithThreeKeys() {
Counters c = eql("sequence by exit_code, user, serial_event_id\r\n" +
" [process where serial_event_id < 4]\r\n" +
" [process where opcode == 1]\r\n");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(SEQUENCE, PIPE_HEAD, SEQUENCE_QUERIES_TWO, JOIN_KEYS_THREE))));
}
public void testSequenceWithFourKeys() {
Counters c = eql("sequence by exit_code, user, serial_event_id, pid\r\n" +
" [process where serial_event_id < 4]\r\n" +
" [process where opcode == 1]\r\n");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(SEQUENCE, PIPE_HEAD, SEQUENCE_QUERIES_TWO, JOIN_KEYS_FOUR))));
}
public void testSequenceWithFiveKeys() {
Counters c = eql("sequence by exit_code, user, serial_event_id, pid, ppid\r\n" +
" [process where serial_event_id < 4]\r\n" +
" [process where opcode == 1]\r\n");
assertCounters(c, unmodifiableSet(new HashSet<>(Arrays.asList(SEQUENCE, PIPE_HEAD, SEQUENCE_QUERIES_TWO, JOIN_KEYS_FIVE_OR_MORE))));
}
private void assertCounters(Counters actual, Set<FeatureMetric> metrics) {
MetricsHolder expected = new MetricsHolder();
expected.set(metrics);
for (FeatureMetric metric : FeatureMetric.values()) {
assertEquals(expected.get(metric), actual.get(metric.prefixedName()));
}
}
private Counters eql(String query) {
return eql(query, null);
}
private Counters eql(String query, Verifier v) {
Verifier verifier = v;
Metrics metrics = null;
if (v == null) {
metrics = new Metrics();
verifier = new Verifier(metrics);
}
Analyzer analyzer = new Analyzer(EqlTestUtils.randomConfiguration(), eqlFunctionRegistry, verifier);
analyzer.analyze(preAnalyzer.preAnalyze(parser.createStatement(query), index));
return metrics == null ? null : metrics.stats();
}
private class MetricsHolder {
long[] metrics;
MetricsHolder() {
this.metrics = new long[FeatureMetric.values().length];
for (int i = 0; i < this.metrics.length; i++) {
this.metrics[i] = 0;
}
}
void set(Set<FeatureMetric> metrics) {
for (FeatureMetric metric : metrics) {
set(metric);
}
}
void set(FeatureMetric metric) {
this.metrics[metric.ordinal()] = 1L;
}
long get(FeatureMetric metric) {
return this.metrics[metric.ordinal()];
}
}
}