Add enrich stats api (#46462)

The enrich api returns enrich coordinator stats and
information about currently executing enrich policies.

The coordinator stats include per ingest node:
* The current number of search requests in the queue.
* The total number of outstanding remote requests that
  have been executed since node startup. Each remote
  request is likely to include multiple search requests.
  This depends on how much search requests are in the
  queue at the time when the remote request is performed.
* The number of current outstanding remote requests.
* The total number of search requests that `enrich`
  processors have executed since node startup.

The current execution policies stats include:
* The name of policy that is executing
* A full blow task info object that is executing the policy.

Relates to #32789
This commit is contained in:
Martijn van Groningen 2019-09-11 12:58:46 +02:00
parent b5dbb3f2b3
commit a4b0f66919
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
15 changed files with 716 additions and 14 deletions

View File

@ -0,0 +1,52 @@
[role="xpack"]
[testenv="basic"]
[[enrich-stats-api]]
=== Enrich stats API
++++
<titleabbrev>Enrich stats API</titleabbrev>
++++
Returns enrich coordinator stats and information about currently executing enrich policies.
[source,js]
--------------------------------------------------
GET /_enrich/_stats
--------------------------------------------------
// CONSOLE
// TEST
The API returns the following response:
[source,js]
--------------------------------------------------
{
"executing_policies": [],
"coordinator_stats": [
{
"node_id": "1sFM8cmSROZYhPxVsiWew",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 0,
"executed_searches_total": 0
}
]
}
--------------------------------------------------
// TESTRESPONSE[s/"node_id": "1sFM8cmSROZYhPxVsiWew"/"node_id" : $body.coordinator_stats.0.node_id/]
// TESTRESPONSE[s/"remote_requests_total": 0/"remote_requests_total" : $body.coordinator_stats.0.remote_requests_total/]
// TESTRESPONSE[s/"executed_searches_total": 0/"executed_searches_total" : $body.coordinator_stats.0.executed_searches_total/]
The top level `executing_policies` field includes an object for each policy that is currently executing.
Each object contains the following fields:
* `name` - The name of policy that is executing
* `task` - A full blow task info object that is executing the policy.
The top level `coordinator_stats` field includes an object for each ingest node with information about the coordinator.
Each object contains the following fields:
* `node_id` - The id of the ingest node that is coordinating search requests for configured `enrich` processors.
* `queue_size` - The current number of search requests in the queue.
* `remote_requests_current` - The number of current outstanding remote requests.
* `remote_requests_total` - The total number of outstanding remote requests that have been executed since node startup.
Each remote request is likely to include multiple search requests. This depends on how much
search requests are in the queue at the time when the remote request is performed.
* `executed_searches_total` - The total number of search requests that `enrich` processors have executed since node startup.

View File

@ -7,6 +7,7 @@ The following enrich APIs are available for managing enrich policies:
* <<delete-enrich-policy-api>> to delete an enrich policy
* <<get-enrich-policy-api>> to return information about an enrich policy
* <<execute-enrich-policy-api>> to execute an enrich policy
* <<enrich-stats-api>> to get enrich related stats
include::put-enrich-policy.asciidoc[]
@ -16,3 +17,5 @@ include::delete-enrich-policy.asciidoc[]
include::get-enrich-policy.asciidoc[]
include::execute-enrich-policy.asciidoc[]
include::enrich-stats.asciidoc[]

View File

@ -0,0 +1,235 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.enrich.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.TaskInfo;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
public static final EnrichStatsAction INSTANCE = new EnrichStatsAction();
public static final String NAME = "cluster:admin/xpack/enrich/stats";
private EnrichStatsAction() {
super(NAME, Response::new);
}
public static class Request extends MasterNodeRequest<Request> {
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private final List<ExecutingPolicy> executingPolicies;
private final Map<String, CoordinatorStats> coordinatorStats;
public Response(List<ExecutingPolicy> executingPolicies, Map<String, CoordinatorStats> coordinatorStats) {
this.executingPolicies = executingPolicies;
this.coordinatorStats = coordinatorStats;
}
public Response(StreamInput in) throws IOException {
super(in);
executingPolicies = in.readList(ExecutingPolicy::new);
coordinatorStats = in.readMap(StreamInput::readString, CoordinatorStats::new);
}
public List<ExecutingPolicy> getExecutingPolicies() {
return executingPolicies;
}
public Map<String, CoordinatorStats> getCoordinatorStats() {
return coordinatorStats;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(executingPolicies);
out.writeMap(coordinatorStats, StreamOutput::writeString, (innerOut, stat) -> stat.writeTo(innerOut));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("executing_policies");
executingPolicies.stream().sorted().forEachOrdered(policy -> {
try {
builder.startObject();
builder.field("name", policy.getName());
{
builder.startObject("task");
builder.value(policy.getTaskInfo());
builder.endObject();
}
builder.endObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
builder.endArray();
builder.startArray("coordinator_stats");
coordinatorStats.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).forEachOrdered(entry -> {
try {
builder.startObject();
builder.field("node_id", entry.getKey());
builder.field("queue_size", entry.getValue().getQueueSize());
builder.field("remote_requests_current", entry.getValue().getRemoteRequestsCurrent());
builder.field("remote_requests_total", entry.getValue().getRemoteRequestsTotal());
builder.field("executed_searches_total", entry.getValue().getExecutedSearchesTotal());
builder.endObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
builder.endArray();
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return executingPolicies.equals(response.executingPolicies) &&
coordinatorStats.equals(response.coordinatorStats);
}
@Override
public int hashCode() {
return Objects.hash(executingPolicies, coordinatorStats);
}
public static class CoordinatorStats implements Writeable {
private final int queueSize;
private final int remoteRequestsCurrent;
private final long remoteRequestsTotal;
private final long executedSearchesTotal;
public CoordinatorStats(int queueSize, int remoteRequestsCurrent, long remoteRequestsTotal, long executedSearchesTotal) {
this.queueSize = queueSize;
this.remoteRequestsCurrent = remoteRequestsCurrent;
this.remoteRequestsTotal = remoteRequestsTotal;
this.executedSearchesTotal = executedSearchesTotal;
}
public CoordinatorStats(StreamInput in) throws IOException {
this(in.readVInt(), in.readVInt(), in.readVLong(), in.readVLong());
}
public int getQueueSize() {
return queueSize;
}
public int getRemoteRequestsCurrent() {
return remoteRequestsCurrent;
}
public long getRemoteRequestsTotal() {
return remoteRequestsTotal;
}
public long getExecutedSearchesTotal() {
return executedSearchesTotal;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(queueSize);
out.writeVInt(remoteRequestsCurrent);
out.writeVLong(remoteRequestsTotal);
out.writeVLong(executedSearchesTotal);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CoordinatorStats stats = (CoordinatorStats) o;
return queueSize == stats.queueSize &&
remoteRequestsCurrent == stats.remoteRequestsCurrent &&
remoteRequestsTotal == stats.remoteRequestsTotal &&
executedSearchesTotal == stats.executedSearchesTotal;
}
@Override
public int hashCode() {
return Objects.hash(queueSize, remoteRequestsCurrent, remoteRequestsTotal, executedSearchesTotal);
}
}
public static class ExecutingPolicy implements Writeable {
private final String name;
private final TaskInfo taskInfo;
public ExecutingPolicy(String name, TaskInfo taskInfo) {
this.name = name;
this.taskInfo = taskInfo;
}
ExecutingPolicy(StreamInput in) throws IOException {
this(in.readString(), new TaskInfo(in));
}
public String getName() {
return name;
}
public TaskInfo getTaskInfo() {
return taskInfo;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
taskInfo.writeTo(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExecutingPolicy that = (ExecutingPolicy) o;
return name.equals(that.name) &&
taskInfo.equals(that.taskInfo);
}
@Override
public int hashCode() {
return Objects.hash(name, taskInfo);
}
}
}
}

View File

@ -52,6 +52,12 @@ public class ExecuteEnrichPolicyAction extends ActionType<AcknowledgedResponse>
return null;
}
// This will be displayed in tasks api and allows stats api to figure out which policies are being executed.
@Override
public String getDescription() {
return name;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -33,6 +33,15 @@
- match: { policies.0.match.match_field: baz }
- match: { policies.0.match.enrich_fields: ["a", "b"] }
- do:
enrich.stats: {}
- length: { executing_policies: 0}
- length: { coordinator_stats: 1}
- match: { coordinator_stats.0.queue_size: 0}
- match: { coordinator_stats.0.remote_requests_current: 0}
- gte: { coordinator_stats.0.remote_requests_total: 0}
- gte: { coordinator_stats.0.executed_searches_total: 0}
- do:
enrich.delete_policy:
name: policy-crud

View File

@ -36,16 +36,20 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction;
import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction;
import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportEnrichStatsAction;
import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportGetEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportPutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestDeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestEnrichStatsAction;
import org.elasticsearch.xpack.enrich.rest.RestExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestGetEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction;
@ -117,8 +121,10 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
new ActionHandler<>(DeleteEnrichPolicyAction.INSTANCE, TransportDeleteEnrichPolicyAction.class),
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class),
new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class),
new ActionHandler<>(EnrichStatsAction.INSTANCE, TransportEnrichStatsAction.class),
new ActionHandler<>(EnrichCoordinatorProxyAction.INSTANCE, EnrichCoordinatorProxyAction.TransportAction.class),
new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class)
new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class),
new ActionHandler<>(EnrichCoordinatorStatsAction.INSTANCE, EnrichCoordinatorStatsAction.TransportAction.class)
);
}
@ -134,7 +140,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
new RestGetEnrichPolicyAction(restController),
new RestDeleteEnrichPolicyAction(restController),
new RestPutEnrichPolicyAction(restController),
new RestExecuteEnrichPolicyAction(restController)
new RestExecuteEnrichPolicyAction(restController),
new RestEnrichStatsAction(restController)
);
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.enrich.EnrichPlugin;
import java.util.ArrayList;
@ -34,6 +35,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* An internal action to locally manage the load of the search requests that originate from the enrich processor.
@ -72,7 +74,9 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
final int maxLookupsPerRequest;
final int maxNumberOfConcurrentRequests;
final BlockingQueue<Slot> queue;
final AtomicInteger numberOfOutstandingRequests = new AtomicInteger(0);
final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0);
volatile long remoteRequestsTotal = 0;
final AtomicLong executedSearchesTotal = new AtomicLong(0);
public Coordinator(Client client, Settings settings) {
this(
@ -105,16 +109,22 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
coordinateLookups();
}
CoordinatorStats getStats() {
return new CoordinatorStats(queue.size(), remoteRequestsCurrent.get(), remoteRequestsTotal,
executedSearchesTotal.get());
}
synchronized void coordinateLookups() {
while (queue.isEmpty() == false &&
numberOfOutstandingRequests.get() < maxNumberOfConcurrentRequests) {
remoteRequestsCurrent.get() < maxNumberOfConcurrentRequests) {
final List<Slot> slots = new ArrayList<>();
queue.drainTo(slots, maxLookupsPerRequest);
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
slots.forEach(slot -> multiSearchRequest.add(slot.searchRequest));
numberOfOutstandingRequests.incrementAndGet();
remoteRequestsCurrent.incrementAndGet();
remoteRequestsTotal++;
lookupFunction.accept(multiSearchRequest, (response, e) -> {
handleResponse(slots, response, e);
});
@ -122,7 +132,8 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
}
void handleResponse(List<Slot> slots, MultiSearchResponse response, Exception e) {
numberOfOutstandingRequests.decrementAndGet();
remoteRequestsCurrent.decrementAndGet();
executedSearchesTotal.addAndGet(slots.size());
if (response != null) {
assert slots.size() == response.getResponses().length;

View File

@ -0,0 +1,151 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import java.io.IOException;
import java.util.List;
/**
* This is an internal action that gather coordinator stats from each node with an ingest role in the cluster.
* This action is only used via the {@link EnrichStatsAction}.
*/
public class EnrichCoordinatorStatsAction extends ActionType<EnrichCoordinatorStatsAction.Response> {
public static final EnrichCoordinatorStatsAction INSTANCE = new EnrichCoordinatorStatsAction();
public static final String NAME = "cluster:admin/xpack/enrich/coordinator_stats";
private EnrichCoordinatorStatsAction() {
super(NAME, Response::new);
}
// This always executes on all ingest nodes, hence no node ids need to be provided.
public static class Request extends BaseNodesRequest<Request> {
public Request() {
super(new String[0]);
}
Request(StreamInput in) throws IOException {
super(in);
}
}
public static class NodeRequest extends BaseNodeRequest {
NodeRequest() {}
NodeRequest(StreamInput in) throws IOException {
super(in);
}
}
public static class Response extends BaseNodesResponse<NodeResponse> {
Response(StreamInput in) throws IOException {
super(in);
}
Response(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeResponse::new);
}
@Override
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
out.writeList(nodes);
}
}
public static class NodeResponse extends BaseNodeResponse {
private final CoordinatorStats coordinatorStats;
NodeResponse(DiscoveryNode node, CoordinatorStats coordinatorStats) {
super(node);
this.coordinatorStats = coordinatorStats;
}
NodeResponse(StreamInput in) throws IOException {
super(in);
this.coordinatorStats = new CoordinatorStats(in);
}
public CoordinatorStats getCoordinatorStats() {
return coordinatorStats;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
coordinatorStats.writeTo(out);
}
}
public static class TransportAction extends TransportNodesAction<Request, Response, NodeRequest, NodeResponse> {
private final EnrichCoordinatorProxyAction.Coordinator coordinator;
@Inject
public TransportAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, EnrichCoordinatorProxyAction.Coordinator coordinator) {
super(NAME, threadPool, clusterService, transportService, actionFilters, Request::new, NodeRequest::new,
ThreadPool.Names.SAME, NodeResponse.class);
this.coordinator = coordinator;
}
@Override
protected void resolveRequest(Request request, ClusterState clusterState) {
DiscoveryNode[] ingestNodes = clusterState.getNodes().getIngestNodes().values().toArray(DiscoveryNode.class);
request.setConcreteNodes(ingestNodes);
}
@Override
protected Response newResponse(Request request, List<NodeResponse> nodeResponses, List<FailedNodeException> failures) {
return new Response(clusterService.getClusterName(), nodeResponses, failures);
}
@Override
protected NodeRequest newNodeRequest(Request request) {
return new NodeRequest();
}
@Override
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
return new NodeResponse(in);
}
@Override
protected NodeResponse nodeOperation(NodeRequest request) {
return new NodeResponse(clusterService.localNode(), coordinator.getStats());
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction.NodeResponse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TransportEnrichStatsAction extends TransportMasterNodeAction<EnrichStatsAction.Request, EnrichStatsAction.Response> {
private final Client client;
@Inject
public TransportEnrichStatsAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
super(EnrichStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
EnrichStatsAction.Request::new, indexNameExpressionResolver);
this.client = client;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected EnrichStatsAction.Response read(StreamInput in) throws IOException {
return new EnrichStatsAction.Response(in);
}
@Override
protected void masterOperation(EnrichStatsAction.Request request,
ClusterState state,
ActionListener<EnrichStatsAction.Response> listener) throws Exception {
EnrichCoordinatorStatsAction.Request statsRequest = new EnrichCoordinatorStatsAction.Request();
ActionListener<EnrichCoordinatorStatsAction.Response> statsListener = ActionListener.wrap(
response -> {
if (response.hasFailures()) {
// Report failures even if some node level requests succeed:
Exception failure = null;
for (FailedNodeException nodeFailure : response.failures()) {
if (failure == null) {
failure = nodeFailure;
} else {
failure.addSuppressed(nodeFailure);
}
}
listener.onFailure(failure);
return;
}
Map<String, EnrichStatsAction.Response.CoordinatorStats> coordinatorStats = response.getNodes().stream()
.collect(Collectors.toMap(nodeResponse -> nodeResponse.getNode().getId(), NodeResponse::getCoordinatorStats));
List<ExecutingPolicy> policyExecutionTasks = taskManager.getTasks().values().stream()
.filter(t -> t.getAction().equals(ExecuteEnrichPolicyAction.NAME))
.map(t -> t.taskInfo(clusterService.localNode().getId(), true))
.map(t -> new ExecutingPolicy(t.getDescription(), t))
.collect(Collectors.toList());
listener.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats));
},
listener::onFailure
);
client.execute(EnrichCoordinatorStatsAction.INSTANCE, statsRequest, statsListener);
}
@Override
protected ClusterBlockException checkBlock(EnrichStatsAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import java.io.IOException;
public class RestEnrichStatsAction extends BaseRestHandler {
public RestEnrichStatsAction(final RestController controller) {
controller.registerHandler(RestRequest.Method.GET, "/_enrich/_stats", this);
}
@Override
public String getName() {
return "enrich_stats";
}
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final EnrichStatsAction.Request request = new EnrichStatsAction.Request();
return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -13,12 +13,14 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
@ -36,6 +38,7 @@ import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.MATCH_FIELD;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.SOURCE_INDEX_NAME;
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -46,6 +49,11 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class);
}
@Override
protected boolean resetNodeAfterTest() {
return true;
}
public void testIngestDataWithEnrichProcessor() {
int numDocs = 32;
int maxMatches = randomIntBetween(2, 8);
@ -95,6 +103,13 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
}
}
EnrichStatsAction.Response statsResponse =
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getExecutedSearchesTotal(), equalTo((long) numDocs));
}
public void testMultiplePolicies() {

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
@ -26,6 +27,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
@ -41,6 +43,7 @@ import java.util.Set;
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -157,6 +160,13 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
assertThat(userEntry.get(field), notNullValue());
}
}
EnrichStatsAction.Response statsResponse =
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size()));
String nodeId = internalCluster().getInstance(ClusterService.class, coordinatingNode).localNode().getId();
assertThat(statsResponse.getCoordinatorStats().get(nodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(nodeId).getExecutedSearchesTotal(), equalTo((long) numDocs));
}
private static List<String> createSourceIndex(int numDocs) {

View File

@ -68,14 +68,14 @@ public class CoordinatorTests extends ESTestCase {
// First batch of search requests have been sent off:
// (However still 5 should remain in the queue)
assertThat(coordinator.queue.size(), equalTo(5));
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5));
// Nothing should happen now, because there is an outstanding request and max number of requests has been set to 1:
coordinator.coordinateLookups();
assertThat(coordinator.queue.size(), equalTo(5));
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
SearchResponse emptyResponse = emptySearchResponse();
@ -86,7 +86,7 @@ public class CoordinatorTests extends ESTestCase {
}
lookupFunction.capturedConsumers.get(0).accept(new MultiSearchResponse(responseItems, 1L), null);
assertThat(coordinator.queue.size(), equalTo(0));
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(2));
// Replying last response, resulting in an empty queue and no outstanding requests.
@ -96,7 +96,7 @@ public class CoordinatorTests extends ESTestCase {
}
lookupFunction.capturedConsumers.get(1).accept(new MultiSearchResponse(responseItems, 1L), null);
assertThat(coordinator.queue.size(), equalTo(0));
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0));
assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(0));
assertThat(lookupFunction.capturedRequests.size(), equalTo(2));
// All individual action listeners for the search requests should have been invoked:
@ -129,14 +129,14 @@ public class CoordinatorTests extends ESTestCase {
// First batch of search requests have been sent off:
// (However still 5 should remain in the queue)
assertThat(coordinator.queue.size(), equalTo(0));
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5));
RuntimeException e = new RuntimeException();
lookupFunction.capturedConsumers.get(0).accept(null, e);
assertThat(coordinator.queue.size(), equalTo(0));
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0));
assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(0));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
// All individual action listeners for the search requests should have been invoked:
@ -169,7 +169,7 @@ public class CoordinatorTests extends ESTestCase {
// First batch of search requests have been sent off:
// (However still 5 should remain in the queue)
assertThat(coordinator.queue.size(), equalTo(0));
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(1));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5));
@ -181,7 +181,7 @@ public class CoordinatorTests extends ESTestCase {
}
lookupFunction.capturedConsumers.get(0).accept(new MultiSearchResponse(responseItems, 1L), null);
assertThat(coordinator.queue.size(), equalTo(0));
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0));
assertThat(coordinator.remoteRequestsCurrent.get(), equalTo(0));
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
// All individual action listeners for the search requests should have been invoked:

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase<EnrichStatsAction.Response> {
@Override
protected EnrichStatsAction.Response createTestInstance() {
int numExecutingPolicies = randomIntBetween(0, 16);
List<ExecutingPolicy> executingPolicies = new ArrayList<>(numExecutingPolicies);
for (int i = 0; i < numExecutingPolicies; i++) {
TaskInfo taskInfo = randomTaskInfo();
executingPolicies.add(new ExecutingPolicy(randomAlphaOfLength(4), taskInfo));
}
int numCoordinatingStats = randomIntBetween(0, 16);
Map<String, CoordinatorStats> coordinatorStats = new HashMap<>(numCoordinatingStats);
for (int i = 0; i < numCoordinatingStats; i++) {
CoordinatorStats stats = new CoordinatorStats(randomIntBetween(0, 8096), randomIntBetween(0, 8096),
randomNonNegativeLong(), randomNonNegativeLong());
coordinatorStats.put(randomAlphaOfLength(4), stats);
}
return new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
}
@Override
protected Writeable.Reader<EnrichStatsAction.Response> instanceReader() {
return EnrichStatsAction.Response::new;
}
private static TaskInfo randomTaskInfo() {
TaskId taskId = new TaskId(randomAlphaOfLength(5), randomLong());
String type = randomAlphaOfLength(5);
String action = randomAlphaOfLength(5);
String description = randomAlphaOfLength(5);
long startTime = randomLong();
long runningTimeNanos = randomLong();
boolean cancellable = randomBoolean();
TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
Map<String, String> headers = randomBoolean() ?
Collections.emptyMap() :
Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
}
}

View File

@ -0,0 +1,14 @@
{
"enrich.stats": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-stats.html",
"stability" : "stable",
"url": {
"paths": [
{
"path": "/_enrich/_stats",
"methods": [ "GET" ]
}
]
}
}
}