From a4b0f669195ed8ef9a8e400cf1e6173aef92063b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 11 Sep 2019 12:58:46 +0200 Subject: [PATCH] Add enrich stats api (#46462) The enrich api returns enrich coordinator stats and information about currently executing enrich policies. The coordinator stats include per ingest node: * The current number of search requests in the queue. * The total number of outstanding remote requests that have been executed since node startup. Each remote request is likely to include multiple search requests. This depends on how much search requests are in the queue at the time when the remote request is performed. * The number of current outstanding remote requests. * The total number of search requests that `enrich` processors have executed since node startup. The current execution policies stats include: * The name of policy that is executing * A full blow task info object that is executing the policy. Relates to #32789 --- .../ingest/apis/enrich/enrich-stats.asciidoc | 52 ++++ .../ingest/apis/enrich/index.asciidoc | 3 + .../core/enrich/action/EnrichStatsAction.java | 235 ++++++++++++++++++ .../action/ExecuteEnrichPolicyAction.java | 6 + .../rest-api-spec/test/enrich/10_basic.yml | 9 + .../xpack/enrich/EnrichPlugin.java | 11 +- .../action/EnrichCoordinatorProxyAction.java | 19 +- .../action/EnrichCoordinatorStatsAction.java | 151 +++++++++++ .../action/TransportEnrichStatsAction.java | 94 +++++++ .../enrich/rest/RestEnrichStatsAction.java | 34 +++ .../xpack/enrich/BasicEnrichTests.java | 15 ++ .../xpack/enrich/EnrichMultiNodeIT.java | 10 + .../xpack/enrich/action/CoordinatorTests.java | 16 +- .../action/EnrichStatsResponseTests.java | 61 +++++ .../rest-api-spec/api/enrich.stats.json | 14 ++ 15 files changed, 716 insertions(+), 14 deletions(-) create mode 100644 docs/reference/ingest/apis/enrich/enrich-stats.asciidoc create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java create mode 100644 x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.stats.json diff --git a/docs/reference/ingest/apis/enrich/enrich-stats.asciidoc b/docs/reference/ingest/apis/enrich/enrich-stats.asciidoc new file mode 100644 index 00000000000..d1b82907848 --- /dev/null +++ b/docs/reference/ingest/apis/enrich/enrich-stats.asciidoc @@ -0,0 +1,52 @@ +[role="xpack"] +[testenv="basic"] +[[enrich-stats-api]] +=== Enrich stats API +++++ +Enrich stats API +++++ + +Returns enrich coordinator stats and information about currently executing enrich policies. + +[source,js] +-------------------------------------------------- +GET /_enrich/_stats +-------------------------------------------------- +// CONSOLE +// TEST + +The API returns the following response: + +[source,js] +-------------------------------------------------- +{ + "executing_policies": [], + "coordinator_stats": [ + { + "node_id": "1sFM8cmSROZYhPxVsiWew", + "queue_size": 0, + "remote_requests_current": 0, + "remote_requests_total": 0, + "executed_searches_total": 0 + } + ] +} +-------------------------------------------------- +// TESTRESPONSE[s/"node_id": "1sFM8cmSROZYhPxVsiWew"/"node_id" : $body.coordinator_stats.0.node_id/] +// TESTRESPONSE[s/"remote_requests_total": 0/"remote_requests_total" : $body.coordinator_stats.0.remote_requests_total/] +// TESTRESPONSE[s/"executed_searches_total": 0/"executed_searches_total" : $body.coordinator_stats.0.executed_searches_total/] + +The top level `executing_policies` field includes an object for each policy that is currently executing. +Each object contains the following fields: +* `name` - The name of policy that is executing +* `task` - A full blow task info object that is executing the policy. + +The top level `coordinator_stats` field includes an object for each ingest node with information about the coordinator. +Each object contains the following fields: +* `node_id` - The id of the ingest node that is coordinating search requests for configured `enrich` processors. +* `queue_size` - The current number of search requests in the queue. +* `remote_requests_current` - The number of current outstanding remote requests. +* `remote_requests_total` - The total number of outstanding remote requests that have been executed since node startup. + Each remote request is likely to include multiple search requests. This depends on how much + search requests are in the queue at the time when the remote request is performed. +* `executed_searches_total` - The total number of search requests that `enrich` processors have executed since node startup. \ No newline at end of file diff --git a/docs/reference/ingest/apis/enrich/index.asciidoc b/docs/reference/ingest/apis/enrich/index.asciidoc index e5ceaa43676..a11a61934f8 100644 --- a/docs/reference/ingest/apis/enrich/index.asciidoc +++ b/docs/reference/ingest/apis/enrich/index.asciidoc @@ -7,6 +7,7 @@ The following enrich APIs are available for managing enrich policies: * <> to delete an enrich policy * <> to return information about an enrich policy * <> to execute an enrich policy +* <> to get enrich related stats include::put-enrich-policy.asciidoc[] @@ -16,3 +17,5 @@ include::delete-enrich-policy.asciidoc[] include::get-enrich-policy.asciidoc[] include::execute-enrich-policy.asciidoc[] + +include::enrich-stats.asciidoc[] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java new file mode 100644 index 00000000000..a13ac215ca6 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java @@ -0,0 +1,235 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.enrich.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.TaskInfo; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class EnrichStatsAction extends ActionType { + + public static final EnrichStatsAction INSTANCE = new EnrichStatsAction(); + public static final String NAME = "cluster:admin/xpack/enrich/stats"; + + private EnrichStatsAction() { + super(NAME, Response::new); + } + + public static class Request extends MasterNodeRequest { + + public Request() { + } + + public Request(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final List executingPolicies; + private final Map coordinatorStats; + + public Response(List executingPolicies, Map coordinatorStats) { + this.executingPolicies = executingPolicies; + this.coordinatorStats = coordinatorStats; + } + + public Response(StreamInput in) throws IOException { + super(in); + executingPolicies = in.readList(ExecutingPolicy::new); + coordinatorStats = in.readMap(StreamInput::readString, CoordinatorStats::new); + } + + public List getExecutingPolicies() { + return executingPolicies; + } + + public Map getCoordinatorStats() { + return coordinatorStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(executingPolicies); + out.writeMap(coordinatorStats, StreamOutput::writeString, (innerOut, stat) -> stat.writeTo(innerOut)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("executing_policies"); + executingPolicies.stream().sorted().forEachOrdered(policy -> { + try { + builder.startObject(); + builder.field("name", policy.getName()); + { + builder.startObject("task"); + builder.value(policy.getTaskInfo()); + builder.endObject(); + } + builder.endObject(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + builder.endArray(); + builder.startArray("coordinator_stats"); + coordinatorStats.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).forEachOrdered(entry -> { + try { + builder.startObject(); + builder.field("node_id", entry.getKey()); + builder.field("queue_size", entry.getValue().getQueueSize()); + builder.field("remote_requests_current", entry.getValue().getRemoteRequestsCurrent()); + builder.field("remote_requests_total", entry.getValue().getRemoteRequestsTotal()); + builder.field("executed_searches_total", entry.getValue().getExecutedSearchesTotal()); + builder.endObject(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return executingPolicies.equals(response.executingPolicies) && + coordinatorStats.equals(response.coordinatorStats); + } + + @Override + public int hashCode() { + return Objects.hash(executingPolicies, coordinatorStats); + } + + public static class CoordinatorStats implements Writeable { + + private final int queueSize; + private final int remoteRequestsCurrent; + private final long remoteRequestsTotal; + private final long executedSearchesTotal; + + public CoordinatorStats(int queueSize, int remoteRequestsCurrent, long remoteRequestsTotal, long executedSearchesTotal) { + this.queueSize = queueSize; + this.remoteRequestsCurrent = remoteRequestsCurrent; + this.remoteRequestsTotal = remoteRequestsTotal; + this.executedSearchesTotal = executedSearchesTotal; + } + + public CoordinatorStats(StreamInput in) throws IOException { + this(in.readVInt(), in.readVInt(), in.readVLong(), in.readVLong()); + } + + public int getQueueSize() { + return queueSize; + } + + public int getRemoteRequestsCurrent() { + return remoteRequestsCurrent; + } + + public long getRemoteRequestsTotal() { + return remoteRequestsTotal; + } + + public long getExecutedSearchesTotal() { + return executedSearchesTotal; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(queueSize); + out.writeVInt(remoteRequestsCurrent); + out.writeVLong(remoteRequestsTotal); + out.writeVLong(executedSearchesTotal); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CoordinatorStats stats = (CoordinatorStats) o; + return queueSize == stats.queueSize && + remoteRequestsCurrent == stats.remoteRequestsCurrent && + remoteRequestsTotal == stats.remoteRequestsTotal && + executedSearchesTotal == stats.executedSearchesTotal; + } + + @Override + public int hashCode() { + return Objects.hash(queueSize, remoteRequestsCurrent, remoteRequestsTotal, executedSearchesTotal); + } + } + + public static class ExecutingPolicy implements Writeable { + + private final String name; + private final TaskInfo taskInfo; + + public ExecutingPolicy(String name, TaskInfo taskInfo) { + this.name = name; + this.taskInfo = taskInfo; + } + + ExecutingPolicy(StreamInput in) throws IOException { + this(in.readString(), new TaskInfo(in)); + } + + public String getName() { + return name; + } + + public TaskInfo getTaskInfo() { + return taskInfo; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + taskInfo.writeTo(out); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ExecutingPolicy that = (ExecutingPolicy) o; + return name.equals(that.name) && + taskInfo.equals(that.taskInfo); + } + + @Override + public int hashCode() { + return Objects.hash(name, taskInfo); + } + } + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java index e4169b9944b..6f5804e54ae 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java @@ -52,6 +52,12 @@ public class ExecuteEnrichPolicyAction extends ActionType return null; } + // This will be displayed in tasks api and allows stats api to figure out which policies are being executed. + @Override + public String getDescription() { + return name; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml index f012de75e99..eb8be92aa3e 100644 --- a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml +++ b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml @@ -33,6 +33,15 @@ - match: { policies.0.match.match_field: baz } - match: { policies.0.match.enrich_fields: ["a", "b"] } + - do: + enrich.stats: {} + - length: { executing_policies: 0} + - length: { coordinator_stats: 1} + - match: { coordinator_stats.0.queue_size: 0} + - match: { coordinator_stats.0.remote_requests_current: 0} + - gte: { coordinator_stats.0.remote_requests_total: 0} + - gte: { coordinator_stats.0.executed_searches_total: 0} + - do: enrich.delete_policy: name: policy-crud diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index f8efece29b6..1af2f833d4b 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -36,16 +36,20 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction; +import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction; import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction; import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction; +import org.elasticsearch.xpack.enrich.action.TransportEnrichStatsAction; import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.action.TransportGetEnrichPolicyAction; import org.elasticsearch.xpack.enrich.action.TransportPutEnrichPolicyAction; import org.elasticsearch.xpack.enrich.rest.RestDeleteEnrichPolicyAction; +import org.elasticsearch.xpack.enrich.rest.RestEnrichStatsAction; import org.elasticsearch.xpack.enrich.rest.RestExecuteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.rest.RestGetEnrichPolicyAction; import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction; @@ -117,8 +121,10 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { new ActionHandler<>(DeleteEnrichPolicyAction.INSTANCE, TransportDeleteEnrichPolicyAction.class), new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class), new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class), + new ActionHandler<>(EnrichStatsAction.INSTANCE, TransportEnrichStatsAction.class), new ActionHandler<>(EnrichCoordinatorProxyAction.INSTANCE, EnrichCoordinatorProxyAction.TransportAction.class), - new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class) + new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class), + new ActionHandler<>(EnrichCoordinatorStatsAction.INSTANCE, EnrichCoordinatorStatsAction.TransportAction.class) ); } @@ -134,7 +140,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { new RestGetEnrichPolicyAction(restController), new RestDeleteEnrichPolicyAction(restController), new RestPutEnrichPolicyAction(restController), - new RestExecuteEnrichPolicyAction(restController) + new RestExecuteEnrichPolicyAction(restController), + new RestEnrichStatsAction(restController) ); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index ed0a8714bbe..56eba0c25b4 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats; import org.elasticsearch.xpack.enrich.EnrichPlugin; import java.util.ArrayList; @@ -34,6 +35,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * An internal action to locally manage the load of the search requests that originate from the enrich processor. @@ -72,7 +74,9 @@ public class EnrichCoordinatorProxyAction extends ActionType { final int maxLookupsPerRequest; final int maxNumberOfConcurrentRequests; final BlockingQueue queue; - final AtomicInteger numberOfOutstandingRequests = new AtomicInteger(0); + final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0); + volatile long remoteRequestsTotal = 0; + final AtomicLong executedSearchesTotal = new AtomicLong(0); public Coordinator(Client client, Settings settings) { this( @@ -105,16 +109,22 @@ public class EnrichCoordinatorProxyAction extends ActionType { coordinateLookups(); } + CoordinatorStats getStats() { + return new CoordinatorStats(queue.size(), remoteRequestsCurrent.get(), remoteRequestsTotal, + executedSearchesTotal.get()); + } + synchronized void coordinateLookups() { while (queue.isEmpty() == false && - numberOfOutstandingRequests.get() < maxNumberOfConcurrentRequests) { + remoteRequestsCurrent.get() < maxNumberOfConcurrentRequests) { final List slots = new ArrayList<>(); queue.drainTo(slots, maxLookupsPerRequest); final MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); slots.forEach(slot -> multiSearchRequest.add(slot.searchRequest)); - numberOfOutstandingRequests.incrementAndGet(); + remoteRequestsCurrent.incrementAndGet(); + remoteRequestsTotal++; lookupFunction.accept(multiSearchRequest, (response, e) -> { handleResponse(slots, response, e); }); @@ -122,7 +132,8 @@ public class EnrichCoordinatorProxyAction extends ActionType { } void handleResponse(List slots, MultiSearchResponse response, Exception e) { - numberOfOutstandingRequests.decrementAndGet(); + remoteRequestsCurrent.decrementAndGet(); + executedSearchesTotal.addAndGet(slots.size()); if (response != null) { assert slots.size() == response.getResponses().length; diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java new file mode 100644 index 00000000000..09a47b7d11c --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java @@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats; + +import java.io.IOException; +import java.util.List; + +/** + * This is an internal action that gather coordinator stats from each node with an ingest role in the cluster. + * This action is only used via the {@link EnrichStatsAction}. + */ +public class EnrichCoordinatorStatsAction extends ActionType { + + public static final EnrichCoordinatorStatsAction INSTANCE = new EnrichCoordinatorStatsAction(); + public static final String NAME = "cluster:admin/xpack/enrich/coordinator_stats"; + + private EnrichCoordinatorStatsAction() { + super(NAME, Response::new); + } + + // This always executes on all ingest nodes, hence no node ids need to be provided. + public static class Request extends BaseNodesRequest { + + public Request() { + super(new String[0]); + } + + Request(StreamInput in) throws IOException { + super(in); + } + } + + public static class NodeRequest extends BaseNodeRequest { + + NodeRequest() {} + + NodeRequest(StreamInput in) throws IOException { + super(in); + } + + } + + public static class Response extends BaseNodesResponse { + + Response(StreamInput in) throws IOException { + super(in); + } + + Response(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } + + public static class NodeResponse extends BaseNodeResponse { + + private final CoordinatorStats coordinatorStats; + + NodeResponse(DiscoveryNode node, CoordinatorStats coordinatorStats) { + super(node); + this.coordinatorStats = coordinatorStats; + } + + NodeResponse(StreamInput in) throws IOException { + super(in); + this.coordinatorStats = new CoordinatorStats(in); + } + + public CoordinatorStats getCoordinatorStats() { + return coordinatorStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + coordinatorStats.writeTo(out); + } + } + + public static class TransportAction extends TransportNodesAction { + + private final EnrichCoordinatorProxyAction.Coordinator coordinator; + + @Inject + public TransportAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService, + ActionFilters actionFilters, EnrichCoordinatorProxyAction.Coordinator coordinator) { + super(NAME, threadPool, clusterService, transportService, actionFilters, Request::new, NodeRequest::new, + ThreadPool.Names.SAME, NodeResponse.class); + this.coordinator = coordinator; + } + + @Override + protected void resolveRequest(Request request, ClusterState clusterState) { + DiscoveryNode[] ingestNodes = clusterState.getNodes().getIngestNodes().values().toArray(DiscoveryNode.class); + request.setConcreteNodes(ingestNodes); + } + + @Override + protected Response newResponse(Request request, List nodeResponses, List failures) { + return new Response(clusterService.getClusterName(), nodeResponses, failures); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(); + } + + @Override + protected NodeResponse newNodeResponse(StreamInput in) throws IOException { + return new NodeResponse(in); + } + + @Override + protected NodeResponse nodeOperation(NodeRequest request) { + return new NodeResponse(clusterService.localNode(), coordinator.getStats()); + } + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java new file mode 100644 index 00000000000..611eb1c5071 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction.NodeResponse; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TransportEnrichStatsAction extends TransportMasterNodeAction { + + private final Client client; + + @Inject + public TransportEnrichStatsAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + Client client) { + super(EnrichStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, + EnrichStatsAction.Request::new, indexNameExpressionResolver); + this.client = client; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected EnrichStatsAction.Response read(StreamInput in) throws IOException { + return new EnrichStatsAction.Response(in); + } + + @Override + protected void masterOperation(EnrichStatsAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + EnrichCoordinatorStatsAction.Request statsRequest = new EnrichCoordinatorStatsAction.Request(); + ActionListener statsListener = ActionListener.wrap( + response -> { + if (response.hasFailures()) { + // Report failures even if some node level requests succeed: + Exception failure = null; + for (FailedNodeException nodeFailure : response.failures()) { + if (failure == null) { + failure = nodeFailure; + } else { + failure.addSuppressed(nodeFailure); + } + } + listener.onFailure(failure); + return; + } + + Map coordinatorStats = response.getNodes().stream() + .collect(Collectors.toMap(nodeResponse -> nodeResponse.getNode().getId(), NodeResponse::getCoordinatorStats)); + List policyExecutionTasks = taskManager.getTasks().values().stream() + .filter(t -> t.getAction().equals(ExecuteEnrichPolicyAction.NAME)) + .map(t -> t.taskInfo(clusterService.localNode().getId(), true)) + .map(t -> new ExecutingPolicy(t.getDescription(), t)) + .collect(Collectors.toList()); + listener.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats)); + }, + listener::onFailure + ); + client.execute(EnrichCoordinatorStatsAction.INSTANCE, statsRequest, statsListener); + } + + @Override + protected ClusterBlockException checkBlock(EnrichStatsAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java new file mode 100644 index 00000000000..586e1c255dd --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; + +import java.io.IOException; + +public class RestEnrichStatsAction extends BaseRestHandler { + + public RestEnrichStatsAction(final RestController controller) { + controller.registerHandler(RestRequest.Method.GET, "/_enrich/_stats", this); + } + + @Override + public String getName() { + return "enrich_stats"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { + final EnrichStatsAction.Request request = new EnrichStatsAction.Request(); + return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } + +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index 654df371735..ddc432ed45d 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -13,12 +13,14 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; @@ -36,6 +38,7 @@ import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.MATCH_FIELD; import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.SOURCE_INDEX_NAME; import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -46,6 +49,11 @@ public class BasicEnrichTests extends ESSingleNodeTestCase { return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class); } + @Override + protected boolean resetNodeAfterTest() { + return true; + } + public void testIngestDataWithEnrichProcessor() { int numDocs = 32; int maxMatches = randomIntBetween(2, 8); @@ -95,6 +103,13 @@ public class BasicEnrichTests extends ESSingleNodeTestCase { assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true)); } } + + EnrichStatsAction.Response statsResponse = + client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet(); + assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1)); + String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId(); + assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L)); + assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getExecutedSearchesTotal(), equalTo((long) numDocs)); } public void testMultiplePolicies() { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java index f9e0ebb5d0c..a89015214cd 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; @@ -26,6 +27,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; @@ -41,6 +43,7 @@ import java.util.Set; import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -157,6 +160,13 @@ public class EnrichMultiNodeIT extends ESIntegTestCase { assertThat(userEntry.get(field), notNullValue()); } } + + EnrichStatsAction.Response statsResponse = + client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet(); + assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size())); + String nodeId = internalCluster().getInstance(ClusterService.class, coordinatingNode).localNode().getId(); + assertThat(statsResponse.getCoordinatorStats().get(nodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L)); + assertThat(statsResponse.getCoordinatorStats().get(nodeId).getExecutedSearchesTotal(), equalTo((long) numDocs)); } private static List createSourceIndex(int numDocs) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java index 1e416ab8002..19d3df511b4 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java @@ -68,14 +68,14 @@ public class CoordinatorTests extends ESTestCase { // First batch of search requests have been sent off: // (However still 5 should remain in the queue) assertThat(coordinator.queue.size(), equalTo(5)); - assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1)); + assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1)); assertThat(lookupFunction.capturedRequests.size(), equalTo(1)); assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5)); // Nothing should happen now, because there is an outstanding request and max number of requests has been set to 1: coordinator.coordinateLookups(); assertThat(coordinator.queue.size(), equalTo(5)); - assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1)); + assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1)); assertThat(lookupFunction.capturedRequests.size(), equalTo(1)); SearchResponse emptyResponse = emptySearchResponse(); @@ -86,7 +86,7 @@ public class CoordinatorTests extends ESTestCase { } lookupFunction.capturedConsumers.get(0).accept(new MultiSearchResponse(responseItems, 1L), null); assertThat(coordinator.queue.size(), equalTo(0)); - assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1)); + assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1)); assertThat(lookupFunction.capturedRequests.size(), equalTo(2)); // Replying last response, resulting in an empty queue and no outstanding requests. @@ -96,7 +96,7 @@ public class CoordinatorTests extends ESTestCase { } lookupFunction.capturedConsumers.get(1).accept(new MultiSearchResponse(responseItems, 1L), null); assertThat(coordinator.queue.size(), equalTo(0)); - assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0)); + assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(0)); assertThat(lookupFunction.capturedRequests.size(), equalTo(2)); // All individual action listeners for the search requests should have been invoked: @@ -129,14 +129,14 @@ public class CoordinatorTests extends ESTestCase { // First batch of search requests have been sent off: // (However still 5 should remain in the queue) assertThat(coordinator.queue.size(), equalTo(0)); - assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1)); + assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1)); assertThat(lookupFunction.capturedRequests.size(), equalTo(1)); assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5)); RuntimeException e = new RuntimeException(); lookupFunction.capturedConsumers.get(0).accept(null, e); assertThat(coordinator.queue.size(), equalTo(0)); - assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0)); + assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(0)); assertThat(lookupFunction.capturedRequests.size(), equalTo(1)); // All individual action listeners for the search requests should have been invoked: @@ -169,7 +169,7 @@ public class CoordinatorTests extends ESTestCase { // First batch of search requests have been sent off: // (However still 5 should remain in the queue) assertThat(coordinator.queue.size(), equalTo(0)); - assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1)); + assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1)); assertThat(lookupFunction.capturedRequests.size(), equalTo(1)); assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5)); @@ -181,7 +181,7 @@ public class CoordinatorTests extends ESTestCase { } lookupFunction.capturedConsumers.get(0).accept(new MultiSearchResponse(responseItems, 1L), null); assertThat(coordinator.queue.size(), equalTo(0)); - assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0)); + assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(0)); assertThat(lookupFunction.capturedRequests.size(), equalTo(1)); // All individual action listeners for the search requests should have been invoked: diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java new file mode 100644 index 00000000000..21f0ee0c605 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected EnrichStatsAction.Response createTestInstance() { + int numExecutingPolicies = randomIntBetween(0, 16); + List executingPolicies = new ArrayList<>(numExecutingPolicies); + for (int i = 0; i < numExecutingPolicies; i++) { + TaskInfo taskInfo = randomTaskInfo(); + executingPolicies.add(new ExecutingPolicy(randomAlphaOfLength(4), taskInfo)); + } + int numCoordinatingStats = randomIntBetween(0, 16); + Map coordinatorStats = new HashMap<>(numCoordinatingStats); + for (int i = 0; i < numCoordinatingStats; i++) { + CoordinatorStats stats = new CoordinatorStats(randomIntBetween(0, 8096), randomIntBetween(0, 8096), + randomNonNegativeLong(), randomNonNegativeLong()); + coordinatorStats.put(randomAlphaOfLength(4), stats); + } + return new EnrichStatsAction.Response(executingPolicies, coordinatorStats); + } + + @Override + protected Writeable.Reader instanceReader() { + return EnrichStatsAction.Response::new; + } + + private static TaskInfo randomTaskInfo() { + TaskId taskId = new TaskId(randomAlphaOfLength(5), randomLong()); + String type = randomAlphaOfLength(5); + String action = randomAlphaOfLength(5); + String description = randomAlphaOfLength(5); + long startTime = randomLong(); + long runningTimeNanos = randomLong(); + boolean cancellable = randomBoolean(); + TaskId parentTaskId = TaskId.EMPTY_TASK_ID; + Map headers = randomBoolean() ? + Collections.emptyMap() : + Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); + return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers); + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.stats.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.stats.json new file mode 100644 index 00000000000..0dda96d81fa --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.stats.json @@ -0,0 +1,14 @@ +{ + "enrich.stats": { + "documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-stats.html", + "stability" : "stable", + "url": { + "paths": [ + { + "path": "/_enrich/_stats", + "methods": [ "GET" ] + } + ] + } + } +}