EQL: Add HLRC for EQL stats (#53043) (#53148)

This commit is contained in:
Aleksandr Maus 2020-03-05 09:20:38 -05:00 committed by GitHub
parent 360ac1997f
commit 2dc872f052
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 395 additions and 6 deletions

View File

@ -22,6 +22,8 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.eql.EqlSearchRequest;
import org.elasticsearch.client.eql.EqlSearchResponse;
import org.elasticsearch.client.eql.EqlStatsRequest;
import org.elasticsearch.client.eql.EqlStatsResponse;
import java.io.IOException;
import java.util.Collections;
@ -85,4 +87,42 @@ public final class EqlClient {
Collections.emptySet()
);
}
/**
* Get the eql stats
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/eql-stats.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public EqlStatsResponse stats(EqlStatsRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
EqlRequestConverters::stats,
options,
EqlStatsResponse::fromXContent,
Collections.emptySet()
);
}
/**
* Asynchronously get the eql stats
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/eql-stats.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable statsAsync(EqlStatsRequest request, RequestOptions options, ActionListener<EqlStatsResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request,
EqlRequestConverters::stats,
options,
EqlStatsResponse::fromXContent,
listener,
Collections.emptySet()
);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client;
import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.client.eql.EqlSearchRequest;
import org.elasticsearch.client.eql.EqlStatsRequest;
import java.io.IOException;
@ -41,4 +42,11 @@ final class EqlRequestConverters {
request.addParameters(parameters.asMap());
return request;
}
static Request stats(EqlStatsRequest eqlStatsRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_eql", "stats")
.build();
return new Request(HttpGet.METHOD_NAME, endpoint);
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.eql;
import org.elasticsearch.client.Validatable;
public final class EqlStatsRequest implements Validatable {
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.eql;
import org.elasticsearch.client.NodesResponseHeader;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class EqlStatsResponse {
private final NodesResponseHeader header;
private final String clusterName;
private final List<Node> nodes;
public EqlStatsResponse(NodesResponseHeader header, String clusterName, List<Node> nodes) {
this.header = header;
this.clusterName = clusterName;
this.nodes = nodes;
}
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<EqlStatsResponse, Void>
PARSER = new ConstructingObjectParser<>("eql/stats_response", true, args -> {
int i = 0;
NodesResponseHeader header = (NodesResponseHeader) args[i++];
String clusterName = (String) args[i++];
List<Node> nodes = (List<Node>) args[i];
return new EqlStatsResponse(header, clusterName, nodes);
});
static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), NodesResponseHeader::fromXContent, new ParseField("_nodes"));
PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField("cluster_name"));
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(),
(p, c) -> EqlStatsResponse.Node.PARSER.apply(p, null),
new ParseField("stats"));
}
public static EqlStatsResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
public NodesResponseHeader getHeader() {
return header;
}
public List<Node> getNodes() {
return nodes;
}
public String getClusterName() {
return clusterName;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EqlStatsResponse that = (EqlStatsResponse) o;
return Objects.equals(nodes, that.nodes) && Objects.equals(header, that.header) && Objects.equals(clusterName, that.clusterName);
}
@Override
public int hashCode() {
return Objects.hash(nodes, header, clusterName);
}
public static class Node {
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<Node, Void>
PARSER = new ConstructingObjectParser<>("eql/stats_response_node", true, (args, c) -> new Node((Map<String, Object>) args[0]));
static {
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), new ParseField("stats"));
}
private Map<String, Object> stats;
public Node(Map<String, Object> stats) {
this.stats = stats;
}
public Map<String, Object> getStats() {
return stats;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Node node = (Node) o;
return Objects.equals(stats, node.stats);
}
@Override
public int hashCode() {
return Objects.hash(stats);
}
}
}

View File

@ -23,6 +23,8 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.eql.EqlSearchRequest;
import org.elasticsearch.client.eql.EqlSearchResponse;
import org.elasticsearch.client.eql.EqlStatsRequest;
import org.elasticsearch.client.eql.EqlStatsResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.index.IndexSettings;
@ -31,6 +33,7 @@ import org.junit.Before;
import java.time.format.DateTimeFormatter;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
public class EqlIT extends ESRestHighLevelClientTestCase {
@ -97,4 +100,16 @@ public class EqlIT extends ESRestHighLevelClientTestCase {
assertNotNull(response.hits());
assertThat(response.hits().events().size(), equalTo(1));
}
// Basic test for stats
// TODO: add more tests once the stats are hooked up
public void testStats() throws Exception {
EqlClient eql = highLevelClient().eql();
EqlStatsRequest request = new EqlStatsRequest();
EqlStatsResponse response = execute(request, eql::stats, eql::statsAsync);
assertNotNull(response);
assertNotNull(response.getHeader());
assertThat(response.getHeader().getTotal(), greaterThan(0));
assertThat(response.getNodes().size(), greaterThan(0));
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.eql;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.client.NodesResponseHeader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.is;
public class EqlStatsResponseTests extends AbstractResponseTestCase<EqlStatsResponseToXContent, EqlStatsResponse> {
private static Map<String, Object> buildRandomCountersMap(int count) {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < count; i++) {
map.put(randomAlphaOfLength(10), randomIntBetween(0, Integer.MAX_VALUE));
}
return map;
}
private static Map<String, Object> buildRandomNodeStats(int featuresNumber) {
Map<String, Object> stats = new HashMap<>();
int countersNumber = randomIntBetween(0, 10);
Map<String, Object> features = new HashMap<>();
for (int i = 0; i < featuresNumber; i++) {
features.put(randomAlphaOfLength(10), buildRandomCountersMap(countersNumber));
}
stats.put("features", features);
Map<String, Object> res = new HashMap<>();
res.put("stats", stats);
return res;
}
@Override
protected EqlStatsResponseToXContent createServerTestInstance(XContentType xContentType) {
NodesResponseHeader header = new NodesResponseHeader(randomInt(10), randomInt(10),
randomInt(10), Collections.emptyList());
String clusterName = randomAlphaOfLength(10);
int nodeCount = randomInt(10);
int featuresNumber = randomIntBetween(0, 10);
List<EqlStatsResponse.Node> nodes = new ArrayList<>(nodeCount);
for (int i = 0; i < nodeCount; i++) {
Map<String, Object> stat = buildRandomNodeStats(featuresNumber);
nodes.add(new EqlStatsResponse.Node(stat));
}
EqlStatsResponse response = new EqlStatsResponse(header, clusterName, nodes);
return new EqlStatsResponseToXContent(new EqlStatsResponse(header, clusterName, nodes));
}
@Override
protected EqlStatsResponse doParseToClientInstance(XContentParser parser) throws IOException {
return EqlStatsResponse.fromXContent(parser);
}
@Override
protected void assertInstances(EqlStatsResponseToXContent serverTestInstanceWrap, EqlStatsResponse clientInstance) {
EqlStatsResponse serverTestInstance = serverTestInstanceWrap.unwrap();
assertThat(serverTestInstance, is(clientInstance));
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.eql;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.NodesResponseHeader;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
public class EqlStatsResponseToXContent implements ToXContent {
private final EqlStatsResponse response;
public EqlStatsResponseToXContent(EqlStatsResponse response) {
this.response = response;
}
public EqlStatsResponse unwrap() {
return this.response;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
NodesResponseHeader header = response.getHeader();
if (header != null) {
builder.startObject("_nodes");
builder.field("total", header.getTotal());
builder.field("successful", header.getSuccessful());
builder.field("failed", header.getFailed());
if (header.getFailures().isEmpty() == false) {
builder.startArray("failures");
for (ElasticsearchException failure : header.getFailures()) {
builder.startObject();
failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
}
builder.endArray();
}
builder.endObject();
}
builder.field("cluster_name", response.getClusterName());
List<EqlStatsResponse.Node> nodes = response.getNodes();
if (nodes != null) {
builder.startArray("stats");
for (EqlStatsResponse.Node node : nodes) {
builder.startObject();
if (node.getStats() != null) {
builder.field("stats", node.getStats());
}
builder.endObject();
}
builder.endArray();
}
return builder;
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.eql.planner.Planner;
import org.elasticsearch.xpack.eql.session.Configuration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.eql.stats.Metrics;
import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.ql.index.IndexResolver;
@ -35,6 +36,9 @@ public class PlanExecutor {
private final Optimizer optimizer;
private final Planner planner;
private final Metrics metrics;
public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRegistry writeableRegistry) {
this.client = client;
this.writableRegistry = writeableRegistry;
@ -42,6 +46,8 @@ public class PlanExecutor {
this.indexResolver = indexResolver;
this.functionRegistry = null;
this.metrics = new Metrics();
this.preAnalyzer = new PreAnalyzer();
this.analyzer = new Analyzer(functionRegistry, new Verifier());
this.optimizer = new Optimizer();
@ -55,4 +61,8 @@ public class PlanExecutor {
public void eql(Configuration cfg, String eql, ParserParams parserParams, ActionListener<Results> listener) {
newSession(cfg).eql(eql, parserParams, wrap(listener::onResponse, listener::onFailure));
}
public Metrics metrics() {
return this.metrics;
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.eql.execution.PlanExecutor;
import java.io.IOException;
import java.util.List;
@ -22,17 +23,17 @@ import java.util.List;
*/
public class TransportEqlStatsAction extends TransportNodesAction<EqlStatsRequest, EqlStatsResponse,
EqlStatsRequest.NodeStatsRequest, EqlStatsResponse.NodeStatsResponse> {
// the plan executor holds the metrics
//private final PlanExecutor planExecutor;
private final PlanExecutor planExecutor;
@Inject
public TransportEqlStatsAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters/* , PlanExecutor planExecutor */) {
ThreadPool threadPool, ActionFilters actionFilters, PlanExecutor planExecutor) {
super(EqlStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
EqlStatsRequest::new, EqlStatsRequest.NodeStatsRequest::new, ThreadPool.Names.MANAGEMENT,
EqlStatsResponse.NodeStatsResponse.class);
//this.planExecutor = planExecutor;
this.planExecutor = planExecutor;
}
@Override
@ -54,8 +55,7 @@ public class TransportEqlStatsAction extends TransportNodesAction<EqlStatsReques
@Override
protected EqlStatsResponse.NodeStatsResponse nodeOperation(EqlStatsRequest.NodeStatsRequest request) {
EqlStatsResponse.NodeStatsResponse statsResponse = new EqlStatsResponse.NodeStatsResponse(clusterService.localNode());
//statsResponse.setStats(planExecutor.metrics().stats());
statsResponse.setStats(planExecutor.metrics().stats());
return statsResponse;
}
}