Rename CCR stats implementation (#34300)
In the CCR docs we want to refer to the endpoint that returns following stats as the follow stats API. This commit renames the internal implementation of this endpoint to reflect this usage.
This commit is contained in:
parent
daf88335d7
commit
7478167d60
|
@ -53,7 +53,7 @@ import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
|
|||
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction;
|
||||
|
@ -62,7 +62,7 @@ import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
|
|||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
|
||||
|
@ -70,7 +70,7 @@ import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
|
|||
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
|
@ -161,7 +161,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class),
|
||||
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
|
||||
// stats action
|
||||
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
|
||||
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
|
||||
new ActionHandler<>(AutoFollowStatsAction.INSTANCE, TransportAutoFollowStatsAction.class),
|
||||
// follow actions
|
||||
new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
|
||||
|
@ -184,7 +184,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
|
||||
return Arrays.asList(
|
||||
// stats API
|
||||
new RestCcrStatsAction(settings, restController),
|
||||
new RestFollowStatsAction(settings, restController),
|
||||
new RestAutoFollowStatsAction(settings, restController),
|
||||
// follow APIs
|
||||
new RestPutFollowAction(settings, restController),
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.elasticsearch.tasks.Task;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -32,16 +32,16 @@ import java.util.Objects;
|
|||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class TransportCcrStatsAction extends TransportTasksAction<
|
||||
public class TransportFollowStatsAction extends TransportTasksAction<
|
||||
ShardFollowNodeTask,
|
||||
CcrStatsAction.StatsRequest,
|
||||
CcrStatsAction.StatsResponses, CcrStatsAction.StatsResponse> {
|
||||
FollowStatsAction.StatsRequest,
|
||||
FollowStatsAction.StatsResponses, FollowStatsAction.StatsResponse> {
|
||||
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
private final CcrLicenseChecker ccrLicenseChecker;
|
||||
|
||||
@Inject
|
||||
public TransportCcrStatsAction(
|
||||
public TransportFollowStatsAction(
|
||||
final Settings settings,
|
||||
final ClusterService clusterService,
|
||||
final TransportService transportService,
|
||||
|
@ -50,12 +50,12 @@ public class TransportCcrStatsAction extends TransportTasksAction<
|
|||
final CcrLicenseChecker ccrLicenseChecker) {
|
||||
super(
|
||||
settings,
|
||||
CcrStatsAction.NAME,
|
||||
FollowStatsAction.NAME,
|
||||
clusterService,
|
||||
transportService,
|
||||
actionFilters,
|
||||
CcrStatsAction.StatsRequest::new,
|
||||
CcrStatsAction.StatsResponses::new,
|
||||
FollowStatsAction.StatsRequest::new,
|
||||
FollowStatsAction.StatsResponses::new,
|
||||
Ccr.CCR_THREAD_POOL_NAME);
|
||||
this.resolver = Objects.requireNonNull(resolver);
|
||||
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
|
||||
|
@ -64,8 +64,8 @@ public class TransportCcrStatsAction extends TransportTasksAction<
|
|||
@Override
|
||||
protected void doExecute(
|
||||
final Task task,
|
||||
final CcrStatsAction.StatsRequest request,
|
||||
final ActionListener<CcrStatsAction.StatsResponses> listener) {
|
||||
final FollowStatsAction.StatsRequest request,
|
||||
final ActionListener<FollowStatsAction.StatsResponses> listener) {
|
||||
if (ccrLicenseChecker.isCcrAllowed() == false) {
|
||||
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
|
||||
return;
|
||||
|
@ -74,21 +74,21 @@ public class TransportCcrStatsAction extends TransportTasksAction<
|
|||
}
|
||||
|
||||
@Override
|
||||
protected CcrStatsAction.StatsResponses newResponse(
|
||||
final CcrStatsAction.StatsRequest request,
|
||||
final List<CcrStatsAction.StatsResponse> statsRespons,
|
||||
protected FollowStatsAction.StatsResponses newResponse(
|
||||
final FollowStatsAction.StatsRequest request,
|
||||
final List<FollowStatsAction.StatsResponse> statsRespons,
|
||||
final List<TaskOperationFailure> taskOperationFailures,
|
||||
final List<FailedNodeException> failedNodeExceptions) {
|
||||
return new CcrStatsAction.StatsResponses(taskOperationFailures, failedNodeExceptions, statsRespons);
|
||||
return new FollowStatsAction.StatsResponses(taskOperationFailures, failedNodeExceptions, statsRespons);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CcrStatsAction.StatsResponse readTaskResponse(final StreamInput in) throws IOException {
|
||||
return new CcrStatsAction.StatsResponse(in);
|
||||
protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in) throws IOException {
|
||||
return new FollowStatsAction.StatsResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processTasks(final CcrStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
|
||||
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
|
||||
final ClusterState state = clusterService.state();
|
||||
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
|
||||
for (final Task task : taskManager.getTasks().values()) {
|
||||
|
@ -103,10 +103,10 @@ public class TransportCcrStatsAction extends TransportTasksAction<
|
|||
|
||||
@Override
|
||||
protected void taskOperation(
|
||||
final CcrStatsAction.StatsRequest request,
|
||||
final FollowStatsAction.StatsRequest request,
|
||||
final ShardFollowNodeTask task,
|
||||
final ActionListener<CcrStatsAction.StatsResponse> listener) {
|
||||
listener.onResponse(new CcrStatsAction.StatsResponse(task.getStatus()));
|
||||
final ActionListener<FollowStatsAction.StatsResponse> listener) {
|
||||
listener.onResponse(new FollowStatsAction.StatsResponse(task.getStatus()));
|
||||
}
|
||||
|
||||
}
|
|
@ -13,13 +13,13 @@ import org.elasticsearch.rest.BaseRestHandler;
|
|||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RestCcrStatsAction extends BaseRestHandler {
|
||||
public class RestFollowStatsAction extends BaseRestHandler {
|
||||
|
||||
public RestCcrStatsAction(final Settings settings, final RestController controller) {
|
||||
public RestFollowStatsAction(final Settings settings, final RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.GET, "/_ccr/stats", this);
|
||||
controller.registerHandler(RestRequest.Method.GET, "/{index}/_ccr/stats", this);
|
||||
|
@ -32,9 +32,9 @@ public class RestCcrStatsAction extends BaseRestHandler {
|
|||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
|
||||
final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
|
||||
final FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
|
||||
request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
|
||||
return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
return channel -> client.execute(FollowStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
}
|
|
@ -22,7 +22,7 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
|
@ -88,21 +88,24 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
|
|||
latch.await();
|
||||
}
|
||||
|
||||
public void testThatCcrStatsAreUnavailableWithNonCompliantLicense() throws InterruptedException {
|
||||
public void testThatFollowStatsAreUnavailableWithNonCompliantLicense() throws InterruptedException {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.StatsRequest(), new ActionListener<CcrStatsAction.StatsResponses>() {
|
||||
@Override
|
||||
public void onResponse(final CcrStatsAction.StatsResponses statsResponses) {
|
||||
latch.countDown();
|
||||
fail();
|
||||
}
|
||||
client().execute(
|
||||
FollowStatsAction.INSTANCE,
|
||||
new FollowStatsAction.StatsRequest(),
|
||||
new ActionListener<FollowStatsAction.StatsResponses>() {
|
||||
@Override
|
||||
public void onResponse(final FollowStatsAction.StatsResponses statsResponses) {
|
||||
latch.countDown();
|
||||
fail();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
assertNonCompliantLicense(e);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
assertNonCompliantLicense(e);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
latch.await();
|
||||
}
|
||||
|
|
|
@ -59,9 +59,9 @@ import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
|||
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsRequest;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsResponses;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction.StatsRequest;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction.StatsResponses;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
|
@ -570,7 +570,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
|
||||
client().admin().indices().close(new CloseIndexRequest("index1")).actionGet();
|
||||
assertBusy(() -> {
|
||||
StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet();
|
||||
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
|
||||
assertThat(response.getNodeFailures(), empty());
|
||||
assertThat(response.getTaskFailures(), empty());
|
||||
assertThat(response.getStatsResponses(), hasSize(1));
|
||||
|
@ -605,7 +605,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
|||
client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
|
||||
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
|
||||
assertBusy(() -> {
|
||||
StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet();
|
||||
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
|
||||
assertThat(response.getNodeFailures(), empty());
|
||||
assertThat(response.getTaskFailures(), empty());
|
||||
assertThat(response.getStatsResponses(), hasSize(1));
|
||||
|
|
|
@ -6,18 +6,18 @@
|
|||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
|
||||
public class StatsRequestTests extends AbstractStreamableTestCase<CcrStatsAction.StatsRequest> {
|
||||
public class StatsRequestTests extends AbstractStreamableTestCase<FollowStatsAction.StatsRequest> {
|
||||
|
||||
@Override
|
||||
protected CcrStatsAction.StatsRequest createBlankInstance() {
|
||||
return new CcrStatsAction.StatsRequest();
|
||||
protected FollowStatsAction.StatsRequest createBlankInstance() {
|
||||
return new FollowStatsAction.StatsRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CcrStatsAction.StatsRequest createTestInstance() {
|
||||
CcrStatsAction.StatsRequest statsRequest = new CcrStatsAction.StatsRequest();
|
||||
protected FollowStatsAction.StatsRequest createTestInstance() {
|
||||
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
|
||||
if (randomBoolean()) {
|
||||
statsRequest.setIndices(generateRandomStringArray(8, 4, false));
|
||||
}
|
||||
|
|
|
@ -7,23 +7,23 @@ package org.elasticsearch.xpack.ccr.action;
|
|||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class StatsResponsesTests extends AbstractStreamableTestCase<CcrStatsAction.StatsResponses> {
|
||||
public class StatsResponsesTests extends AbstractStreamableTestCase<FollowStatsAction.StatsResponses> {
|
||||
|
||||
@Override
|
||||
protected CcrStatsAction.StatsResponses createBlankInstance() {
|
||||
return new CcrStatsAction.StatsResponses();
|
||||
protected FollowStatsAction.StatsResponses createBlankInstance() {
|
||||
return new FollowStatsAction.StatsResponses();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CcrStatsAction.StatsResponses createTestInstance() {
|
||||
protected FollowStatsAction.StatsResponses createTestInstance() {
|
||||
int numResponses = randomIntBetween(0, 8);
|
||||
List<CcrStatsAction.StatsResponse> responses = new ArrayList<>(numResponses);
|
||||
List<FollowStatsAction.StatsResponse> responses = new ArrayList<>(numResponses);
|
||||
for (int i = 0; i < numResponses; i++) {
|
||||
ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
|
||||
randomAlphaOfLength(4),
|
||||
|
@ -49,8 +49,8 @@ public class StatsResponsesTests extends AbstractStreamableTestCase<CcrStatsActi
|
|||
randomNonNegativeLong(),
|
||||
Collections.emptyNavigableMap(),
|
||||
randomLong());
|
||||
responses.add(new CcrStatsAction.StatsResponse(status));
|
||||
responses.add(new FollowStatsAction.StatsResponse(status));
|
||||
}
|
||||
return new CcrStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);
|
||||
return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
|
||||
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
|
||||
|
@ -39,11 +39,11 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class CcrStatsCollectorTests extends AbstractCcrCollectorTestCase {
|
||||
public class FollowStatsCollectorTests extends AbstractCcrCollectorTestCase {
|
||||
|
||||
@Override
|
||||
AbstractCcrCollector createCollector(Settings settings, ClusterService clusterService, XPackLicenseState licenseState, Client client) {
|
||||
return new CcrStatsCollector(settings, clusterService, licenseState, client);
|
||||
return new FollowStatsCollector(settings, clusterService, licenseState, client);
|
||||
}
|
||||
|
||||
public void testDoCollect() throws Exception {
|
||||
|
@ -55,18 +55,20 @@ public class CcrStatsCollectorTests extends AbstractCcrCollectorTestCase {
|
|||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
|
||||
final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
|
||||
withCollectionTimeout(CcrStatsCollector.CCR_STATS_TIMEOUT, timeout);
|
||||
withCollectionTimeout(FollowStatsCollector.CCR_STATS_TIMEOUT, timeout);
|
||||
|
||||
final CcrStatsCollector collector = new CcrStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext);
|
||||
final FollowStatsCollector collector =
|
||||
new FollowStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext);
|
||||
assertEquals(timeout, collector.getCollectionTimeout());
|
||||
|
||||
final List<CcrStatsAction.StatsResponse> statuses = mockStatuses();
|
||||
final List<FollowStatsAction.StatsResponse> statuses = mockStatuses();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final ActionFuture<CcrStatsAction.StatsResponses> future = (ActionFuture<CcrStatsAction.StatsResponses>)mock(ActionFuture.class);
|
||||
final CcrStatsAction.StatsResponses responses = new CcrStatsAction.StatsResponses(emptyList(), emptyList(), statuses);
|
||||
final ActionFuture<FollowStatsAction.StatsResponses> future =
|
||||
(ActionFuture<FollowStatsAction.StatsResponses>)mock(ActionFuture.class);
|
||||
final FollowStatsAction.StatsResponses responses = new FollowStatsAction.StatsResponses(emptyList(), emptyList(), statuses);
|
||||
|
||||
final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
|
||||
final FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
|
||||
request.setIndices(Strings.EMPTY_ARRAY);
|
||||
when(client.stats(statsRequestEq(request))).thenReturn(future);
|
||||
when(future.actionGet(timeout)).thenReturn(responses);
|
||||
|
@ -81,26 +83,26 @@ public class CcrStatsCollectorTests extends AbstractCcrCollectorTestCase {
|
|||
|
||||
int index = 0;
|
||||
for (final Iterator<MonitoringDoc> it = documents.iterator(); it.hasNext(); index++) {
|
||||
final CcrStatsMonitoringDoc document = (CcrStatsMonitoringDoc)it.next();
|
||||
final CcrStatsAction.StatsResponse status = statuses.get(index);
|
||||
final FollowStatsMonitoringDoc document = (FollowStatsMonitoringDoc)it.next();
|
||||
final FollowStatsAction.StatsResponse status = statuses.get(index);
|
||||
|
||||
assertThat(document.getCluster(), is(clusterUuid));
|
||||
assertThat(document.getTimestamp(), greaterThan(0L));
|
||||
assertThat(document.getIntervalMillis(), equalTo(interval));
|
||||
assertThat(document.getNode(), equalTo(node));
|
||||
assertThat(document.getSystem(), is(MonitoredSystem.ES));
|
||||
assertThat(document.getType(), is(CcrStatsMonitoringDoc.TYPE));
|
||||
assertThat(document.getType(), is(FollowStatsMonitoringDoc.TYPE));
|
||||
assertThat(document.getId(), nullValue());
|
||||
assertThat(document.status(), is(status.status()));
|
||||
}
|
||||
}
|
||||
|
||||
private List<CcrStatsAction.StatsResponse> mockStatuses() {
|
||||
private List<FollowStatsAction.StatsResponse> mockStatuses() {
|
||||
final int count = randomIntBetween(1, 8);
|
||||
final List<CcrStatsAction.StatsResponse> statuses = new ArrayList<>(count);
|
||||
final List<FollowStatsAction.StatsResponse> statuses = new ArrayList<>(count);
|
||||
|
||||
for (int i = 0; i < count; ++i) {
|
||||
CcrStatsAction.StatsResponse statsResponse = mock(CcrStatsAction.StatsResponse.class);
|
||||
FollowStatsAction.StatsResponse statsResponse = mock(FollowStatsAction.StatsResponse.class);
|
||||
ShardFollowNodeTaskStatus status = mock(ShardFollowNodeTaskStatus.class);
|
||||
when(statsResponse.status()).thenReturn(status);
|
||||
statuses.add(statsResponse);
|
||||
|
@ -109,21 +111,21 @@ public class CcrStatsCollectorTests extends AbstractCcrCollectorTestCase {
|
|||
return statuses;
|
||||
}
|
||||
|
||||
private static CcrStatsAction.StatsRequest statsRequestEq(CcrStatsAction.StatsRequest expected) {
|
||||
return argThat(new StatsRequestMatches(expected));
|
||||
private static FollowStatsAction.StatsRequest statsRequestEq(FollowStatsAction.StatsRequest expected) {
|
||||
return argThat(new FollowStatsRequest(expected));
|
||||
}
|
||||
|
||||
private static class StatsRequestMatches extends ArgumentMatcher<CcrStatsAction.StatsRequest> {
|
||||
private static class FollowStatsRequest extends ArgumentMatcher<FollowStatsAction.StatsRequest> {
|
||||
|
||||
private final CcrStatsAction.StatsRequest expected;
|
||||
private final FollowStatsAction.StatsRequest expected;
|
||||
|
||||
private StatsRequestMatches(CcrStatsAction.StatsRequest expected) {
|
||||
private FollowStatsRequest(FollowStatsAction.StatsRequest expected) {
|
||||
this.expected = expected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object o) {
|
||||
CcrStatsAction.StatsRequest actual = (CcrStatsAction.StatsRequest) o;
|
||||
FollowStatsAction.StatsRequest actual = (FollowStatsAction.StatsRequest) o;
|
||||
return Arrays.equals(expected.indices(), actual.indices());
|
||||
}
|
||||
}
|
|
@ -39,7 +39,7 @@ import static org.hamcrest.Matchers.notNullValue;
|
|||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase<CcrStatsMonitoringDoc> {
|
||||
public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<FollowStatsMonitoringDoc> {
|
||||
|
||||
private ShardFollowNodeTaskStatus status;
|
||||
|
||||
|
@ -52,12 +52,12 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase<CcrSta
|
|||
|
||||
public void testConstructorStatusMustNotBeNull() {
|
||||
final NullPointerException e =
|
||||
expectThrows(NullPointerException.class, () -> new CcrStatsMonitoringDoc(cluster, timestamp, interval, node, null));
|
||||
expectThrows(NullPointerException.class, () -> new FollowStatsMonitoringDoc(cluster, timestamp, interval, node, null));
|
||||
assertThat(e, hasToString(containsString("status")));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CcrStatsMonitoringDoc createMonitoringDoc(
|
||||
protected FollowStatsMonitoringDoc createMonitoringDoc(
|
||||
final String cluster,
|
||||
final long timestamp,
|
||||
final long interval,
|
||||
|
@ -65,13 +65,13 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase<CcrSta
|
|||
final MonitoredSystem system,
|
||||
final String type,
|
||||
final String id) {
|
||||
return new CcrStatsMonitoringDoc(cluster, timestamp, interval, node, status);
|
||||
return new FollowStatsMonitoringDoc(cluster, timestamp, interval, node, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertMonitoringDoc(CcrStatsMonitoringDoc document) {
|
||||
protected void assertMonitoringDoc(FollowStatsMonitoringDoc document) {
|
||||
assertThat(document.getSystem(), is(MonitoredSystem.ES));
|
||||
assertThat(document.getType(), is(CcrStatsMonitoringDoc.TYPE));
|
||||
assertThat(document.getType(), is(FollowStatsMonitoringDoc.TYPE));
|
||||
assertThat(document.getId(), nullValue());
|
||||
assertThat(document.status(), is(status));
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase<CcrSta
|
|||
numberOfOperationsIndexed,
|
||||
fetchExceptions,
|
||||
timeSinceLastFetchMillis);
|
||||
final CcrStatsMonitoringDoc document = new CcrStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status);
|
||||
final FollowStatsMonitoringDoc document = new FollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status);
|
||||
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
|
||||
assertThat(
|
||||
xContent.utf8ToString(),
|
||||
|
@ -219,12 +219,12 @@ public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase<CcrSta
|
|||
|
||||
Map<String, Object> template =
|
||||
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
|
||||
Map<?, ?> ccrStatsMapping = (Map<?, ?>) XContentMapValues.extractValue("mappings.doc.properties.ccr_stats.properties", template);
|
||||
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues.extractValue("mappings.doc.properties.ccr_stats.properties", template);
|
||||
|
||||
assertThat(serializedStatus.size(), equalTo(ccrStatsMapping.size()));
|
||||
assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
|
||||
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
|
||||
String fieldName = entry.getKey();
|
||||
Map<?, ?> fieldMapping = (Map<?, ?>) ccrStatsMapping.get(fieldName);
|
||||
Map<?, ?> fieldMapping = (Map<?, ?>) followStatsMapping.get(fieldName);
|
||||
assertThat(fieldMapping, notNullValue());
|
||||
|
||||
Object fieldValue = entry.getValue();
|
|
@ -30,13 +30,13 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class CcrStatsAction extends Action<CcrStatsAction.StatsResponses> {
|
||||
public class FollowStatsAction extends Action<FollowStatsAction.StatsResponses> {
|
||||
|
||||
public static final String NAME = "cluster:monitor/ccr/stats";
|
||||
|
||||
public static final CcrStatsAction INSTANCE = new CcrStatsAction();
|
||||
public static final FollowStatsAction INSTANCE = new FollowStatsAction();
|
||||
|
||||
private CcrStatsAction() {
|
||||
private FollowStatsAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
|
@ -140,8 +140,8 @@ public class CcrStatsAction extends Action<CcrStatsAction.StatsResponses> {
|
|||
* This is a limitation of the current tasks API. When the transport action is executed, the tasks API invokes this match method
|
||||
* to find the tasks on which to execute the task-level operation (see TransportTasksAction#nodeOperation and
|
||||
* TransportTasksAction#processTasks). If we do the matching here, then we can not match index patterns. Therefore, we override
|
||||
* TransportTasksAction#processTasks (see TransportCcrStatsAction#processTasks) and do the matching there. We should never see
|
||||
* this method invoked and since we can not support matching a task on the basis of the request here, we throw that this
|
||||
* TransportTasksAction#processTasks (see TransportFollowStatsAction#processTasks) and do the matching there. We should never
|
||||
* see this method invoked and since we can not support matching a task on the basis of the request here, we throw that this
|
||||
* operation is unsupported.
|
||||
*/
|
||||
throw new UnsupportedOperationException();
|
|
@ -12,7 +12,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
|||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
|
@ -54,14 +54,14 @@ public class CcrClient {
|
|||
}
|
||||
|
||||
public void stats(
|
||||
final CcrStatsAction.StatsRequest request,
|
||||
final ActionListener<CcrStatsAction.StatsResponses> listener) {
|
||||
client.execute(CcrStatsAction.INSTANCE, request, listener);
|
||||
final FollowStatsAction.StatsRequest request,
|
||||
final ActionListener<FollowStatsAction.StatsResponses> listener) {
|
||||
client.execute(FollowStatsAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
public ActionFuture<CcrStatsAction.StatsResponses> stats(final CcrStatsAction.StatsRequest request) {
|
||||
final PlainActionFuture<CcrStatsAction.StatsResponses> listener = PlainActionFuture.newFuture();
|
||||
client.execute(CcrStatsAction.INSTANCE, request, listener);
|
||||
public ActionFuture<FollowStatsAction.StatsResponses> stats(final FollowStatsAction.StatsRequest request) {
|
||||
final PlainActionFuture<FollowStatsAction.StatsResponses> listener = PlainActionFuture.newFuture();
|
||||
client.execute(FollowStatsAction.INSTANCE, request, listener);
|
||||
return listener;
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction;
|
|||
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
|
||||
import org.elasticsearch.xpack.monitoring.collector.Collector;
|
||||
import org.elasticsearch.xpack.monitoring.collector.ccr.CcrAutoFollowStatsCollector;
|
||||
import org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsCollector;
|
||||
import org.elasticsearch.xpack.monitoring.collector.ccr.FollowStatsCollector;
|
||||
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
|
||||
import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector;
|
||||
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsCollector;
|
||||
|
@ -144,7 +144,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
|
|||
collectors.add(new NodeStatsCollector(settings, clusterService, getLicenseState(), client));
|
||||
collectors.add(new IndexRecoveryCollector(settings, clusterService, getLicenseState(), client));
|
||||
collectors.add(new JobStatsCollector(settings, clusterService, getLicenseState(), client));
|
||||
collectors.add(new CcrStatsCollector(settings, clusterService, getLicenseState(), client));
|
||||
collectors.add(new FollowStatsCollector(settings, clusterService, getLicenseState(), client));
|
||||
collectors.add(new CcrAutoFollowStatsCollector(settings, clusterService, getLicenseState(), client));
|
||||
|
||||
final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters);
|
||||
|
@ -184,7 +184,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
|
|||
settings.add(IndexRecoveryCollector.INDEX_RECOVERY_ACTIVE_ONLY);
|
||||
settings.add(IndexStatsCollector.INDEX_STATS_TIMEOUT);
|
||||
settings.add(JobStatsCollector.JOB_STATS_TIMEOUT);
|
||||
settings.add(CcrStatsCollector.CCR_STATS_TIMEOUT);
|
||||
settings.add(FollowStatsCollector.CCR_STATS_TIMEOUT);
|
||||
settings.add(CcrAutoFollowStatsCollector.CCR_AUTO_FOLLOW_STATS_TIMEOUT);
|
||||
settings.add(NodeStatsCollector.NODE_STATS_TIMEOUT);
|
||||
settings.addAll(Exporters.getSettings());
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.Collection;
|
|||
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
||||
import static org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsMonitoringDoc.TYPE;
|
||||
import static org.elasticsearch.xpack.monitoring.collector.ccr.FollowStatsMonitoringDoc.TYPE;
|
||||
|
||||
public abstract class AbstractCcrCollector extends Collector {
|
||||
|
||||
|
|
|
@ -14,18 +14,18 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.xpack.core.XPackClient;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class CcrStatsCollector extends AbstractCcrCollector {
|
||||
public final class FollowStatsCollector extends AbstractCcrCollector {
|
||||
|
||||
public static final Setting<TimeValue> CCR_STATS_TIMEOUT = collectionTimeoutSetting("ccr.stats.timeout");
|
||||
|
||||
public CcrStatsCollector(
|
||||
public FollowStatsCollector(
|
||||
final Settings settings,
|
||||
final ClusterService clusterService,
|
||||
final XPackLicenseState licenseState,
|
||||
|
@ -34,7 +34,7 @@ public final class CcrStatsCollector extends AbstractCcrCollector {
|
|||
client.threadPool().getThreadContext());
|
||||
}
|
||||
|
||||
CcrStatsCollector(
|
||||
FollowStatsCollector(
|
||||
final Settings settings,
|
||||
final ClusterService clusterService,
|
||||
final XPackLicenseState licenseState,
|
||||
|
@ -51,14 +51,14 @@ public final class CcrStatsCollector extends AbstractCcrCollector {
|
|||
MonitoringDoc.Node node) throws Exception {
|
||||
|
||||
|
||||
final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
|
||||
final FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
|
||||
request.setIndices(getCollectionIndices());
|
||||
final CcrStatsAction.StatsResponses responses = ccrClient.stats(request).actionGet(getCollectionTimeout());
|
||||
final FollowStatsAction.StatsResponses responses = ccrClient.stats(request).actionGet(getCollectionTimeout());
|
||||
|
||||
return responses
|
||||
.getStatsResponses()
|
||||
.stream()
|
||||
.map(stats -> new CcrStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status()))
|
||||
.map(stats -> new FollowStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
|
@ -14,7 +14,7 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
|
|||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class CcrStatsMonitoringDoc extends MonitoringDoc {
|
||||
public class FollowStatsMonitoringDoc extends MonitoringDoc {
|
||||
|
||||
public static final String TYPE = "ccr_stats";
|
||||
|
||||
|
@ -24,7 +24,7 @@ public class CcrStatsMonitoringDoc extends MonitoringDoc {
|
|||
return status;
|
||||
}
|
||||
|
||||
public CcrStatsMonitoringDoc(
|
||||
public FollowStatsMonitoringDoc(
|
||||
final String cluster,
|
||||
final long timestamp,
|
||||
final long intervalMillis,
|
Loading…
Reference in New Issue