This commit adds two APIs that allow to pause and resume CCR auto-follower patterns: // pause auto-follower POST /_ccr/auto_follow/my_pattern/pause // resume auto-follower POST /_ccr/auto_follow/my_pattern/resume The ability to pause and resume auto-follow patterns can be useful in some situations, including the rolling upgrades of cluster using a bi-directional cross-cluster replication scheme (see #46665). This commit adds a new active flag to the AutoFollowPattern and adapts the AutoCoordinator and AutoFollower classes so that it stops to fetch remote's cluster state when all auto-follow patterns associate to the remote cluster are paused. When an auto-follower is paused, remote indices that match the pattern are just ignored: they are not added to the pattern's followed indices uids list that is maintained in the local cluster state. This way, when the auto-follow pattern is resumed the indices created in the remote cluster in the meantime will be picked up again and added as new following indices. Indices created and then deleted in the remote cluster will be ignored as they won't be seen at all by the auto-follower pattern at resume time. Backport of #47510 for 7.x
This commit is contained in:
parent
65717f6f42
commit
742fa818b8
|
@ -49,6 +49,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractResponseTestCase<
|
|||
String remoteCluster = randomAlphaOfLength(4);
|
||||
List<String> leaderIndexPatters = Collections.singletonList(randomAlphaOfLength(4));
|
||||
String followIndexNamePattern = randomAlphaOfLength(4);
|
||||
boolean active = randomBoolean();
|
||||
|
||||
Integer maxOutstandingReadRequests = null;
|
||||
if (randomBoolean()) {
|
||||
|
@ -91,7 +92,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractResponseTestCase<
|
|||
readPollTimeout = new TimeValue(randomNonNegativeLong());
|
||||
}
|
||||
patterns.put(randomAlphaOfLength(4), new AutoFollowMetadata.AutoFollowPattern(remoteCluster, leaderIndexPatters,
|
||||
followIndexNamePattern, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests,
|
||||
followIndexNamePattern, active, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests,
|
||||
maxOutstandingWriteRequests, maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize,
|
||||
maxRetryDelay, readPollTimeout));
|
||||
}
|
||||
|
|
|
@ -90,6 +90,7 @@ The API returns the following result:
|
|||
{
|
||||
"name": "my_auto_follow_pattern",
|
||||
"pattern": {
|
||||
"active": true,
|
||||
"remote_cluster" : "remote_cluster",
|
||||
"leader_index_patterns" :
|
||||
[
|
||||
|
|
|
@ -52,3 +52,89 @@
|
|||
catch: missing
|
||||
ccr.get_auto_follow_pattern:
|
||||
name: my_pattern
|
||||
|
||||
---
|
||||
"Test pause and resume auto follow pattern":
|
||||
- skip:
|
||||
version: " - 7.4.99"
|
||||
reason: "pause/resume auto-follow patterns is supported since 7.5.0"
|
||||
|
||||
- do:
|
||||
cluster.state: {}
|
||||
|
||||
- set: {master_node: master}
|
||||
|
||||
- do:
|
||||
nodes.info: {}
|
||||
|
||||
- set: {nodes.$master.transport_address: local_ip}
|
||||
|
||||
- do:
|
||||
cluster.put_settings:
|
||||
body:
|
||||
transient:
|
||||
cluster.remote.local.seeds: $local_ip
|
||||
flat_settings: true
|
||||
|
||||
- match: {transient: {cluster.remote.local.seeds: $local_ip}}
|
||||
|
||||
- do:
|
||||
ccr.put_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
body:
|
||||
remote_cluster: local
|
||||
leader_index_patterns: ['logs-*']
|
||||
max_outstanding_read_requests: 2
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.get_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
- match: { patterns.0.name: 'pattern_test' }
|
||||
- match: { patterns.0.pattern.remote_cluster: 'local' }
|
||||
- match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] }
|
||||
- match: { patterns.0.pattern.max_outstanding_read_requests: 2 }
|
||||
- match: { patterns.0.pattern.active: true }
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
ccr.pause_auto_follow_pattern:
|
||||
name: unknown_pattern
|
||||
|
||||
- do:
|
||||
ccr.pause_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.get_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
- match: { patterns.0.name: 'pattern_test' }
|
||||
- match: { patterns.0.pattern.remote_cluster: 'local' }
|
||||
- match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] }
|
||||
- match: { patterns.0.pattern.max_outstanding_read_requests: 2 }
|
||||
- match: { patterns.0.pattern.active: false }
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
ccr.resume_auto_follow_pattern:
|
||||
name: unknown_pattern
|
||||
|
||||
- do:
|
||||
ccr.resume_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.get_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
- match: { patterns.0.name: 'pattern_test' }
|
||||
- match: { patterns.0.pattern.remote_cluster: 'local' }
|
||||
- match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] }
|
||||
- match: { patterns.0.pattern.max_outstanding_read_requests: 2 }
|
||||
- match: { patterns.0.pattern.active: true }
|
||||
|
||||
- do:
|
||||
ccr.delete_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
- is_true: acknowledged
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ccr.action.TransportFollowInfoAction;
|
|||
import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportForgetFollowerAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportActivateAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction;
|
||||
|
@ -82,9 +83,11 @@ import org.elasticsearch.xpack.ccr.rest.RestFollowInfoAction;
|
|||
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestForgetFollowerAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPauseAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestResumeAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
|
@ -97,6 +100,7 @@ import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
|
|||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
|
@ -236,6 +240,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
|
||||
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
|
||||
new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class),
|
||||
new ActionHandler<>(ActivateAutoFollowPatternAction.INSTANCE, TransportActivateAutoFollowPatternAction.class),
|
||||
// forget follower action
|
||||
new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class));
|
||||
}
|
||||
|
@ -262,6 +267,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
new RestDeleteAutoFollowPatternAction(restController),
|
||||
new RestPutAutoFollowPatternAction(restController),
|
||||
new RestGetAutoFollowPatternAction(restController),
|
||||
new RestPauseAutoFollowPatternAction(restController),
|
||||
new RestResumeAutoFollowPatternAction(restController),
|
||||
// forget follower API
|
||||
new RestForgetFollowerAction(restController));
|
||||
}
|
||||
|
|
|
@ -194,7 +194,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
}
|
||||
|
||||
void updateAutoFollowers(ClusterState followerClusterState) {
|
||||
AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
|
||||
final AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
|
||||
if (autoFollowMetadata == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -206,8 +206,9 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
}
|
||||
|
||||
final CopyOnWriteHashMap<String, AutoFollower> autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers);
|
||||
Set<String> newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream()
|
||||
.map(entry -> entry.getValue().getRemoteCluster())
|
||||
Set<String> newRemoteClusters = autoFollowMetadata.getPatterns().values().stream()
|
||||
.filter(AutoFollowPattern::isActive)
|
||||
.map(AutoFollowPattern::getRemoteCluster)
|
||||
.filter(remoteCluster -> autoFollowers.containsKey(remoteCluster) == false)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
|
@ -283,6 +284,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
String remoteCluster = entry.getKey();
|
||||
AutoFollower autoFollower = entry.getValue();
|
||||
boolean exist = autoFollowMetadata.getPatterns().values().stream()
|
||||
.filter(AutoFollowPattern::isActive)
|
||||
.anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster));
|
||||
if (exist == false) {
|
||||
LOGGER.info("removing auto-follower for remote cluster [{}]", remoteCluster);
|
||||
|
@ -345,6 +347,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
private volatile CountDown autoFollowPatternsCountDown;
|
||||
private volatile AtomicArray<AutoFollowResult> autoFollowResults;
|
||||
private volatile boolean stop;
|
||||
private volatile List<String> lastActivePatterns = Collections.emptyList();
|
||||
|
||||
AutoFollower(final String remoteCluster,
|
||||
final Consumer<List<AutoFollowResult>> statsUpdater,
|
||||
|
@ -384,7 +387,9 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
|
||||
final List<String> patterns = autoFollowMetadata.getPatterns().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster))
|
||||
.filter(entry -> entry.getValue().isActive())
|
||||
.map(Map.Entry::getKey)
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
if (patterns.isEmpty()) {
|
||||
LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster);
|
||||
|
@ -394,8 +399,15 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
|
||||
this.autoFollowResults = new AtomicArray<>(patterns.size());
|
||||
|
||||
// keep the list of the last known active patterns for this auto-follower
|
||||
// if the list changed, we explicitly retrieve the last cluster state in
|
||||
// order to avoid timeouts when waiting for the next remote cluster state
|
||||
// version that might never arrive
|
||||
final long nextMetadataVersion = Objects.equals(patterns, lastActivePatterns) ? metadataVersion + 1 : metadataVersion;
|
||||
this.lastActivePatterns = Collections.unmodifiableList(patterns);
|
||||
|
||||
final Thread thread = Thread.currentThread();
|
||||
getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> {
|
||||
getRemoteClusterState(remoteCluster, Math.max(1L, nextMetadataVersion), (remoteClusterStateResponse, remoteError) -> {
|
||||
// Also check removed flag here, as it may take a while for this remote cluster state api call to return:
|
||||
if (removed) {
|
||||
LOGGER.info("AutoFollower instance for cluster [{}] has been removed", remoteCluster);
|
||||
|
@ -445,8 +457,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
|
||||
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);
|
||||
|
||||
final List<Index> leaderIndicesToFollow =
|
||||
getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices);
|
||||
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices);
|
||||
if (leaderIndicesToFollow.isEmpty()) {
|
||||
finalise(slot, new AutoFollowResult(autoFollowPatternName), thread);
|
||||
} else {
|
||||
|
@ -599,7 +610,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) {
|
||||
continue;
|
||||
}
|
||||
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
|
||||
if (autoFollowPattern.isActive() && autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
|
||||
IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex());
|
||||
if (indexRoutingTable != null &&
|
||||
// Leader indices can be in the cluster state, but not all primary shards may be ready yet.
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class TransportActivateAutoFollowPatternAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
|
||||
|
||||
@Inject
|
||||
public TransportActivateAutoFollowPatternAction(TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver resolver) {
|
||||
super(ActivateAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new, resolver);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse read(final StreamInput in) throws IOException {
|
||||
return new AcknowledgedResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(final Request request, final ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(ActivateAutoFollowPatternAction.Request request,
|
||||
ClusterState state,
|
||||
ActionListener<AcknowledgedResponse> listener) throws Exception {
|
||||
clusterService.submitStateUpdateTask("activate-auto-follow-pattern-" + request.getName(),
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse(final boolean acknowledged) {
|
||||
return new AcknowledgedResponse(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) throws Exception {
|
||||
return innerActivate(request, currentState);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static ClusterState innerActivate(final Request request, ClusterState currentState) {
|
||||
final AutoFollowMetadata autoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
if (autoFollowMetadata == null) {
|
||||
throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName());
|
||||
}
|
||||
|
||||
final Map<String, AutoFollowMetadata.AutoFollowPattern> patterns = autoFollowMetadata.getPatterns();
|
||||
final AutoFollowMetadata.AutoFollowPattern previousAutoFollowPattern = patterns.get(request.getName());
|
||||
if (previousAutoFollowPattern == null) {
|
||||
throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName());
|
||||
}
|
||||
|
||||
if (previousAutoFollowPattern.isActive() == request.isActive()) {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
final Map<String, AutoFollowMetadata.AutoFollowPattern> newPatterns = new HashMap<>(patterns);
|
||||
newPatterns.put(request.getName(),
|
||||
new AutoFollowMetadata.AutoFollowPattern(
|
||||
previousAutoFollowPattern.getRemoteCluster(),
|
||||
previousAutoFollowPattern.getLeaderIndexPatterns(),
|
||||
previousAutoFollowPattern.getFollowIndexPattern(),
|
||||
request.isActive(),
|
||||
previousAutoFollowPattern.getMaxReadRequestOperationCount(),
|
||||
previousAutoFollowPattern.getMaxWriteRequestOperationCount(),
|
||||
previousAutoFollowPattern.getMaxOutstandingReadRequests(),
|
||||
previousAutoFollowPattern.getMaxOutstandingWriteRequests(),
|
||||
previousAutoFollowPattern.getMaxReadRequestSize(),
|
||||
previousAutoFollowPattern.getMaxWriteRequestSize(),
|
||||
previousAutoFollowPattern.getMaxWriteBufferCount(),
|
||||
previousAutoFollowPattern.getMaxWriteBufferSize(),
|
||||
previousAutoFollowPattern.getMaxRetryDelay(),
|
||||
previousAutoFollowPattern.getReadPollTimeout()));
|
||||
|
||||
return ClusterState.builder(currentState)
|
||||
.metaData(MetaData.builder(currentState.getMetaData())
|
||||
.putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(newPatterns, autoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getHeaders()))
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
}
|
|
@ -160,6 +160,7 @@ public class TransportPutAutoFollowPatternAction extends
|
|||
request.getRemoteCluster(),
|
||||
request.getLeaderIndexPatterns(),
|
||||
request.getFollowIndexNamePattern(),
|
||||
true,
|
||||
request.getParameters().getMaxReadRequestOperationCount(),
|
||||
request.getParameters().getMaxWriteRequestOperationCount(),
|
||||
request.getParameters().getMaxOutstandingReadRequests(),
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.rest;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.INSTANCE;
|
||||
|
||||
public class RestPauseAutoFollowPatternAction extends BaseRestHandler {
|
||||
|
||||
public RestPauseAutoFollowPatternAction(final RestController controller) {
|
||||
controller.registerHandler(RestRequest.Method.POST, "/_ccr/auto_follow/{name}/pause", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "ccr_pause_auto_follow_pattern_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
|
||||
Request request = new Request(restRequest.param("name"), false);
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.rest;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.INSTANCE;
|
||||
|
||||
public class RestResumeAutoFollowPatternAction extends BaseRestHandler {
|
||||
|
||||
public RestResumeAutoFollowPatternAction(final RestController controller) {
|
||||
controller.registerHandler(RestRequest.Method.POST, "/_ccr/auto_follow/{name}/resume", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "ccr_resume_auto_follow_pattern_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
|
||||
Request request = new Request(restRequest.param("name"), true);
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -16,25 +17,33 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.xpack.CcrIntegTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
|
||||
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -392,6 +401,164 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
public void testPauseAndResumeAutoFollowPattern() throws Exception {
|
||||
final Settings leaderIndexSettings = Settings.builder()
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
|
||||
.build();
|
||||
|
||||
// index created in the remote cluster before the auto follow pattern exists won't be auto followed
|
||||
createLeaderIndex("test-existing-index-is-ignored", leaderIndexSettings);
|
||||
|
||||
// create the auto follow pattern
|
||||
putAutoFollowPatterns("test-pattern", new String[]{"test-*", "tests-*"});
|
||||
assertBusy(() -> {
|
||||
final AutoFollowStats autoFollowStats = getAutoFollowStats();
|
||||
assertThat(autoFollowStats.getAutoFollowedClusters().size(), equalTo(1));
|
||||
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L));
|
||||
});
|
||||
|
||||
// index created in the remote cluster are auto followed
|
||||
createLeaderIndex("test-new-index-is-auto-followed", leaderIndexSettings);
|
||||
assertBusy(() -> {
|
||||
final AutoFollowStats autoFollowStats = getAutoFollowStats();
|
||||
assertThat(autoFollowStats.getAutoFollowedClusters().size(), equalTo(1));
|
||||
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L));
|
||||
IndicesExistsRequest request = new IndicesExistsRequest("copy-test-new-index-is-auto-followed");
|
||||
assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists());
|
||||
});
|
||||
ensureFollowerGreen("copy-test-new-index-is-auto-followed");
|
||||
|
||||
// pause the auto follow pattern
|
||||
pauseAutoFollowPattern("test-pattern");
|
||||
assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(0)));
|
||||
|
||||
// indices created in the remote cluster are not auto followed because the pattern is paused
|
||||
final int nbIndicesCreatedWhilePaused = randomIntBetween(1, 5);
|
||||
for (int i = 0; i < nbIndicesCreatedWhilePaused; i++) {
|
||||
createLeaderIndex("test-index-created-while-pattern-is-paused-" + i, leaderIndexSettings);
|
||||
}
|
||||
|
||||
// sometimes create another index in the remote cluster and close (or delete) it right away
|
||||
// it should not be auto followed when the pattern is resumed
|
||||
if (randomBoolean()) {
|
||||
final String indexName = "test-index-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
||||
createLeaderIndex(indexName, leaderIndexSettings);
|
||||
if (randomBoolean()) {
|
||||
assertAcked(leaderClient().admin().indices().prepareClose(indexName));
|
||||
} else {
|
||||
assertAcked(leaderClient().admin().indices().prepareDelete(indexName));
|
||||
}
|
||||
}
|
||||
|
||||
if (randomBoolean()) {
|
||||
createLeaderIndex("logs-20200101", leaderIndexSettings);
|
||||
}
|
||||
|
||||
// pattern is paused, none of the newly created indices has been followed yet
|
||||
assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(1));
|
||||
ensureLeaderGreen("test-index-created-while-pattern-is-paused-*");
|
||||
|
||||
// resume the auto follow pattern, indices created while the pattern was paused are picked up for auto-following
|
||||
resumeAutoFollowPattern("test-pattern");
|
||||
assertBusy(() -> {
|
||||
final Client client = followerClient();
|
||||
assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1));
|
||||
assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(1 + nbIndicesCreatedWhilePaused));
|
||||
for (int i = 0; i < nbIndicesCreatedWhilePaused; i++) {
|
||||
IndicesExistsRequest request = new IndicesExistsRequest("copy-test-index-created-while-pattern-is-paused-" + i);
|
||||
assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testPauseAndResumeWithMultipleAutoFollowPatterns() throws Exception {
|
||||
final Settings leaderIndexSettings = Settings.builder()
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
|
||||
.build();
|
||||
|
||||
final String[] prefixes = {"logs-", "users-", "docs-", "monitoring-", "data-", "system-", "events-", "files-"};
|
||||
if (randomBoolean()) {
|
||||
// sometimes create indices in the remote cluster that match the future auto follow patterns
|
||||
Arrays.stream(prefixes).forEach(prefix -> createLeaderIndex(prefix + "ignored", leaderIndexSettings));
|
||||
}
|
||||
|
||||
// create auto follow patterns
|
||||
final List<String> autoFollowPatterns = new ArrayList<>(prefixes.length);
|
||||
for (String prefix : prefixes) {
|
||||
String name = prefix + "pattern";
|
||||
putAutoFollowPatterns(name, new String[]{prefix + "*"});
|
||||
autoFollowPatterns.add(name);
|
||||
assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1)));
|
||||
assertTrue(getAutoFollowPattern(name).isActive());
|
||||
}
|
||||
|
||||
// no following indices are created yet
|
||||
assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(0));
|
||||
|
||||
// create random indices in the remote cluster that match the patterns
|
||||
final AtomicBoolean running = new AtomicBoolean(true);
|
||||
final Set<String> leaderIndices = ConcurrentCollections.newConcurrentSet();
|
||||
final Thread createNewLeaderIndicesThread = new Thread(() -> {
|
||||
while (running.get()) {
|
||||
try {
|
||||
String indexName = randomFrom(prefixes) + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
createLeaderIndex(indexName, leaderIndexSettings);
|
||||
leaderIndices.add(indexName);
|
||||
Thread.sleep(randomIntBetween(100, 500));
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
createNewLeaderIndicesThread.start();
|
||||
|
||||
// wait for some leader indices to be auto-followed
|
||||
assertBusy(() ->
|
||||
assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo((long) prefixes.length)));
|
||||
|
||||
final int nbLeaderIndices = leaderIndices.size();
|
||||
|
||||
// pause some random patterns
|
||||
final List<String> pausedAutoFollowerPatterns = randomSubsetOf(autoFollowPatterns);
|
||||
pausedAutoFollowerPatterns.forEach(this::pauseAutoFollowPattern);
|
||||
assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertFalse(getAutoFollowPattern(pattern).isActive())));
|
||||
|
||||
assertBusy(() -> {
|
||||
final int expectedAutoFollowedClusters = pausedAutoFollowerPatterns.size() != autoFollowPatterns.size() ? 1 : 0;
|
||||
assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(expectedAutoFollowedClusters));
|
||||
if (expectedAutoFollowedClusters > 0) {
|
||||
// wait for more indices to be created in the remote cluster while some patterns are paused
|
||||
assertThat(leaderIndices.size(), greaterThan(nbLeaderIndices + 3));
|
||||
}
|
||||
});
|
||||
ensureFollowerGreen(true, "copy-*");
|
||||
|
||||
// resume auto follow patterns
|
||||
pausedAutoFollowerPatterns.forEach(this::resumeAutoFollowPattern);
|
||||
assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive())));
|
||||
|
||||
// stop creating indices in the remote cluster
|
||||
running.set(false);
|
||||
createNewLeaderIndicesThread.join();
|
||||
|
||||
ensureLeaderGreen(leaderIndices.toArray(new String[0]));
|
||||
|
||||
// check that all leader indices have been correctly auto followed
|
||||
assertBusy(() -> {
|
||||
final Client client = followerClient();
|
||||
assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.size()));
|
||||
leaderIndices.stream()
|
||||
.map(leaderIndex -> "copy-" + leaderIndex)
|
||||
.forEach(followerIndex ->
|
||||
assertTrue("following index must exist: " + followerIndex,
|
||||
client.admin().indices().exists(new IndicesExistsRequest(followerIndex)).actionGet().isExists()));
|
||||
});
|
||||
}
|
||||
|
||||
private void putAutoFollowPatterns(String name, String[] patterns) {
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setName(name);
|
||||
|
@ -418,4 +585,21 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|||
leaderClient().admin().indices().create(request).actionGet();
|
||||
}
|
||||
|
||||
private void pauseAutoFollowPattern(final String name) {
|
||||
ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, false);
|
||||
assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet());
|
||||
}
|
||||
|
||||
private void resumeAutoFollowPattern(final String name) {
|
||||
ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, true);
|
||||
assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet());
|
||||
}
|
||||
|
||||
private AutoFollowMetadata.AutoFollowPattern getAutoFollowPattern(final String name) {
|
||||
GetAutoFollowPatternAction.Request request = new GetAutoFollowPatternAction.Request();
|
||||
request.setName(name);
|
||||
GetAutoFollowPatternAction.Response response = followerClient().execute(GetAutoFollowPatternAction.INSTANCE, request).actionGet();
|
||||
assertTrue(response.getAutoFollowPatterns().containsKey(name));
|
||||
return response.getAutoFollowPatterns().get(name);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase<AutoFol
|
|||
randomAlphaOfLength(4),
|
||||
leaderPatterns,
|
||||
randomAlphaOfLength(4),
|
||||
randomIntBetween(0, Integer.MAX_VALUE),
|
||||
true, randomIntBetween(0, Integer.MAX_VALUE),
|
||||
randomIntBetween(0, Integer.MAX_VALUE),
|
||||
randomIntBetween(0, Integer.MAX_VALUE),
|
||||
randomIntBetween(0, Integer.MAX_VALUE),
|
||||
|
|
|
@ -101,7 +101,7 @@ public class CCRFeatureSetTests extends ESTestCase {
|
|||
Map<String, AutoFollowMetadata.AutoFollowPattern> patterns = new HashMap<>(numAutoFollowPatterns);
|
||||
for (int i = 0; i < numAutoFollowPatterns; i++) {
|
||||
AutoFollowMetadata.AutoFollowPattern pattern = new AutoFollowMetadata.AutoFollowPattern("remote_cluser",
|
||||
Collections.singletonList("logs" + i + "*"), null, null, null, null, null, null, null, null, null, null, null);
|
||||
Collections.singletonList("logs" + i + "*"), null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
patterns.put("pattern" + i, pattern);
|
||||
}
|
||||
metaData.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()));
|
||||
|
|
|
@ -165,7 +165,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase {
|
|||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(
|
||||
Collections.singletonMap("test_alias", autoFollowPattern),
|
||||
Collections.emptyMap(),
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class ActivateAutoFollowPatternActionRequestTests extends AbstractWireSerializingTestCase<ActivateAutoFollowPatternAction.Request> {
|
||||
|
||||
@Override
|
||||
protected ActivateAutoFollowPatternAction.Request createTestInstance() {
|
||||
return new ActivateAutoFollowPatternAction.Request(randomAlphaOfLength(5), randomBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<ActivateAutoFollowPatternAction.Request> instanceReader() {
|
||||
return ActivateAutoFollowPatternAction.Request::new;
|
||||
}
|
||||
|
||||
public void testValidate() {
|
||||
ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(null, true);
|
||||
ActionRequestValidationException validationException = request.validate();
|
||||
assertThat(validationException, notNullValue());
|
||||
assertThat(validationException.getMessage(), containsString("[name] is missing"));
|
||||
|
||||
request = new ActivateAutoFollowPatternAction.Request("name", true);
|
||||
validationException = request.validate();
|
||||
assertThat(validationException, nullValue());
|
||||
}
|
||||
}
|
|
@ -38,6 +38,8 @@ import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
|
|||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -52,6 +54,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
|
@ -59,8 +62,12 @@ import java.util.function.Consumer;
|
|||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices;
|
||||
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
|
@ -68,6 +75,7 @@ import static org.hamcrest.Matchers.is;
|
|||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -81,7 +89,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
ClusterState remoteState = createRemoteClusterState("logs-20190101", true);
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
|
@ -150,7 +158,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
|
@ -202,7 +210,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
ClusterState remoteState = createRemoteClusterState("logs-20190101", true);
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
|
@ -253,13 +261,232 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(invoked[0], is(true));
|
||||
}
|
||||
|
||||
public void testAutoFollowerWithNoActivePatternsDoesNotStart() {
|
||||
final String remoteCluster = randomAlphaOfLength(5);
|
||||
|
||||
final Map<String, AutoFollowPattern> autoFollowPatterns = new HashMap<>(2);
|
||||
autoFollowPatterns.put("pattern_1", new AutoFollowPattern(remoteCluster, Arrays.asList("logs-*", "test-*"), "copy-", false,
|
||||
null, null, null, null, null, null, null, null, null, null));
|
||||
autoFollowPatterns.put("pattern_2", new AutoFollowPattern(remoteCluster, Arrays.asList("users-*"), "copy-", false, null, null,
|
||||
null, null, null, null, null, null, null, null));
|
||||
|
||||
final Map<String, List<String>> followedLeaderIndexUUIDs = new HashMap<>(2);
|
||||
followedLeaderIndexUUIDs.put("pattern_1", Arrays.asList("uuid1", "uuid2"));
|
||||
followedLeaderIndexUUIDs.put("pattern_2", Collections.emptyList());
|
||||
|
||||
final Map<String, Map<String, String>> headers = new HashMap<>(2);
|
||||
headers.put("pattern_1", singletonMap("header", "value"));
|
||||
headers.put("pattern_2", emptyMap());
|
||||
|
||||
final Supplier<ClusterState> followerClusterStateSupplier = localClusterStateSupplier(ClusterState.builder(new ClusterName("test"))
|
||||
.metaData(MetaData.builder()
|
||||
.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(autoFollowPatterns, followedLeaderIndexUUIDs, headers))
|
||||
.build())
|
||||
.build());
|
||||
|
||||
final AtomicBoolean invoked = new AtomicBoolean(false);
|
||||
final AutoFollower autoFollower =
|
||||
new AutoFollower(remoteCluster, v -> invoked.set(true), followerClusterStateSupplier, () -> 1L, Runnable::run) {
|
||||
@Override
|
||||
void getRemoteClusterState(String remote, long metadataVersion, BiConsumer<ClusterStateResponse, Exception> handler) {
|
||||
invoked.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
void createAndFollow(Map<String, String> headers, PutFollowAction.Request request,
|
||||
Runnable successHandler, Consumer<Exception> failureHandler) {
|
||||
invoked.set(true);
|
||||
successHandler.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
||||
invoked.set(true);
|
||||
}
|
||||
};
|
||||
|
||||
autoFollower.start();
|
||||
assertThat(invoked.get(), is(false));
|
||||
}
|
||||
|
||||
public void testAutoFollowerWithPausedActivePatterns() {
|
||||
final String remoteCluster = randomAlphaOfLength(5);
|
||||
|
||||
final AtomicReference<ClusterState> remoteClusterState = new AtomicReference<>(
|
||||
createRemoteClusterState("patternLogs-0", true, randomLongBetween(1L, 1_000L))
|
||||
);
|
||||
|
||||
final AtomicReference<ClusterState> localClusterState = new AtomicReference<>(
|
||||
ClusterState.builder(new ClusterName("local"))
|
||||
.metaData(MetaData.builder()
|
||||
.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(emptyMap(), emptyMap(), emptyMap())))
|
||||
.build()
|
||||
);
|
||||
|
||||
// compute and return the local cluster state, updated with some auto-follow patterns
|
||||
final Supplier<ClusterState> localClusterStateSupplier = () -> localClusterState.updateAndGet(currentLocalState -> {
|
||||
final int nextClusterStateVersion = (int) (currentLocalState.version() + 1);
|
||||
|
||||
final ClusterState nextLocalClusterState;
|
||||
if (nextClusterStateVersion == 1) {
|
||||
// cluster state #1 : one pattern is active
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setName("patternLogs");
|
||||
request.setRemoteCluster(remoteCluster);
|
||||
request.setLeaderIndexPatterns(singletonList("patternLogs-*"));
|
||||
request.setFollowIndexNamePattern("copy-{{leader_index}}");
|
||||
nextLocalClusterState =
|
||||
TransportPutAutoFollowPatternAction.innerPut(request, emptyMap(), currentLocalState, remoteClusterState.get());
|
||||
|
||||
} else if (nextClusterStateVersion == 2) {
|
||||
// cluster state #2 : still one pattern is active
|
||||
nextLocalClusterState = currentLocalState;
|
||||
|
||||
} else if (nextClusterStateVersion == 3) {
|
||||
// cluster state #3 : add a new pattern, two patterns are active
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setName("patternDocs");
|
||||
request.setRemoteCluster(remoteCluster);
|
||||
request.setLeaderIndexPatterns(singletonList("patternDocs-*"));
|
||||
request.setFollowIndexNamePattern("copy-{{leader_index}}");
|
||||
nextLocalClusterState =
|
||||
TransportPutAutoFollowPatternAction.innerPut(request, emptyMap(), currentLocalState, remoteClusterState.get());
|
||||
|
||||
} else if (nextClusterStateVersion == 4) {
|
||||
// cluster state #4 : still both patterns are active
|
||||
nextLocalClusterState = currentLocalState;
|
||||
|
||||
} else if (nextClusterStateVersion == 5) {
|
||||
// cluster state #5 : first pattern is paused, second pattern is still active
|
||||
ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request("patternLogs", false);
|
||||
nextLocalClusterState = TransportActivateAutoFollowPatternAction.innerActivate(request, currentLocalState);
|
||||
|
||||
} else if (nextClusterStateVersion == 6) {
|
||||
// cluster state #5 : second pattern is paused, both patterns are inactive
|
||||
ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request("patternDocs", false);
|
||||
nextLocalClusterState = TransportActivateAutoFollowPatternAction.innerActivate(request, currentLocalState);
|
||||
|
||||
} else {
|
||||
return currentLocalState;
|
||||
}
|
||||
|
||||
return ClusterState.builder(nextLocalClusterState)
|
||||
.version(nextClusterStateVersion)
|
||||
.build();
|
||||
});
|
||||
|
||||
final Set<String> followedIndices = ConcurrentCollections.newConcurrentSet();
|
||||
final List<AutoFollowCoordinator.AutoFollowResult> autoFollowResults = new ArrayList<>();
|
||||
|
||||
final AutoFollower autoFollower =
|
||||
new AutoFollower(remoteCluster, autoFollowResults::addAll, localClusterStateSupplier, () -> 1L, Runnable::run) {
|
||||
|
||||
int countFetches = 1; // to be aligned with local cluster state updates
|
||||
ClusterState lastFetchedRemoteClusterState;
|
||||
|
||||
@Override
|
||||
void getRemoteClusterState(String remote, long metadataVersion, BiConsumer<ClusterStateResponse, Exception> handler) {
|
||||
assertThat(remote, equalTo(remoteCluster));
|
||||
|
||||
// in this test, every time it fetches the remote cluster state new leader indices to follow appears
|
||||
final String[] newLeaderIndices = {"patternLogs-" + countFetches, "patternDocs-" + countFetches};
|
||||
|
||||
if (countFetches == 1) {
|
||||
assertThat("first invocation, it should retrieve the metadata version 1", metadataVersion, equalTo(1L));
|
||||
lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices);
|
||||
|
||||
} else if (countFetches == 2 || countFetches == 4) {
|
||||
assertThat("no patterns changes, it should retrieve the last known metadata version + 1",
|
||||
metadataVersion, equalTo(lastFetchedRemoteClusterState.metaData().version() + 1));
|
||||
lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices);
|
||||
assertThat("remote cluster state metadata version is aligned with what the auto-follower is requesting",
|
||||
lastFetchedRemoteClusterState.getMetaData().version(), equalTo(metadataVersion));
|
||||
|
||||
} else if (countFetches == 3 || countFetches == 5) {
|
||||
assertThat("patterns have changed, it should retrieve the last known metadata version again",
|
||||
metadataVersion, equalTo(lastFetchedRemoteClusterState.metaData().version()));
|
||||
lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices);
|
||||
assertThat("remote cluster state metadata version is incremented",
|
||||
lastFetchedRemoteClusterState.getMetaData().version(), equalTo(metadataVersion + 1));
|
||||
} else {
|
||||
fail("after the 5th invocation there are no more active patterns, the auto-follower should have stopped");
|
||||
}
|
||||
|
||||
countFetches = countFetches + 1;
|
||||
remoteClusterState.set(lastFetchedRemoteClusterState);
|
||||
handler.accept(new ClusterStateResponse(lastFetchedRemoteClusterState.getClusterName(),
|
||||
lastFetchedRemoteClusterState, false), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
void createAndFollow(Map<String, String> headers, PutFollowAction.Request request,
|
||||
Runnable successHandler, Consumer<Exception> failureHandler) {
|
||||
assertThat(request.getRemoteCluster(), equalTo(remoteCluster));
|
||||
assertThat(request.getFollowerIndex(), startsWith("copy-"));
|
||||
followedIndices.add(request.getLeaderIndex());
|
||||
successHandler.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
||||
localClusterState.updateAndGet(updateFunction::apply);
|
||||
handler.accept(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
|
||||
// Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
|
||||
}
|
||||
};
|
||||
|
||||
autoFollower.start();
|
||||
|
||||
assertThat(autoFollowResults.size(), equalTo(7));
|
||||
assertThat(followedIndices, containsInAnyOrder(
|
||||
"patternLogs-1", // iteration #1 : only pattern "patternLogs" is active in local cluster state
|
||||
"patternLogs-2", // iteration #2 : only pattern "patternLogs" is active in local cluster state
|
||||
"patternLogs-3", // iteration #3 : both patterns "patternLogs" and "patternDocs" are active in local cluster state
|
||||
"patternDocs-3", //
|
||||
"patternLogs-4", // iteration #4 : both patterns "patternLogs" and "patternDocs" are active in local cluster state
|
||||
"patternDocs-4", //
|
||||
"patternDocs-5" // iteration #5 : only pattern "patternDocs" is active in local cluster state, "patternLogs" is paused
|
||||
));
|
||||
|
||||
final ClusterState finalRemoteClusterState = remoteClusterState.get();
|
||||
final ClusterState finalLocalClusterState = localClusterState.get();
|
||||
|
||||
AutoFollowMetadata autoFollowMetadata = finalLocalClusterState.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(2));
|
||||
assertThat(autoFollowMetadata.getPatterns().values().stream().noneMatch(AutoFollowPattern::isActive), is(true));
|
||||
|
||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("patternLogs"),
|
||||
containsInAnyOrder(
|
||||
finalRemoteClusterState.metaData().index("patternLogs-0").getIndexUUID(),
|
||||
finalRemoteClusterState.metaData().index("patternLogs-1").getIndexUUID(),
|
||||
finalRemoteClusterState.metaData().index("patternLogs-2").getIndexUUID(),
|
||||
finalRemoteClusterState.metaData().index("patternLogs-3").getIndexUUID(),
|
||||
finalRemoteClusterState.metaData().index("patternLogs-4").getIndexUUID()
|
||||
// patternLogs-5 exists in remote cluster state but patternLogs was paused
|
||||
));
|
||||
|
||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("patternDocs"),
|
||||
containsInAnyOrder(
|
||||
// patternDocs-0 does not exist in remote cluster state
|
||||
finalRemoteClusterState.metaData().index("patternDocs-1").getIndexUUID(),
|
||||
finalRemoteClusterState.metaData().index("patternDocs-2").getIndexUUID(),
|
||||
finalRemoteClusterState.metaData().index("patternDocs-3").getIndexUUID(),
|
||||
finalRemoteClusterState.metaData().index("patternDocs-4").getIndexUUID(),
|
||||
finalRemoteClusterState.metaData().index("patternDocs-5").getIndexUUID()
|
||||
));
|
||||
}
|
||||
|
||||
public void testAutoFollowerCreateAndFollowApiCallFailure() {
|
||||
Client client = mock(Client.class);
|
||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||
ClusterState remoteState = createRemoteClusterState("logs-20190101", true);
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
|
@ -317,13 +544,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testGetLeaderIndicesToFollow() {
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true,
|
||||
null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, Map<String, String>> headers = new HashMap<>();
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
|
||||
.build();
|
||||
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||
MetaData.Builder imdBuilder = MetaData.builder();
|
||||
|
@ -368,7 +591,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(result.get(3).getName(), equalTo("metrics-3"));
|
||||
assertThat(result.get(4).getName(), equalTo("metrics-4"));
|
||||
|
||||
List<String> followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID());
|
||||
final List<String> followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID());
|
||||
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, followedIndexUUIDs);
|
||||
result.sort(Comparator.comparing(Index::getName));
|
||||
assertThat(result.size(), equalTo(4));
|
||||
|
@ -376,16 +599,20 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(result.get(1).getName(), equalTo("metrics-1"));
|
||||
assertThat(result.get(2).getName(), equalTo("metrics-3"));
|
||||
assertThat(result.get(3).getName(), equalTo("metrics-4"));
|
||||
|
||||
final AutoFollowPattern inactiveAutoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null,
|
||||
false, null, null, null, null, null, null, null, null, null, null);
|
||||
|
||||
result = AutoFollower.getLeaderIndicesToFollow(inactiveAutoFollowPattern, remoteState, Collections.emptyList());
|
||||
assertThat(result.size(), equalTo(0));
|
||||
|
||||
result = AutoFollower.getLeaderIndicesToFollow(inactiveAutoFollowPattern, remoteState, followedIndexUUIDs);
|
||||
assertThat(result.size(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testGetLeaderIndicesToFollow_shardsNotStarted() {
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, Map<String, String>> headers = new HashMap<>();
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
|
||||
.build();
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), null, true,
|
||||
null, null, null, null, null, null, null, null, null, null);
|
||||
|
||||
// 1 shard started and another not started:
|
||||
ClusterState remoteState = createRemoteClusterState("index1", true);
|
||||
|
@ -425,7 +652,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
|
||||
public void testGetLeaderIndicesToFollowWithClosedIndices() {
|
||||
final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
|
||||
// index is opened
|
||||
ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0);
|
||||
|
@ -552,15 +779,15 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testGetFollowerIndexName() {
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null,
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true, null,
|
||||
null, null, null, null, null, null, null, null, null);
|
||||
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0"));
|
||||
|
||||
autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", null, null,
|
||||
autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", true, null, null,
|
||||
null, null, null, null, null, null, null, null);
|
||||
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
|
||||
|
||||
autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null,
|
||||
autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", true, null,
|
||||
null, null, null, null, null, null, null, null, null);
|
||||
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
|
||||
}
|
||||
|
@ -643,11 +870,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
Runnable::run);
|
||||
// Add 3 patterns:
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null,
|
||||
patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null,
|
||||
null, null, null, null, null, null, null, null));
|
||||
patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, null, null,
|
||||
patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null,
|
||||
null, null, null, null, null, null, null, null));
|
||||
patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, null, null,
|
||||
patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null,
|
||||
null, null, null, null, null, null, null, null));
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
|
@ -673,7 +900,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue());
|
||||
assertThat(removedAutoFollower1.removed, is(true));
|
||||
// Add pattern 4:
|
||||
patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, null, null,
|
||||
patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null,
|
||||
null, null, null, null, null, null, null, null));
|
||||
clusterState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
|
@ -733,12 +960,100 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testUpdateAutoFollowersNoActivePatterns() {
|
||||
final ClusterService clusterService = mockClusterService();
|
||||
final AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
|
||||
Settings.EMPTY,
|
||||
null,
|
||||
clusterService,
|
||||
new CcrLicenseChecker(() -> true, () -> false),
|
||||
() -> 1L,
|
||||
() -> 1L,
|
||||
Runnable::run);
|
||||
|
||||
autoFollowCoordinator.updateAutoFollowers(ClusterState.EMPTY_STATE);
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0));
|
||||
|
||||
// Add 3 patterns:
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null,
|
||||
null, null, null, null, null, null, null, null));
|
||||
patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null,
|
||||
null, null, null, null, null, null, null, null));
|
||||
patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null,
|
||||
null, null, null, null, null, null, null, null));
|
||||
|
||||
autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())))
|
||||
.build());
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(2));
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue());
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue());
|
||||
|
||||
AutoFollowCoordinator.AutoFollower removedAutoFollower1 = autoFollowCoordinator.getAutoFollowers().get("remote1");
|
||||
assertThat(removedAutoFollower1.removed, is(false));
|
||||
AutoFollowCoordinator.AutoFollower removedAutoFollower2 = autoFollowCoordinator.getAutoFollowers().get("remote2");
|
||||
assertThat(removedAutoFollower2.removed, is(false));
|
||||
|
||||
// Make pattern 1 and pattern 3 inactive
|
||||
patterns.computeIfPresent("pattern1", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(),
|
||||
pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(),
|
||||
pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(),
|
||||
pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(),
|
||||
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout()));
|
||||
patterns.computeIfPresent("pattern3", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(),
|
||||
pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(),
|
||||
pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(),
|
||||
pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(),
|
||||
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout()));
|
||||
|
||||
autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())))
|
||||
.build());
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(1));
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue());
|
||||
assertThat(removedAutoFollower1.removed, is(true));
|
||||
assertThat(removedAutoFollower2.removed, is(false));
|
||||
|
||||
// Add active pattern 4 and make pattern 2 inactive
|
||||
patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null,
|
||||
null, null, null, null, null, null, null, null));
|
||||
patterns.computeIfPresent("pattern2", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(),
|
||||
pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(),
|
||||
pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(),
|
||||
pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(),
|
||||
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout()));
|
||||
|
||||
autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())))
|
||||
.build());
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(1));
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue());
|
||||
|
||||
AutoFollowCoordinator.AutoFollower removedAutoFollower4 = autoFollowCoordinator.getAutoFollowers().get("remote1");
|
||||
assertThat(removedAutoFollower4.removed, is(false));
|
||||
assertNotSame(removedAutoFollower4, removedAutoFollower1);
|
||||
assertThat(removedAutoFollower2.removed, is(true));
|
||||
|
||||
autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())))
|
||||
.build());
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0));
|
||||
assertThat(removedAutoFollower1.removed, is(true));
|
||||
assertThat(removedAutoFollower2.removed, is(true));
|
||||
assertThat(removedAutoFollower4.removed, is(true));
|
||||
}
|
||||
|
||||
public void testWaitForMetadataVersion() {
|
||||
Client client = mock(Client.class);
|
||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
|
@ -801,7 +1116,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
|
@ -860,7 +1175,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
createRemoteClusterState("logs-20190101", null);
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
|
@ -926,7 +1241,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
ClusterState remoteState = createRemoteClusterState("logs-20190101", true);
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, true, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
|
@ -1012,7 +1327,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
"remote",
|
||||
Collections.singletonList("*"),
|
||||
"{}",
|
||||
0,
|
||||
true, 0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
|
@ -1087,8 +1402,8 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
.metaData(MetaData.builder()
|
||||
.putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(Collections.singletonMap(pattern,
|
||||
new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, null, null, null, null, null, null, null,
|
||||
null, null, null)),
|
||||
new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, true,
|
||||
null, null, null, null, null, null, null, null, null, null)),
|
||||
Collections.singletonMap(pattern, Collections.emptyList()),
|
||||
Collections.singletonMap(pattern, Collections.emptyMap()))))
|
||||
.build();
|
||||
|
@ -1160,6 +1475,10 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) {
|
||||
return createRemoteClusterState(indexName, enableSoftDeletes, 0L);
|
||||
}
|
||||
|
||||
private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes, long metadataVersion) {
|
||||
Settings.Builder indexSettings;
|
||||
if (enableSoftDeletes != null) {
|
||||
indexSettings = settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), enableSoftDeletes);
|
||||
|
@ -1173,7 +1492,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
.numberOfReplicas(0)
|
||||
.build();
|
||||
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().put(indexMetaData, true));
|
||||
.metaData(MetaData.builder()
|
||||
.put(indexMetaData, true)
|
||||
.version(metadataVersion));
|
||||
|
||||
ShardRouting shardRouting =
|
||||
TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
|
||||
|
@ -1183,25 +1504,29 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
return csBuilder.build();
|
||||
}
|
||||
|
||||
private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) {
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(indexName)
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
private static ClusterState createRemoteClusterState(final ClusterState previous, final String... indices) {
|
||||
if (indices == null) {
|
||||
return previous;
|
||||
}
|
||||
final MetaData.Builder metadataBuilder = MetaData.builder(previous.metaData()).version(previous.metaData().version() + 1);
|
||||
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(previous.routingTable());
|
||||
for (String indexName : indices) {
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(indexName)
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
metadataBuilder.put(indexMetaData, true);
|
||||
routingTableBuilder.add(IndexRoutingTable.builder(indexMetaData.getIndex())
|
||||
.addShard(TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted())
|
||||
.build());
|
||||
}
|
||||
return ClusterState.builder(previous.getClusterName())
|
||||
.metaData(metadataBuilder.build())
|
||||
.routingTable(routingTableBuilder.build())
|
||||
.build();
|
||||
ClusterState.Builder csBuilder = ClusterState.builder(previous.getClusterName())
|
||||
.metaData(MetaData.builder(previous.metaData())
|
||||
.version(previous.metaData().version() + 1)
|
||||
.put(indexMetaData, true));
|
||||
|
||||
ShardRouting shardRouting =
|
||||
TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
|
||||
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build();
|
||||
csBuilder.routingTable(RoutingTable.builder(previous.routingTable()).add(indexRoutingTable).build()).build();
|
||||
|
||||
return csBuilder.build();
|
||||
}
|
||||
|
||||
private static Supplier<ClusterState> localClusterStateSupplier(ClusterState... states) {
|
||||
|
|
|
@ -33,7 +33,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractWireSerializingTe
|
|||
"remote",
|
||||
Collections.singletonList(randomAlphaOfLength(4)),
|
||||
randomAlphaOfLength(4),
|
||||
randomIntBetween(0, Integer.MAX_VALUE),
|
||||
true, randomIntBetween(0, Integer.MAX_VALUE),
|
||||
randomIntBetween(0, Integer.MAX_VALUE),
|
||||
randomIntBetween(0, Integer.MAX_VALUE),
|
||||
randomIntBetween(0, Integer.MAX_VALUE),
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class TransportActivateAutoFollowPatternActionTests extends ESTestCase {
|
||||
|
||||
public void testInnerActivateNoAutoFollowMetadata() {
|
||||
Exception e = expectThrows(ResourceNotFoundException.class,
|
||||
() -> TransportActivateAutoFollowPatternAction.innerActivate(new Request("test", true), ClusterState.EMPTY_STATE));
|
||||
assertThat(e.getMessage(), equalTo("auto-follow pattern [test] is missing"));
|
||||
}
|
||||
|
||||
public void testInnerActivateDoesNotExist() {
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("cluster"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(
|
||||
singletonMap("remote_cluster", randomAutoFollowPattern()),
|
||||
singletonMap("remote_cluster", randomSubsetOf(randomIntBetween(1, 3), "uuid0", "uuid1", "uuid2")),
|
||||
singletonMap("remote_cluster", singletonMap("header0", randomFrom("val0", "val2", "val3"))))))
|
||||
.build();
|
||||
Exception e = expectThrows(ResourceNotFoundException.class,
|
||||
() -> TransportActivateAutoFollowPatternAction.innerActivate(new Request("does_not_exist", true), clusterState));
|
||||
assertThat(e.getMessage(), equalTo("auto-follow pattern [does_not_exist] is missing"));
|
||||
}
|
||||
|
||||
public void testInnerActivateToggle() {
|
||||
final AutoFollowMetadata.AutoFollowPattern autoFollowPattern = randomAutoFollowPattern();
|
||||
final ClusterState clusterState = ClusterState.builder(new ClusterName("cluster"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(
|
||||
singletonMap("remote_cluster", autoFollowPattern),
|
||||
singletonMap("remote_cluster", randomSubsetOf(randomIntBetween(1, 3), "uuid0", "uuid1", "uuid2")),
|
||||
singletonMap("remote_cluster", singletonMap("header0", randomFrom("val0", "val2", "val3"))))))
|
||||
.build();
|
||||
{
|
||||
Request pauseRequest = new Request("remote_cluster", autoFollowPattern.isActive());
|
||||
ClusterState updatedState = TransportActivateAutoFollowPatternAction.innerActivate(pauseRequest, clusterState);
|
||||
assertThat(updatedState, sameInstance(clusterState));
|
||||
}
|
||||
{
|
||||
Request pauseRequest = new Request("remote_cluster", autoFollowPattern.isActive() == false);
|
||||
ClusterState updatedState = TransportActivateAutoFollowPatternAction.innerActivate(pauseRequest, clusterState);
|
||||
assertThat(updatedState, not(sameInstance(clusterState)));
|
||||
|
||||
AutoFollowMetadata updatedAutoFollowMetadata = updatedState.getMetaData().custom(AutoFollowMetadata.TYPE);
|
||||
assertNotEquals(updatedAutoFollowMetadata, notNullValue());
|
||||
|
||||
AutoFollowMetadata autoFollowMetadata = clusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
|
||||
assertNotEquals(updatedAutoFollowMetadata, autoFollowMetadata);
|
||||
assertThat(updatedAutoFollowMetadata.getPatterns().size(), equalTo(autoFollowMetadata.getPatterns().size()));
|
||||
assertThat(updatedAutoFollowMetadata.getPatterns().get("remote_cluster").isActive(), not(autoFollowPattern.isActive()));
|
||||
|
||||
assertEquals(updatedAutoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getFollowedLeaderIndexUUIDs());
|
||||
assertEquals(updatedAutoFollowMetadata.getHeaders(), autoFollowMetadata.getHeaders());
|
||||
}
|
||||
}
|
||||
|
||||
private static AutoFollowMetadata.AutoFollowPattern randomAutoFollowPattern() {
|
||||
return new AutoFollowMetadata.AutoFollowPattern(randomAlphaOfLength(5),
|
||||
randomSubsetOf(Arrays.asList("test-*", "user-*", "logs-*", "failures-*")),
|
||||
randomFrom("{{leader_index}}", "{{leader_index}}-follower", "test"),
|
||||
randomBoolean(),
|
||||
randomIntBetween(1, 100),
|
||||
randomIntBetween(1, 100),
|
||||
randomIntBetween(1, 100),
|
||||
randomIntBetween(1, 100),
|
||||
new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())),
|
||||
new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())),
|
||||
randomIntBetween(1, 100),
|
||||
new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())),
|
||||
TimeValue.timeValueSeconds(randomIntBetween(30, 600)),
|
||||
TimeValue.timeValueSeconds(randomIntBetween(30, 600)));
|
||||
}
|
||||
}
|
|
@ -32,8 +32,8 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
|
|||
{
|
||||
List<String> existingPatterns = new ArrayList<>();
|
||||
existingPatterns.add("transactions-*");
|
||||
existingAutoFollowPatterns.put("name1",
|
||||
new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null));
|
||||
existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null,
|
||||
null, null, null, null, null, null, null));
|
||||
|
||||
List<String> existingUUIDS = new ArrayList<>();
|
||||
existingUUIDS.add("_val");
|
||||
|
@ -43,8 +43,8 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
|
|||
{
|
||||
List<String> existingPatterns = new ArrayList<>();
|
||||
existingPatterns.add("logs-*");
|
||||
existingAutoFollowPatterns.put("name2",
|
||||
new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null));
|
||||
existingAutoFollowPatterns.put("name2", new AutoFollowPattern("asia_cluster", existingPatterns, null, true, null, null, null,
|
||||
null, null, null, null, null, null, null));
|
||||
|
||||
List<String> existingUUIDS = new ArrayList<>();
|
||||
existingUUIDS.add("_val");
|
||||
|
@ -76,8 +76,8 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
|
|||
{
|
||||
List<String> existingPatterns = new ArrayList<>();
|
||||
existingPatterns.add("transactions-*");
|
||||
existingAutoFollowPatterns.put("name1",
|
||||
new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null));
|
||||
existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null,
|
||||
null, null, null, null, null, null, null));
|
||||
existingHeaders.put("key", Collections.singletonMap("key", "val"));
|
||||
}
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster"))
|
||||
|
|
|
@ -24,9 +24,9 @@ public class TransportGetAutoFollowPatternActionTests extends ESTestCase {
|
|||
public void testGetAutoFollowPattern() {
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("name1", new AutoFollowPattern(
|
||||
"test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null, null, null));
|
||||
"test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null));
|
||||
patterns.put("name2", new AutoFollowPattern(
|
||||
"test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null, null, null));
|
||||
"test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null));
|
||||
MetaData metaData = MetaData.builder()
|
||||
.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))
|
||||
.build();
|
||||
|
|
|
@ -103,7 +103,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
|
|||
List<String> existingPatterns = new ArrayList<>();
|
||||
existingPatterns.add("transactions-*");
|
||||
existingAutoFollowPatterns.put("name1",
|
||||
new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null));
|
||||
new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, null, null, null, null, null, null, null));
|
||||
Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
|
||||
List<String> existingUUIDS = new ArrayList<>();
|
||||
existingUUIDS.add("_val");
|
||||
|
|
|
@ -16,7 +16,7 @@ import org.elasticsearch.common.regex.Regex;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
|
@ -174,8 +174,9 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
|||
return Objects.hash(patterns, followedLeaderIndexUUIDs, headers);
|
||||
}
|
||||
|
||||
public static class AutoFollowPattern extends ImmutableFollowParameters implements ToXContentObject {
|
||||
public static class AutoFollowPattern extends ImmutableFollowParameters implements ToXContentFragment {
|
||||
|
||||
public static final ParseField ACTIVE = new ParseField("active");
|
||||
public static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
|
||||
public static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_index_patterns");
|
||||
public static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_index_pattern");
|
||||
|
@ -183,24 +184,28 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
|||
@SuppressWarnings("unchecked")
|
||||
private static final ConstructingObjectParser<AutoFollowPattern, Void> PARSER =
|
||||
new ConstructingObjectParser<>("auto_follow_pattern",
|
||||
args -> new AutoFollowPattern((String) args[0], (List<String>) args[1], (String) args[2], (Integer) args[3],
|
||||
(Integer) args[4], (Integer) args[5], (Integer) args[6], (ByteSizeValue) args[7], (ByteSizeValue) args[8],
|
||||
(Integer) args[9], (ByteSizeValue) args[10], (TimeValue) args[11], (TimeValue) args[12]));
|
||||
args -> new AutoFollowPattern((String) args[0], (List<String>) args[1], (String) args[2],
|
||||
args[3] == null || (boolean) args[3], (Integer) args[4], (Integer) args[5], (Integer) args[6], (Integer) args[7],
|
||||
(ByteSizeValue) args[8], (ByteSizeValue) args[9], (Integer) args[10], (ByteSizeValue) args[11], (TimeValue) args[12],
|
||||
(TimeValue) args[13]));
|
||||
|
||||
static {
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD);
|
||||
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD);
|
||||
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD);
|
||||
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACTIVE);
|
||||
ImmutableFollowParameters.initParser(PARSER);
|
||||
}
|
||||
|
||||
private final String remoteCluster;
|
||||
private final List<String> leaderIndexPatterns;
|
||||
private final String followIndexPattern;
|
||||
private final boolean active;
|
||||
|
||||
public AutoFollowPattern(String remoteCluster,
|
||||
List<String> leaderIndexPatterns,
|
||||
String followIndexPattern,
|
||||
boolean active,
|
||||
Integer maxReadRequestOperationCount,
|
||||
Integer maxWriteRequestOperationCount,
|
||||
Integer maxOutstandingReadRequests,
|
||||
|
@ -216,6 +221,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
|||
this.remoteCluster = remoteCluster;
|
||||
this.leaderIndexPatterns = leaderIndexPatterns;
|
||||
this.followIndexPattern = followIndexPattern;
|
||||
this.active = active;
|
||||
}
|
||||
|
||||
public static AutoFollowPattern readFrom(StreamInput in) throws IOException {
|
||||
|
@ -228,6 +234,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
|||
this.remoteCluster = remoteCluster;
|
||||
this.leaderIndexPatterns = leaderIndexPatterns;
|
||||
this.followIndexPattern = followIndexPattern;
|
||||
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
|
||||
this.active = in.readBoolean();
|
||||
} else {
|
||||
this.active = true;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean match(String indexName) {
|
||||
|
@ -250,16 +261,24 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
|||
return followIndexPattern;
|
||||
}
|
||||
|
||||
public boolean isActive() {
|
||||
return active;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(remoteCluster);
|
||||
out.writeStringCollection(leaderIndexPatterns);
|
||||
out.writeOptionalString(followIndexPattern);
|
||||
super.writeTo(out);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
|
||||
out.writeBoolean(active);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(ACTIVE.getPreferredName(), active);
|
||||
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
|
||||
builder.array(LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns.toArray(new String[0]));
|
||||
if (followIndexPattern != null) {
|
||||
|
@ -269,25 +288,21 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
|||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (!super.equals(o)) return false;
|
||||
AutoFollowPattern pattern = (AutoFollowPattern) o;
|
||||
return remoteCluster.equals(pattern.remoteCluster) &&
|
||||
return active == pattern.active &&
|
||||
remoteCluster.equals(pattern.remoteCluster) &&
|
||||
leaderIndexPatterns.equals(pattern.leaderIndexPatterns) &&
|
||||
followIndexPattern.equals(pattern.followIndexPattern);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern);
|
||||
return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern, active);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.ccr.action;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
public class ActivateAutoFollowPatternAction extends ActionType<AcknowledgedResponse> {
|
||||
|
||||
public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/activate";
|
||||
public static final ActivateAutoFollowPatternAction INSTANCE = new ActivateAutoFollowPatternAction();
|
||||
|
||||
private ActivateAutoFollowPatternAction() {
|
||||
super(NAME, AcknowledgedResponse::new);
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<Request> {
|
||||
|
||||
private final String name;
|
||||
private final boolean active;
|
||||
|
||||
public Request(final String name, final boolean active) {
|
||||
this.name = name;
|
||||
this.active = active;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (name == null) {
|
||||
validationException = addValidationError("[name] is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public boolean isActive() {
|
||||
return active;
|
||||
}
|
||||
|
||||
public Request(final StreamInput in) throws IOException {
|
||||
super(in);
|
||||
name = in.readString();
|
||||
active = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(name);
|
||||
out.writeBoolean(active);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Request request = (Request) o;
|
||||
return active == request.active
|
||||
&& Objects.equals(name, request.name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name, active);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"ccr.pause_auto_follow_pattern":{
|
||||
"documentation":{
|
||||
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-pause-auto-follow-pattern.html"
|
||||
},
|
||||
"stability":"stable",
|
||||
"url":{
|
||||
"paths":[
|
||||
{
|
||||
"path":"/_ccr/auto_follow/{name}/pause",
|
||||
"methods":[
|
||||
"POST"
|
||||
],
|
||||
"parts":{
|
||||
"name":{
|
||||
"type":"string",
|
||||
"description":"The name of the auto follow pattern that should pause discovering new indices to follow."
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"ccr.resume_auto_follow_pattern":{
|
||||
"documentation":{
|
||||
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-resume-auto-follow-pattern.html"
|
||||
},
|
||||
"stability":"stable",
|
||||
"url":{
|
||||
"paths":[
|
||||
{
|
||||
"path":"/_ccr/auto_follow/{name}/resume",
|
||||
"methods":[
|
||||
"POST"
|
||||
],
|
||||
"parts":{
|
||||
"name":{
|
||||
"type":"string",
|
||||
"description":"The name of the auto follow pattern to resume discovering new indices to follow."
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue