Expose enrich stats api to monitoring. (#46708)

This change also slightly modifies the stats response,
so that is can easier consumer by monitoring and other
users. (coordinators stats are now in a list instead of
a map and has an additional field for the node id)

Relates to #32789
This commit is contained in:
Martijn van Groningen 2019-09-26 11:00:33 +02:00
parent 9967aff714
commit 8a4eefdd83
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
20 changed files with 861 additions and 42 deletions

View File

@ -680,6 +680,22 @@ public class XPackLicenseState {
return localStatus.active;
}
/**
* Determine if the enrich processor and related APIs are allowed to be used.
* <p>
* This is available in for all license types except
* {@link OperationMode#MISSING}
*
* @return {@code true} as long as the license is valid. Otherwise
* {@code false}.
*/
public boolean isEnrichAllowed() {
// status is volatile
Status localStatus = status;
// Should work on all active licenses
return localStatus.active;
}
/**
* Determine if SQL support should be enabled.
* <p>

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.TaskInfo;
@ -20,7 +21,6 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
@ -50,9 +50,9 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
public static class Response extends ActionResponse implements ToXContentObject {
private final List<ExecutingPolicy> executingPolicies;
private final Map<String, CoordinatorStats> coordinatorStats;
private final List<CoordinatorStats> coordinatorStats;
public Response(List<ExecutingPolicy> executingPolicies, Map<String, CoordinatorStats> coordinatorStats) {
public Response(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats) {
this.executingPolicies = executingPolicies;
this.coordinatorStats = coordinatorStats;
}
@ -60,36 +60,31 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
public Response(StreamInput in) throws IOException {
super(in);
executingPolicies = in.readList(ExecutingPolicy::new);
coordinatorStats = in.readMap(StreamInput::readString, CoordinatorStats::new);
coordinatorStats = in.readList(CoordinatorStats::new);
}
public List<ExecutingPolicy> getExecutingPolicies() {
return executingPolicies;
}
public Map<String, CoordinatorStats> getCoordinatorStats() {
public List<CoordinatorStats> getCoordinatorStats() {
return coordinatorStats;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(executingPolicies);
out.writeMap(coordinatorStats, StreamOutput::writeString, (innerOut, stat) -> stat.writeTo(innerOut));
out.writeList(coordinatorStats);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("executing_policies");
executingPolicies.stream().sorted().forEachOrdered(policy -> {
executingPolicies.stream().sorted(Comparator.comparing(ExecutingPolicy::getName)).forEachOrdered(policy -> {
try {
builder.startObject();
builder.field("name", policy.getName());
{
builder.startObject("task");
builder.value(policy.getTaskInfo());
builder.endObject();
}
policy.toXContent(builder, params);
builder.endObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
@ -97,14 +92,10 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
});
builder.endArray();
builder.startArray("coordinator_stats");
coordinatorStats.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).forEachOrdered(entry -> {
coordinatorStats.stream().sorted(Comparator.comparing(CoordinatorStats::getNodeId)).forEachOrdered(entry -> {
try {
builder.startObject();
builder.field("node_id", entry.getKey());
builder.field("queue_size", entry.getValue().getQueueSize());
builder.field("remote_requests_current", entry.getValue().getRemoteRequestsCurrent());
builder.field("remote_requests_total", entry.getValue().getRemoteRequestsTotal());
builder.field("executed_searches_total", entry.getValue().getExecutedSearchesTotal());
entry.toXContent(builder, params);
builder.endObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
@ -129,14 +120,20 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
return Objects.hash(executingPolicies, coordinatorStats);
}
public static class CoordinatorStats implements Writeable {
public static class CoordinatorStats implements Writeable, ToXContentFragment {
private final String nodeId;
private final int queueSize;
private final int remoteRequestsCurrent;
private final long remoteRequestsTotal;
private final long executedSearchesTotal;
public CoordinatorStats(int queueSize, int remoteRequestsCurrent, long remoteRequestsTotal, long executedSearchesTotal) {
public CoordinatorStats(String nodeId,
int queueSize,
int remoteRequestsCurrent,
long remoteRequestsTotal,
long executedSearchesTotal) {
this.nodeId = nodeId;
this.queueSize = queueSize;
this.remoteRequestsCurrent = remoteRequestsCurrent;
this.remoteRequestsTotal = remoteRequestsTotal;
@ -144,7 +141,11 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
}
public CoordinatorStats(StreamInput in) throws IOException {
this(in.readVInt(), in.readVInt(), in.readVLong(), in.readVLong());
this(in.readString(), in.readVInt(), in.readVInt(), in.readVLong(), in.readVLong());
}
public String getNodeId() {
return nodeId;
}
public int getQueueSize() {
@ -165,18 +166,30 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
out.writeVInt(queueSize);
out.writeVInt(remoteRequestsCurrent);
out.writeVLong(remoteRequestsTotal);
out.writeVLong(executedSearchesTotal);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node_id", nodeId);
builder.field("queue_size", queueSize);
builder.field("remote_requests_current", remoteRequestsCurrent);
builder.field("remote_requests_total", remoteRequestsTotal);
builder.field("executed_searches_total", executedSearchesTotal);
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CoordinatorStats stats = (CoordinatorStats) o;
return queueSize == stats.queueSize &&
return Objects.equals(nodeId, stats.nodeId) &&
queueSize == stats.queueSize &&
remoteRequestsCurrent == stats.remoteRequestsCurrent &&
remoteRequestsTotal == stats.remoteRequestsTotal &&
executedSearchesTotal == stats.executedSearchesTotal;
@ -184,11 +197,11 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
@Override
public int hashCode() {
return Objects.hash(queueSize, remoteRequestsCurrent, remoteRequestsTotal, executedSearchesTotal);
return Objects.hash(nodeId, queueSize, remoteRequestsCurrent, remoteRequestsTotal, executedSearchesTotal);
}
}
public static class ExecutingPolicy implements Writeable {
public static class ExecutingPolicy implements Writeable, ToXContentFragment {
private final String name;
private final TaskInfo taskInfo;
@ -216,6 +229,17 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
taskInfo.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("name", name);
builder.startObject("task");
{
builder.value(taskInfo);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -1093,6 +1093,62 @@
}
}
}
},
"enrich_coordinator_stats" : {
"properties": {
"node_id": {
"type": "keyword"
},
"queue_size": {
"type": "integer"
},
"remote_requests_current" : {
"type": "long"
},
"remote_requests_total" : {
"type": "long"
},
"executed_searches_total" : {
"type": "long"
}
}
},
"enrich_executing_policy_stats": {
"properties": {
"name": {
"type": "keyword"
},
"task": {
"type": "object",
"properties": {
"node": {
"type": "keyword"
},
"id": {
"type": "long"
},
"type": {
"type": "keyword"
},
"action": {
"type": "keyword"
},
"description": {
"type": "keyword"
},
"start_time_in_millis": {
"type": "date",
"format": "epoch_millis"
},
"running_time_in_nanos": {
"type": "long"
},
"cancellable": {
"type": "boolean"
}
}
}
}
}
}
}

View File

@ -12,6 +12,8 @@ archivesBaseName = 'x-pack-enrich'
dependencies {
compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: ':modules:ingest-common')
testCompile project(path: xpackModule('monitoring'), configuration: 'testArtifacts')
if (isEclipse) {
testCompile project(path: xpackModule('core-tests'), configuration: 'testArtifacts')
}

View File

@ -21,9 +21,12 @@ import org.junit.After;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
@ -87,6 +90,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
public void testBasicFlow() throws Exception {
setupGenericLifecycleTest(true);
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 30, TimeUnit.SECONDS);
}
public void testImmutablePolicy() throws IOException {
@ -153,4 +157,35 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
private static Map<String, Object> toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}
private static void verifyEnrichMonitoring() throws IOException {
Request request = new Request("GET", "/.monitoring-*/_search");
request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"enrich_coordinator_stats\"}}}");
Map<String, ?> response;
try {
response = toMap(adminClient().performRequest(request));
} catch (ResponseException e) {
throw new AssertionError("error while searching", e);
}
int maxRemoteRequestsTotal = 0;
int maxExecutedSearchesTotal = 0;
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), greaterThanOrEqualTo(1));
for (int i = 0; i < hits.size(); i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
int foundRemoteRequestsTotal =
(int) XContentMapValues.extractValue("_source.enrich_coordinator_stats.remote_requests_total", hit);
maxRemoteRequestsTotal = Math.max(maxRemoteRequestsTotal, foundRemoteRequestsTotal);
int foundExecutedSearchesTotal =
(int) XContentMapValues.extractValue("_source.enrich_coordinator_stats.executed_searches_total", hit);
maxExecutedSearchesTotal = Math.max(maxExecutedSearchesTotal, foundExecutedSearchesTotal);
}
assertThat(maxRemoteRequestsTotal, greaterThan(0));
assertThat(maxExecutedSearchesTotal, greaterThan(0));
}
}

