[CCR] Refactor stats APIs (#34912)

* Changed the auto follow stats to also include follow stats.
* Renamed the auto follow stats api to stats api and changed its url path
  from `/_ccr/auto_follow/stats` `/_ccr/stats`.
* Removed `/_ccr/stats` url path for the follow stats api, which makes
  the index parameter a required parameter.
* Fixed docs.
This commit is contained in:
Martijn van Groningen 2018-10-29 07:45:27 +01:00 committed by GitHub
parent bad5972f62
commit 1801518527
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 380 additions and 623 deletions

View File

@ -1,46 +0,0 @@
[role="xpack"]
[testenv="platinum"]
[[ccr-get-auto-follow-stats]]
=== Get Auto-Follow Stats API
++++
<titleabbrev>Get Auto-Follow Stats</titleabbrev>
++++
Get auto-follow stats.
==== Description
This API gets stats about auto-follow patterns.
==== Request
[source,js]
--------------------------------------------------
GET /_ccr/auto_follow/stats
--------------------------------------------------
// CONSOLE
// TEST
==== Example
This example retrieves stats about auto-follow patterns:
[source,js]
--------------------------------------------------
GET /_ccr/auto_follow/stats
--------------------------------------------------
// CONSOLE
// TEST
The API returns the following result:
[source,js]
--------------------------------------------------
{
"number_of_successful_follow_indices" : 16,
"number_of_failed_follow_indices" : 0,
"number_of_failed_remote_cluster_state_requests" : 0,
"recent_auto_follow_errors" : [ ]
}
--------------------------------------------------
// TESTRESPONSE[s/"number_of_successful_follow_indices" : 16/"number_of_successful_follow_indices" : $body.number_of_successful_follow_indices/]

View File

@ -38,12 +38,6 @@ POST /follower_index/_ccr/pause_follow
//////////////////////////
[source,js]
--------------------------------------------------
GET /_ccr/stats
--------------------------------------------------
// CONSOLE
[source,js]
--------------------------------------------------
GET /<index>/_ccr/stats
@ -186,7 +180,7 @@ This example retrieves follower stats:
[source,js]
--------------------------------------------------
GET /_ccr/stats
GET /follower_index/_ccr/stats
--------------------------------------------------
// CONSOLE

View File

@ -102,8 +102,9 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
assertBusy(() -> {
Request statsRequest = new Request("GET", "/_ccr/auto_follow/stats");
Map<String, ?> response = toMap(client().performRequest(statsRequest));
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
response = (Map<?, ?>) response.get("auto_follow_stats");
assertThat(response.get("number_of_successful_follow_indices"), equalTo(1));
ensureYellow("logs-20190101");

View File

@ -1,10 +0,0 @@
---
"Test autofollow stats":
- do:
ccr.auto_follow_stats: {}
- match: { number_of_successful_follow_indices: 0 }
- match: { number_of_failed_follow_indices: 0 }
- match: { number_of_failed_remote_cluster_state_requests: 0 }
- length: { recent_auto_follow_errors: 0 }

View File

@ -45,7 +45,7 @@
# we can not reliably wait for replication to occur so we test the endpoint without indexing any documents
- do:
ccr.stats:
ccr.follow_stats:
index: bar
- match: { indices.0.index: "bar" }
- match: { indices.0.shards.0.leader_index: "foo" }

View File

@ -0,0 +1,11 @@
---
"Test stats":
- do:
ccr.stats: {}
- match: { auto_follow_stats.number_of_successful_follow_indices: 0 }
- match: { auto_follow_stats.number_of_failed_follow_indices: 0 }
- match: { auto_follow_stats.number_of_failed_remote_cluster_state_requests: 0 }
- length: { auto_follow_stats.recent_auto_follow_errors: 0 }
- length: { follow_stats.indices: 0 }

View File

@ -43,10 +43,10 @@ import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportAutoFollowStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestAutoFollowStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.StatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
@ -164,7 +164,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
// stats action
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
new ActionHandler<>(AutoFollowStatsAction.INSTANCE, TransportAutoFollowStatsAction.class),
new ActionHandler<>(StatsAction.INSTANCE, TransportStatsAction.class),
// follow actions
new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),

View File

@ -9,11 +9,13 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
@ -23,18 +25,19 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.StatsAction;
import java.util.Objects;
public class TransportAutoFollowStatsAction
extends TransportMasterNodeAction<AutoFollowStatsAction.Request, AutoFollowStatsAction.Response> {
public class TransportStatsAction extends TransportMasterNodeAction<StatsAction.Request, StatsAction.Response> {
private final Client client;
private final CcrLicenseChecker ccrLicenseChecker;
private final AutoFollowCoordinator autoFollowCoordinator;
@Inject
public TransportAutoFollowStatsAction(
public TransportStatsAction(
Settings settings,
TransportService transportService,
ClusterService clusterService,
@ -42,18 +45,20 @@ public class TransportAutoFollowStatsAction
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AutoFollowCoordinator autoFollowCoordinator,
CcrLicenseChecker ccrLicenseChecker
CcrLicenseChecker ccrLicenseChecker,
Client client
) {
super(
settings,
AutoFollowStatsAction.NAME,
StatsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
AutoFollowStatsAction.Request::new,
StatsAction.Request::new,
indexNameExpressionResolver
);
this.client = client;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
this.autoFollowCoordinator = Objects.requireNonNull(autoFollowCoordinator);
}
@ -64,12 +69,12 @@ public class TransportAutoFollowStatsAction
}
@Override
protected AutoFollowStatsAction.Response newResponse() {
return new AutoFollowStatsAction.Response();
protected StatsAction.Response newResponse() {
return new StatsAction.Response();
}
@Override
protected void doExecute(Task task, AutoFollowStatsAction.Request request, ActionListener<AutoFollowStatsAction.Response> listener) {
protected void doExecute(Task task, StatsAction.Request request, ActionListener<StatsAction.Response> listener) {
if (ccrLicenseChecker.isCcrAllowed() == false) {
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
@ -79,16 +84,20 @@ public class TransportAutoFollowStatsAction
@Override
protected void masterOperation(
AutoFollowStatsAction.Request request,
StatsAction.Request request,
ClusterState state,
ActionListener<AutoFollowStatsAction.Response> listener
ActionListener<StatsAction.Response> listener
) throws Exception {
AutoFollowStats stats = autoFollowCoordinator.getStats();
listener.onResponse(new AutoFollowStatsAction.Response(stats));
CheckedConsumer<FollowStatsAction.StatsResponses, Exception> handler = statsResponse -> {
AutoFollowStats stats = autoFollowCoordinator.getStats();
listener.onResponse(new StatsAction.Response(stats, statsResponse));
};
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
client.execute(FollowStatsAction.INSTANCE, statsRequest, ActionListener.wrap(handler, listener::onFailure));
}
@Override
protected ClusterBlockException checkBlock(AutoFollowStatsAction.Request request, ClusterState state) {
protected ClusterBlockException checkBlock(StatsAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}

View File

@ -12,7 +12,7 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.StatsAction;
import java.io.IOException;
@ -20,7 +20,7 @@ public class RestAutoFollowStatsAction extends BaseRestHandler {
public RestAutoFollowStatsAction(final Settings settings, final RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, "/_ccr/auto_follow/stats", this);
controller.registerHandler(RestRequest.Method.GET, "/_ccr/stats", this);
}
@Override
@ -30,8 +30,8 @@ public class RestAutoFollowStatsAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final AutoFollowStatsAction.Request request = new AutoFollowStatsAction.Request();
return channel -> client.execute(AutoFollowStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
final StatsAction.Request request = new StatsAction.Request();
return channel -> client.execute(StatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -21,7 +21,6 @@ public class RestFollowStatsAction extends BaseRestHandler {
public RestFollowStatsAction(final Settings settings, final RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, "/_ccr/stats", this);
controller.registerHandler(RestRequest.Method.GET, "/{index}/_ccr/stats", this);
}

View File

@ -19,7 +19,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.StatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
@ -260,8 +260,8 @@ public class AutoFollowIT extends CcrIntegTestCase {
}
private AutoFollowStats getAutoFollowStats() {
AutoFollowStatsAction.Request request = new AutoFollowStatsAction.Request();
return followerClient().execute(AutoFollowStatsAction.INSTANCE, request).actionGet().getStats();
StatsAction.Request request = new StatsAction.Request();
return followerClient().execute(StatsAction.INSTANCE, request).actionGet().getAutoFollowStats();
}
private void createLeaderIndex(String index, Settings settings) {

View File

@ -7,25 +7,28 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.StatsAction;
import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomReadExceptions;
import static org.elasticsearch.xpack.ccr.action.StatsResponsesTests.createStatsResponse;
public class AutoFollowStatsResponseTests extends AbstractStreamableTestCase<AutoFollowStatsAction.Response> {
public class AutoFollowStatsResponseTests extends AbstractStreamableTestCase<StatsAction.Response> {
@Override
protected AutoFollowStatsAction.Response createBlankInstance() {
return new AutoFollowStatsAction.Response();
protected StatsAction.Response createBlankInstance() {
return new StatsAction.Response();
}
@Override
protected AutoFollowStatsAction.Response createTestInstance() {
protected StatsAction.Response createTestInstance() {
AutoFollowStats autoFollowStats = new AutoFollowStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions()
);
return new AutoFollowStatsAction.Response(autoFollowStats);
FollowStatsAction.StatsResponses statsResponse = createStatsResponse();
return new StatsAction.Response(autoFollowStats, statsResponse);
}
}

View File

@ -23,6 +23,10 @@ public class StatsResponsesTests extends AbstractStreamableTestCase<FollowStatsA
@Override
protected FollowStatsAction.StatsResponses createTestInstance() {
return createStatsResponse();
}
static FollowStatsAction.StatsResponses createStatsResponse() {
int numResponses = randomIntBetween(0, 8);
List<FollowStatsAction.StatsResponse> responses = new ArrayList<>(numResponses);
for (int i = 0; i < numResponses; i++) {

View File

@ -1,118 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.ccr;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public abstract class AbstractCcrCollectorTestCase extends BaseCollectorTestCase {
public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
final boolean ccrAllowed = randomBoolean();
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
// this controls the blockage
when(licenseState.isMonitoringAllowed()).thenReturn(false);
when(licenseState.isCcrAllowed()).thenReturn(ccrAllowed);
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfNotMaster() {
// regardless of CCR being enabled
final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
// this controls the blockage
final boolean isElectedMaster = false;
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
}
public void testShouldCollectReturnsFalseIfCCRIsDisabled() {
// this is controls the blockage
final Settings settings = ccrDisabledSettings();
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfCCRIsNotAllowed() {
final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
// this is controls the blockage
when(licenseState.isCcrAllowed()).thenReturn(false);
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsTrue() {
final Settings settings = ccrEnabledSettings();
when(licenseState.isMonitoringAllowed()).thenReturn(true);
when(licenseState.isCcrAllowed()).thenReturn(true);
final boolean isElectedMaster = true;
final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(true));
verify(licenseState).isMonitoringAllowed();
}
abstract AbstractCcrCollector createCollector(Settings settings,
ClusterService clusterService,
XPackLicenseState licenseState,
Client client);
private Settings ccrEnabledSettings() {
// since it's the default, we want to ensure we test both with/without it
return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true).build();
}
private Settings ccrDisabledSettings() {
return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build();
}
}

View File

@ -1,85 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.ccr;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import java.util.Collection;
import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CcrAutoFollowStatsCollectorTests extends AbstractCcrCollectorTestCase {
@Override
AbstractCcrCollector createCollector(Settings settings, ClusterService clusterService, XPackLicenseState licenseState, Client client) {
return new CcrAutoFollowStatsCollector(settings, clusterService, licenseState, client);
}
public void testDoCollect() throws Exception {
final String clusterUuid = randomAlphaOfLength(5);
whenClusterStateWithUUID(clusterUuid);
final MonitoringDoc.Node node = randomMonitoringNode(random());
final CcrClient client = mock(CcrClient.class);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
withCollectionTimeout(CcrAutoFollowStatsCollector.CCR_AUTO_FOLLOW_STATS_TIMEOUT, timeout);
final CcrAutoFollowStatsCollector collector =
new CcrAutoFollowStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext);
assertEquals(timeout, collector.getCollectionTimeout());
final AutoFollowStats autoFollowStats = mock(AutoFollowStats.class);
@SuppressWarnings("unchecked")
final ActionFuture<AutoFollowStatsAction.Response> future = (ActionFuture<AutoFollowStatsAction.Response>)mock(ActionFuture.class);
final AutoFollowStatsAction.Response response = new AutoFollowStatsAction.Response(autoFollowStats);
when(client.autoFollowStats(any())).thenReturn(future);
when(future.actionGet(timeout)).thenReturn(response);
final long interval = randomNonNegativeLong();
final Collection<MonitoringDoc> documents = collector.doCollect(node, interval, clusterState);
verify(clusterState).metaData();
verify(metaData).clusterUUID();
assertThat(documents, hasSize(1));
final AutoFollowStatsMonitoringDoc document = (AutoFollowStatsMonitoringDoc) documents.iterator().next();
assertThat(document.getCluster(), is(clusterUuid));
assertThat(document.getTimestamp(), greaterThan(0L));
assertThat(document.getIntervalMillis(), equalTo(interval));
assertThat(document.getNode(), equalTo(node));
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), is(AutoFollowStatsMonitoringDoc.TYPE));
assertThat(document.getId(), nullValue());
assertThat(document.stats(), is(autoFollowStats));
}
}

View File

@ -1,133 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.ccr;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.mockito.ArgumentMatcher;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class FollowStatsCollectorTests extends AbstractCcrCollectorTestCase {
@Override
AbstractCcrCollector createCollector(Settings settings, ClusterService clusterService, XPackLicenseState licenseState, Client client) {
return new FollowStatsCollector(settings, clusterService, licenseState, client);
}
public void testDoCollect() throws Exception {
final String clusterUuid = randomAlphaOfLength(5);
whenClusterStateWithUUID(clusterUuid);
final MonitoringDoc.Node node = randomMonitoringNode(random());
final CcrClient client = mock(CcrClient.class);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
withCollectionTimeout(FollowStatsCollector.CCR_STATS_TIMEOUT, timeout);
final FollowStatsCollector collector =
new FollowStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext);
assertEquals(timeout, collector.getCollectionTimeout());
final List<FollowStatsAction.StatsResponse> statuses = mockStatuses();
@SuppressWarnings("unchecked")
final ActionFuture<FollowStatsAction.StatsResponses> future =
(ActionFuture<FollowStatsAction.StatsResponses>)mock(ActionFuture.class);
final FollowStatsAction.StatsResponses responses = new FollowStatsAction.StatsResponses(emptyList(), emptyList(), statuses);
final FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
request.setIndices(Strings.EMPTY_ARRAY);
when(client.stats(statsRequestEq(request))).thenReturn(future);
when(future.actionGet(timeout)).thenReturn(responses);
final long interval = randomNonNegativeLong();
final Collection<MonitoringDoc> documents = collector.doCollect(node, interval, clusterState);
verify(clusterState).metaData();
verify(metaData).clusterUUID();
assertThat(documents, hasSize(statuses.size()));
int index = 0;
for (final Iterator<MonitoringDoc> it = documents.iterator(); it.hasNext(); index++) {
final FollowStatsMonitoringDoc document = (FollowStatsMonitoringDoc)it.next();
final FollowStatsAction.StatsResponse status = statuses.get(index);
assertThat(document.getCluster(), is(clusterUuid));
assertThat(document.getTimestamp(), greaterThan(0L));
assertThat(document.getIntervalMillis(), equalTo(interval));
assertThat(document.getNode(), equalTo(node));
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), is(FollowStatsMonitoringDoc.TYPE));
assertThat(document.getId(), nullValue());
assertThat(document.status(), is(status.status()));
}
}
private List<FollowStatsAction.StatsResponse> mockStatuses() {
final int count = randomIntBetween(1, 8);
final List<FollowStatsAction.StatsResponse> statuses = new ArrayList<>(count);
for (int i = 0; i < count; ++i) {
FollowStatsAction.StatsResponse statsResponse = mock(FollowStatsAction.StatsResponse.class);
ShardFollowNodeTaskStatus status = mock(ShardFollowNodeTaskStatus.class);
when(statsResponse.status()).thenReturn(status);
statuses.add(statsResponse);
}
return statuses;
}
private static FollowStatsAction.StatsRequest statsRequestEq(FollowStatsAction.StatsRequest expected) {
return argThat(new FollowStatsRequest(expected));
}
private static class FollowStatsRequest extends ArgumentMatcher<FollowStatsAction.StatsRequest> {
private final FollowStatsAction.StatsRequest expected;
private FollowStatsRequest(FollowStatsAction.StatsRequest expected) {
this.expected = expected;
}
@Override
public boolean matches(Object o) {
FollowStatsAction.StatsRequest actual = (FollowStatsAction.StatsRequest) o;
return Arrays.equals(expected.indices(), actual.indices());
}
}
}

View File

@ -0,0 +1,215 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.ccr;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.StatsAction;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class StatsCollectorTests extends BaseCollectorTestCase {
public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
final boolean ccrAllowed = randomBoolean();
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
// this controls the blockage
when(licenseState.isMonitoringAllowed()).thenReturn(false);
when(licenseState.isCcrAllowed()).thenReturn(ccrAllowed);
final StatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfNotMaster() {
// regardless of CCR being enabled
final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
// this controls the blockage
final boolean isElectedMaster = false;
final StatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
}
public void testShouldCollectReturnsFalseIfCCRIsDisabled() {
// this is controls the blockage
final Settings settings = ccrDisabledSettings();
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final StatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsFalseIfCCRIsNotAllowed() {
final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
// this is controls the blockage
when(licenseState.isCcrAllowed()).thenReturn(false);
final boolean isElectedMaster = randomBoolean();
whenLocalNodeElectedMaster(isElectedMaster);
final StatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(false));
if (isElectedMaster) {
verify(licenseState).isMonitoringAllowed();
}
}
public void testShouldCollectReturnsTrue() {
final Settings settings = ccrEnabledSettings();
when(licenseState.isMonitoringAllowed()).thenReturn(true);
when(licenseState.isCcrAllowed()).thenReturn(true);
final boolean isElectedMaster = true;
final StatsCollector collector = createCollector(settings, clusterService, licenseState, client);
assertThat(collector.shouldCollect(isElectedMaster), is(true));
verify(licenseState).isMonitoringAllowed();
}
public void testDoCollect() throws Exception {
final String clusterUuid = randomAlphaOfLength(5);
whenClusterStateWithUUID(clusterUuid);
final MonitoringDoc.Node node = randomMonitoringNode(random());
final CcrClient client = mock(CcrClient.class);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final List<FollowStatsAction.StatsResponse> statuses = mockStatuses();
final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
withCollectionTimeout(StatsCollector.CCR_STATS_TIMEOUT, timeout);
final AutoFollowStats autoFollowStats = mock(AutoFollowStats.class);
final FollowStatsAction.StatsResponses statsResponse = mock(FollowStatsAction.StatsResponses.class);
when(statsResponse.getStatsResponses()).thenReturn(statuses);
@SuppressWarnings("unchecked")
final ActionFuture<StatsAction.Response> future = (ActionFuture<StatsAction.Response>) mock(ActionFuture.class);
final StatsAction.Response response = new StatsAction.Response(autoFollowStats, statsResponse);
when(client.stats(any())).thenReturn(future);
when(future.actionGet(timeout)).thenReturn(response);
final StatsCollector collector = new StatsCollector(settings, clusterService, licenseState, client, threadContext);
assertEquals(timeout, collector.getCollectionTimeout());
final long interval = randomNonNegativeLong();
final List<MonitoringDoc> documents = new ArrayList<>(collector.doCollect(node, interval, clusterState));
verify(clusterState).metaData();
verify(metaData).clusterUUID();
assertThat(documents, hasSize(statuses.size() + 1));
for (int i = 0; i < documents.size() - 1; i++) {
final FollowStatsMonitoringDoc document = (FollowStatsMonitoringDoc) documents.get(i);
final FollowStatsAction.StatsResponse status = statuses.get(i);
assertThat(document.getCluster(), is(clusterUuid));
assertThat(document.getTimestamp(), greaterThan(0L));
assertThat(document.getIntervalMillis(), equalTo(interval));
assertThat(document.getNode(), equalTo(node));
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), is(FollowStatsMonitoringDoc.TYPE));
assertThat(document.getId(), nullValue());
assertThat(document.status(), is(status.status()));
}
final AutoFollowStatsMonitoringDoc document = (AutoFollowStatsMonitoringDoc) documents.get(documents.size() - 1);
assertThat(document, notNullValue());
assertThat(document.getCluster(), is(clusterUuid));
assertThat(document.getTimestamp(), greaterThan(0L));
assertThat(document.getIntervalMillis(), equalTo(interval));
assertThat(document.getNode(), equalTo(node));
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), is(AutoFollowStatsMonitoringDoc.TYPE));
assertThat(document.getId(), nullValue());
assertThat(document.stats(), is(autoFollowStats));
}
private List<FollowStatsAction.StatsResponse> mockStatuses() {
final int count = randomIntBetween(1, 8);
final List<FollowStatsAction.StatsResponse> statuses = new ArrayList<>(count);
for (int i = 0; i < count; ++i) {
FollowStatsAction.StatsResponse statsResponse = mock(FollowStatsAction.StatsResponse.class);
ShardFollowNodeTaskStatus status = mock(ShardFollowNodeTaskStatus.class);
when(status.followerIndex()).thenReturn("follow_index");
when(statsResponse.status()).thenReturn(status);
statuses.add(statsResponse);
}
return statuses;
}
private StatsCollector createCollector(Settings settings,
ClusterService clusterService,
XPackLicenseState licenseState,
Client client) {
return new StatsCollector(settings, clusterService, licenseState, client);
}
private Settings ccrEnabledSettings() {
// since it's the default, we want to ensure we test both with/without it
return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true).build();
}
private Settings ccrDisabledSettings() {
return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build();
}
}

View File

@ -32,7 +32,7 @@ import java.util.TreeMap;
public class FollowStatsAction extends Action<FollowStatsAction.StatsResponses> {
public static final String NAME = "cluster:monitor/ccr/stats";
public static final String NAME = "cluster:monitor/ccr/follow_stats";
public static final FollowStatsAction INSTANCE = new FollowStatsAction();

View File

@ -19,12 +19,12 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import java.io.IOException;
import java.util.Objects;
public class AutoFollowStatsAction extends Action<AutoFollowStatsAction.Response> {
public class StatsAction extends Action<StatsAction.Response> {
public static final String NAME = "cluster:monitor/ccr/auto_follow_stats";
public static final AutoFollowStatsAction INSTANCE = new AutoFollowStatsAction();
public static final String NAME = "cluster:monitor/ccr/stats";
public static final StatsAction INSTANCE = new StatsAction();
private AutoFollowStatsAction() {
private StatsAction() {
super(NAME);
}
@ -55,34 +55,48 @@ public class AutoFollowStatsAction extends Action<AutoFollowStatsAction.Response
public static class Response extends ActionResponse implements ToXContentObject {
private AutoFollowStats stats;
private AutoFollowStats autoFollowStats;
private FollowStatsAction.StatsResponses followStats;
public Response(AutoFollowStats stats) {
this.stats = stats;
public Response(AutoFollowStats autoFollowStats, FollowStatsAction.StatsResponses followStats) {
this.autoFollowStats = Objects.requireNonNull(autoFollowStats);
this.followStats = Objects.requireNonNull(followStats);
}
public Response() {
}
public AutoFollowStats getStats() {
return stats;
public AutoFollowStats getAutoFollowStats() {
return autoFollowStats;
}
public FollowStatsAction.StatsResponses getFollowStats() {
return followStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
stats = new AutoFollowStats(in);
autoFollowStats = new AutoFollowStats(in);
followStats = new FollowStatsAction.StatsResponses();
followStats.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
stats.writeTo(out);
autoFollowStats.writeTo(out);
followStats.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
stats.toXContent(builder, params);
builder.startObject();
{
builder.field("auto_follow_stats", autoFollowStats, params);
builder.field("follow_stats", followStats, params);
}
builder.endObject();
return builder;
}
@ -91,12 +105,13 @@ public class AutoFollowStatsAction extends Action<AutoFollowStatsAction.Response
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return Objects.equals(stats, response.stats);
return Objects.equals(autoFollowStats, response.autoFollowStats) &&
Objects.equals(followStats, response.followStats);
}
@Override
public int hashCode() {
return Objects.hash(stats);
return Objects.hash(autoFollowStats, followStats);
}
}

View File

@ -11,7 +11,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.StatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
@ -53,26 +53,26 @@ public class CcrClient {
return listener;
}
public void stats(
public void followStats(
final FollowStatsAction.StatsRequest request,
final ActionListener<FollowStatsAction.StatsResponses> listener) {
client.execute(FollowStatsAction.INSTANCE, request, listener);
}
public ActionFuture<FollowStatsAction.StatsResponses> stats(final FollowStatsAction.StatsRequest request) {
public ActionFuture<FollowStatsAction.StatsResponses> followStats(final FollowStatsAction.StatsRequest request) {
final PlainActionFuture<FollowStatsAction.StatsResponses> listener = PlainActionFuture.newFuture();
client.execute(FollowStatsAction.INSTANCE, request, listener);
return listener;
}
public void autoFollowStats(final AutoFollowStatsAction.Request request,
final ActionListener<AutoFollowStatsAction.Response> listener) {
client.execute(AutoFollowStatsAction.INSTANCE, request, listener);
public void stats(final StatsAction.Request request,
final ActionListener<StatsAction.Response> listener) {
client.execute(StatsAction.INSTANCE, request, listener);
}
public ActionFuture<AutoFollowStatsAction.Response> autoFollowStats(final AutoFollowStatsAction.Request request) {
final PlainActionFuture<AutoFollowStatsAction.Response> listener = PlainActionFuture.newFuture();
autoFollowStats(request, listener);
public ActionFuture<StatsAction.Response> stats(final StatsAction.Request request) {
final PlainActionFuture<StatsAction.Response> listener = PlainActionFuture.newFuture();
stats(request, listener);
return listener;
}

View File

@ -39,8 +39,7 @@ import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.collector.ccr.CcrAutoFollowStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.ccr.FollowStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.ccr.StatsCollector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsCollector;
@ -144,8 +143,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
collectors.add(new NodeStatsCollector(settings, clusterService, getLicenseState(), client));
collectors.add(new IndexRecoveryCollector(settings, clusterService, getLicenseState(), client));
collectors.add(new JobStatsCollector(settings, clusterService, getLicenseState(), client));
collectors.add(new FollowStatsCollector(settings, clusterService, getLicenseState(), client));
collectors.add(new CcrAutoFollowStatsCollector(settings, clusterService, getLicenseState(), client));
collectors.add(new StatsCollector(settings, clusterService, getLicenseState(), client));
final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters);
@ -184,8 +182,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
settings.add(IndexRecoveryCollector.INDEX_RECOVERY_ACTIVE_ONLY);
settings.add(IndexStatsCollector.INDEX_STATS_TIMEOUT);
settings.add(JobStatsCollector.JOB_STATS_TIMEOUT);
settings.add(FollowStatsCollector.CCR_STATS_TIMEOUT);
settings.add(CcrAutoFollowStatsCollector.CCR_AUTO_FOLLOW_STATS_TIMEOUT);
settings.add(StatsCollector.CCR_STATS_TIMEOUT);
settings.add(NodeStatsCollector.NODE_STATS_TIMEOUT);
settings.addAll(Exporters.getSettings());
return Collections.unmodifiableList(settings);

View File

@ -1,61 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.ccr;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import java.util.Collection;
import java.util.Collections;
public final class CcrAutoFollowStatsCollector extends AbstractCcrCollector {
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_STATS_TIMEOUT = collectionTimeoutSetting("ccr.auto_follow.stats.timeout");
public CcrAutoFollowStatsCollector(
final Settings settings,
final ClusterService clusterService,
final XPackLicenseState licenseState,
final Client client) {
super(settings, clusterService, CCR_AUTO_FOLLOW_STATS_TIMEOUT, licenseState, new XPackClient(client).ccr(),
client.threadPool().getThreadContext());
}
CcrAutoFollowStatsCollector(
final Settings settings,
final ClusterService clusterService,
final XPackLicenseState licenseState,
final CcrClient ccrClient,
final ThreadContext threadContext) {
super(settings, clusterService, CCR_AUTO_FOLLOW_STATS_TIMEOUT, licenseState, ccrClient, threadContext);
}
@Override
Collection<MonitoringDoc> innerDoCollect(
long timestamp,
String clusterUuid,
long interval,
MonitoringDoc.Node node) throws Exception {
final AutoFollowStatsAction.Request request = new AutoFollowStatsAction.Request();
final AutoFollowStatsAction.Response response = ccrClient.autoFollowStats(request).actionGet(getCollectionTimeout());
final AutoFollowStatsMonitoringDoc doc =
new AutoFollowStatsMonitoringDoc(clusterUuid, timestamp, interval, node, response.getStats());
return Collections.singletonList(doc);
}
}

View File

@ -1,65 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.ccr;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import java.util.Collection;
import java.util.stream.Collectors;
public final class FollowStatsCollector extends AbstractCcrCollector {
public static final Setting<TimeValue> CCR_STATS_TIMEOUT = collectionTimeoutSetting("ccr.stats.timeout");
public FollowStatsCollector(
final Settings settings,
final ClusterService clusterService,
final XPackLicenseState licenseState,
final Client client) {
super(settings, clusterService, CCR_STATS_TIMEOUT, licenseState, new XPackClient(client).ccr(),
client.threadPool().getThreadContext());
}
FollowStatsCollector(
final Settings settings,
final ClusterService clusterService,
final XPackLicenseState licenseState,
final CcrClient ccrClient,
final ThreadContext threadContext) {
super(settings, clusterService, CCR_STATS_TIMEOUT, licenseState, ccrClient, threadContext);
}
@Override
Collection<MonitoringDoc> innerDoCollect(
long timestamp,
String clusterUuid,
long interval,
MonitoringDoc.Node node) throws Exception {
final FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
request.setIndices(getCollectionIndices());
final FollowStatsAction.StatsResponses responses = ccrClient.stats(request).actionGet(getCollectionTimeout());
return responses
.getStatsResponses()
.stream()
.map(stats -> new FollowStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status()))
.collect(Collectors.toList());
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.monitoring.collector.ccr;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
@ -13,30 +14,46 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.action.StatsAction;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
import static org.elasticsearch.xpack.monitoring.collector.ccr.FollowStatsMonitoringDoc.TYPE;
public abstract class AbstractCcrCollector extends Collector {
public final class StatsCollector extends Collector {
public static final Setting<TimeValue> CCR_STATS_TIMEOUT = collectionTimeoutSetting("ccr.stats.timeout");
private final ThreadContext threadContext;
final CcrClient ccrClient;
private final CcrClient ccrClient;
AbstractCcrCollector(
public StatsCollector(
final Settings settings,
final ClusterService clusterService,
final Setting<TimeValue> timeoutSetting,
final XPackLicenseState licenseState,
final CcrClient ccrClient,
final ThreadContext threadContext) {
super(settings, TYPE, clusterService, timeoutSetting, licenseState);
final Client client) {
this(settings, clusterService, licenseState, new XPackClient(client).ccr(), client.threadPool().getThreadContext());
}
StatsCollector(
final Settings settings,
final ClusterService clusterService,
final XPackLicenseState licenseState,
final CcrClient ccrClient,
final ThreadContext threadContext) {
super(settings, TYPE, clusterService, CCR_STATS_TIMEOUT, licenseState);
this.ccrClient = ccrClient;
this.threadContext = threadContext;
}
@ -59,13 +76,23 @@ public abstract class AbstractCcrCollector extends Collector {
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, MONITORING_ORIGIN)) {
final long timestamp = timestamp();
final String clusterUuid = clusterUuid(clusterState);
return innerDoCollect(timestamp, clusterUuid, interval, node);
final StatsAction.Request request = new StatsAction.Request();
final StatsAction.Response response = ccrClient.stats(request).actionGet(getCollectionTimeout());
final AutoFollowStatsMonitoringDoc autoFollowStatsDoc =
new AutoFollowStatsMonitoringDoc(clusterUuid, timestamp, interval, node, response.getAutoFollowStats());
Set<String> collectionIndices = new HashSet<>(Arrays.asList(getCollectionIndices()));
List<MonitoringDoc> docs = response
.getFollowStats()
.getStatsResponses()
.stream()
.filter(statsResponse -> collectionIndices.isEmpty() || collectionIndices.contains(statsResponse.status().followerIndex()))
.map(stats -> new FollowStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status()))
.collect(Collectors.toList());
docs.add(autoFollowStatsDoc);
return docs;
}
}
abstract Collection<MonitoringDoc> innerDoCollect(
long timestamp,
String clusterUuid,
long interval,
MonitoringDoc.Node node) throws Exception;
}

View File

@ -1,12 +0,0 @@
{
"ccr.auto_follow_stats": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
"methods": [ "GET" ],
"url": {
"path": "/_ccr/auto_follow/stats",
"paths": [ "/_ccr/auto_follow/stats" ],
"parts": {},
"body": null
}
}
}

View File

@ -0,0 +1,16 @@
{
"ccr.follow_stats": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
"methods": [ "GET" ],
"url": {
"path": "/{index}/_ccr/stats",
"paths": [ "/{index}/_ccr/stats" ],
"parts": {
"index": {
"type": "list",
"description": "A comma-separated list of index names; use `_all` or empty string to perform the operation on all indices"
}
}
}
}
}

View File

@ -4,13 +4,9 @@
"methods": [ "GET" ],
"url": {
"path": "/_ccr/stats",
"paths": [ "/_ccr/stats", "/{index}/_ccr/stats" ],
"parts": {
"index": {
"type": "list",
"description": "A comma-separated list of index names; use `_all` or empty string to perform the operation on all indices"
}
}
"paths": [ "/_ccr/stats" ],
"parts": {},
"body": null
}
}
}