From 742fa818b85bb04b802c1c4d11f97b9c964b4cf1 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Sun, 13 Oct 2019 09:22:51 +0200 Subject: [PATCH] Add Pause/Resume Auto Follower APIs (#47510) (#47904) This commit adds two APIs that allow to pause and resume CCR auto-follower patterns: // pause auto-follower POST /_ccr/auto_follow/my_pattern/pause // resume auto-follower POST /_ccr/auto_follow/my_pattern/resume The ability to pause and resume auto-follow patterns can be useful in some situations, including the rolling upgrades of cluster using a bi-directional cross-cluster replication scheme (see #46665). This commit adds a new active flag to the AutoFollowPattern and adapts the AutoCoordinator and AutoFollower classes so that it stops to fetch remote's cluster state when all auto-follow patterns associate to the remote cluster are paused. When an auto-follower is paused, remote indices that match the pattern are just ignored: they are not added to the pattern's followed indices uids list that is maintained in the local cluster state. This way, when the auto-follow pattern is resumed the indices created in the remote cluster in the meantime will be picked up again and added as new following indices. Indices created and then deleted in the remote cluster will be ignored as they won't be seen at all by the auto-follower pattern at resume time. Backport of #47510 for 7.x --- .../GetAutoFollowPatternResponseTests.java | 3 +- .../get-auto-follow-pattern.asciidoc | 1 + .../rest-api-spec/test/ccr/auto_follow.yml | 86 ++++ .../java/org/elasticsearch/xpack/ccr/Ccr.java | 7 + .../ccr/action/AutoFollowCoordinator.java | 25 +- ...nsportActivateAutoFollowPatternAction.java | 116 +++++ .../TransportPutAutoFollowPatternAction.java | 1 + .../RestPauseAutoFollowPatternAction.java | 33 ++ .../RestResumeAutoFollowPatternAction.java | 33 ++ .../elasticsearch/xpack/ccr/AutoFollowIT.java | 186 +++++++- .../xpack/ccr/AutoFollowMetadataTests.java | 2 +- .../xpack/ccr/CCRFeatureSetTests.java | 2 +- .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 2 +- ...teAutoFollowPatternActionRequestTests.java | 39 ++ .../action/AutoFollowCoordinatorTests.java | 429 +++++++++++++++--- .../GetAutoFollowPatternResponseTests.java | 2 +- ...tActivateAutoFollowPatternActionTests.java | 96 ++++ ...ortDeleteAutoFollowPatternActionTests.java | 12 +- ...nsportGetAutoFollowPatternActionTests.java | 4 +- ...nsportPutAutoFollowPatternActionTests.java | 2 +- .../xpack/core/ccr/AutoFollowMetadata.java | 39 +- .../ActivateAutoFollowPatternAction.java | 84 ++++ .../api/ccr.pause_auto_follow_pattern.json | 24 + .../api/ccr.resume_auto_follow_pattern.json | 24 + 24 files changed, 1166 insertions(+), 86 deletions(-) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java index 82064063578..c469647f7eb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java @@ -49,6 +49,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractResponseTestCase< String remoteCluster = randomAlphaOfLength(4); List leaderIndexPatters = Collections.singletonList(randomAlphaOfLength(4)); String followIndexNamePattern = randomAlphaOfLength(4); + boolean active = randomBoolean(); Integer maxOutstandingReadRequests = null; if (randomBoolean()) { @@ -91,7 +92,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractResponseTestCase< readPollTimeout = new TimeValue(randomNonNegativeLong()); } patterns.put(randomAlphaOfLength(4), new AutoFollowMetadata.AutoFollowPattern(remoteCluster, leaderIndexPatters, - followIndexNamePattern, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests, + followIndexNamePattern, active, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests, maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, readPollTimeout)); } diff --git a/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc b/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc index ac8f9e49941..5ea23782e19 100644 --- a/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc +++ b/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc @@ -90,6 +90,7 @@ The API returns the following result: { "name": "my_auto_follow_pattern", "pattern": { + "active": true, "remote_cluster" : "remote_cluster", "leader_index_patterns" : [ diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml index ebf9176c30a..db54e2c38c7 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml @@ -52,3 +52,89 @@ catch: missing ccr.get_auto_follow_pattern: name: my_pattern + +--- +"Test pause and resume auto follow pattern": + - skip: + version: " - 7.4.99" + reason: "pause/resume auto-follow patterns is supported since 7.5.0" + + - do: + cluster.state: {} + + - set: {master_node: master} + + - do: + nodes.info: {} + + - set: {nodes.$master.transport_address: local_ip} + + - do: + cluster.put_settings: + body: + transient: + cluster.remote.local.seeds: $local_ip + flat_settings: true + + - match: {transient: {cluster.remote.local.seeds: $local_ip}} + + - do: + ccr.put_auto_follow_pattern: + name: pattern_test + body: + remote_cluster: local + leader_index_patterns: ['logs-*'] + max_outstanding_read_requests: 2 + - is_true: acknowledged + + - do: + ccr.get_auto_follow_pattern: + name: pattern_test + - match: { patterns.0.name: 'pattern_test' } + - match: { patterns.0.pattern.remote_cluster: 'local' } + - match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] } + - match: { patterns.0.pattern.max_outstanding_read_requests: 2 } + - match: { patterns.0.pattern.active: true } + + - do: + catch: missing + ccr.pause_auto_follow_pattern: + name: unknown_pattern + + - do: + ccr.pause_auto_follow_pattern: + name: pattern_test + - is_true: acknowledged + + - do: + ccr.get_auto_follow_pattern: + name: pattern_test + - match: { patterns.0.name: 'pattern_test' } + - match: { patterns.0.pattern.remote_cluster: 'local' } + - match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] } + - match: { patterns.0.pattern.max_outstanding_read_requests: 2 } + - match: { patterns.0.pattern.active: false } + + - do: + catch: missing + ccr.resume_auto_follow_pattern: + name: unknown_pattern + + - do: + ccr.resume_auto_follow_pattern: + name: pattern_test + - is_true: acknowledged + + - do: + ccr.get_auto_follow_pattern: + name: pattern_test + - match: { patterns.0.name: 'pattern_test' } + - match: { patterns.0.pattern.remote_cluster: 'local' } + - match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] } + - match: { patterns.0.pattern.max_outstanding_read_requests: 2 } + - match: { patterns.0.pattern.active: true } + + - do: + ccr.delete_auto_follow_pattern: + name: pattern_test + - is_true: acknowledged diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index cff4338386e..de6f03bddc5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ccr.action.TransportFollowInfoAction; import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction; import org.elasticsearch.xpack.ccr.action.TransportForgetFollowerAction; import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportActivateAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction; @@ -82,9 +83,11 @@ import org.elasticsearch.xpack.ccr.rest.RestFollowInfoAction; import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; import org.elasticsearch.xpack.ccr.rest.RestForgetFollowerAction; import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.rest.RestPauseAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction; +import org.elasticsearch.xpack.ccr.rest.RestResumeAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; import org.elasticsearch.xpack.core.XPackPlugin; @@ -97,6 +100,7 @@ import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; @@ -236,6 +240,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class), new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class), new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class), + new ActionHandler<>(ActivateAutoFollowPatternAction.INSTANCE, TransportActivateAutoFollowPatternAction.class), // forget follower action new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class)); } @@ -262,6 +267,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E new RestDeleteAutoFollowPatternAction(restController), new RestPutAutoFollowPatternAction(restController), new RestGetAutoFollowPatternAction(restController), + new RestPauseAutoFollowPatternAction(restController), + new RestResumeAutoFollowPatternAction(restController), // forget follower API new RestForgetFollowerAction(restController)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index cca60579ae6..4129b4c018a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -194,7 +194,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements } void updateAutoFollowers(ClusterState followerClusterState) { - AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + final AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); if (autoFollowMetadata == null) { return; } @@ -206,8 +206,9 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements } final CopyOnWriteHashMap autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers); - Set newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream() - .map(entry -> entry.getValue().getRemoteCluster()) + Set newRemoteClusters = autoFollowMetadata.getPatterns().values().stream() + .filter(AutoFollowPattern::isActive) + .map(AutoFollowPattern::getRemoteCluster) .filter(remoteCluster -> autoFollowers.containsKey(remoteCluster) == false) .collect(Collectors.toSet()); @@ -283,6 +284,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements String remoteCluster = entry.getKey(); AutoFollower autoFollower = entry.getValue(); boolean exist = autoFollowMetadata.getPatterns().values().stream() + .filter(AutoFollowPattern::isActive) .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); if (exist == false) { LOGGER.info("removing auto-follower for remote cluster [{}]", remoteCluster); @@ -345,6 +347,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements private volatile CountDown autoFollowPatternsCountDown; private volatile AtomicArray autoFollowResults; private volatile boolean stop; + private volatile List lastActivePatterns = Collections.emptyList(); AutoFollower(final String remoteCluster, final Consumer> statsUpdater, @@ -384,7 +387,9 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements final List patterns = autoFollowMetadata.getPatterns().entrySet().stream() .filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster)) + .filter(entry -> entry.getValue().isActive()) .map(Map.Entry::getKey) + .sorted() .collect(Collectors.toList()); if (patterns.isEmpty()) { LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster); @@ -394,8 +399,15 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements this.autoFollowPatternsCountDown = new CountDown(patterns.size()); this.autoFollowResults = new AtomicArray<>(patterns.size()); + // keep the list of the last known active patterns for this auto-follower + // if the list changed, we explicitly retrieve the last cluster state in + // order to avoid timeouts when waiting for the next remote cluster state + // version that might never arrive + final long nextMetadataVersion = Objects.equals(patterns, lastActivePatterns) ? metadataVersion + 1 : metadataVersion; + this.lastActivePatterns = Collections.unmodifiableList(patterns); + final Thread thread = Thread.currentThread(); - getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> { + getRemoteClusterState(remoteCluster, Math.max(1L, nextMetadataVersion), (remoteClusterStateResponse, remoteError) -> { // Also check removed flag here, as it may take a while for this remote cluster state api call to return: if (removed) { LOGGER.info("AutoFollower instance for cluster [{}] has been removed", remoteCluster); @@ -445,8 +457,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); - final List leaderIndicesToFollow = - getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices); + final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { finalise(slot, new AutoFollowResult(autoFollowPatternName), thread); } else { @@ -599,7 +610,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) { continue; } - if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { + if (autoFollowPattern.isActive() && autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex()); if (indexRoutingTable != null && // Leader indices can be in the cluster state, but not all primary shards may be ready yet. diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java new file mode 100644 index 00000000000..30e2d7854e3 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class TransportActivateAutoFollowPatternAction extends TransportMasterNodeAction { + + @Inject + public TransportActivateAutoFollowPatternAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver resolver) { + super(ActivateAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new, resolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(final StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(final Request request, final ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void masterOperation(ActivateAutoFollowPatternAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("activate-auto-follow-pattern-" + request.getName(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(final boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + return innerActivate(request, currentState); + } + }); + } + + static ClusterState innerActivate(final Request request, ClusterState currentState) { + final AutoFollowMetadata autoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); + if (autoFollowMetadata == null) { + throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName()); + } + + final Map patterns = autoFollowMetadata.getPatterns(); + final AutoFollowMetadata.AutoFollowPattern previousAutoFollowPattern = patterns.get(request.getName()); + if (previousAutoFollowPattern == null) { + throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName()); + } + + if (previousAutoFollowPattern.isActive() == request.isActive()) { + return currentState; + } + + final Map newPatterns = new HashMap<>(patterns); + newPatterns.put(request.getName(), + new AutoFollowMetadata.AutoFollowPattern( + previousAutoFollowPattern.getRemoteCluster(), + previousAutoFollowPattern.getLeaderIndexPatterns(), + previousAutoFollowPattern.getFollowIndexPattern(), + request.isActive(), + previousAutoFollowPattern.getMaxReadRequestOperationCount(), + previousAutoFollowPattern.getMaxWriteRequestOperationCount(), + previousAutoFollowPattern.getMaxOutstandingReadRequests(), + previousAutoFollowPattern.getMaxOutstandingWriteRequests(), + previousAutoFollowPattern.getMaxReadRequestSize(), + previousAutoFollowPattern.getMaxWriteRequestSize(), + previousAutoFollowPattern.getMaxWriteBufferCount(), + previousAutoFollowPattern.getMaxWriteBufferSize(), + previousAutoFollowPattern.getMaxRetryDelay(), + previousAutoFollowPattern.getReadPollTimeout())); + + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(newPatterns, autoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getHeaders())) + .build()) + .build(); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index deb39a97d1c..7784f3fe7b0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -160,6 +160,7 @@ public class TransportPutAutoFollowPatternAction extends request.getRemoteCluster(), request.getLeaderIndexPatterns(), request.getFollowIndexNamePattern(), + true, request.getParameters().getMaxReadRequestOperationCount(), request.getParameters().getMaxWriteRequestOperationCount(), request.getParameters().getMaxOutstandingReadRequests(), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java new file mode 100644 index 00000000000..abfca00da5c --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import static org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.INSTANCE; + +public class RestPauseAutoFollowPatternAction extends BaseRestHandler { + + public RestPauseAutoFollowPatternAction(final RestController controller) { + controller.registerHandler(RestRequest.Method.POST, "/_ccr/auto_follow/{name}/pause", this); + } + + @Override + public String getName() { + return "ccr_pause_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + Request request = new Request(restRequest.param("name"), false); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java new file mode 100644 index 00000000000..89f3f65fca7 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import static org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.INSTANCE; + +public class RestResumeAutoFollowPatternAction extends BaseRestHandler { + + public RestResumeAutoFollowPatternAction(final RestController controller) { + controller.registerHandler(RestRequest.Method.POST, "/_ccr/auto_follow/{name}/resume", this); + } + + @Override + public String getName() { + return "ccr_resume_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + Request request = new Request(restRequest.param("name"), true); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 0bcb3daac62..f030b99d0a0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; @@ -16,25 +17,33 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; -import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; +import org.elasticsearch.xpack.core.ccr.action.FollowParameters; +import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Locale; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -392,6 +401,164 @@ public class AutoFollowIT extends CcrIntegTestCase { }); } + public void testPauseAndResumeAutoFollowPattern() throws Exception { + final Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + // index created in the remote cluster before the auto follow pattern exists won't be auto followed + createLeaderIndex("test-existing-index-is-ignored", leaderIndexSettings); + + // create the auto follow pattern + putAutoFollowPatterns("test-pattern", new String[]{"test-*", "tests-*"}); + assertBusy(() -> { + final AutoFollowStats autoFollowStats = getAutoFollowStats(); + assertThat(autoFollowStats.getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L)); + }); + + // index created in the remote cluster are auto followed + createLeaderIndex("test-new-index-is-auto-followed", leaderIndexSettings); + assertBusy(() -> { + final AutoFollowStats autoFollowStats = getAutoFollowStats(); + assertThat(autoFollowStats.getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L)); + IndicesExistsRequest request = new IndicesExistsRequest("copy-test-new-index-is-auto-followed"); + assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists()); + }); + ensureFollowerGreen("copy-test-new-index-is-auto-followed"); + + // pause the auto follow pattern + pauseAutoFollowPattern("test-pattern"); + assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(0))); + + // indices created in the remote cluster are not auto followed because the pattern is paused + final int nbIndicesCreatedWhilePaused = randomIntBetween(1, 5); + for (int i = 0; i < nbIndicesCreatedWhilePaused; i++) { + createLeaderIndex("test-index-created-while-pattern-is-paused-" + i, leaderIndexSettings); + } + + // sometimes create another index in the remote cluster and close (or delete) it right away + // it should not be auto followed when the pattern is resumed + if (randomBoolean()) { + final String indexName = "test-index-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + createLeaderIndex(indexName, leaderIndexSettings); + if (randomBoolean()) { + assertAcked(leaderClient().admin().indices().prepareClose(indexName)); + } else { + assertAcked(leaderClient().admin().indices().prepareDelete(indexName)); + } + } + + if (randomBoolean()) { + createLeaderIndex("logs-20200101", leaderIndexSettings); + } + + // pattern is paused, none of the newly created indices has been followed yet + assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(1)); + ensureLeaderGreen("test-index-created-while-pattern-is-paused-*"); + + // resume the auto follow pattern, indices created while the pattern was paused are picked up for auto-following + resumeAutoFollowPattern("test-pattern"); + assertBusy(() -> { + final Client client = followerClient(); + assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(1 + nbIndicesCreatedWhilePaused)); + for (int i = 0; i < nbIndicesCreatedWhilePaused; i++) { + IndicesExistsRequest request = new IndicesExistsRequest("copy-test-index-created-while-pattern-is-paused-" + i); + assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists()); + } + }); + } + + public void testPauseAndResumeWithMultipleAutoFollowPatterns() throws Exception { + final Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + final String[] prefixes = {"logs-", "users-", "docs-", "monitoring-", "data-", "system-", "events-", "files-"}; + if (randomBoolean()) { + // sometimes create indices in the remote cluster that match the future auto follow patterns + Arrays.stream(prefixes).forEach(prefix -> createLeaderIndex(prefix + "ignored", leaderIndexSettings)); + } + + // create auto follow patterns + final List autoFollowPatterns = new ArrayList<>(prefixes.length); + for (String prefix : prefixes) { + String name = prefix + "pattern"; + putAutoFollowPatterns(name, new String[]{prefix + "*"}); + autoFollowPatterns.add(name); + assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1))); + assertTrue(getAutoFollowPattern(name).isActive()); + } + + // no following indices are created yet + assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(0)); + + // create random indices in the remote cluster that match the patterns + final AtomicBoolean running = new AtomicBoolean(true); + final Set leaderIndices = ConcurrentCollections.newConcurrentSet(); + final Thread createNewLeaderIndicesThread = new Thread(() -> { + while (running.get()) { + try { + String indexName = randomFrom(prefixes) + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createLeaderIndex(indexName, leaderIndexSettings); + leaderIndices.add(indexName); + Thread.sleep(randomIntBetween(100, 500)); + } catch (Exception e) { + throw new AssertionError(e); + } + } + }); + createNewLeaderIndicesThread.start(); + + // wait for some leader indices to be auto-followed + assertBusy(() -> + assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo((long) prefixes.length))); + + final int nbLeaderIndices = leaderIndices.size(); + + // pause some random patterns + final List pausedAutoFollowerPatterns = randomSubsetOf(autoFollowPatterns); + pausedAutoFollowerPatterns.forEach(this::pauseAutoFollowPattern); + assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertFalse(getAutoFollowPattern(pattern).isActive()))); + + assertBusy(() -> { + final int expectedAutoFollowedClusters = pausedAutoFollowerPatterns.size() != autoFollowPatterns.size() ? 1 : 0; + assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(expectedAutoFollowedClusters)); + if (expectedAutoFollowedClusters > 0) { + // wait for more indices to be created in the remote cluster while some patterns are paused + assertThat(leaderIndices.size(), greaterThan(nbLeaderIndices + 3)); + } + }); + ensureFollowerGreen(true, "copy-*"); + + // resume auto follow patterns + pausedAutoFollowerPatterns.forEach(this::resumeAutoFollowPattern); + assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive()))); + + // stop creating indices in the remote cluster + running.set(false); + createNewLeaderIndicesThread.join(); + + ensureLeaderGreen(leaderIndices.toArray(new String[0])); + + // check that all leader indices have been correctly auto followed + assertBusy(() -> { + final Client client = followerClient(); + assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.size())); + leaderIndices.stream() + .map(leaderIndex -> "copy-" + leaderIndex) + .forEach(followerIndex -> + assertTrue("following index must exist: " + followerIndex, + client.admin().indices().exists(new IndicesExistsRequest(followerIndex)).actionGet().isExists())); + }); + } + private void putAutoFollowPatterns(String name, String[] patterns) { PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); request.setName(name); @@ -418,4 +585,21 @@ public class AutoFollowIT extends CcrIntegTestCase { leaderClient().admin().indices().create(request).actionGet(); } + private void pauseAutoFollowPattern(final String name) { + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, false); + assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet()); + } + + private void resumeAutoFollowPattern(final String name) { + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, true); + assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet()); + } + + private AutoFollowMetadata.AutoFollowPattern getAutoFollowPattern(final String name) { + GetAutoFollowPatternAction.Request request = new GetAutoFollowPatternAction.Request(); + request.setName(name); + GetAutoFollowPatternAction.Response response = followerClient().execute(GetAutoFollowPatternAction.INSTANCE, request).actionGet(); + assertTrue(response.getAutoFollowPatterns().containsKey(name)); + return response.getAutoFollowPatterns().get(name); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java index 26182781233..32b4b3ed9d1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java @@ -44,7 +44,7 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase patterns = new HashMap<>(numAutoFollowPatterns); for (int i = 0; i < numAutoFollowPatterns; i++) { AutoFollowMetadata.AutoFollowPattern pattern = new AutoFollowMetadata.AutoFollowPattern("remote_cluser", - Collections.singletonList("logs" + i + "*"), null, null, null, null, null, null, null, null, null, null, null); + Collections.singletonList("logs" + i + "*"), null, true, null, null, null, null, null, null, null, null, null, null); patterns.put("pattern" + i, pattern); } metaData.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index f8e7eab1c86..13aa3208e55 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -165,7 +165,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase { @Override public ClusterState execute(ClusterState currentState) throws Exception { AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( Collections.singletonMap("test_alias", autoFollowPattern), Collections.emptyMap(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java new file mode 100644 index 00000000000..961bf94f658 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class ActivateAutoFollowPatternActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected ActivateAutoFollowPatternAction.Request createTestInstance() { + return new ActivateAutoFollowPatternAction.Request(randomAlphaOfLength(5), randomBoolean()); + } + + @Override + protected Writeable.Reader instanceReader() { + return ActivateAutoFollowPatternAction.Request::new; + } + + public void testValidate() { + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(null, true); + ActionRequestValidationException validationException = request.validate(); + assertThat(validationException, notNullValue()); + assertThat(validationException.getMessage(), containsString("[name] is missing")); + + request = new ActivateAutoFollowPatternAction.Request("name", true); + validationException = request.validate(); + assertThat(validationException, nullValue()); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index e3edf0489d7..ba2afd788f6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -38,6 +38,8 @@ import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import java.util.ArrayList; @@ -52,6 +54,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -59,8 +62,12 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; @@ -68,6 +75,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.startsWith; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -81,7 +89,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -150,7 +158,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -202,7 +210,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -253,13 +261,232 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(invoked[0], is(true)); } + public void testAutoFollowerWithNoActivePatternsDoesNotStart() { + final String remoteCluster = randomAlphaOfLength(5); + + final Map autoFollowPatterns = new HashMap<>(2); + autoFollowPatterns.put("pattern_1", new AutoFollowPattern(remoteCluster, Arrays.asList("logs-*", "test-*"), "copy-", false, + null, null, null, null, null, null, null, null, null, null)); + autoFollowPatterns.put("pattern_2", new AutoFollowPattern(remoteCluster, Arrays.asList("users-*"), "copy-", false, null, null, + null, null, null, null, null, null, null, null)); + + final Map> followedLeaderIndexUUIDs = new HashMap<>(2); + followedLeaderIndexUUIDs.put("pattern_1", Arrays.asList("uuid1", "uuid2")); + followedLeaderIndexUUIDs.put("pattern_2", Collections.emptyList()); + + final Map> headers = new HashMap<>(2); + headers.put("pattern_1", singletonMap("header", "value")); + headers.put("pattern_2", emptyMap()); + + final Supplier followerClusterStateSupplier = localClusterStateSupplier(ClusterState.builder(new ClusterName("test")) + .metaData(MetaData.builder() + .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(autoFollowPatterns, followedLeaderIndexUUIDs, headers)) + .build()) + .build()); + + final AtomicBoolean invoked = new AtomicBoolean(false); + final AutoFollower autoFollower = + new AutoFollower(remoteCluster, v -> invoked.set(true), followerClusterStateSupplier, () -> 1L, Runnable::run) { + @Override + void getRemoteClusterState(String remote, long metadataVersion, BiConsumer handler) { + invoked.set(true); + } + + @Override + void createAndFollow(Map headers, PutFollowAction.Request request, + Runnable successHandler, Consumer failureHandler) { + invoked.set(true); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + invoked.set(true); + } + }; + + autoFollower.start(); + assertThat(invoked.get(), is(false)); + } + + public void testAutoFollowerWithPausedActivePatterns() { + final String remoteCluster = randomAlphaOfLength(5); + + final AtomicReference remoteClusterState = new AtomicReference<>( + createRemoteClusterState("patternLogs-0", true, randomLongBetween(1L, 1_000L)) + ); + + final AtomicReference localClusterState = new AtomicReference<>( + ClusterState.builder(new ClusterName("local")) + .metaData(MetaData.builder() + .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(emptyMap(), emptyMap(), emptyMap()))) + .build() + ); + + // compute and return the local cluster state, updated with some auto-follow patterns + final Supplier localClusterStateSupplier = () -> localClusterState.updateAndGet(currentLocalState -> { + final int nextClusterStateVersion = (int) (currentLocalState.version() + 1); + + final ClusterState nextLocalClusterState; + if (nextClusterStateVersion == 1) { + // cluster state #1 : one pattern is active + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setName("patternLogs"); + request.setRemoteCluster(remoteCluster); + request.setLeaderIndexPatterns(singletonList("patternLogs-*")); + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + nextLocalClusterState = + TransportPutAutoFollowPatternAction.innerPut(request, emptyMap(), currentLocalState, remoteClusterState.get()); + + } else if (nextClusterStateVersion == 2) { + // cluster state #2 : still one pattern is active + nextLocalClusterState = currentLocalState; + + } else if (nextClusterStateVersion == 3) { + // cluster state #3 : add a new pattern, two patterns are active + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setName("patternDocs"); + request.setRemoteCluster(remoteCluster); + request.setLeaderIndexPatterns(singletonList("patternDocs-*")); + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + nextLocalClusterState = + TransportPutAutoFollowPatternAction.innerPut(request, emptyMap(), currentLocalState, remoteClusterState.get()); + + } else if (nextClusterStateVersion == 4) { + // cluster state #4 : still both patterns are active + nextLocalClusterState = currentLocalState; + + } else if (nextClusterStateVersion == 5) { + // cluster state #5 : first pattern is paused, second pattern is still active + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request("patternLogs", false); + nextLocalClusterState = TransportActivateAutoFollowPatternAction.innerActivate(request, currentLocalState); + + } else if (nextClusterStateVersion == 6) { + // cluster state #5 : second pattern is paused, both patterns are inactive + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request("patternDocs", false); + nextLocalClusterState = TransportActivateAutoFollowPatternAction.innerActivate(request, currentLocalState); + + } else { + return currentLocalState; + } + + return ClusterState.builder(nextLocalClusterState) + .version(nextClusterStateVersion) + .build(); + }); + + final Set followedIndices = ConcurrentCollections.newConcurrentSet(); + final List autoFollowResults = new ArrayList<>(); + + final AutoFollower autoFollower = + new AutoFollower(remoteCluster, autoFollowResults::addAll, localClusterStateSupplier, () -> 1L, Runnable::run) { + + int countFetches = 1; // to be aligned with local cluster state updates + ClusterState lastFetchedRemoteClusterState; + + @Override + void getRemoteClusterState(String remote, long metadataVersion, BiConsumer handler) { + assertThat(remote, equalTo(remoteCluster)); + + // in this test, every time it fetches the remote cluster state new leader indices to follow appears + final String[] newLeaderIndices = {"patternLogs-" + countFetches, "patternDocs-" + countFetches}; + + if (countFetches == 1) { + assertThat("first invocation, it should retrieve the metadata version 1", metadataVersion, equalTo(1L)); + lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices); + + } else if (countFetches == 2 || countFetches == 4) { + assertThat("no patterns changes, it should retrieve the last known metadata version + 1", + metadataVersion, equalTo(lastFetchedRemoteClusterState.metaData().version() + 1)); + lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices); + assertThat("remote cluster state metadata version is aligned with what the auto-follower is requesting", + lastFetchedRemoteClusterState.getMetaData().version(), equalTo(metadataVersion)); + + } else if (countFetches == 3 || countFetches == 5) { + assertThat("patterns have changed, it should retrieve the last known metadata version again", + metadataVersion, equalTo(lastFetchedRemoteClusterState.metaData().version())); + lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices); + assertThat("remote cluster state metadata version is incremented", + lastFetchedRemoteClusterState.getMetaData().version(), equalTo(metadataVersion + 1)); + } else { + fail("after the 5th invocation there are no more active patterns, the auto-follower should have stopped"); + } + + countFetches = countFetches + 1; + remoteClusterState.set(lastFetchedRemoteClusterState); + handler.accept(new ClusterStateResponse(lastFetchedRemoteClusterState.getClusterName(), + lastFetchedRemoteClusterState, false), null); + } + + @Override + void createAndFollow(Map headers, PutFollowAction.Request request, + Runnable successHandler, Consumer failureHandler) { + assertThat(request.getRemoteCluster(), equalTo(remoteCluster)); + assertThat(request.getFollowerIndex(), startsWith("copy-")); + followedIndices.add(request.getLeaderIndex()); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + localClusterState.updateAndGet(updateFunction::apply); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + + autoFollower.start(); + + assertThat(autoFollowResults.size(), equalTo(7)); + assertThat(followedIndices, containsInAnyOrder( + "patternLogs-1", // iteration #1 : only pattern "patternLogs" is active in local cluster state + "patternLogs-2", // iteration #2 : only pattern "patternLogs" is active in local cluster state + "patternLogs-3", // iteration #3 : both patterns "patternLogs" and "patternDocs" are active in local cluster state + "patternDocs-3", // + "patternLogs-4", // iteration #4 : both patterns "patternLogs" and "patternDocs" are active in local cluster state + "patternDocs-4", // + "patternDocs-5" // iteration #5 : only pattern "patternDocs" is active in local cluster state, "patternLogs" is paused + )); + + final ClusterState finalRemoteClusterState = remoteClusterState.get(); + final ClusterState finalLocalClusterState = localClusterState.get(); + + AutoFollowMetadata autoFollowMetadata = finalLocalClusterState.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata.getPatterns().size(), equalTo(2)); + assertThat(autoFollowMetadata.getPatterns().values().stream().noneMatch(AutoFollowPattern::isActive), is(true)); + + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("patternLogs"), + containsInAnyOrder( + finalRemoteClusterState.metaData().index("patternLogs-0").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-1").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-2").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-3").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-4").getIndexUUID() + // patternLogs-5 exists in remote cluster state but patternLogs was paused + )); + + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("patternDocs"), + containsInAnyOrder( + // patternDocs-0 does not exist in remote cluster state + finalRemoteClusterState.metaData().index("patternDocs-1").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-2").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-3").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-4").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-5").getIndexUUID() + )); + } + public void testAutoFollowerCreateAndFollowApiCallFailure() { Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -317,13 +544,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase { } public void testGetLeaderIndicesToFollow() { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), - null, null, null, null, null, null, null, null, null, null, null); + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true, + null, null, null, null, null, null, null, null, null, null); Map> headers = new HashMap<>(); - ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) - .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers))) - .build(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); MetaData.Builder imdBuilder = MetaData.builder(); @@ -368,7 +591,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(result.get(3).getName(), equalTo("metrics-3")); assertThat(result.get(4).getName(), equalTo("metrics-4")); - List followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID()); + final List followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID()); result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(4)); @@ -376,16 +599,20 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(result.get(1).getName(), equalTo("metrics-1")); assertThat(result.get(2).getName(), equalTo("metrics-3")); assertThat(result.get(3).getName(), equalTo("metrics-4")); + + final AutoFollowPattern inactiveAutoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, + false, null, null, null, null, null, null, null, null, null, null); + + result = AutoFollower.getLeaderIndicesToFollow(inactiveAutoFollowPattern, remoteState, Collections.emptyList()); + assertThat(result.size(), equalTo(0)); + + result = AutoFollower.getLeaderIndicesToFollow(inactiveAutoFollowPattern, remoteState, followedIndexUUIDs); + assertThat(result.size(), equalTo(0)); } public void testGetLeaderIndicesToFollow_shardsNotStarted() { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), - null, null, null, null, null, null, null, null, null, null, null); - Map> headers = new HashMap<>(); - ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) - .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers))) - .build(); + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), null, true, + null, null, null, null, null, null, null, null, null, null); // 1 shard started and another not started: ClusterState remoteState = createRemoteClusterState("index1", true); @@ -425,7 +652,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testGetLeaderIndicesToFollowWithClosedIndices() { final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); // index is opened ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0); @@ -552,15 +779,15 @@ public class AutoFollowCoordinatorTests extends ESTestCase { } public void testGetFollowerIndexName() { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true, null, null, null, null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0")); - autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", null, null, + autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", true, null, null, null, null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); - autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null, + autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", true, null, null, null, null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); } @@ -643,11 +870,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase { Runnable::run); // Add 3 patterns: Map patterns = new HashMap<>(); - patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null, + patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null, null, null, null, null, null, null, null, null)); - patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, null, null, + patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null, null, null, null, null, null, null, null, null)); - patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, null, null, + patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null, null, null, null, null, null, null, null, null)); ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -673,7 +900,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); assertThat(removedAutoFollower1.removed, is(true)); // Add pattern 4: - patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, null, null, + patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null, null, null, null, null, null, null, null, null)); clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -733,12 +960,100 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); } + public void testUpdateAutoFollowersNoActivePatterns() { + final ClusterService clusterService = mockClusterService(); + final AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, + null, + clusterService, + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L, + () -> 1L, + Runnable::run); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.EMPTY_STATE); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); + + // Add 3 patterns: + Map patterns = new HashMap<>(); + patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(2)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); + + AutoFollowCoordinator.AutoFollower removedAutoFollower1 = autoFollowCoordinator.getAutoFollowers().get("remote1"); + assertThat(removedAutoFollower1.removed, is(false)); + AutoFollowCoordinator.AutoFollower removedAutoFollower2 = autoFollowCoordinator.getAutoFollowers().get("remote2"); + assertThat(removedAutoFollower2.removed, is(false)); + + // Make pattern 1 and pattern 3 inactive + patterns.computeIfPresent("pattern1", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(), + pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(), + pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(), + pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout())); + patterns.computeIfPresent("pattern3", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(), + pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(), + pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(), + pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout())); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); + assertThat(removedAutoFollower1.removed, is(true)); + assertThat(removedAutoFollower2.removed, is(false)); + + // Add active pattern 4 and make pattern 2 inactive + patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + patterns.computeIfPresent("pattern2", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(), + pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(), + pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(), + pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout())); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue()); + + AutoFollowCoordinator.AutoFollower removedAutoFollower4 = autoFollowCoordinator.getAutoFollowers().get("remote1"); + assertThat(removedAutoFollower4.removed, is(false)); + assertNotSame(removedAutoFollower4, removedAutoFollower1); + assertThat(removedAutoFollower2.removed, is(true)); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); + assertThat(removedAutoFollower1.removed, is(true)); + assertThat(removedAutoFollower2.removed, is(true)); + assertThat(removedAutoFollower4.removed, is(true)); + } + public void testWaitForMetadataVersion() { Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -801,7 +1116,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -860,7 +1175,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { createRemoteClusterState("logs-20190101", null); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -926,7 +1241,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -1012,7 +1327,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { "remote", Collections.singletonList("*"), "{}", - 0, + true, 0, 0, 0, 0, @@ -1087,8 +1402,8 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .metaData(MetaData.builder() .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(Collections.singletonMap(pattern, - new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, null, null, null, null, null, null, null, - null, null, null)), + new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, true, + null, null, null, null, null, null, null, null, null, null)), Collections.singletonMap(pattern, Collections.emptyList()), Collections.singletonMap(pattern, Collections.emptyMap())))) .build(); @@ -1160,6 +1475,10 @@ public class AutoFollowCoordinatorTests extends ESTestCase { } private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) { + return createRemoteClusterState(indexName, enableSoftDeletes, 0L); + } + + private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes, long metadataVersion) { Settings.Builder indexSettings; if (enableSoftDeletes != null) { indexSettings = settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), enableSoftDeletes); @@ -1173,7 +1492,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .numberOfReplicas(0) .build(); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) - .metaData(MetaData.builder().put(indexMetaData, true)); + .metaData(MetaData.builder() + .put(indexMetaData, true) + .version(metadataVersion)); ShardRouting shardRouting = TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); @@ -1183,25 +1504,29 @@ public class AutoFollowCoordinatorTests extends ESTestCase { return csBuilder.build(); } - private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) { - IndexMetaData indexMetaData = IndexMetaData.builder(indexName) - .settings(settings(Version.CURRENT) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))) - .numberOfShards(1) - .numberOfReplicas(0) + private static ClusterState createRemoteClusterState(final ClusterState previous, final String... indices) { + if (indices == null) { + return previous; + } + final MetaData.Builder metadataBuilder = MetaData.builder(previous.metaData()).version(previous.metaData().version() + 1); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(previous.routingTable()); + for (String indexName : indices) { + IndexMetaData indexMetaData = IndexMetaData.builder(indexName) + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + metadataBuilder.put(indexMetaData, true); + routingTableBuilder.add(IndexRoutingTable.builder(indexMetaData.getIndex()) + .addShard(TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted()) + .build()); + } + return ClusterState.builder(previous.getClusterName()) + .metaData(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) .build(); - ClusterState.Builder csBuilder = ClusterState.builder(previous.getClusterName()) - .metaData(MetaData.builder(previous.metaData()) - .version(previous.metaData().version() + 1) - .put(indexMetaData, true)); - - ShardRouting shardRouting = - TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); - IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build(); - csBuilder.routingTable(RoutingTable.builder(previous.routingTable()).add(indexRoutingTable).build()).build(); - - return csBuilder.build(); } private static Supplier localClusterStateSupplier(ClusterState... states) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java index 55582815ce5..5fd6381001d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java @@ -33,7 +33,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractWireSerializingTe "remote", Collections.singletonList(randomAlphaOfLength(4)), randomAlphaOfLength(4), - randomIntBetween(0, Integer.MAX_VALUE), + true, randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java new file mode 100644 index 00000000000..18fa6cbd1db --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import java.util.Arrays; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class TransportActivateAutoFollowPatternActionTests extends ESTestCase { + + public void testInnerActivateNoAutoFollowMetadata() { + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportActivateAutoFollowPatternAction.innerActivate(new Request("test", true), ClusterState.EMPTY_STATE)); + assertThat(e.getMessage(), equalTo("auto-follow pattern [test] is missing")); + } + + public void testInnerActivateDoesNotExist() { + ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata( + singletonMap("remote_cluster", randomAutoFollowPattern()), + singletonMap("remote_cluster", randomSubsetOf(randomIntBetween(1, 3), "uuid0", "uuid1", "uuid2")), + singletonMap("remote_cluster", singletonMap("header0", randomFrom("val0", "val2", "val3")))))) + .build(); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportActivateAutoFollowPatternAction.innerActivate(new Request("does_not_exist", true), clusterState)); + assertThat(e.getMessage(), equalTo("auto-follow pattern [does_not_exist] is missing")); + } + + public void testInnerActivateToggle() { + final AutoFollowMetadata.AutoFollowPattern autoFollowPattern = randomAutoFollowPattern(); + final ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata( + singletonMap("remote_cluster", autoFollowPattern), + singletonMap("remote_cluster", randomSubsetOf(randomIntBetween(1, 3), "uuid0", "uuid1", "uuid2")), + singletonMap("remote_cluster", singletonMap("header0", randomFrom("val0", "val2", "val3")))))) + .build(); + { + Request pauseRequest = new Request("remote_cluster", autoFollowPattern.isActive()); + ClusterState updatedState = TransportActivateAutoFollowPatternAction.innerActivate(pauseRequest, clusterState); + assertThat(updatedState, sameInstance(clusterState)); + } + { + Request pauseRequest = new Request("remote_cluster", autoFollowPattern.isActive() == false); + ClusterState updatedState = TransportActivateAutoFollowPatternAction.innerActivate(pauseRequest, clusterState); + assertThat(updatedState, not(sameInstance(clusterState))); + + AutoFollowMetadata updatedAutoFollowMetadata = updatedState.getMetaData().custom(AutoFollowMetadata.TYPE); + assertNotEquals(updatedAutoFollowMetadata, notNullValue()); + + AutoFollowMetadata autoFollowMetadata = clusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + assertNotEquals(updatedAutoFollowMetadata, autoFollowMetadata); + assertThat(updatedAutoFollowMetadata.getPatterns().size(), equalTo(autoFollowMetadata.getPatterns().size())); + assertThat(updatedAutoFollowMetadata.getPatterns().get("remote_cluster").isActive(), not(autoFollowPattern.isActive())); + + assertEquals(updatedAutoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getFollowedLeaderIndexUUIDs()); + assertEquals(updatedAutoFollowMetadata.getHeaders(), autoFollowMetadata.getHeaders()); + } + } + + private static AutoFollowMetadata.AutoFollowPattern randomAutoFollowPattern() { + return new AutoFollowMetadata.AutoFollowPattern(randomAlphaOfLength(5), + randomSubsetOf(Arrays.asList("test-*", "user-*", "logs-*", "failures-*")), + randomFrom("{{leader_index}}", "{{leader_index}}-follower", "test"), + randomBoolean(), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())), + new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())), + randomIntBetween(1, 100), + new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())), + TimeValue.timeValueSeconds(randomIntBetween(30, 600)), + TimeValue.timeValueSeconds(randomIntBetween(30, 600))); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java index 5ef43fc05c8..06bb95e333f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java @@ -32,8 +32,8 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); - existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, + null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -43,8 +43,8 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { { List existingPatterns = new ArrayList<>(); existingPatterns.add("logs-*"); - existingAutoFollowPatterns.put("name2", - new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + existingAutoFollowPatterns.put("name2", new AutoFollowPattern("asia_cluster", existingPatterns, null, true, null, null, null, + null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -76,8 +76,8 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); - existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, + null, null, null, null, null, null, null)); existingHeaders.put("key", Collections.singletonMap("key", "val")); } ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java index 85e1bf916aa..7de170442f1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java @@ -24,9 +24,9 @@ public class TransportGetAutoFollowPatternActionTests extends ESTestCase { public void testGetAutoFollowPattern() { Map patterns = new HashMap<>(); patterns.put("name1", new AutoFollowPattern( - "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null, null, null)); + "test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null)); patterns.put("name2", new AutoFollowPattern( - "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null, null, null)); + "test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null)); MetaData metaData = MetaData.builder() .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())) .build(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java index ac556d47c85..05315f239be 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java @@ -103,7 +103,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, null, null, null, null, null, null, null)); Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index a8758ed6c2d..f0c32c57bad 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -16,7 +16,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.XPackPlugin; @@ -174,8 +174,9 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i return Objects.hash(patterns, followedLeaderIndexUUIDs, headers); } - public static class AutoFollowPattern extends ImmutableFollowParameters implements ToXContentObject { + public static class AutoFollowPattern extends ImmutableFollowParameters implements ToXContentFragment { + public static final ParseField ACTIVE = new ParseField("active"); public static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster"); public static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_index_patterns"); public static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_index_pattern"); @@ -183,24 +184,28 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow_pattern", - args -> new AutoFollowPattern((String) args[0], (List) args[1], (String) args[2], (Integer) args[3], - (Integer) args[4], (Integer) args[5], (Integer) args[6], (ByteSizeValue) args[7], (ByteSizeValue) args[8], - (Integer) args[9], (ByteSizeValue) args[10], (TimeValue) args[11], (TimeValue) args[12])); + args -> new AutoFollowPattern((String) args[0], (List) args[1], (String) args[2], + args[3] == null || (boolean) args[3], (Integer) args[4], (Integer) args[5], (Integer) args[6], (Integer) args[7], + (ByteSizeValue) args[8], (ByteSizeValue) args[9], (Integer) args[10], (ByteSizeValue) args[11], (TimeValue) args[12], + (TimeValue) args[13])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD); PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACTIVE); ImmutableFollowParameters.initParser(PARSER); } private final String remoteCluster; private final List leaderIndexPatterns; private final String followIndexPattern; + private final boolean active; public AutoFollowPattern(String remoteCluster, List leaderIndexPatterns, String followIndexPattern, + boolean active, Integer maxReadRequestOperationCount, Integer maxWriteRequestOperationCount, Integer maxOutstandingReadRequests, @@ -216,6 +221,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i this.remoteCluster = remoteCluster; this.leaderIndexPatterns = leaderIndexPatterns; this.followIndexPattern = followIndexPattern; + this.active = active; } public static AutoFollowPattern readFrom(StreamInput in) throws IOException { @@ -228,6 +234,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i this.remoteCluster = remoteCluster; this.leaderIndexPatterns = leaderIndexPatterns; this.followIndexPattern = followIndexPattern; + if (in.getVersion().onOrAfter(Version.V_7_5_0)) { + this.active = in.readBoolean(); + } else { + this.active = true; + } } public boolean match(String indexName) { @@ -250,16 +261,24 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i return followIndexPattern; } + public boolean isActive() { + return active; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(remoteCluster); out.writeStringCollection(leaderIndexPatterns); out.writeOptionalString(followIndexPattern); super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_5_0)) { + out.writeBoolean(active); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(ACTIVE.getPreferredName(), active); builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); builder.array(LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns.toArray(new String[0])); if (followIndexPattern != null) { @@ -269,25 +288,21 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i return builder; } - @Override - public boolean isFragment() { - return true; - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; AutoFollowPattern pattern = (AutoFollowPattern) o; - return remoteCluster.equals(pattern.remoteCluster) && + return active == pattern.active && + remoteCluster.equals(pattern.remoteCluster) && leaderIndexPatterns.equals(pattern.leaderIndexPatterns) && followIndexPattern.equals(pattern.followIndexPattern); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern); + return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern, active); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java new file mode 100644 index 00000000000..5b8f644ceca --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ccr.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class ActivateAutoFollowPatternAction extends ActionType { + + public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/activate"; + public static final ActivateAutoFollowPatternAction INSTANCE = new ActivateAutoFollowPatternAction(); + + private ActivateAutoFollowPatternAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends AcknowledgedRequest { + + private final String name; + private final boolean active; + + public Request(final String name, final boolean active) { + this.name = name; + this.active = active; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (name == null) { + validationException = addValidationError("[name] is missing", validationException); + } + return validationException; + } + + public String getName() { + return name; + } + + public boolean isActive() { + return active; + } + + public Request(final StreamInput in) throws IOException { + super(in); + name = in.readString(); + active = in.readBoolean(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeBoolean(active); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return active == request.active + && Objects.equals(name, request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, active); + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json new file mode 100644 index 00000000000..9e76b83bb90 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json @@ -0,0 +1,24 @@ +{ + "ccr.pause_auto_follow_pattern":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-pause-auto-follow-pattern.html" + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_ccr/auto_follow/{name}/pause", + "methods":[ + "POST" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the auto follow pattern that should pause discovering new indices to follow." + } + } + } + ] + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json new file mode 100644 index 00000000000..96b77cb82e9 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json @@ -0,0 +1,24 @@ +{ + "ccr.resume_auto_follow_pattern":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-resume-auto-follow-pattern.html" + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_ccr/auto_follow/{name}/resume", + "methods":[ + "POST" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the auto follow pattern to resume discovering new indices to follow." + } + } + } + ] + } + } +}