diff --git a/docs/reference/ingest/apis/enrich/enrich-stats.asciidoc b/docs/reference/ingest/apis/enrich/enrich-stats.asciidoc
new file mode 100644
index 00000000000..d1b82907848
--- /dev/null
+++ b/docs/reference/ingest/apis/enrich/enrich-stats.asciidoc
@@ -0,0 +1,52 @@
+[role="xpack"]
+[testenv="basic"]
+[[enrich-stats-api]]
+=== Enrich stats API
+++++
+Enrich stats API
+++++
+
+Returns enrich coordinator stats and information about currently executing enrich policies.
+
+[source,js]
+--------------------------------------------------
+GET /_enrich/_stats
+--------------------------------------------------
+// CONSOLE
+// TEST
+
+The API returns the following response:
+
+[source,js]
+--------------------------------------------------
+{
+ "executing_policies": [],
+ "coordinator_stats": [
+ {
+ "node_id": "1sFM8cmSROZYhPxVsiWew",
+ "queue_size": 0,
+ "remote_requests_current": 0,
+ "remote_requests_total": 0,
+ "executed_searches_total": 0
+ }
+ ]
+}
+--------------------------------------------------
+// TESTRESPONSE[s/"node_id": "1sFM8cmSROZYhPxVsiWew"/"node_id" : $body.coordinator_stats.0.node_id/]
+// TESTRESPONSE[s/"remote_requests_total": 0/"remote_requests_total" : $body.coordinator_stats.0.remote_requests_total/]
+// TESTRESPONSE[s/"executed_searches_total": 0/"executed_searches_total" : $body.coordinator_stats.0.executed_searches_total/]
+
+The top level `executing_policies` field includes an object for each policy that is currently executing.
+Each object contains the following fields:
+* `name` - The name of policy that is executing
+* `task` - A full blow task info object that is executing the policy.
+
+The top level `coordinator_stats` field includes an object for each ingest node with information about the coordinator.
+Each object contains the following fields:
+* `node_id` - The id of the ingest node that is coordinating search requests for configured `enrich` processors.
+* `queue_size` - The current number of search requests in the queue.
+* `remote_requests_current` - The number of current outstanding remote requests.
+* `remote_requests_total` - The total number of outstanding remote requests that have been executed since node startup.
+ Each remote request is likely to include multiple search requests. This depends on how much
+ search requests are in the queue at the time when the remote request is performed.
+* `executed_searches_total` - The total number of search requests that `enrich` processors have executed since node startup.
\ No newline at end of file
diff --git a/docs/reference/ingest/apis/enrich/index.asciidoc b/docs/reference/ingest/apis/enrich/index.asciidoc
index e5ceaa43676..a11a61934f8 100644
--- a/docs/reference/ingest/apis/enrich/index.asciidoc
+++ b/docs/reference/ingest/apis/enrich/index.asciidoc
@@ -7,6 +7,7 @@ The following enrich APIs are available for managing enrich policies:
* <> to delete an enrich policy
* <> to return information about an enrich policy
* <> to execute an enrich policy
+* <> to get enrich related stats
include::put-enrich-policy.asciidoc[]
@@ -16,3 +17,5 @@ include::delete-enrich-policy.asciidoc[]
include::get-enrich-policy.asciidoc[]
include::execute-enrich-policy.asciidoc[]
+
+include::enrich-stats.asciidoc[]
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java
new file mode 100644
index 00000000000..a13ac215ca6
--- /dev/null
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.enrich.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.master.MasterNodeRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.tasks.TaskInfo;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class EnrichStatsAction extends ActionType {
+
+ public static final EnrichStatsAction INSTANCE = new EnrichStatsAction();
+ public static final String NAME = "cluster:admin/xpack/enrich/stats";
+
+ private EnrichStatsAction() {
+ super(NAME, Response::new);
+ }
+
+ public static class Request extends MasterNodeRequest {
+
+ public Request() {
+ }
+
+ public Request(StreamInput in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public ActionRequestValidationException validate() {
+ return null;
+ }
+ }
+
+ public static class Response extends ActionResponse implements ToXContentObject {
+
+ private final List executingPolicies;
+ private final Map coordinatorStats;
+
+ public Response(List executingPolicies, Map coordinatorStats) {
+ this.executingPolicies = executingPolicies;
+ this.coordinatorStats = coordinatorStats;
+ }
+
+ public Response(StreamInput in) throws IOException {
+ super(in);
+ executingPolicies = in.readList(ExecutingPolicy::new);
+ coordinatorStats = in.readMap(StreamInput::readString, CoordinatorStats::new);
+ }
+
+ public List getExecutingPolicies() {
+ return executingPolicies;
+ }
+
+ public Map getCoordinatorStats() {
+ return coordinatorStats;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeList(executingPolicies);
+ out.writeMap(coordinatorStats, StreamOutput::writeString, (innerOut, stat) -> stat.writeTo(innerOut));
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.startArray("executing_policies");
+ executingPolicies.stream().sorted().forEachOrdered(policy -> {
+ try {
+ builder.startObject();
+ builder.field("name", policy.getName());
+ {
+ builder.startObject("task");
+ builder.value(policy.getTaskInfo());
+ builder.endObject();
+ }
+ builder.endObject();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ builder.endArray();
+ builder.startArray("coordinator_stats");
+ coordinatorStats.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).forEachOrdered(entry -> {
+ try {
+ builder.startObject();
+ builder.field("node_id", entry.getKey());
+ builder.field("queue_size", entry.getValue().getQueueSize());
+ builder.field("remote_requests_current", entry.getValue().getRemoteRequestsCurrent());
+ builder.field("remote_requests_total", entry.getValue().getRemoteRequestsTotal());
+ builder.field("executed_searches_total", entry.getValue().getExecutedSearchesTotal());
+ builder.endObject();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ builder.endArray();
+ builder.endObject();
+ return builder;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Response response = (Response) o;
+ return executingPolicies.equals(response.executingPolicies) &&
+ coordinatorStats.equals(response.coordinatorStats);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(executingPolicies, coordinatorStats);
+ }
+
+ public static class CoordinatorStats implements Writeable {
+
+ private final int queueSize;
+ private final int remoteRequestsCurrent;
+ private final long remoteRequestsTotal;
+ private final long executedSearchesTotal;
+
+ public CoordinatorStats(int queueSize, int remoteRequestsCurrent, long remoteRequestsTotal, long executedSearchesTotal) {
+ this.queueSize = queueSize;
+ this.remoteRequestsCurrent = remoteRequestsCurrent;
+ this.remoteRequestsTotal = remoteRequestsTotal;
+ this.executedSearchesTotal = executedSearchesTotal;
+ }
+
+ public CoordinatorStats(StreamInput in) throws IOException {
+ this(in.readVInt(), in.readVInt(), in.readVLong(), in.readVLong());
+ }
+
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ public int getRemoteRequestsCurrent() {
+ return remoteRequestsCurrent;
+ }
+
+ public long getRemoteRequestsTotal() {
+ return remoteRequestsTotal;
+ }
+
+ public long getExecutedSearchesTotal() {
+ return executedSearchesTotal;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVInt(queueSize);
+ out.writeVInt(remoteRequestsCurrent);
+ out.writeVLong(remoteRequestsTotal);
+ out.writeVLong(executedSearchesTotal);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CoordinatorStats stats = (CoordinatorStats) o;
+ return queueSize == stats.queueSize &&
+ remoteRequestsCurrent == stats.remoteRequestsCurrent &&
+ remoteRequestsTotal == stats.remoteRequestsTotal &&
+ executedSearchesTotal == stats.executedSearchesTotal;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(queueSize, remoteRequestsCurrent, remoteRequestsTotal, executedSearchesTotal);
+ }
+ }
+
+ public static class ExecutingPolicy implements Writeable {
+
+ private final String name;
+ private final TaskInfo taskInfo;
+
+ public ExecutingPolicy(String name, TaskInfo taskInfo) {
+ this.name = name;
+ this.taskInfo = taskInfo;
+ }
+
+ ExecutingPolicy(StreamInput in) throws IOException {
+ this(in.readString(), new TaskInfo(in));
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public TaskInfo getTaskInfo() {
+ return taskInfo;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(name);
+ taskInfo.writeTo(out);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ExecutingPolicy that = (ExecutingPolicy) o;
+ return name.equals(that.name) &&
+ taskInfo.equals(that.taskInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, taskInfo);
+ }
+ }
+ }
+
+}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java
index e4169b9944b..6f5804e54ae 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java
@@ -52,6 +52,12 @@ public class ExecuteEnrichPolicyAction extends ActionType
return null;
}
+ // This will be displayed in tasks api and allows stats api to figure out which policies are being executed.
+ @Override
+ public String getDescription() {
+ return name;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml
index f012de75e99..eb8be92aa3e 100644
--- a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml
+++ b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml
@@ -33,6 +33,15 @@
- match: { policies.0.match.match_field: baz }
- match: { policies.0.match.enrich_fields: ["a", "b"] }
+ - do:
+ enrich.stats: {}
+ - length: { executing_policies: 0}
+ - length: { coordinator_stats: 1}
+ - match: { coordinator_stats.0.queue_size: 0}
+ - match: { coordinator_stats.0.remote_requests_current: 0}
+ - gte: { coordinator_stats.0.remote_requests_total: 0}
+ - gte: { coordinator_stats.0.executed_searches_total: 0}
+
- do:
enrich.delete_policy:
name: policy-crud
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java
index f8efece29b6..1af2f833d4b 100644
--- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java
@@ -36,16 +36,20 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
+import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction;
import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction;
import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
+import org.elasticsearch.xpack.enrich.action.TransportEnrichStatsAction;
import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportGetEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportPutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestDeleteEnrichPolicyAction;
+import org.elasticsearch.xpack.enrich.rest.RestEnrichStatsAction;
import org.elasticsearch.xpack.enrich.rest.RestExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestGetEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction;
@@ -117,8 +121,10 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
new ActionHandler<>(DeleteEnrichPolicyAction.INSTANCE, TransportDeleteEnrichPolicyAction.class),
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class),
new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class),
+ new ActionHandler<>(EnrichStatsAction.INSTANCE, TransportEnrichStatsAction.class),
new ActionHandler<>(EnrichCoordinatorProxyAction.INSTANCE, EnrichCoordinatorProxyAction.TransportAction.class),
- new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class)
+ new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class),
+ new ActionHandler<>(EnrichCoordinatorStatsAction.INSTANCE, EnrichCoordinatorStatsAction.TransportAction.class)
);
}
@@ -134,7 +140,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
new RestGetEnrichPolicyAction(restController),
new RestDeleteEnrichPolicyAction(restController),
new RestPutEnrichPolicyAction(restController),
- new RestExecuteEnrichPolicyAction(restController)
+ new RestExecuteEnrichPolicyAction(restController),
+ new RestEnrichStatsAction(restController)
);
}
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
index ed0a8714bbe..56eba0c25b4 100644
--- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
@@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.enrich.EnrichPlugin;
import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
* An internal action to locally manage the load of the search requests that originate from the enrich processor.
@@ -72,7 +74,9 @@ public class EnrichCoordinatorProxyAction extends ActionType {
final int maxLookupsPerRequest;
final int maxNumberOfConcurrentRequests;
final BlockingQueue queue;
- final AtomicInteger numberOfOutstandingRequests = new AtomicInteger(0);
+ final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0);
+ volatile long remoteRequestsTotal = 0;
+ final AtomicLong executedSearchesTotal = new AtomicLong(0);
public Coordinator(Client client, Settings settings) {
this(
@@ -105,16 +109,22 @@ public class EnrichCoordinatorProxyAction extends ActionType {
coordinateLookups();
}
+ CoordinatorStats getStats() {
+ return new CoordinatorStats(queue.size(), remoteRequestsCurrent.get(), remoteRequestsTotal,
+ executedSearchesTotal.get());
+ }
+
synchronized void coordinateLookups() {
while (queue.isEmpty() == false &&
- numberOfOutstandingRequests.get() < maxNumberOfConcurrentRequests) {
+ remoteRequestsCurrent.get() < maxNumberOfConcurrentRequests) {
final List slots = new ArrayList<>();
queue.drainTo(slots, maxLookupsPerRequest);
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
slots.forEach(slot -> multiSearchRequest.add(slot.searchRequest));
- numberOfOutstandingRequests.incrementAndGet();
+ remoteRequestsCurrent.incrementAndGet();
+ remoteRequestsTotal++;
lookupFunction.accept(multiSearchRequest, (response, e) -> {
handleResponse(slots, response, e);
});
@@ -122,7 +132,8 @@ public class EnrichCoordinatorProxyAction extends ActionType {
}
void handleResponse(List slots, MultiSearchResponse response, Exception e) {
- numberOfOutstandingRequests.decrementAndGet();
+ remoteRequestsCurrent.decrementAndGet();
+ executedSearchesTotal.addAndGet(slots.size());
if (response != null) {
assert slots.size() == response.getResponses().length;
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java
new file mode 100644
index 00000000000..09a47b7d11c
--- /dev/null
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.enrich.action;
+
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.nodes.BaseNodeRequest;
+import org.elasticsearch.action.support.nodes.BaseNodeResponse;
+import org.elasticsearch.action.support.nodes.BaseNodesRequest;
+import org.elasticsearch.action.support.nodes.BaseNodesResponse;
+import org.elasticsearch.action.support.nodes.TransportNodesAction;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This is an internal action that gather coordinator stats from each node with an ingest role in the cluster.
+ * This action is only used via the {@link EnrichStatsAction}.
+ */
+public class EnrichCoordinatorStatsAction extends ActionType {
+
+ public static final EnrichCoordinatorStatsAction INSTANCE = new EnrichCoordinatorStatsAction();
+ public static final String NAME = "cluster:admin/xpack/enrich/coordinator_stats";
+
+ private EnrichCoordinatorStatsAction() {
+ super(NAME, Response::new);
+ }
+
+ // This always executes on all ingest nodes, hence no node ids need to be provided.
+ public static class Request extends BaseNodesRequest {
+
+ public Request() {
+ super(new String[0]);
+ }
+
+ Request(StreamInput in) throws IOException {
+ super(in);
+ }
+ }
+
+ public static class NodeRequest extends BaseNodeRequest {
+
+ NodeRequest() {}
+
+ NodeRequest(StreamInput in) throws IOException {
+ super(in);
+ }
+
+ }
+
+ public static class Response extends BaseNodesResponse {
+
+ Response(StreamInput in) throws IOException {
+ super(in);
+ }
+
+ Response(ClusterName clusterName, List nodes, List failures) {
+ super(clusterName, nodes, failures);
+ }
+
+ @Override
+ protected List readNodesFrom(StreamInput in) throws IOException {
+ return in.readList(NodeResponse::new);
+ }
+
+ @Override
+ protected void writeNodesTo(StreamOutput out, List nodes) throws IOException {
+ out.writeList(nodes);
+ }
+ }
+
+ public static class NodeResponse extends BaseNodeResponse {
+
+ private final CoordinatorStats coordinatorStats;
+
+ NodeResponse(DiscoveryNode node, CoordinatorStats coordinatorStats) {
+ super(node);
+ this.coordinatorStats = coordinatorStats;
+ }
+
+ NodeResponse(StreamInput in) throws IOException {
+ super(in);
+ this.coordinatorStats = new CoordinatorStats(in);
+ }
+
+ public CoordinatorStats getCoordinatorStats() {
+ return coordinatorStats;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ coordinatorStats.writeTo(out);
+ }
+ }
+
+ public static class TransportAction extends TransportNodesAction {
+
+ private final EnrichCoordinatorProxyAction.Coordinator coordinator;
+
+ @Inject
+ public TransportAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
+ ActionFilters actionFilters, EnrichCoordinatorProxyAction.Coordinator coordinator) {
+ super(NAME, threadPool, clusterService, transportService, actionFilters, Request::new, NodeRequest::new,
+ ThreadPool.Names.SAME, NodeResponse.class);
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ protected void resolveRequest(Request request, ClusterState clusterState) {
+ DiscoveryNode[] ingestNodes = clusterState.getNodes().getIngestNodes().values().toArray(DiscoveryNode.class);
+ request.setConcreteNodes(ingestNodes);
+ }
+
+ @Override
+ protected Response newResponse(Request request, List nodeResponses, List failures) {
+ return new Response(clusterService.getClusterName(), nodeResponses, failures);
+ }
+
+ @Override
+ protected NodeRequest newNodeRequest(Request request) {
+ return new NodeRequest();
+ }
+
+ @Override
+ protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
+ return new NodeResponse(in);
+ }
+
+ @Override
+ protected NodeResponse nodeOperation(NodeRequest request) {
+ return new NodeResponse(clusterService.localNode(), coordinator.getStats());
+ }
+ }
+
+}
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java
new file mode 100644
index 00000000000..611eb1c5071
--- /dev/null
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.enrich.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
+import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
+import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction.NodeResponse;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TransportEnrichStatsAction extends TransportMasterNodeAction {
+
+ private final Client client;
+
+ @Inject
+ public TransportEnrichStatsAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
+ ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
+ Client client) {
+ super(EnrichStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
+ EnrichStatsAction.Request::new, indexNameExpressionResolver);
+ this.client = client;
+ }
+
+ @Override
+ protected String executor() {
+ return ThreadPool.Names.SAME;
+ }
+
+ @Override
+ protected EnrichStatsAction.Response read(StreamInput in) throws IOException {
+ return new EnrichStatsAction.Response(in);
+ }
+
+ @Override
+ protected void masterOperation(EnrichStatsAction.Request request,
+ ClusterState state,
+ ActionListener listener) throws Exception {
+ EnrichCoordinatorStatsAction.Request statsRequest = new EnrichCoordinatorStatsAction.Request();
+ ActionListener statsListener = ActionListener.wrap(
+ response -> {
+ if (response.hasFailures()) {
+ // Report failures even if some node level requests succeed:
+ Exception failure = null;
+ for (FailedNodeException nodeFailure : response.failures()) {
+ if (failure == null) {
+ failure = nodeFailure;
+ } else {
+ failure.addSuppressed(nodeFailure);
+ }
+ }
+ listener.onFailure(failure);
+ return;
+ }
+
+ Map coordinatorStats = response.getNodes().stream()
+ .collect(Collectors.toMap(nodeResponse -> nodeResponse.getNode().getId(), NodeResponse::getCoordinatorStats));
+ List policyExecutionTasks = taskManager.getTasks().values().stream()
+ .filter(t -> t.getAction().equals(ExecuteEnrichPolicyAction.NAME))
+ .map(t -> t.taskInfo(clusterService.localNode().getId(), true))
+ .map(t -> new ExecutingPolicy(t.getDescription(), t))
+ .collect(Collectors.toList());
+ listener.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats));
+ },
+ listener::onFailure
+ );
+ client.execute(EnrichCoordinatorStatsAction.INSTANCE, statsRequest, statsListener);
+ }
+
+ @Override
+ protected ClusterBlockException checkBlock(EnrichStatsAction.Request request, ClusterState state) {
+ return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
+ }
+}
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java
new file mode 100644
index 00000000000..586e1c255dd
--- /dev/null
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.enrich.rest;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
+
+import java.io.IOException;
+
+public class RestEnrichStatsAction extends BaseRestHandler {
+
+ public RestEnrichStatsAction(final RestController controller) {
+ controller.registerHandler(RestRequest.Method.GET, "/_enrich/_stats", this);
+ }
+
+ @Override
+ public String getName() {
+ return "enrich_stats";
+ }
+
+ @Override
+ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
+ final EnrichStatsAction.Request request = new EnrichStatsAction.Request();
+ return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
+ }
+
+}
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java
index 654df371735..ddc432ed45d 100644
--- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java
@@ -13,12 +13,14 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
@@ -36,6 +38,7 @@ import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.MATCH_FIELD;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.SOURCE_INDEX_NAME;
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -46,6 +49,11 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class);
}
+ @Override
+ protected boolean resetNodeAfterTest() {
+ return true;
+ }
+
public void testIngestDataWithEnrichProcessor() {
int numDocs = 32;
int maxMatches = randomIntBetween(2, 8);
@@ -95,6 +103,13 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
}
}
+
+ EnrichStatsAction.Response statsResponse =
+ client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
+ assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
+ String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
+ assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
+ assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getExecutedSearchesTotal(), equalTo((long) numDocs));
}
public void testMultiplePolicies() {
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java
index f9e0ebb5d0c..a89015214cd 100644
--- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java
@@ -16,6 +16,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
@@ -26,6 +27,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
@@ -41,6 +43,7 @@ import java.util.Set;
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -157,6 +160,13 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
assertThat(userEntry.get(field), notNullValue());
}
}
+
+ EnrichStatsAction.Response statsResponse =
+ client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
+ assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size()));
+ String nodeId = internalCluster().getInstance(ClusterService.class, coordinatingNode).localNode().getId();
+ assertThat(statsResponse.getCoordinatorStats().get(nodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
+ assertThat(statsResponse.getCoordinatorStats().get(nodeId).getExecutedSearchesTotal(), equalTo((long) numDocs));
}
private static List createSourceIndex(int numDocs) {
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java
index 1e416ab8002..19d3df511b4 100644
--- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java
@@ -68,14 +68,14 @@ public class CoordinatorTests extends ESTestCase {
// First batch of search requests have been sent off:
// (However still 5 should remain in the queue)
assertThat(coordinator.queue.size(), equalTo(5));
- assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
+ assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5));
// Nothing should happen now, because there is an outstanding request and max number of requests has been set to 1:
coordinator.coordinateLookups();
assertThat(coordinator.queue.size(), equalTo(5));
- assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
+ assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
SearchResponse emptyResponse = emptySearchResponse();
@@ -86,7 +86,7 @@ public class CoordinatorTests extends ESTestCase {
}
lookupFunction.capturedConsumers.get(0).accept(new MultiSearchResponse(responseItems, 1L), null);
assertThat(coordinator.queue.size(), equalTo(0));
- assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
+ assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(2));
// Replying last response, resulting in an empty queue and no outstanding requests.
@@ -96,7 +96,7 @@ public class CoordinatorTests extends ESTestCase {
}
lookupFunction.capturedConsumers.get(1).accept(new MultiSearchResponse(responseItems, 1L), null);
assertThat(coordinator.queue.size(), equalTo(0));
- assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0));
+ assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(0));
assertThat(lookupFunction.capturedRequests.size(), equalTo(2));
// All individual action listeners for the search requests should have been invoked:
@@ -129,14 +129,14 @@ public class CoordinatorTests extends ESTestCase {
// First batch of search requests have been sent off:
// (However still 5 should remain in the queue)
assertThat(coordinator.queue.size(), equalTo(0));
- assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
+ assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5));
RuntimeException e = new RuntimeException();
lookupFunction.capturedConsumers.get(0).accept(null, e);
assertThat(coordinator.queue.size(), equalTo(0));
- assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0));
+ assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(0));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
// All individual action listeners for the search requests should have been invoked:
@@ -169,7 +169,7 @@ public class CoordinatorTests extends ESTestCase {
// First batch of search requests have been sent off:
// (However still 5 should remain in the queue)
assertThat(coordinator.queue.size(), equalTo(0));
- assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
+ assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5));
@@ -181,7 +181,7 @@ public class CoordinatorTests extends ESTestCase {
}
lookupFunction.capturedConsumers.get(0).accept(new MultiSearchResponse(responseItems, 1L), null);
assertThat(coordinator.queue.size(), equalTo(0));
- assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0));
+ assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(0));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
// All individual action listeners for the search requests should have been invoked:
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java
new file mode 100644
index 00000000000..21f0ee0c605
--- /dev/null
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.enrich.action;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.tasks.TaskInfo;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase {
+
+ @Override
+ protected EnrichStatsAction.Response createTestInstance() {
+ int numExecutingPolicies = randomIntBetween(0, 16);
+ List executingPolicies = new ArrayList<>(numExecutingPolicies);
+ for (int i = 0; i < numExecutingPolicies; i++) {
+ TaskInfo taskInfo = randomTaskInfo();
+ executingPolicies.add(new ExecutingPolicy(randomAlphaOfLength(4), taskInfo));
+ }
+ int numCoordinatingStats = randomIntBetween(0, 16);
+ Map coordinatorStats = new HashMap<>(numCoordinatingStats);
+ for (int i = 0; i < numCoordinatingStats; i++) {
+ CoordinatorStats stats = new CoordinatorStats(randomIntBetween(0, 8096), randomIntBetween(0, 8096),
+ randomNonNegativeLong(), randomNonNegativeLong());
+ coordinatorStats.put(randomAlphaOfLength(4), stats);
+ }
+ return new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
+ }
+
+ @Override
+ protected Writeable.Reader instanceReader() {
+ return EnrichStatsAction.Response::new;
+ }
+
+ private static TaskInfo randomTaskInfo() {
+ TaskId taskId = new TaskId(randomAlphaOfLength(5), randomLong());
+ String type = randomAlphaOfLength(5);
+ String action = randomAlphaOfLength(5);
+ String description = randomAlphaOfLength(5);
+ long startTime = randomLong();
+ long runningTimeNanos = randomLong();
+ boolean cancellable = randomBoolean();
+ TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
+ Map headers = randomBoolean() ?
+ Collections.emptyMap() :
+ Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
+ return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
+ }
+}
diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.stats.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.stats.json
new file mode 100644
index 00000000000..0dda96d81fa
--- /dev/null
+++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.stats.json
@@ -0,0 +1,14 @@
+{
+ "enrich.stats": {
+ "documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-stats.html",
+ "stability" : "stable",
+ "url": {
+ "paths": [
+ {
+ "path": "/_enrich/_stats",
+ "methods": [ "GET" ]
+ }
+ ]
+ }
+ }
+}