View File

@ -15,4 +15,5 @@ testClusters.integTest {
user username: "test_enrich_no_privs", password: "x-pack-test-password", role: "enrich_no_privs"
setting 'xpack.license.self_generated.type', 'basic'
setting 'xpack.security.enabled', 'true'
setting 'xpack.monitoring.collection.enabled', 'true'
}

View File

@ -10,4 +10,5 @@ dependencies {
testClusters.integTest {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'basic'
setting 'xpack.monitoring.collection.enabled', 'true'
}

View File

@ -109,8 +109,8 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
coordinateLookups();
}
CoordinatorStats getStats() {
return new CoordinatorStats(queue.size(), remoteRequestsCurrent.get(), remoteRequestsTotal,
CoordinatorStats getStats(String nodeId) {
return new CoordinatorStats(nodeId, queue.size(), remoteRequestsCurrent.get(), remoteRequestsTotal,
executedSearchesTotal.get());
}

View File

@ -144,7 +144,8 @@ public class EnrichCoordinatorStatsAction extends ActionType<EnrichCoordinatorSt
@Override
protected NodeResponse nodeOperation(NodeRequest request) {
return new NodeResponse(clusterService.localNode(), coordinator.getStats());
DiscoveryNode node = clusterService.localNode();
return new NodeResponse(node, coordinator.getStats(node.getId()));
}
}

View File

@ -20,13 +20,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorStatsAction.NodeResponse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TransportEnrichStatsAction extends TransportMasterNodeAction<EnrichStatsAction.Request, EnrichStatsAction.Response> {
@ -73,8 +72,9 @@ public class TransportEnrichStatsAction extends TransportMasterNodeAction<Enrich
return;
}
Map<String, EnrichStatsAction.Response.CoordinatorStats> coordinatorStats = response.getNodes().stream()
.collect(Collectors.toMap(nodeResponse -> nodeResponse.getNode().getId(), NodeResponse::getCoordinatorStats));
List<CoordinatorStats> coordinatorStats = response.getNodes().stream()
.map(EnrichCoordinatorStatsAction.NodeResponse::getCoordinatorStats)
.collect(Collectors.toList());
List<ExecutingPolicy> policyExecutionTasks = taskManager.getTasks().values().stream()
.filter(t -> t.getAction().equals(ExecuteEnrichPolicyAction.NAME))
.map(t -> t.taskInfo(clusterService.localNode().getId(), true))

View File

@ -108,8 +108,9 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(localNodeId).getExecutedSearchesTotal(), equalTo((long) numDocs));
assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId));
assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo((long) numDocs));
}
public void testMultiplePolicies() {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
@ -165,8 +166,13 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size()));
String nodeId = internalCluster().getInstance(ClusterService.class, coordinatingNode).localNode().getId();
assertThat(statsResponse.getCoordinatorStats().get(nodeId).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(nodeId).getExecutedSearchesTotal(), equalTo((long) numDocs));
CoordinatorStats stats = statsResponse.getCoordinatorStats().stream()
.filter(s -> s.getNodeId().equals(nodeId))
.findAny()
.get();
assertThat(stats.getNodeId(), equalTo(nodeId));
assertThat(stats.getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(stats.getExecutedSearchesTotal(), equalTo((long) numDocs));
}
private static List<String> createSourceIndex(int numDocs) {

View File

@ -15,7 +15,6 @@ import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.Exe
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -30,11 +29,11 @@ public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase<En
executingPolicies.add(new ExecutingPolicy(randomAlphaOfLength(4), taskInfo));
}
int numCoordinatingStats = randomIntBetween(0, 16);
Map<String, CoordinatorStats> coordinatorStats = new HashMap<>(numCoordinatingStats);
List<CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatingStats);
for (int i = 0; i < numCoordinatingStats; i++) {
CoordinatorStats stats = new CoordinatorStats(randomIntBetween(0, 8096), randomIntBetween(0, 8096),
randomNonNegativeLong(), randomNonNegativeLong());
coordinatorStats.put(randomAlphaOfLength(4), stats);
CoordinatorStats stats = new CoordinatorStats(randomAlphaOfLength(4), randomIntBetween(0, 8096),
randomIntBetween(0, 8096), randomNonNegativeLong(), randomNonNegativeLong());
coordinatorStats.add(stats);
}
return new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
}
@ -44,7 +43,7 @@ public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase<En
return EnrichStatsAction.Response::new;
}
private static TaskInfo randomTaskInfo() {
public static TaskInfo randomTaskInfo() {
TaskId taskId = new TaskId(randomAlphaOfLength(5), randomLong());
String type = randomAlphaOfLength(5);
String action = randomAlphaOfLength(5);

View File

@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.enrich;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase;
import java.io.IOException;
import java.time.ZoneOffset;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class EnrichCoordinatorDocTests extends BaseMonitoringDocTestCase<EnrichCoordinatorDoc> {
static final DateFormatter DATE_TIME_FORMATTER = DateFormatter.forPattern("strict_date_time").withZone(ZoneOffset.UTC);
private CoordinatorStats stats;
@Override
public void setUp() throws Exception {
super.setUp();
stats = new CoordinatorStats(
randomAlphaOfLength(4),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomNonNegativeLong(),
randomNonNegativeLong()
);
}
@Override
protected EnrichCoordinatorDoc createMonitoringDoc(String cluster,
long timestamp,
long interval,
MonitoringDoc.Node node,
MonitoredSystem system,
String type,
String id) {
return new EnrichCoordinatorDoc(cluster, timestamp, interval, node, stats);
}
@Override
protected void assertMonitoringDoc(EnrichCoordinatorDoc document) {
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), is(EnrichCoordinatorDoc.TYPE));
assertThat(document.getId(), nullValue());
assertThat(document.getCoordinatorStats(), equalTo(stats));
}
@Override
public void testToXContent() throws IOException {
final long timestamp = System.currentTimeMillis();
final long intervalMillis = System.currentTimeMillis();
final long nodeTimestamp = System.currentTimeMillis();
final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", nodeTimestamp);
final EnrichCoordinatorDoc document = new EnrichCoordinatorDoc("_cluster", timestamp, intervalMillis, node, stats);
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
assertThat(xContent.utf8ToString(), equalTo(
"{"
+ "\"cluster_uuid\":\"_cluster\","
+ "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(timestamp) + "\","
+ "\"interval_ms\":" + intervalMillis + ","
+ "\"type\":\"enrich_coordinator_stats\","
+ "\"source_node\":{"
+ "\"uuid\":\"_uuid\","
+ "\"host\":\"_host\","
+ "\"transport_address\":\"_addr\","
+ "\"ip\":\"_ip\","
+ "\"name\":\"_name\","
+ "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(nodeTimestamp) + "\""
+ "},"
+ "\"enrich_coordinator_stats\":{"
+ "\"node_id\":\"" + stats.getNodeId() + "\","
+ "\"queue_size\":" + stats.getQueueSize() + ","
+ "\"remote_requests_current\":" + stats.getRemoteRequestsCurrent() + ","
+ "\"remote_requests_total\":" + stats.getRemoteRequestsTotal() + ","
+ "\"executed_searches_total\":" + stats.getExecutedSearchesTotal()
+ "}"
+ "}"
));
}
public void testEnrichCoordinatorStatsFieldsMapped() throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.value(stats);
builder.endObject();
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
Map<String, Object> template =
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues
.extractValue("mappings._doc.properties.enrich_coordinator_stats.properties", template);
assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
String fieldName = entry.getKey();
Map<?, ?> fieldMapping = (Map<?, ?>) followStatsMapping.get(fieldName);
assertThat("no field mapping for field [" + fieldName + "]", fieldMapping, notNullValue());
Object fieldValue = entry.getValue();
String fieldType = (String) fieldMapping.get("type");
if (fieldValue instanceof Long || fieldValue instanceof Integer) {
assertThat("expected long field type for field [" + fieldName + "]", fieldType,
anyOf(equalTo("long"), equalTo("integer")));
} else if (fieldValue instanceof String) {
assertThat("expected keyword field type for field [" + fieldName + "]", fieldType,
anyOf(equalTo("keyword"), equalTo("text")));
} else {
// Manual test specific object fields and if not just fail:
fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]");
}
}
}
}

