From 793b2a94b451e65730a4b57f322db87d5c285d35 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 25 Sep 2018 07:19:46 +0200 Subject: [PATCH] [CCR] Expose auto follow stats to monitoring (#33886) --- .../xpack/ccr/FollowIndexSecurityIT.java | 30 ++++ .../xpack/ccr/FollowIndexIT.java | 31 +++- .../ccr/AbstractCcrCollectorTestCase.java | 118 +++++++++++++ .../AutoFollowStatsMonitoringDocTests.java | 163 ++++++++++++++++++ .../ccr/CcrAutoFollowStatsCollectorTests.java | 85 +++++++++ .../collector/ccr/CcrStatsCollectorTests.java | 100 +---------- .../xpack/core/ccr/AutoFollowStats.java | 35 ++-- .../xpack/core/ccr/client/CcrClient.java | 12 ++ .../src/main/resources/monitoring-es.json | 32 ++++ .../xpack/monitoring/Monitoring.java | 3 + .../collector/ccr/AbstractCcrCollector.java | 71 ++++++++ .../ccr/AutoFollowStatsMonitoringDoc.java | 47 +++++ .../ccr/CcrAutoFollowStatsCollector.java | 61 +++++++ .../collector/ccr/CcrStatsCollector.java | 60 ++----- 14 files changed, 697 insertions(+), 151 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollectorTestCase.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollectorTests.java create mode 100644 x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollector.java create mode 100644 x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDoc.java create mode 100644 x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollector.java diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 85913c26114..723d4cddc3a 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -152,6 +152,10 @@ public class FollowIndexSecurityIT extends ESRestTestCase { verifyDocuments(adminClient(), allowedIndex, 5); }); assertThat(indexExists(adminClient(), disallowedIndex), is(false)); + assertBusy(() -> { + verifyCcrMonitoring(allowedIndex, allowedIndex); + verifyAutoFollowMonitoring(); + }); // Cleanup by deleting auto follow pattern and unfollowing: request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster"); @@ -309,4 +313,30 @@ public class FollowIndexSecurityIT extends ESRestTestCase { assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1)); } + private static void verifyAutoFollowMonitoring() throws IOException { + Request request = new Request("GET", "/.monitoring-*/_search"); + request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_auto_follow_stats\"}}}"); + Map response; + try { + response = toMap(adminClient().performRequest(request)); + } catch (ResponseException e) { + throw new AssertionError("error while searching", e); + } + + int numberOfSuccessfulFollowIndices = 0; + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + assertThat(hits.size(), greaterThanOrEqualTo(1)); + + for (int i = 0; i < hits.size(); i++) { + Map hit = (Map) hits.get(i); + + int foundNumberOfOperationsReceived = + (int) XContentMapValues.extractValue("_source.ccr_auto_follow_stats.number_of_successful_follow_indices", hit); + numberOfSuccessfulFollowIndices = Math.max(numberOfSuccessfulFollowIndices, foundNumberOfOperationsReceived); + } + + assertThat(numberOfSuccessfulFollowIndices, greaterThanOrEqualTo(1)); + } + } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 514e9f261f7..eaacd8c5ae7 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -125,7 +125,10 @@ public class FollowIndexIT extends ESRestTestCase { ensureYellow("logs-20190101"); verifyDocuments("logs-20190101", 5); }); - assertBusy(() -> verifyCcrMonitoring("logs-20190101", "logs-20190101")); + assertBusy(() -> { + verifyCcrMonitoring("logs-20190101", "logs-20190101"); + verifyAutoFollowMonitoring(); + }); } private static void index(RestClient client, String index, String id, Object... fields) throws IOException { @@ -213,6 +216,32 @@ public class FollowIndexIT extends ESRestTestCase { assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1)); } + private static void verifyAutoFollowMonitoring() throws IOException { + Request request = new Request("GET", "/.monitoring-*/_search"); + request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_auto_follow_stats\"}}}"); + Map response; + try { + response = toMap(client().performRequest(request)); + } catch (ResponseException e) { + throw new AssertionError("error while searching", e); + } + + int numberOfSuccessfulFollowIndices = 0; + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + assertThat(hits.size(), greaterThanOrEqualTo(1)); + + for (int i = 0; i < hits.size(); i++) { + Map hit = (Map) hits.get(i); + + int foundNumberOfOperationsReceived = + (int) XContentMapValues.extractValue("_source.ccr_auto_follow_stats.number_of_successful_follow_indices", hit); + numberOfSuccessfulFollowIndices = Math.max(numberOfSuccessfulFollowIndices, foundNumberOfOperationsReceived); + } + + assertThat(numberOfSuccessfulFollowIndices, greaterThanOrEqualTo(1)); + } + private static Map toMap(Response response) throws IOException { return toMap(EntityUtils.toString(response.getEntity())); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollectorTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollectorTestCase.java new file mode 100644 index 00000000000..f98e541a9d9 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollectorTestCase.java @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase; + +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public abstract class AbstractCcrCollectorTestCase extends BaseCollectorTestCase { + + public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() { + final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings()); + final boolean ccrAllowed = randomBoolean(); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); + + // this controls the blockage + when(licenseState.isMonitoringAllowed()).thenReturn(false); + when(licenseState.isCcrAllowed()).thenReturn(ccrAllowed); + + final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } + } + + public void testShouldCollectReturnsFalseIfNotMaster() { + // regardless of CCR being enabled + final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings()); + + when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); + when(licenseState.isCcrAllowed()).thenReturn(randomBoolean()); + // this controls the blockage + final boolean isElectedMaster = false; + + final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + } + + public void testShouldCollectReturnsFalseIfCCRIsDisabled() { + // this is controls the blockage + final Settings settings = ccrDisabledSettings(); + + when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); + when(licenseState.isCcrAllowed()).thenReturn(randomBoolean()); + + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); + + final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } + } + + public void testShouldCollectReturnsFalseIfCCRIsNotAllowed() { + final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings()); + + when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); + // this is controls the blockage + when(licenseState.isCcrAllowed()).thenReturn(false); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); + + final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } + } + + public void testShouldCollectReturnsTrue() { + final Settings settings = ccrEnabledSettings(); + + when(licenseState.isMonitoringAllowed()).thenReturn(true); + when(licenseState.isCcrAllowed()).thenReturn(true); + final boolean isElectedMaster = true; + + final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(true)); + + verify(licenseState).isMonitoringAllowed(); + } + + abstract AbstractCcrCollector createCollector(Settings settings, + ClusterService clusterService, + XPackLicenseState licenseState, + Client client); + + private Settings ccrEnabledSettings() { + // since it's the default, we want to ensure we test both with/without it + return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true).build(); + } + + private Settings ccrDisabledSettings() { + return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build(); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java new file mode 100644 index 00000000000..ce1c0136677 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils; +import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class AutoFollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase { + + private AutoFollowStats autoFollowStats; + + @Before + public void instantiateAutoFollowStats() { + autoFollowStats = new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + Collections.emptyNavigableMap()); + } + + @Override + protected AutoFollowStatsMonitoringDoc createMonitoringDoc(String cluster, + long timestamp, + long interval, + MonitoringDoc.Node node, + MonitoredSystem system, + String type, + String id) { + return new AutoFollowStatsMonitoringDoc(cluster, timestamp, interval, node, autoFollowStats); + } + + @Override + protected void assertMonitoringDoc(AutoFollowStatsMonitoringDoc document) { + assertThat(document.getSystem(), is(MonitoredSystem.ES)); + assertThat(document.getType(), is(AutoFollowStatsMonitoringDoc.TYPE)); + assertThat(document.getId(), nullValue()); + assertThat(document.stats(), is(autoFollowStats)); + } + + @Override + public void testToXContent() throws IOException { + final long timestamp = System.currentTimeMillis(); + final long intervalMillis = System.currentTimeMillis(); + final long nodeTimestamp = System.currentTimeMillis(); + final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", nodeTimestamp); + + final NavigableMap recentAutoFollowExceptions = + new TreeMap<>(Collections.singletonMap( + randomAlphaOfLength(4), + new ElasticsearchException("cannot follow index"))); + final AutoFollowStats autoFollowStats = + new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), recentAutoFollowExceptions); + + final AutoFollowStatsMonitoringDoc document = + new AutoFollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, autoFollowStats); + final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false); + assertThat( + xContent.utf8ToString(), + equalTo( + "{" + + "\"cluster_uuid\":\"_cluster\"," + + "\"timestamp\":\"" + new DateTime(timestamp, DateTimeZone.UTC).toString() + "\"," + + "\"interval_ms\":" + intervalMillis + "," + + "\"type\":\"ccr_auto_follow_stats\"," + + "\"source_node\":{" + + "\"uuid\":\"_uuid\"," + + "\"host\":\"_host\"," + + "\"transport_address\":\"_addr\"," + + "\"ip\":\"_ip\"," + + "\"name\":\"_name\"," + + "\"timestamp\":\"" + new DateTime(nodeTimestamp, DateTimeZone.UTC).toString() + "\"" + + "}," + + "\"ccr_auto_follow_stats\":{" + + "\"number_of_failed_follow_indices\":" + autoFollowStats.getNumberOfFailedFollowIndices() + "," + + "\"number_of_failed_remote_cluster_state_requests\":" + + autoFollowStats.getNumberOfFailedRemoteClusterStateRequests() + "," + + "\"number_of_successful_follow_indices\":" + autoFollowStats.getNumberOfSuccessfulFollowIndices() + "," + + "\"recent_auto_follow_errors\":[" + + "{" + + "\"leader_index\":\"" + recentAutoFollowExceptions.keySet().iterator().next() + "\"," + + "\"auto_follow_exception\":{" + + "\"type\":\"exception\"," + + "\"reason\":\"cannot follow index\"" + + "}" + + "}" + + "]" + + "}" + + "}")); + } + + public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { + final NavigableMap fetchExceptions = + new TreeMap<>(Collections.singletonMap("leader_index", new ElasticsearchException("cannot follow index"))); + final AutoFollowStats status = new AutoFollowStats(1, 0, 2, fetchExceptions); + XContentBuilder builder = jsonBuilder(); + builder.value(status); + Map serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false); + + Map template = + XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false); + Map autoFollowStatsMapping = + (Map) XContentMapValues.extractValue("mappings.doc.properties.ccr_auto_follow_stats.properties", template); + + assertThat(serializedStatus.size(), equalTo(autoFollowStatsMapping.size())); + for (Map.Entry entry : serializedStatus.entrySet()) { + String fieldName = entry.getKey(); + Map fieldMapping = (Map) autoFollowStatsMapping.get(fieldName); + assertThat(fieldMapping, notNullValue()); + + Object fieldValue = entry.getValue(); + String fieldType = (String) fieldMapping.get("type"); + if (fieldValue instanceof Long || fieldValue instanceof Integer) { + assertThat("expected long field type for field [" + fieldName + "]", fieldType, + anyOf(equalTo("long"), equalTo("integer"))); + } else if (fieldValue instanceof String) { + assertThat("expected keyword field type for field [" + fieldName + "]", fieldType, + anyOf(equalTo("keyword"), equalTo("text"))); + } else { + // Manual test specific object fields and if not just fail: + if (fieldName.equals("recent_auto_follow_errors")) { + assertThat(fieldType, equalTo("nested")); + assertThat(((Map) fieldMapping.get("properties")).size(), equalTo(2)); + assertThat(XContentMapValues.extractValue("properties.leader_index.type", fieldMapping), equalTo("keyword")); + assertThat(XContentMapValues.extractValue("properties.auto_follow_exception.type", fieldMapping), equalTo("object")); + + Map exceptionFieldMapping = + (Map) XContentMapValues.extractValue("properties.auto_follow_exception.properties", fieldMapping); + assertThat(exceptionFieldMapping.size(), equalTo(2)); + assertThat(XContentMapValues.extractValue("type.type", exceptionFieldMapping), equalTo("keyword")); + assertThat(XContentMapValues.extractValue("reason.type", exceptionFieldMapping), equalTo("text")); + } else { + fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]"); + } + } + } + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollectorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollectorTests.java new file mode 100644 index 00000000000..7a302503d2d --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollectorTests.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction; +import org.elasticsearch.xpack.core.ccr.client.CcrClient; +import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; + +import java.util.Collection; + +import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CcrAutoFollowStatsCollectorTests extends AbstractCcrCollectorTestCase { + + @Override + AbstractCcrCollector createCollector(Settings settings, ClusterService clusterService, XPackLicenseState licenseState, Client client) { + return new CcrAutoFollowStatsCollector(settings, clusterService, licenseState, client); + } + + public void testDoCollect() throws Exception { + final String clusterUuid = randomAlphaOfLength(5); + whenClusterStateWithUUID(clusterUuid); + + final MonitoringDoc.Node node = randomMonitoringNode(random()); + final CcrClient client = mock(CcrClient.class); + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + + final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120)); + withCollectionTimeout(CcrAutoFollowStatsCollector.CCR_AUTO_FOLLOW_STATS_TIMEOUT, timeout); + + final CcrAutoFollowStatsCollector collector = + new CcrAutoFollowStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext); + assertEquals(timeout, collector.getCollectionTimeout()); + + final AutoFollowStats autoFollowStats = mock(AutoFollowStats.class); + + @SuppressWarnings("unchecked") + final ActionFuture future = (ActionFuture)mock(ActionFuture.class); + final AutoFollowStatsAction.Response response = new AutoFollowStatsAction.Response(autoFollowStats); + + when(client.autoFollowStats(any())).thenReturn(future); + when(future.actionGet(timeout)).thenReturn(response); + + final long interval = randomNonNegativeLong(); + + final Collection documents = collector.doCollect(node, interval, clusterState); + verify(clusterState).metaData(); + verify(metaData).clusterUUID(); + + assertThat(documents, hasSize(1)); + final AutoFollowStatsMonitoringDoc document = (AutoFollowStatsMonitoringDoc) documents.iterator().next(); + + assertThat(document.getCluster(), is(clusterUuid)); + assertThat(document.getTimestamp(), greaterThan(0L)); + assertThat(document.getIntervalMillis(), equalTo(interval)); + assertThat(document.getNode(), equalTo(node)); + assertThat(document.getSystem(), is(MonitoredSystem.ES)); + assertThat(document.getType(), is(AutoFollowStatsMonitoringDoc.TYPE)); + assertThat(document.getId(), nullValue()); + assertThat(document.stats(), is(autoFollowStats)); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java index aaf3a61643b..b0f2a00d2dc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java @@ -7,17 +7,18 @@ package org.elasticsearch.xpack.monitoring.collector.ccr; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.client.CcrClient; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; -import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase; import org.mockito.ArgumentMatcher; import java.util.ArrayList; @@ -38,89 +39,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class CcrStatsCollectorTests extends BaseCollectorTestCase { +public class CcrStatsCollectorTests extends AbstractCcrCollectorTestCase { - public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() { - final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings()); - final boolean ccrAllowed = randomBoolean(); - final boolean isElectedMaster = randomBoolean(); - whenLocalNodeElectedMaster(isElectedMaster); - - // this controls the blockage - when(licenseState.isMonitoringAllowed()).thenReturn(false); - when(licenseState.isCcrAllowed()).thenReturn(ccrAllowed); - - final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); - - assertThat(collector.shouldCollect(isElectedMaster), is(false)); - if (isElectedMaster) { - verify(licenseState).isMonitoringAllowed(); - } - } - - public void testShouldCollectReturnsFalseIfNotMaster() { - // regardless of CCR being enabled - final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings()); - - when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); - when(licenseState.isCcrAllowed()).thenReturn(randomBoolean()); - // this controls the blockage - final boolean isElectedMaster = false; - - final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); - - assertThat(collector.shouldCollect(isElectedMaster), is(false)); - } - - public void testShouldCollectReturnsFalseIfCCRIsDisabled() { - // this is controls the blockage - final Settings settings = ccrDisabledSettings(); - - when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); - when(licenseState.isCcrAllowed()).thenReturn(randomBoolean()); - - final boolean isElectedMaster = randomBoolean(); - whenLocalNodeElectedMaster(isElectedMaster); - - final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); - - assertThat(collector.shouldCollect(isElectedMaster), is(false)); - - if (isElectedMaster) { - verify(licenseState).isMonitoringAllowed(); - } - } - - public void testShouldCollectReturnsFalseIfCCRIsNotAllowed() { - final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings()); - - when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); - // this is controls the blockage - when(licenseState.isCcrAllowed()).thenReturn(false); - final boolean isElectedMaster = randomBoolean(); - whenLocalNodeElectedMaster(isElectedMaster); - - final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); - - assertThat(collector.shouldCollect(isElectedMaster), is(false)); - - if (isElectedMaster) { - verify(licenseState).isMonitoringAllowed(); - } - } - - public void testShouldCollectReturnsTrue() { - final Settings settings = ccrEnabledSettings(); - - when(licenseState.isMonitoringAllowed()).thenReturn(true); - when(licenseState.isCcrAllowed()).thenReturn(true); - final boolean isElectedMaster = true; - - final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); - - assertThat(collector.shouldCollect(isElectedMaster), is(true)); - - verify(licenseState).isMonitoringAllowed(); + @Override + AbstractCcrCollector createCollector(Settings settings, ClusterService clusterService, XPackLicenseState licenseState, Client client) { + return new CcrStatsCollector(settings, clusterService, licenseState, client); } public void testDoCollect() throws Exception { @@ -186,15 +109,6 @@ public class CcrStatsCollectorTests extends BaseCollectorTestCase { return statuses; } - private Settings ccrEnabledSettings() { - // since it's the default, we want to ensure we test both with/without it - return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true).build(); - } - - private Settings ccrDisabledSettings() { - return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build(); - } - private static CcrStatsAction.StatsRequest statsRequestEq(CcrStatsAction.StatsRequest expected) { return argThat(new StatsRequestMatches(expected)); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java index 7133a201f4e..6f28c450f04 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -121,28 +121,33 @@ public class AutoFollowStats implements Writeable, ToXContentObject { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); { - builder.field(NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfFailedFollowIndices); - builder.field(NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS.getPreferredName(), numberOfFailedRemoteClusterStateRequests); - builder.field(NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfSuccessfulFollowIndices); - builder.startArray(RECENT_AUTO_FOLLOW_ERRORS.getPreferredName()); - { - for (final Map.Entry entry : recentAutoFollowErrors.entrySet()) { + toXContentFragment(builder, params); + } + builder.endObject(); + return builder; + } + + public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException { + builder.field(NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfFailedFollowIndices); + builder.field(NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS.getPreferredName(), numberOfFailedRemoteClusterStateRequests); + builder.field(NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfSuccessfulFollowIndices); + builder.startArray(RECENT_AUTO_FOLLOW_ERRORS.getPreferredName()); + { + for (final Map.Entry entry : recentAutoFollowErrors.entrySet()) { + builder.startObject(); + { + builder.field(LEADER_INDEX.getPreferredName(), entry.getKey()); + builder.field(AUTO_FOLLOW_EXCEPTION.getPreferredName()); builder.startObject(); { - builder.field(LEADER_INDEX.getPreferredName(), entry.getKey()); - builder.field(AUTO_FOLLOW_EXCEPTION.getPreferredName()); - builder.startObject(); - { - ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); - } - builder.endObject(); + ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); } builder.endObject(); } + builder.endObject(); } - builder.endArray(); } - builder.endObject(); + builder.endArray(); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java index 3b98f2319c1..f2f76bcb846 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; @@ -63,6 +64,17 @@ public class CcrClient { return listener; } + public void autoFollowStats(final AutoFollowStatsAction.Request request, + final ActionListener listener) { + client.execute(AutoFollowStatsAction.INSTANCE, request, listener); + } + + public ActionFuture autoFollowStats(final AutoFollowStatsAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + autoFollowStats(request, listener); + return listener; + } + public void unfollow(final UnfollowIndexAction.Request request, final ActionListener listener) { client.execute(UnfollowIndexAction.INSTANCE, request, listener); } diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index 8464f495371..2620fee9fd1 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -1008,6 +1008,38 @@ "type": "long" } } + }, + "ccr_auto_follow_stats" : { + "properties": { + "number_of_failed_follow_indices": { + "type": "long" + }, + "number_of_failed_remote_cluster_state_requests": { + "type": "long" + }, + "number_of_successful_follow_indices": { + "type": "long" + }, + "recent_auto_follow_errors": { + "type": "nested", + "properties": { + "leader_index": { + "type": "keyword" + }, + "auto_follow_exception": { + "type": "object", + "properties": { + "type": { + "type": "keyword" + }, + "reason": { + "type": "text" + } + } + } + } + } + } } } } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java index 027cb7de937..d18286a9db5 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction; import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; import org.elasticsearch.xpack.monitoring.collector.Collector; +import org.elasticsearch.xpack.monitoring.collector.ccr.CcrAutoFollowStatsCollector; import org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsCollector; import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector; import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector; @@ -144,6 +145,7 @@ public class Monitoring extends Plugin implements ActionPlugin { collectors.add(new IndexRecoveryCollector(settings, clusterService, getLicenseState(), client)); collectors.add(new JobStatsCollector(settings, clusterService, getLicenseState(), client)); collectors.add(new CcrStatsCollector(settings, clusterService, getLicenseState(), client)); + collectors.add(new CcrAutoFollowStatsCollector(settings, clusterService, getLicenseState(), client)); final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters); @@ -183,6 +185,7 @@ public class Monitoring extends Plugin implements ActionPlugin { settings.add(IndexStatsCollector.INDEX_STATS_TIMEOUT); settings.add(JobStatsCollector.JOB_STATS_TIMEOUT); settings.add(CcrStatsCollector.CCR_STATS_TIMEOUT); + settings.add(CcrAutoFollowStatsCollector.CCR_AUTO_FOLLOW_STATS_TIMEOUT); settings.add(NodeStatsCollector.NODE_STATS_TIMEOUT); settings.addAll(Exporters.getSettings()); return Collections.unmodifiableList(settings); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollector.java new file mode 100644 index 00000000000..f6b124d6df5 --- /dev/null +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollector.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ccr.client.CcrClient; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; +import org.elasticsearch.xpack.monitoring.collector.Collector; + +import java.util.Collection; + +import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; +import static org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsMonitoringDoc.TYPE; + +public abstract class AbstractCcrCollector extends Collector { + + private final ThreadContext threadContext; + final CcrClient ccrClient; + + AbstractCcrCollector( + final Settings settings, + final ClusterService clusterService, + final Setting timeoutSetting, + final XPackLicenseState licenseState, + final CcrClient ccrClient, + final ThreadContext threadContext) { + super(settings, TYPE, clusterService, timeoutSetting, licenseState); + this.ccrClient = ccrClient; + this.threadContext = threadContext; + } + + @Override + protected boolean shouldCollect(final boolean isElectedMaster) { + // this can only run when monitoring is allowed and CCR is enabled and allowed, but also only on the elected master node + return isElectedMaster + && super.shouldCollect(isElectedMaster) + && XPackSettings.CCR_ENABLED_SETTING.get(settings) + && licenseState.isCcrAllowed(); + } + + + @Override + protected Collection doCollect( + final MonitoringDoc.Node node, + final long interval, + final ClusterState clusterState) throws Exception { + try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, MONITORING_ORIGIN)) { + final long timestamp = timestamp(); + final String clusterUuid = clusterUuid(clusterState); + return innerDoCollect(timestamp, clusterUuid, interval, node); + } + } + + abstract Collection innerDoCollect( + long timestamp, + String clusterUuid, + long interval, + MonitoringDoc.Node node) throws Exception; +} diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDoc.java new file mode 100644 index 00000000000..82312203fd8 --- /dev/null +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDoc.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; + +import java.io.IOException; +import java.util.Objects; + +public class AutoFollowStatsMonitoringDoc extends MonitoringDoc { + + public static final String TYPE = "ccr_auto_follow_stats"; + + private final AutoFollowStats stats; + + public AutoFollowStats stats() { + return stats; + } + + public AutoFollowStatsMonitoringDoc( + final String cluster, + final long timestamp, + final long intervalMillis, + final Node node, + final AutoFollowStats stats) { + super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null); + this.stats = Objects.requireNonNull(stats, "stats"); + } + + + @Override + protected void innerToXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(TYPE); + { + stats.toXContentFragment(builder, params); + } + builder.endObject(); + } + +} diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollector.java new file mode 100644 index 00000000000..e179c204416 --- /dev/null +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollector.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.core.XPackClient; +import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction; +import org.elasticsearch.xpack.core.ccr.client.CcrClient; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; + +import java.util.Collection; +import java.util.Collections; + +public final class CcrAutoFollowStatsCollector extends AbstractCcrCollector { + + public static final Setting CCR_AUTO_FOLLOW_STATS_TIMEOUT = collectionTimeoutSetting("ccr.auto_follow.stats.timeout"); + + public CcrAutoFollowStatsCollector( + final Settings settings, + final ClusterService clusterService, + final XPackLicenseState licenseState, + final Client client) { + super(settings, clusterService, CCR_AUTO_FOLLOW_STATS_TIMEOUT, licenseState, new XPackClient(client).ccr(), + client.threadPool().getThreadContext()); + } + + CcrAutoFollowStatsCollector( + final Settings settings, + final ClusterService clusterService, + final XPackLicenseState licenseState, + final CcrClient ccrClient, + final ThreadContext threadContext) { + super(settings, clusterService, CCR_AUTO_FOLLOW_STATS_TIMEOUT, licenseState, ccrClient, threadContext); + } + + @Override + Collection innerDoCollect( + long timestamp, + String clusterUuid, + long interval, + MonitoringDoc.Node node) throws Exception { + + final AutoFollowStatsAction.Request request = new AutoFollowStatsAction.Request(); + final AutoFollowStatsAction.Response response = ccrClient.autoFollowStats(request).actionGet(getCollectionTimeout()); + + final AutoFollowStatsMonitoringDoc doc = + new AutoFollowStatsMonitoringDoc(clusterUuid, timestamp, interval, node, response.getStats()); + return Collections.singletonList(doc); + } + +} diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java index 510f430d196..45a8ddc0f1a 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.monitoring.collector.ccr; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -16,32 +15,24 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.core.XPackClient; -import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.client.CcrClient; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; -import org.elasticsearch.xpack.monitoring.collector.Collector; import java.util.Collection; import java.util.stream.Collectors; -import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; -import static org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsMonitoringDoc.TYPE; - -public class CcrStatsCollector extends Collector { +public final class CcrStatsCollector extends AbstractCcrCollector { public static final Setting CCR_STATS_TIMEOUT = collectionTimeoutSetting("ccr.stats.timeout"); - private final ThreadContext threadContext; - private final CcrClient ccrClient; - public CcrStatsCollector( final Settings settings, final ClusterService clusterService, final XPackLicenseState licenseState, final Client client) { - this(settings, clusterService, licenseState, new XPackClient(client).ccr(), client.threadPool().getThreadContext()); + super(settings, clusterService, CCR_STATS_TIMEOUT, licenseState, new XPackClient(client).ccr(), + client.threadPool().getThreadContext()); } CcrStatsCollector( @@ -50,41 +41,26 @@ public class CcrStatsCollector extends Collector { final XPackLicenseState licenseState, final CcrClient ccrClient, final ThreadContext threadContext) { - super(settings, TYPE, clusterService, CCR_STATS_TIMEOUT, licenseState); - this.ccrClient = ccrClient; - this.threadContext = threadContext; + super(settings, clusterService, CCR_STATS_TIMEOUT, licenseState, ccrClient, threadContext); } @Override - protected boolean shouldCollect(final boolean isElectedMaster) { - // this can only run when monitoring is allowed and CCR is enabled and allowed, but also only on the elected master node - return isElectedMaster - && super.shouldCollect(isElectedMaster) - && XPackSettings.CCR_ENABLED_SETTING.get(settings) - && licenseState.isCcrAllowed(); - } + Collection innerDoCollect( + long timestamp, + String clusterUuid, + long interval, + MonitoringDoc.Node node) throws Exception { + final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest(); + request.setIndices(getCollectionIndices()); + request.setIndicesOptions(IndicesOptions.lenientExpandOpen()); + final CcrStatsAction.StatsResponses responses = ccrClient.stats(request).actionGet(getCollectionTimeout()); - @Override - protected Collection doCollect( - final MonitoringDoc.Node node, - final long interval, - final ClusterState clusterState) throws Exception { - try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, MONITORING_ORIGIN)) { - final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest(); - request.setIndices(getCollectionIndices()); - request.setIndicesOptions(IndicesOptions.lenientExpandOpen()); - final CcrStatsAction.StatsResponses responses = ccrClient.stats(request).actionGet(getCollectionTimeout()); - - final long timestamp = timestamp(); - final String clusterUuid = clusterUuid(clusterState); - - return responses - .getStatsResponses() - .stream() - .map(stats -> new CcrStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status())) - .collect(Collectors.toList()); - } + return responses + .getStatsResponses() + .stream() + .map(stats -> new CcrStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status())) + .collect(Collectors.toList()); } }