diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java
index d493413dc1c..59f4951143d 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java
@@ -680,6 +680,22 @@ public class XPackLicenseState {
return localStatus.active;
}
+ /**
+ * Determine if the enrich processor and related APIs are allowed to be used.
+ *
+ * This is available in for all license types except
+ * {@link OperationMode#MISSING}
+ *
+ * @return {@code true} as long as the license is valid. Otherwise
+ * {@code false}.
+ */
+ public boolean isEnrichAllowed() {
+ // status is volatile
+ Status localStatus = status;
+ // Should work on all active licenses
+ return localStatus.active;
+ }
+
/**
* Determine if SQL support should be enabled.
*
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java
index a13ac215ca6..e323cb821bb 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java
@@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.TaskInfo;
@@ -20,7 +21,6 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
public class EnrichStatsAction extends ActionType {
@@ -50,9 +50,9 @@ public class EnrichStatsAction extends ActionType {
public static class Response extends ActionResponse implements ToXContentObject {
private final List executingPolicies;
- private final Map coordinatorStats;
+ private final List coordinatorStats;
- public Response(List executingPolicies, Map coordinatorStats) {
+ public Response(List executingPolicies, List coordinatorStats) {
this.executingPolicies = executingPolicies;
this.coordinatorStats = coordinatorStats;
}
@@ -60,36 +60,31 @@ public class EnrichStatsAction extends ActionType {
public Response(StreamInput in) throws IOException {
super(in);
executingPolicies = in.readList(ExecutingPolicy::new);
- coordinatorStats = in.readMap(StreamInput::readString, CoordinatorStats::new);
+ coordinatorStats = in.readList(CoordinatorStats::new);
}
public List getExecutingPolicies() {
return executingPolicies;
}
- public Map getCoordinatorStats() {
+ public List getCoordinatorStats() {
return coordinatorStats;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(executingPolicies);
- out.writeMap(coordinatorStats, StreamOutput::writeString, (innerOut, stat) -> stat.writeTo(innerOut));
+ out.writeList(coordinatorStats);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("executing_policies");
- executingPolicies.stream().sorted().forEachOrdered(policy -> {
+ executingPolicies.stream().sorted(Comparator.comparing(ExecutingPolicy::getName)).forEachOrdered(policy -> {
try {
builder.startObject();
- builder.field("name", policy.getName());
- {
- builder.startObject("task");
- builder.value(policy.getTaskInfo());
- builder.endObject();
- }
+ policy.toXContent(builder, params);
builder.endObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -97,14 +92,10 @@ public class EnrichStatsAction extends ActionType {
});
builder.endArray();
builder.startArray("coordinator_stats");
- coordinatorStats.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).forEachOrdered(entry -> {
+ coordinatorStats.stream().sorted(Comparator.comparing(CoordinatorStats::getNodeId)).forEachOrdered(entry -> {
try {
builder.startObject();
- builder.field("node_id", entry.getKey());
- builder.field("queue_size", entry.getValue().getQueueSize());
- builder.field("remote_requests_current", entry.getValue().getRemoteRequestsCurrent());
- builder.field("remote_requests_total", entry.getValue().getRemoteRequestsTotal());
- builder.field("executed_searches_total", entry.getValue().getExecutedSearchesTotal());
+ entry.toXContent(builder, params);
builder.endObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -129,14 +120,20 @@ public class EnrichStatsAction extends ActionType {
return Objects.hash(executingPolicies, coordinatorStats);
}
- public static class CoordinatorStats implements Writeable {
+ public static class CoordinatorStats implements Writeable, ToXContentFragment {
+ private final String nodeId;
private final int queueSize;
private final int remoteRequestsCurrent;
private final long remoteRequestsTotal;
private final long executedSearchesTotal;
- public CoordinatorStats(int queueSize, int remoteRequestsCurrent, long remoteRequestsTotal, long executedSearchesTotal) {
+ public CoordinatorStats(String nodeId,
+ int queueSize,
+ int remoteRequestsCurrent,
+ long remoteRequestsTotal,
+ long executedSearchesTotal) {
+ this.nodeId = nodeId;
this.queueSize = queueSize;
this.remoteRequestsCurrent = remoteRequestsCurrent;
this.remoteRequestsTotal = remoteRequestsTotal;
@@ -144,7 +141,11 @@ public class EnrichStatsAction extends ActionType {
}
public CoordinatorStats(StreamInput in) throws IOException {
- this(in.readVInt(), in.readVInt(), in.readVLong(), in.readVLong());
+ this(in.readString(), in.readVInt(), in.readVInt(), in.readVLong(), in.readVLong());
+ }
+
+ public String getNodeId() {
+ return nodeId;
}
public int getQueueSize() {
@@ -165,18 +166,30 @@ public class EnrichStatsAction extends ActionType {
@Override
public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(nodeId);
out.writeVInt(queueSize);
out.writeVInt(remoteRequestsCurrent);
out.writeVLong(remoteRequestsTotal);
out.writeVLong(executedSearchesTotal);
}
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.field("node_id", nodeId);
+ builder.field("queue_size", queueSize);
+ builder.field("remote_requests_current", remoteRequestsCurrent);
+ builder.field("remote_requests_total", remoteRequestsTotal);
+ builder.field("executed_searches_total", executedSearchesTotal);
+ return builder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CoordinatorStats stats = (CoordinatorStats) o;
- return queueSize == stats.queueSize &&
+ return Objects.equals(nodeId, stats.nodeId) &&
+ queueSize == stats.queueSize &&
remoteRequestsCurrent == stats.remoteRequestsCurrent &&
remoteRequestsTotal == stats.remoteRequestsTotal &&
executedSearchesTotal == stats.executedSearchesTotal;
@@ -184,11 +197,11 @@ public class EnrichStatsAction extends ActionType {
@Override
public int hashCode() {
- return Objects.hash(queueSize, remoteRequestsCurrent, remoteRequestsTotal, executedSearchesTotal);
+ return Objects.hash(nodeId, queueSize, remoteRequestsCurrent, remoteRequestsTotal, executedSearchesTotal);
}
}
- public static class ExecutingPolicy implements Writeable {
+ public static class ExecutingPolicy implements Writeable, ToXContentFragment {
private final String name;
private final TaskInfo taskInfo;
@@ -216,6 +229,17 @@ public class EnrichStatsAction extends ActionType {
taskInfo.writeTo(out);
}
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.field("name", name);
+ builder.startObject("task");
+ {
+ builder.value(taskInfo);
+ }
+ builder.endObject();
+ return builder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json
index 89b97abac61..fb7d2d7764a 100644
--- a/x-pack/plugin/core/src/main/resources/monitoring-es.json
+++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json
@@ -1093,6 +1093,62 @@
}
}
}
+ },
+ "enrich_coordinator_stats" : {
+ "properties": {
+ "node_id": {
+ "type": "keyword"
+ },
+ "queue_size": {
+ "type": "integer"
+ },
+ "remote_requests_current" : {
+ "type": "long"
+ },
+ "remote_requests_total" : {
+ "type": "long"
+ },
+ "executed_searches_total" : {
+ "type": "long"
+ }
+ }
+ },
+ "enrich_executing_policy_stats": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "task": {
+ "type": "object",
+ "properties": {
+ "node": {
+ "type": "keyword"
+ },
+ "id": {
+ "type": "long"
+ },
+ "type": {
+ "type": "keyword"
+ },
+ "action": {
+ "type": "keyword"
+ },
+ "description": {
+ "type": "keyword"
+ },
+ "start_time_in_millis": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "running_time_in_nanos": {
+ "type": "long"
+ },
+ "cancellable": {
+ "type": "boolean"
+ }
+ }
+ }
+ }
}
}
}
diff --git a/x-pack/plugin/enrich/build.gradle b/x-pack/plugin/enrich/build.gradle
index 36b5fa18d2c..5dd859ddc77 100644
--- a/x-pack/plugin/enrich/build.gradle
+++ b/x-pack/plugin/enrich/build.gradle
@@ -12,6 +12,8 @@ archivesBaseName = 'x-pack-enrich'
dependencies {
compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
+ testCompile project(path: ':modules:ingest-common')
+ testCompile project(path: xpackModule('monitoring'), configuration: 'testArtifacts')
if (isEclipse) {
testCompile project(path: xpackModule('core-tests'), configuration: 'testArtifacts')
}
diff --git a/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java b/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java
index bb98d4580d7..b41af078159 100644
--- a/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java
+++ b/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java
@@ -21,9 +21,12 @@ import org.junit.After;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
@@ -87,6 +90,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
public void testBasicFlow() throws Exception {
setupGenericLifecycleTest(true);
+ assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 30, TimeUnit.SECONDS);
}
public void testImmutablePolicy() throws IOException {
@@ -153,4 +157,35 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
private static Map toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}
+
+ private static void verifyEnrichMonitoring() throws IOException {
+ Request request = new Request("GET", "/.monitoring-*/_search");
+ request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"enrich_coordinator_stats\"}}}");
+ Map response;
+ try {
+ response = toMap(adminClient().performRequest(request));
+ } catch (ResponseException e) {
+ throw new AssertionError("error while searching", e);
+ }
+
+ int maxRemoteRequestsTotal = 0;
+ int maxExecutedSearchesTotal = 0;
+
+ List> hits = (List>) XContentMapValues.extractValue("hits.hits", response);
+ assertThat(hits.size(), greaterThanOrEqualTo(1));
+
+ for (int i = 0; i < hits.size(); i++) {
+ Map, ?> hit = (Map, ?>) hits.get(i);
+
+ int foundRemoteRequestsTotal =
+ (int) XContentMapValues.extractValue("_source.enrich_coordinator_stats.remote_requests_total", hit);
+ maxRemoteRequestsTotal = Math.max(maxRemoteRequestsTotal, foundRemoteRequestsTotal);
+ int foundExecutedSearchesTotal =
+ (int) XContentMapValues.extractValue("_source.enrich_coordinator_stats.executed_searches_total", hit);
+ maxExecutedSearchesTotal = Math.max(maxExecutedSearchesTotal, foundExecutedSearchesTotal);
+ }
+
+ assertThat(maxRemoteRequestsTotal, greaterThan(0));
+ assertThat(maxExecutedSearchesTotal, greaterThan(0));
+ }
}
diff --git a/x-pack/plugin/enrich/qa/rest-with-security/build.gradle b/x-pack/plugin/enrich/qa/rest-with-security/build.gradle
index 89bff0483df..91427991e3d 100644
--- a/x-pack/plugin/enrich/qa/rest-with-security/build.gradle
+++ b/x-pack/plugin/enrich/qa/rest-with-security/build.gradle
@@ -15,4 +15,5 @@ testClusters.integTest {
user username: "test_enrich_no_privs", password: "x-pack-test-password", role: "enrich_no_privs"
setting 'xpack.license.self_generated.type', 'basic'
setting 'xpack.security.enabled', 'true'
+ setting 'xpack.monitoring.collection.enabled', 'true'
}
diff --git a/x-pack/plugin/enrich/qa/rest/build.gradle b/x-pack/plugin/enrich/qa/rest/build.gradle
index 59ffa85ca97..c96782f074f 100644
--- a/x-pack/plugin/enrich/qa/rest/build.gradle
+++ b/x-pack/plugin/enrich/qa/rest/build.gradle
@@ -10,4 +10,5 @@ dependencies {
testClusters.integTest {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'basic'
+ setting 'xpack.monitoring.collection.enabled', 'true'
}
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
index 56eba0c25b4..c6d25195682 100644
--- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
@@ -109,8 +109,8 @@ public class EnrichCoordinatorProxyAction extends ActionType {
coordinateLookups();
}
- CoordinatorStats getStats() {
- return new CoordinatorStats(queue.size(), remoteRequestsCurrent.get(), remoteRequestsTotal,
+ CoordinatorStats getStats(String nodeId) {
+ return new CoordinatorStats(nodeId, queue.size(), remoteRequestsCurrent.get(), remoteRequestsTotal,
executedSearchesTotal.get());
}
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java
index 09a47b7d11c..4d59d8dc531 100644
--- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java
@@ -144,7 +144,8 @@ public class EnrichCoordinatorStatsAction extends ActionType {
@@ -73,8 +72,9 @@ public class TransportEnrichStatsAction extends TransportMasterNodeAction coordinatorStats = response.getNodes().stream()
- .collect(Collectors.toMap(nodeResponse -> nodeResponse.getNode().getId(), NodeResponse::getCoordinatorStats));
+ List coordinatorStats = response.getNodes().stream()
+ .map(EnrichCoordinatorStatsAction.NodeResponse::getCoordinatorStats)
+ .collect(Collectors.toList());
List policyExecutionTasks = taskManager.getTasks().values().stream()
.filter(t -> t.getAction().equals(ExecuteEnrichPolicyAction.NAME))
.map(t -> t.taskInfo(clusterService.localNode().getId(), true))
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java
index ddc432ed45d..373cfbfbe8a 100644
--- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java
@@ -108,8 +108,9 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
- assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
- assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getExecutedSearchesTotal(), equalTo((long) numDocs));
+ assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId));
+ assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
+ assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo((long) numDocs));
}
public void testMultiplePolicies() {
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java
index 9b8d80d7075..6b278abd1ca 100644
--- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java
@@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
@@ -165,8 +166,13 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size()));
String nodeId = internalCluster().getInstance(ClusterService.class, coordinatingNode).localNode().getId();
- assertThat(statsResponse.getCoordinatorStats().get(nodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
- assertThat(statsResponse.getCoordinatorStats().get(nodeId).getExecutedSearchesTotal(), equalTo((long) numDocs));
+ CoordinatorStats stats = statsResponse.getCoordinatorStats().stream()
+ .filter(s -> s.getNodeId().equals(nodeId))
+ .findAny()
+ .get();
+ assertThat(stats.getNodeId(), equalTo(nodeId));
+ assertThat(stats.getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
+ assertThat(stats.getExecutedSearchesTotal(), equalTo((long) numDocs));
}
private static List createSourceIndex(int numDocs) {
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java
index 21f0ee0c605..47b2a3396ae 100644
--- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java
@@ -15,7 +15,6 @@ import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.Exe
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,11 +29,11 @@ public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase coordinatorStats = new HashMap<>(numCoordinatingStats);
+ List coordinatorStats = new ArrayList<>(numCoordinatingStats);
for (int i = 0; i < numCoordinatingStats; i++) {
- CoordinatorStats stats = new CoordinatorStats(randomIntBetween(0, 8096), randomIntBetween(0, 8096),
- randomNonNegativeLong(), randomNonNegativeLong());
- coordinatorStats.put(randomAlphaOfLength(4), stats);
+ CoordinatorStats stats = new CoordinatorStats(randomAlphaOfLength(4), randomIntBetween(0, 8096),
+ randomIntBetween(0, 8096), randomNonNegativeLong(), randomNonNegativeLong());
+ coordinatorStats.add(stats);
}
return new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
}
@@ -44,7 +43,7 @@ public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase {
+
+ static final DateFormatter DATE_TIME_FORMATTER = DateFormatter.forPattern("strict_date_time").withZone(ZoneOffset.UTC);
+
+ private CoordinatorStats stats;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ stats = new CoordinatorStats(
+ randomAlphaOfLength(4),
+ randomIntBetween(0, Integer.MAX_VALUE),
+ randomIntBetween(0, Integer.MAX_VALUE),
+ randomNonNegativeLong(),
+ randomNonNegativeLong()
+ );
+ }
+
+ @Override
+ protected EnrichCoordinatorDoc createMonitoringDoc(String cluster,
+ long timestamp,
+ long interval,
+ MonitoringDoc.Node node,
+ MonitoredSystem system,
+ String type,
+ String id) {
+
+ return new EnrichCoordinatorDoc(cluster, timestamp, interval, node, stats);
+ }
+
+ @Override
+ protected void assertMonitoringDoc(EnrichCoordinatorDoc document) {
+ assertThat(document.getSystem(), is(MonitoredSystem.ES));
+ assertThat(document.getType(), is(EnrichCoordinatorDoc.TYPE));
+ assertThat(document.getId(), nullValue());
+ assertThat(document.getCoordinatorStats(), equalTo(stats));
+ }
+
+ @Override
+ public void testToXContent() throws IOException {
+ final long timestamp = System.currentTimeMillis();
+ final long intervalMillis = System.currentTimeMillis();
+ final long nodeTimestamp = System.currentTimeMillis();
+ final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", nodeTimestamp);
+
+ final EnrichCoordinatorDoc document = new EnrichCoordinatorDoc("_cluster", timestamp, intervalMillis, node, stats);
+ final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
+ assertThat(xContent.utf8ToString(), equalTo(
+ "{"
+ + "\"cluster_uuid\":\"_cluster\","
+ + "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(timestamp) + "\","
+ + "\"interval_ms\":" + intervalMillis + ","
+ + "\"type\":\"enrich_coordinator_stats\","
+ + "\"source_node\":{"
+ + "\"uuid\":\"_uuid\","
+ + "\"host\":\"_host\","
+ + "\"transport_address\":\"_addr\","
+ + "\"ip\":\"_ip\","
+ + "\"name\":\"_name\","
+ + "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(nodeTimestamp) + "\""
+ + "},"
+ + "\"enrich_coordinator_stats\":{"
+ + "\"node_id\":\"" + stats.getNodeId() + "\","
+ + "\"queue_size\":" + stats.getQueueSize() + ","
+ + "\"remote_requests_current\":" + stats.getRemoteRequestsCurrent() + ","
+ + "\"remote_requests_total\":" + stats.getRemoteRequestsTotal() + ","
+ + "\"executed_searches_total\":" + stats.getExecutedSearchesTotal()
+ + "}"
+ + "}"
+ ));
+ }
+
+ public void testEnrichCoordinatorStatsFieldsMapped() throws IOException {
+ XContentBuilder builder = jsonBuilder();
+ builder.startObject();
+ builder.value(stats);
+ builder.endObject();
+ Map serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
+
+ Map template =
+ XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
+ Map, ?> followStatsMapping = (Map, ?>) XContentMapValues
+ .extractValue("mappings._doc.properties.enrich_coordinator_stats.properties", template);
+ assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
+ for (Map.Entry entry : serializedStatus.entrySet()) {
+ String fieldName = entry.getKey();
+ Map, ?> fieldMapping = (Map, ?>) followStatsMapping.get(fieldName);
+ assertThat("no field mapping for field [" + fieldName + "]", fieldMapping, notNullValue());
+
+ Object fieldValue = entry.getValue();
+ String fieldType = (String) fieldMapping.get("type");
+ if (fieldValue instanceof Long || fieldValue instanceof Integer) {
+ assertThat("expected long field type for field [" + fieldName + "]", fieldType,
+ anyOf(equalTo("long"), equalTo("integer")));
+ } else if (fieldValue instanceof String) {
+ assertThat("expected keyword field type for field [" + fieldName + "]", fieldType,
+ anyOf(equalTo("keyword"), equalTo("text")));
+ } else {
+ // Manual test specific object fields and if not just fail:
+ fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]");
+ }
+ }
+ }
+}
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java
new file mode 100644
index 00000000000..c18372f3367
--- /dev/null
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring.collector.enrich;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.elasticsearch.xpack.enrich.action.EnrichStatsResponseTests.randomTaskInfo;
+import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class EnrichStatsCollectorTests extends BaseCollectorTestCase {
+
+ public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
+ final Settings settings = randomFrom(enrichEnabledSettings(), enrichDisabledSettings());
+ final boolean enrichAllowed = randomBoolean();
+ final boolean isElectedMaster = randomBoolean();
+ whenLocalNodeElectedMaster(isElectedMaster);
+
+ // this controls the blockage
+ when(licenseState.isMonitoringAllowed()).thenReturn(false);
+ when(licenseState.isEnrichAllowed()).thenReturn(enrichAllowed);
+
+ final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+ assertThat(collector.shouldCollect(isElectedMaster), is(false));
+ if (isElectedMaster) {
+ verify(licenseState).isMonitoringAllowed();
+ }
+ }
+
+ public void testShouldCollectReturnsFalseIfNotMaster() {
+ // regardless of enrich being enabled
+ final Settings settings = randomFrom(enrichEnabledSettings(), enrichDisabledSettings());
+
+ when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
+ when(licenseState.isEnrichAllowed()).thenReturn(randomBoolean());
+ // this controls the blockage
+ final boolean isElectedMaster = false;
+
+ final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+ assertThat(collector.shouldCollect(isElectedMaster), is(false));
+ }
+
+ public void testShouldCollectReturnsFalseIfEnrichIsDisabled() {
+ // this is controls the blockage
+ final Settings settings = enrichDisabledSettings();
+
+ when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
+ when(licenseState.isEnrichAllowed()).thenReturn(randomBoolean());
+
+ final boolean isElectedMaster = randomBoolean();
+ whenLocalNodeElectedMaster(isElectedMaster);
+
+ final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+ assertThat(collector.shouldCollect(isElectedMaster), is(false));
+
+ if (isElectedMaster) {
+ verify(licenseState).isMonitoringAllowed();
+ }
+ }
+
+ public void testShouldCollectReturnsFalseIfEnrichIsNotAllowed() {
+ final Settings settings = randomFrom(enrichEnabledSettings(), enrichDisabledSettings());
+
+ when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
+ // this is controls the blockage
+ when(licenseState.isEnrichAllowed()).thenReturn(false);
+ final boolean isElectedMaster = randomBoolean();
+ whenLocalNodeElectedMaster(isElectedMaster);
+
+ final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+ assertThat(collector.shouldCollect(isElectedMaster), is(false));
+
+ if (isElectedMaster) {
+ verify(licenseState).isMonitoringAllowed();
+ }
+ }
+
+ public void testShouldCollectReturnsTrue() {
+ final Settings settings = enrichEnabledSettings();
+
+ when(licenseState.isMonitoringAllowed()).thenReturn(true);
+ when(licenseState.isEnrichAllowed()).thenReturn(true);
+ final boolean isElectedMaster = true;
+
+ final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+ assertThat(collector.shouldCollect(isElectedMaster), is(true));
+
+ verify(licenseState).isMonitoringAllowed();
+ }
+
+ public void testDoCollect() throws Exception {
+ final String clusterUuid = randomAlphaOfLength(5);
+ whenClusterStateWithUUID(clusterUuid);
+
+ final MonitoringDoc.Node node = randomMonitoringNode(random());
+ final Client client = mock(Client.class);
+ final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+
+ final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
+ withCollectionTimeout(EnrichStatsCollector.STATS_TIMEOUT, timeout);
+
+ int numExecutingPolicies = randomIntBetween(0, 8);
+ List executingPolicies = new ArrayList<>(numExecutingPolicies);
+ for (int i = 0; i < numExecutingPolicies; i++) {
+ executingPolicies.add(new ExecutingPolicy(randomAlphaOfLength(4), randomTaskInfo()));
+ }
+ int numCoordinatorStats = randomIntBetween(0, 8);
+ List coordinatorStats = new ArrayList<>(numCoordinatorStats);
+ for (int i = 0; i < numCoordinatorStats; i++) {
+ coordinatorStats.add(new CoordinatorStats(
+ randomAlphaOfLength(4),
+ randomIntBetween(0, Integer.MAX_VALUE),
+ randomIntBetween(0, Integer.MAX_VALUE),
+ randomNonNegativeLong(),
+ randomNonNegativeLong()
+ ));
+ }
+
+
+ @SuppressWarnings("unchecked")
+ final ActionFuture future = (ActionFuture) mock(ActionFuture.class);
+ final EnrichStatsAction.Response response = new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
+
+ when(client.execute(eq(EnrichStatsAction.INSTANCE), any(EnrichStatsAction.Request.class))).thenReturn(future);
+ when(future.actionGet(timeout)).thenReturn(response);
+
+ final EnrichStatsCollector collector =
+ new EnrichStatsCollector(clusterService, licenseState, client, threadContext, settings);
+ assertEquals(timeout, collector.getCollectionTimeout());
+
+ final long interval = randomNonNegativeLong();
+ final List documents = new ArrayList<>(collector.doCollect(node, interval, clusterState));
+ verify(clusterState).metaData();
+ verify(metaData).clusterUUID();
+
+ assertThat(documents, hasSize(executingPolicies.size() + coordinatorStats.size()));
+
+ for (int i = 0; i < coordinatorStats.size(); i++) {
+ final EnrichCoordinatorDoc actual = (EnrichCoordinatorDoc) documents.get(i);
+ final CoordinatorStats expected = coordinatorStats.get(i);
+
+ assertThat(actual.getCluster(), is(clusterUuid));
+ assertThat(actual.getTimestamp(), greaterThan(0L));
+ assertThat(actual.getIntervalMillis(), equalTo(interval));
+ assertThat(actual.getNode(), equalTo(node));
+ assertThat(actual.getSystem(), is(MonitoredSystem.ES));
+ assertThat(actual.getType(), is(EnrichCoordinatorDoc.TYPE));
+ assertThat(actual.getId(), nullValue());
+ assertThat(actual.getCoordinatorStats(), equalTo(expected));
+ }
+
+ for (int i = coordinatorStats.size(); i < documents.size(); i++) {
+ final ExecutingPolicyDoc actual = (ExecutingPolicyDoc) documents.get(i);
+ final ExecutingPolicy expected = executingPolicies.get(i - coordinatorStats.size());
+
+ assertThat(actual.getCluster(), is(clusterUuid));
+ assertThat(actual.getTimestamp(), greaterThan(0L));
+ assertThat(actual.getIntervalMillis(), equalTo(interval));
+ assertThat(actual.getNode(), equalTo(node));
+ assertThat(actual.getSystem(), is(MonitoredSystem.ES));
+ assertThat(actual.getType(), is(ExecutingPolicyDoc.TYPE));
+ assertThat(actual.getId(), nullValue());
+ assertThat(actual.getExecutingPolicy(), equalTo(expected));
+ }
+ }
+
+ private EnrichStatsCollector createCollector(Settings settings,
+ ClusterService clusterService,
+ XPackLicenseState licenseState,
+ Client client) {
+ return new EnrichStatsCollector(clusterService, licenseState, client, settings);
+ }
+
+ private Settings enrichEnabledSettings() {
+ // since it's the default, we want to ensure we test both with/without it
+ return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.ENRICH_ENABLED_SETTING.getKey(), true).build();
+ }
+
+ private Settings enrichDisabledSettings() {
+ return Settings.builder().put(XPackSettings.ENRICH_ENABLED_SETTING.getKey(), false).build();
+ }
+
+}
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java
new file mode 100644
index 00000000000..a36c2d8af92
--- /dev/null
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring.collector.enrich;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
+import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.xpack.enrich.action.EnrichStatsResponseTests.randomTaskInfo;
+import static org.elasticsearch.xpack.monitoring.collector.enrich.EnrichCoordinatorDocTests.DATE_TIME_FORMATTER;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class ExecutingPolicyDocTests extends BaseMonitoringDocTestCase {
+
+ private ExecutingPolicy executingPolicy;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ executingPolicy = new ExecutingPolicy(
+ randomAlphaOfLength(4),
+ randomTaskInfo()
+ );
+ }
+
+ @Override
+ protected ExecutingPolicyDoc createMonitoringDoc(String cluster,
+ long timestamp,
+ long interval,
+ MonitoringDoc.Node node,
+ MonitoredSystem system,
+ String type,
+ String id) {
+
+ return new ExecutingPolicyDoc(cluster, timestamp, interval, node, executingPolicy);
+ }
+
+ @Override
+ protected void assertMonitoringDoc(ExecutingPolicyDoc document) {
+ assertThat(document.getSystem(), is(MonitoredSystem.ES));
+ assertThat(document.getType(), is(ExecutingPolicyDoc.TYPE));
+ assertThat(document.getId(), nullValue());
+ assertThat(document.getExecutingPolicy(), equalTo(executingPolicy));
+ }
+
+ @Override
+ public void testToXContent() throws IOException {
+ final long timestamp = System.currentTimeMillis();
+ final long intervalMillis = System.currentTimeMillis();
+ final long nodeTimestamp = System.currentTimeMillis();
+ final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", nodeTimestamp);
+
+ final ExecutingPolicyDoc document = new ExecutingPolicyDoc("_cluster", timestamp, intervalMillis, node, executingPolicy);
+ final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
+ Optional> header =
+ executingPolicy.getTaskInfo().getHeaders().entrySet().stream().findAny();
+ assertThat(xContent.utf8ToString(), equalTo(
+ "{"
+ + "\"cluster_uuid\":\"_cluster\","
+ + "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(timestamp) + "\","
+ + "\"interval_ms\":" + intervalMillis + ","
+ + "\"type\":\"enrich_executing_policy_stats\","
+ + "\"source_node\":{"
+ + "\"uuid\":\"_uuid\","
+ + "\"host\":\"_host\","
+ + "\"transport_address\":\"_addr\","
+ + "\"ip\":\"_ip\","
+ + "\"name\":\"_name\","
+ + "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(nodeTimestamp) + "\""
+ + "},"
+ + "\"enrich_executing_policy_stats\":{"
+ + "\"name\":\"" + executingPolicy.getName() + "\","
+ + "\"task\":{"
+ + "\"node\":\"" + executingPolicy.getTaskInfo().getTaskId().getNodeId() + "\","
+ + "\"id\":" + executingPolicy.getTaskInfo().getTaskId().getId() + ","
+ + "\"type\":\"" + executingPolicy.getTaskInfo().getType() + "\","
+ + "\"action\":\"" + executingPolicy.getTaskInfo().getAction() + "\","
+ + "\"description\":\"" + executingPolicy.getTaskInfo().getDescription() + "\","
+ + "\"start_time_in_millis\":" + executingPolicy.getTaskInfo().getStartTime() + ","
+ + "\"running_time_in_nanos\":" + executingPolicy.getTaskInfo().getRunningTimeNanos() + ","
+ + "\"cancellable\":" + executingPolicy.getTaskInfo().isCancellable() + ","
+ + header
+ .map(entry -> String.format(Locale.ROOT, "\"headers\":{\"%s\":\"%s\"}", entry.getKey(), entry.getValue()))
+ .orElse("\"headers\":{}")
+ + "}"
+ + "}"
+ + "}"
+ ));
+ }
+
+ public void testEnrichCoordinatorStatsFieldsMapped() throws IOException {
+ XContentBuilder builder = jsonBuilder();
+ builder.startObject();
+ builder.value(executingPolicy);
+ builder.endObject();
+ Map serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
+
+ Map template =
+ XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
+ Map, ?> followStatsMapping = (Map, ?>) XContentMapValues
+ .extractValue("mappings._doc.properties.enrich_executing_policy_stats.properties", template);
+ assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
+ for (Map.Entry entry : serializedStatus.entrySet()) {
+ String fieldName = entry.getKey();
+ Map, ?> fieldMapping = (Map, ?>) followStatsMapping.get(fieldName);
+ assertThat("no field mapping for field [" + fieldName + "]", fieldMapping, notNullValue());
+
+ Object fieldValue = entry.getValue();
+ String fieldType = (String) fieldMapping.get("type");
+ if (fieldValue instanceof Long || fieldValue instanceof Integer) {
+ assertThat("expected long field type for field [" + fieldName + "]", fieldType,
+ anyOf(equalTo("long"), equalTo("integer")));
+ } else if (fieldValue instanceof String) {
+ assertThat("expected keyword field type for field [" + fieldName + "]", fieldType,
+ anyOf(equalTo("keyword"), equalTo("text")));
+ } else {
+ if (fieldName.equals("task")) {
+ assertThat(fieldType, equalTo("object"));
+ assertThat(((Map, ?>) fieldMapping.get("properties")).size(), equalTo(8));
+ assertThat(XContentMapValues.extractValue("properties.node.type", fieldMapping), equalTo("keyword"));
+ assertThat(XContentMapValues.extractValue("properties.id.type", fieldMapping), equalTo("long"));
+ assertThat(XContentMapValues.extractValue("properties.type.type", fieldMapping), equalTo("keyword"));
+ assertThat(XContentMapValues.extractValue("properties.action.type", fieldMapping), equalTo("keyword"));
+ assertThat(XContentMapValues.extractValue("properties.description.type", fieldMapping), equalTo("keyword"));
+ assertThat(XContentMapValues.extractValue("properties.start_time_in_millis.type", fieldMapping), equalTo("date"));
+ assertThat(XContentMapValues.extractValue("properties.cancellable.type", fieldMapping), equalTo("boolean"));
+ } else {
+ // Manual test specific object fields and if not just fail:
+ fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]");
+ }
+ }
+ }
+ }
+}
diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
index 5d03a066d75..29fd28c9561 100644
--- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
+++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
@@ -41,6 +41,7 @@ import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.collector.ccr.StatsCollector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
+import org.elasticsearch.xpack.monitoring.collector.enrich.EnrichStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsCollector;
@@ -144,6 +145,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
collectors.add(new IndexRecoveryCollector(clusterService, getLicenseState(), client));
collectors.add(new JobStatsCollector(settings, clusterService, getLicenseState(), client));
collectors.add(new StatsCollector(settings, clusterService, getLicenseState(), client));
+ collectors.add(new EnrichStatsCollector(clusterService, getLicenseState(), client, settings));
final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters);
@@ -184,6 +186,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
settings.add(JobStatsCollector.JOB_STATS_TIMEOUT);
settings.add(StatsCollector.CCR_STATS_TIMEOUT);
settings.add(NodeStatsCollector.NODE_STATS_TIMEOUT);
+ settings.add(EnrichStatsCollector.STATS_TIMEOUT);
settings.addAll(Exporters.getSettings());
return Collections.unmodifiableList(settings);
}
diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichCoordinatorDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichCoordinatorDoc.java
new file mode 100644
index 00000000000..f8d48512656
--- /dev/null
+++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichCoordinatorDoc.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring.collector.enrich;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public final class EnrichCoordinatorDoc extends MonitoringDoc {
+
+ public static final String TYPE = "enrich_coordinator_stats";
+
+ private final CoordinatorStats coordinatorStats;
+
+ public EnrichCoordinatorDoc(String cluster,
+ long timestamp,
+ long intervalMillis,
+ MonitoringDoc.Node node,
+ CoordinatorStats coordinatorStats) {
+ super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null);
+ this.coordinatorStats = Objects.requireNonNull(coordinatorStats, "stats");
+ }
+
+ public CoordinatorStats getCoordinatorStats() {
+ return coordinatorStats;
+ }
+
+ @Override
+ protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(TYPE);
+ {
+ coordinatorStats.toXContent(builder, params);
+ }
+ builder.endObject();
+ }
+}
diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollector.java
new file mode 100644
index 00000000000..c2162c289df
--- /dev/null
+++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollector.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring.collector.enrich;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+import org.elasticsearch.xpack.monitoring.collector.Collector;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
+
+public final class EnrichStatsCollector extends Collector {
+
+ public static final Setting STATS_TIMEOUT = collectionTimeoutSetting("enrich.stats.timeout");
+
+ private final Client client;
+ private final Settings settings;
+ private final ThreadContext threadContext;
+
+ public EnrichStatsCollector(ClusterService clusterService,
+ XPackLicenseState licenseState,
+ Client client,
+ Settings settings) {
+ this(clusterService, licenseState, client, client.threadPool().getThreadContext(), settings);
+ }
+
+ EnrichStatsCollector(ClusterService clusterService,
+ XPackLicenseState licenseState,
+ Client client,
+ ThreadContext threadContext,
+ Settings settings) {
+ super(EnrichCoordinatorDoc.TYPE, clusterService, STATS_TIMEOUT, licenseState);
+ this.client = client;
+ this.settings = settings;
+ this.threadContext = threadContext;
+ }
+
+ @Override
+ protected boolean shouldCollect(final boolean isElectedMaster) {
+ return isElectedMaster
+ && super.shouldCollect(isElectedMaster)
+ && XPackSettings.ENRICH_ENABLED_SETTING.get(settings)
+ && licenseState.isEnrichAllowed();
+ }
+
+ @Override
+ protected Collection doCollect(MonitoringDoc.Node node, long interval, ClusterState clusterState) throws Exception {
+ try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(MONITORING_ORIGIN)) {
+ final long timestamp = timestamp();
+ final String clusterUuid = clusterUuid(clusterState);
+
+ final EnrichStatsAction.Request request = new EnrichStatsAction.Request();
+ final EnrichStatsAction.Response response =
+ client.execute(EnrichStatsAction.INSTANCE, request).actionGet(getCollectionTimeout());
+
+ final List docs = response.getCoordinatorStats().stream()
+ .map(stats -> new EnrichCoordinatorDoc(clusterUuid, timestamp, interval, node, stats))
+ .collect(Collectors.toList());
+
+ response.getExecutingPolicies().stream()
+ .map(stats -> new ExecutingPolicyDoc(clusterUuid, timestamp, interval, node, stats))
+ .forEach(docs::add);
+
+ return docs;
+ }
+ }
+}
diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDoc.java
new file mode 100644
index 00000000000..724e9f8d9a2
--- /dev/null
+++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDoc.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring.collector.enrich;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public final class ExecutingPolicyDoc extends MonitoringDoc {
+
+ public static final String TYPE = "enrich_executing_policy_stats";
+
+ private final ExecutingPolicy executingPolicy;
+
+ public ExecutingPolicyDoc(String cluster,
+ long timestamp,
+ long intervalMillis,
+ Node node,
+ ExecutingPolicy coordinatorStats) {
+ super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null);
+ this.executingPolicy = Objects.requireNonNull(coordinatorStats, "stats");
+ }
+
+ public ExecutingPolicy getExecutingPolicy() {
+ return executingPolicy;
+ }
+
+ @Override
+ protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(TYPE);
+ {
+ executingPolicy.toXContent(builder, params);
+ }
+ builder.endObject();
+ }
+}