View File

@ -0,0 +1,216 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.enrich;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.xpack.enrich.action.EnrichStatsResponseTests.randomTaskInfo;
import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class EnrichStatsCollectorTests extends BaseCollectorTestCase {
public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
final Settings settings = randomFrom(enrichEnabledSettings(), enrichDisabledSettings());
final boolean enrichAllowed = randomBoolean();
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
// this controls the blockage
when(licenseState.isMonitoringAllowed()).thenReturn(false);
when(licenseState.isEnrichAllowed()).thenReturn(enrichAllowed);
final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfNotMaster() {
// regardless of enrich being enabled
final Settings settings = randomFrom(enrichEnabledSettings(), enrichDisabledSettings());
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
when(licenseState.isEnrichAllowed()).thenReturn(randomBoolean());
// this controls the blockage
final boolean isElectedMaster = false;
final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
}
public void testShouldCollectReturnsFalseIfEnrichIsDisabled() {
// this is controls the blockage
final Settings settings = enrichDisabledSettings();
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
when(licenseState.isEnrichAllowed()).thenReturn(randomBoolean());
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfEnrichIsNotAllowed() {
final Settings settings = randomFrom(enrichEnabledSettings(), enrichDisabledSettings());
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
// this is controls the blockage
when(licenseState.isEnrichAllowed()).thenReturn(false);
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsTrue() {
final Settings settings = enrichEnabledSettings();
when(licenseState.isMonitoringAllowed()).thenReturn(true);
when(licenseState.isEnrichAllowed()).thenReturn(true);
final boolean isElectedMaster = true;
final EnrichStatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(true));
verify(licenseState).isMonitoringAllowed();
}
public void testDoCollect() throws Exception {
final String clusterUuid = randomAlphaOfLength(5);
whenClusterStateWithUUID(clusterUuid);
final MonitoringDoc.Node node = randomMonitoringNode(random());
final Client client = mock(Client.class);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
withCollectionTimeout(EnrichStatsCollector.STATS_TIMEOUT, timeout);
int numExecutingPolicies = randomIntBetween(0, 8);
List<ExecutingPolicy> executingPolicies = new ArrayList<>(numExecutingPolicies);
for (int i = 0; i < numExecutingPolicies; i++) {
executingPolicies.add(new ExecutingPolicy(randomAlphaOfLength(4), randomTaskInfo()));
}
int numCoordinatorStats = randomIntBetween(0, 8);
List<CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatorStats);
for (int i = 0; i < numCoordinatorStats; i++) {
coordinatorStats.add(new CoordinatorStats(
randomAlphaOfLength(4),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomNonNegativeLong(),
randomNonNegativeLong()
));
}
@SuppressWarnings("unchecked")
final ActionFuture<EnrichStatsAction.Response> future = (ActionFuture<EnrichStatsAction.Response>) mock(ActionFuture.class);
final EnrichStatsAction.Response response = new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
when(client.execute(eq(EnrichStatsAction.INSTANCE), any(EnrichStatsAction.Request.class))).thenReturn(future);
when(future.actionGet(timeout)).thenReturn(response);
final EnrichStatsCollector collector =
new EnrichStatsCollector(clusterService, licenseState, client, threadContext, settings);
assertEquals(timeout, collector.getCollectionTimeout());
final long interval = randomNonNegativeLong();
final List<MonitoringDoc> documents = new ArrayList<>(collector.doCollect(node, interval, clusterState));
verify(clusterState).metaData();
verify(metaData).clusterUUID();
assertThat(documents, hasSize(executingPolicies.size() + coordinatorStats.size()));
for (int i = 0; i < coordinatorStats.size(); i++) {
final EnrichCoordinatorDoc actual = (EnrichCoordinatorDoc) documents.get(i);
final CoordinatorStats expected = coordinatorStats.get(i);
assertThat(actual.getCluster(), is(clusterUuid));
assertThat(actual.getTimestamp(), greaterThan(0L));
assertThat(actual.getIntervalMillis(), equalTo(interval));
assertThat(actual.getNode(), equalTo(node));
assertThat(actual.getSystem(), is(MonitoredSystem.ES));
assertThat(actual.getType(), is(EnrichCoordinatorDoc.TYPE));
assertThat(actual.getId(), nullValue());
assertThat(actual.getCoordinatorStats(), equalTo(expected));
}
for (int i = coordinatorStats.size(); i < documents.size(); i++) {
final ExecutingPolicyDoc actual = (ExecutingPolicyDoc) documents.get(i);
final ExecutingPolicy expected = executingPolicies.get(i - coordinatorStats.size());
assertThat(actual.getCluster(), is(clusterUuid));
assertThat(actual.getTimestamp(), greaterThan(0L));
assertThat(actual.getIntervalMillis(), equalTo(interval));
assertThat(actual.getNode(), equalTo(node));
assertThat(actual.getSystem(), is(MonitoredSystem.ES));
assertThat(actual.getType(), is(ExecutingPolicyDoc.TYPE));
assertThat(actual.getId(), nullValue());
assertThat(actual.getExecutingPolicy(), equalTo(expected));
}
}
private EnrichStatsCollector createCollector(Settings settings,
ClusterService clusterService,
XPackLicenseState licenseState,
Client client) {
return new EnrichStatsCollector(clusterService, licenseState, client, settings);
}
private Settings enrichEnabledSettings() {
// since it's the default, we want to ensure we test both with/without it
return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.ENRICH_ENABLED_SETTING.getKey(), true).build();
}
private Settings enrichDisabledSettings() {
return Settings.builder().put(XPackSettings.ENRICH_ENABLED_SETTING.getKey(), false).build();
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.enrich;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.enrich.action.EnrichStatsResponseTests.randomTaskInfo;
import static org.elasticsearch.xpack.monitoring.collector.enrich.EnrichCoordinatorDocTests.DATE_TIME_FORMATTER;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class ExecutingPolicyDocTests extends BaseMonitoringDocTestCase<ExecutingPolicyDoc> {
private ExecutingPolicy executingPolicy;
@Override
public void setUp() throws Exception {
super.setUp();
executingPolicy = new ExecutingPolicy(
randomAlphaOfLength(4),
randomTaskInfo()
);
}
@Override
protected ExecutingPolicyDoc createMonitoringDoc(String cluster,
long timestamp,
long interval,
MonitoringDoc.Node node,
MonitoredSystem system,
String type,
String id) {
return new ExecutingPolicyDoc(cluster, timestamp, interval, node, executingPolicy);
}
@Override
protected void assertMonitoringDoc(ExecutingPolicyDoc document) {
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), is(ExecutingPolicyDoc.TYPE));
assertThat(document.getId(), nullValue());
assertThat(document.getExecutingPolicy(), equalTo(executingPolicy));
}
@Override
public void testToXContent() throws IOException {
final long timestamp = System.currentTimeMillis();
final long intervalMillis = System.currentTimeMillis();
final long nodeTimestamp = System.currentTimeMillis();
final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", nodeTimestamp);
final ExecutingPolicyDoc document = new ExecutingPolicyDoc("_cluster", timestamp, intervalMillis, node, executingPolicy);
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
Optional<Map.Entry<String, String>> header =
executingPolicy.getTaskInfo().getHeaders().entrySet().stream().findAny();
assertThat(xContent.utf8ToString(), equalTo(
"{"
+ "\"cluster_uuid\":\"_cluster\","
+ "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(timestamp) + "\","
+ "\"interval_ms\":" + intervalMillis + ","
+ "\"type\":\"enrich_executing_policy_stats\","
+ "\"source_node\":{"
+ "\"uuid\":\"_uuid\","
+ "\"host\":\"_host\","
+ "\"transport_address\":\"_addr\","
+ "\"ip\":\"_ip\","
+ "\"name\":\"_name\","
+ "\"timestamp\":\"" + DATE_TIME_FORMATTER.formatMillis(nodeTimestamp) + "\""
+ "},"
+ "\"enrich_executing_policy_stats\":{"
+ "\"name\":\"" + executingPolicy.getName() + "\","
+ "\"task\":{"
+ "\"node\":\"" + executingPolicy.getTaskInfo().getTaskId().getNodeId() + "\","
+ "\"id\":" + executingPolicy.getTaskInfo().getTaskId().getId() + ","
+ "\"type\":\"" + executingPolicy.getTaskInfo().getType() + "\","
+ "\"action\":\"" + executingPolicy.getTaskInfo().getAction() + "\","
+ "\"description\":\"" + executingPolicy.getTaskInfo().getDescription() + "\","
+ "\"start_time_in_millis\":" + executingPolicy.getTaskInfo().getStartTime() + ","
+ "\"running_time_in_nanos\":" + executingPolicy.getTaskInfo().getRunningTimeNanos() + ","
+ "\"cancellable\":" + executingPolicy.getTaskInfo().isCancellable() + ","
+ header
.map(entry -> String.format(Locale.ROOT, "\"headers\":{\"%s\":\"%s\"}", entry.getKey(), entry.getValue()))
.orElse("\"headers\":{}")
+ "}"
+ "}"
+ "}"
));
}
public void testEnrichCoordinatorStatsFieldsMapped() throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.value(executingPolicy);
builder.endObject();
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
Map<String, Object> template =
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues
.extractValue("mappings._doc.properties.enrich_executing_policy_stats.properties", template);
assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
String fieldName = entry.getKey();
Map<?, ?> fieldMapping = (Map<?, ?>) followStatsMapping.get(fieldName);
assertThat("no field mapping for field [" + fieldName + "]", fieldMapping, notNullValue());
Object fieldValue = entry.getValue();
String fieldType = (String) fieldMapping.get("type");
if (fieldValue instanceof Long || fieldValue instanceof Integer) {
assertThat("expected long field type for field [" + fieldName + "]", fieldType,
anyOf(equalTo("long"), equalTo("integer")));
} else if (fieldValue instanceof String) {
assertThat("expected keyword field type for field [" + fieldName + "]", fieldType,
anyOf(equalTo("keyword"), equalTo("text")));
} else {
if (fieldName.equals("task")) {
assertThat(fieldType, equalTo("object"));
assertThat(((Map<?, ?>) fieldMapping.get("properties")).size(), equalTo(8));
assertThat(XContentMapValues.extractValue("properties.node.type", fieldMapping), equalTo("keyword"));
assertThat(XContentMapValues.extractValue("properties.id.type", fieldMapping), equalTo("long"));
assertThat(XContentMapValues.extractValue("properties.type.type", fieldMapping), equalTo("keyword"));
assertThat(XContentMapValues.extractValue("properties.action.type", fieldMapping), equalTo("keyword"));
assertThat(XContentMapValues.extractValue("properties.description.type", fieldMapping), equalTo("keyword"));
assertThat(XContentMapValues.extractValue("properties.start_time_in_millis.type", fieldMapping), equalTo("date"));
assertThat(XContentMapValues.extractValue("properties.cancellable.type", fieldMapping), equalTo("boolean"));
} else {
// Manual test specific object fields and if not just fail:
fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]");
}
}
}
}
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.collector.ccr.StatsCollector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.enrich.EnrichStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsCollector;
@ -144,6 +145,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
collectors.add(new IndexRecoveryCollector(clusterService, getLicenseState(), client));
collectors.add(new JobStatsCollector(settings, clusterService, getLicenseState(), client));
collectors.add(new StatsCollector(settings, clusterService, getLicenseState(), client));
collectors.add(new EnrichStatsCollector(clusterService, getLicenseState(), client, settings));
final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters);
@ -184,6 +186,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
settings.add(JobStatsCollector.JOB_STATS_TIMEOUT);
settings.add(StatsCollector.CCR_STATS_TIMEOUT);
settings.add(NodeStatsCollector.NODE_STATS_TIMEOUT);
settings.add(EnrichStatsCollector.STATS_TIMEOUT);
settings.addAll(Exporters.getSettings());
return Collections.unmodifiableList(settings);
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.enrich;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import java.io.IOException;
import java.util.Objects;
public final class EnrichCoordinatorDoc extends MonitoringDoc {
public static final String TYPE = "enrich_coordinator_stats";
private final CoordinatorStats coordinatorStats;
public EnrichCoordinatorDoc(String cluster,
long timestamp,
long intervalMillis,
MonitoringDoc.Node node,
CoordinatorStats coordinatorStats) {
super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null);
this.coordinatorStats = Objects.requireNonNull(coordinatorStats, "stats");
}
public CoordinatorStats getCoordinatorStats() {
return coordinatorStats;
}
@Override
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(TYPE);
{
coordinatorStats.toXContent(builder, params);
}
builder.endObject();
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.enrich;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
public final class EnrichStatsCollector extends Collector {
public static final Setting<TimeValue> STATS_TIMEOUT = collectionTimeoutSetting("enrich.stats.timeout");
private final Client client;
private final Settings settings;
private final ThreadContext threadContext;
public EnrichStatsCollector(ClusterService clusterService,
XPackLicenseState licenseState,
Client client,
Settings settings) {
this(clusterService, licenseState, client, client.threadPool().getThreadContext(), settings);
}
EnrichStatsCollector(ClusterService clusterService,
XPackLicenseState licenseState,
Client client,
ThreadContext threadContext,
Settings settings) {
super(EnrichCoordinatorDoc.TYPE, clusterService, STATS_TIMEOUT, licenseState);
this.client = client;
this.settings = settings;
this.threadContext = threadContext;
}
@Override
protected boolean shouldCollect(final boolean isElectedMaster) {
return isElectedMaster
&& super.shouldCollect(isElectedMaster)
&& XPackSettings.ENRICH_ENABLED_SETTING.get(settings)
&& licenseState.isEnrichAllowed();
}
@Override
protected Collection<MonitoringDoc> doCollect(MonitoringDoc.Node node, long interval, ClusterState clusterState) throws Exception {
try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(MONITORING_ORIGIN)) {
final long timestamp = timestamp();
final String clusterUuid = clusterUuid(clusterState);
final EnrichStatsAction.Request request = new EnrichStatsAction.Request();
final EnrichStatsAction.Response response =
client.execute(EnrichStatsAction.INSTANCE, request).actionGet(getCollectionTimeout());
final List<MonitoringDoc> docs = response.getCoordinatorStats().stream()
.map(stats -> new EnrichCoordinatorDoc(clusterUuid, timestamp, interval, node, stats))
.collect(Collectors.toList());
response.getExecutingPolicies().stream()
.map(stats -> new ExecutingPolicyDoc(clusterUuid, timestamp, interval, node, stats))
.forEach(docs::add);
return docs;
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.enrich;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import java.io.IOException;
import java.util.Objects;
public final class ExecutingPolicyDoc extends MonitoringDoc {
public static final String TYPE = "enrich_executing_policy_stats";
private final ExecutingPolicy executingPolicy;
public ExecutingPolicyDoc(String cluster,
long timestamp,
long intervalMillis,
Node node,
ExecutingPolicy coordinatorStats) {
super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null);
this.executingPolicy = Objects.requireNonNull(coordinatorStats, "stats");
}
public ExecutingPolicy getExecutingPolicy() {
return executingPolicy;
}
@Override
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(TYPE);
{
executingPolicy.toXContent(builder, params);
}
builder.endObject();
}
}