From a721d09c81fc0881f3796550d6c35d459418b883 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 6 Sep 2018 08:01:58 +0200 Subject: [PATCH] [CCR] Added auto follow patterns feature (#33118) Auto Following Patterns is a cross cluster replication feature that keeps track whether in the leader cluster indices are being created with names that match with a specific pattern and if so automatically let the follower cluster follow these newly created indices. This change adds an `AutoFollowCoordinator` component that is only active on the elected master node. Periodically this component checks the the cluster state of remote clusters if there new leader indices that match with configured auto follow patterns that have been defined in `AutoFollowMetadata` custom metadata. This change also adds two new APIs to manage auto follow patterns. A put auto follow pattern api: ``` PUT /_ccr/_autofollow/{{remote_cluster}} { "leader_index_pattern": ["logs-*", ...], "follow_index_pattern": "{{leader_index}}-copy", "max_concurrent_read_batches": 2 ... // other optional parameters } ``` and delete auto follow pattern api: ``` DELETE /_ccr/_autofollow/{{remote_cluster_alias}} ``` The auto follow patterns are directly tied to the remote cluster aliases configured in the follow cluster. Relates to #33007 Co-authored-by: Jason Tedor jason@tedor.me --- .../org/elasticsearch/common/regex/Regex.java | 9 + .../xpack/ccr/FollowIndexIT.java | 37 ++ .../java/org/elasticsearch/xpack/ccr/Ccr.java | 33 +- .../elasticsearch/xpack/ccr/CcrSettings.java | 10 +- .../ccr/action/AutoFollowCoordinator.java | 306 +++++++++++++++ .../action/DeleteAutoFollowPatternAction.java | 81 ++++ .../action/PutAutoFollowPatternAction.java | 284 ++++++++++++++ ...ransportDeleteAutoFollowPatternAction.java | 102 +++++ .../TransportPutAutoFollowPatternAction.java | 173 +++++++++ .../RestDeleteAutoFollowPatternAction.java | 39 ++ .../rest/RestPutAutoFollowPatternAction.java | 44 +++ .../action/AutoFollowCoordinatorTests.java | 296 +++++++++++++++ .../xpack/ccr/action/AutoFollowTests.java | 189 ++++++++++ .../DeleteAutoFollowPatternRequestTests.java | 23 ++ .../PutAutoFollowPatternRequestTests.java | 63 ++++ ...ortDeleteAutoFollowPatternActionTests.java | 98 +++++ ...nsportPutAutoFollowPatternActionTests.java | 133 +++++++ .../xpack/core/XPackClientPlugin.java | 5 +- .../xpack/core/ccr/AutoFollowMetadata.java | 357 ++++++++++++++++++ .../core/ccr/AutoFollowMetadataTests.java | 53 +++ .../api/ccr.delete_auto_follow_pattern.json | 17 + .../api/ccr.put_auto_follow_pattern.json | 21 ++ .../rest-api-spec/test/ccr/auto_follow.yml | 13 + 23 files changed, 2380 insertions(+), 6 deletions(-) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestDeleteAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternRequestTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadataTests.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.delete_auto_follow_pattern.json create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.put_auto_follow_pattern.json create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml diff --git a/server/src/main/java/org/elasticsearch/common/regex/Regex.java b/server/src/main/java/org/elasticsearch/common/regex/Regex.java index bcf2dfba3ef..1f4e4651412 100644 --- a/server/src/main/java/org/elasticsearch/common/regex/Regex.java +++ b/server/src/main/java/org/elasticsearch/common/regex/Regex.java @@ -138,6 +138,15 @@ public class Regex { return false; } + /** + * Similar to {@link #simpleMatch(String[], String)}, but accepts a list of strings instead of an array of strings for the patterns to + * match. + */ + public static boolean simpleMatch(final List patterns, final String str) { + // #simpleMatch(String[], String) is likely to be inlined into this method + return patterns != null && simpleMatch(patterns.toArray(Strings.EMPTY_ARRAY), str); + } + public static boolean simpleMatch(String[] patterns, String[] types) { if (patterns != null && types != null) { for (String type : types) { diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index c14e13e7bb0..17a6db286f2 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -78,6 +78,34 @@ public class FollowIndexIT extends ESRestTestCase { } } + public void testAutoFollowPatterns() throws Exception { + assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster); + + Request request = new Request("PUT", "/_ccr/_auto_follow/leader_cluster"); + request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}"); + assertOK(client().performRequest(request)); + + try (RestClient leaderClient = buildLeaderClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + request = new Request("PUT", "/logs-20190101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(leaderClient.performRequest(request)); + + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true"); + } + } + + assertBusy(() -> { + ensureYellow("logs-20190101"); + verifyDocuments("logs-20190101", 5); + }); + } + private static void index(RestClient client, String index, String id, Object... fields) throws IOException { XContentBuilder document = jsonBuilder().startObject(); for (int i = 0; i < fields.length; i += 2) { @@ -135,6 +163,15 @@ public class FollowIndexIT extends ESRestTestCase { return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); } + private static void ensureYellow(String index) throws IOException { + Request request = new Request("GET", "/_cluster/health/" + index); + request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_no_relocating_shards", "true"); + request.addParameter("timeout", "70s"); + request.addParameter("level", "shards"); + client().performRequest(request); + } + private RestClient buildLeaderClient() throws IOException { assert runningAgainstLeaderCluster == false; String leaderUrl = System.getProperty("tests.leader_host"); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b00883f5c2a..cd0561b1c0c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -39,21 +39,28 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; import org.elasticsearch.xpack.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction; +import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.FollowIndexAction; +import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; +import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction; +import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction; +import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction; import org.elasticsearch.xpack.core.XPackPlugin; @@ -113,7 +120,14 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E final Environment environment, final NodeEnvironment nodeEnvironment, final NamedWriteableRegistry namedWriteableRegistry) { - return Collections.singleton(ccrLicenseChecker); + if (enabled == false) { + return emptyList(); + } + + return Arrays.asList( + ccrLicenseChecker, + new AutoFollowCoordinator(settings, client, threadPool, clusterService) + ); } @Override @@ -128,12 +142,18 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E } return Arrays.asList( + // internal actions new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class), + new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), + // stats action new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), + // follow actions new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class), new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class), - new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), - new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class)); + new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class), + // auto-follow actions + new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class), + new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class)); } public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, @@ -141,10 +161,15 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { return Arrays.asList( + // stats API new RestCcrStatsAction(settings, restController), + // follow APIs new RestCreateAndFollowIndexAction(settings, restController), new RestFollowIndexAction(settings, restController), - new RestUnfollowIndexAction(settings, restController)); + new RestUnfollowIndexAction(settings, restController), + // auto-follow APIs + new RestDeleteAutoFollowPatternAction(settings, restController), + new RestPutAutoFollowPatternAction(settings, restController)); } public List getNamedWriteables() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 6960766bad0..a942990ea5a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.unit.TimeValue; import java.util.Arrays; import java.util.List; @@ -32,6 +33,12 @@ public final class CcrSettings { public static final Setting CCR_FOLLOWING_INDEX_SETTING = Setting.boolSetting("index.xpack.ccr.following_index", false, Setting.Property.IndexScope); + /** + * Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow + */ + public static final Setting CCR_AUTO_FOLLOW_POLL_INTERVAL = + Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope); + /** * The settings defined by CCR. * @@ -40,7 +47,8 @@ public final class CcrSettings { static List> getSettings() { return Arrays.asList( CCR_ENABLED_SETTING, - CCR_FOLLOWING_INDEX_SETTING); + CCR_FOLLOWING_INDEX_SETTING, + CCR_AUTO_FOLLOW_POLL_INTERVAL); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java new file mode 100644 index 00000000000..234fe32cdd0 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -0,0 +1,306 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.index.Index; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A component that runs only on the elected master node and follows leader indices automatically + * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}. + */ +public class AutoFollowCoordinator implements ClusterStateApplier { + + private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); + + private final Client client; + private final TimeValue pollInterval; + private final ThreadPool threadPool; + private final ClusterService clusterService; + + private volatile boolean localNodeMaster = false; + + public AutoFollowCoordinator(Settings settings, + Client client, + ThreadPool threadPool, + ClusterService clusterService) { + this.client = client; + this.threadPool = threadPool; + this.clusterService = clusterService; + + this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); + clusterService.addStateApplier(this); + } + + private void doAutoFollow() { + if (localNodeMaster == false) { + return; + } + ClusterState followerClusterState = clusterService.state(); + AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + if (autoFollowMetadata == null) { + threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + return; + } + + if (autoFollowMetadata.getPatterns().isEmpty()) { + threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + return; + } + + Consumer handler = e -> { + if (e != null) { + LOGGER.warn("Failure occurred during auto following indices", e); + } + threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + }; + AutoFollower operation = new AutoFollower(client, handler, followerClusterState) { + + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.metaData(true); + leaderClient.admin().cluster().state(request, + ActionListener.wrap( + r -> handler.accept(r.getState(), null), + e -> handler.accept(null, e) + ) + ); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + client.execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest), + ActionListener.wrap(r -> successHandler.run(), failureHandler)); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return updateFunction.apply(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + handler.accept(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + handler.accept(null); + } + }); + } + + }; + operation.autoFollowIndices(); + } + + @Override + public void applyClusterState(ClusterChangedEvent event) { + final boolean beforeLocalMasterNode = localNodeMaster; + localNodeMaster = event.localNodeMaster(); + if (beforeLocalMasterNode == false && localNodeMaster) { + threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + } + } + + abstract static class AutoFollower { + + private final Client client; + private final Consumer handler; + private final ClusterState followerClusterState; + private final AutoFollowMetadata autoFollowMetadata; + + private final CountDown autoFollowPatternsCountDown; + private final AtomicReference autoFollowPatternsErrorHolder = new AtomicReference<>(); + + AutoFollower(Client client, Consumer handler, ClusterState followerClusterState) { + this.client = client; + this.handler = handler; + this.followerClusterState = followerClusterState; + this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size()); + } + + void autoFollowIndices() { + for (Map.Entry entry : autoFollowMetadata.getPatterns().entrySet()) { + String clusterAlias = entry.getKey(); + AutoFollowPattern autoFollowPattern = entry.getValue(); + Client leaderClient = clusterAlias.equals("_local_") ? client : client.getRemoteClusterClient(clusterAlias); + List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); + + getLeaderClusterState(leaderClient, (leaderClusterState, e) -> { + if (leaderClusterState != null) { + assert e == null; + handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState); + } else { + finalise(e); + } + }); + } + } + + private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollowPattern, + List followedIndexUUIDs, ClusterState leaderClusterState) { + final List leaderIndicesToFollow = + getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs); + if (leaderIndicesToFollow.isEmpty()) { + finalise(null); + } else { + final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); + final AtomicReference leaderIndicesErrorHolder = new AtomicReference<>(); + for (Index indexToFollow : leaderIndicesToFollow) { + final String leaderIndexName = indexToFollow.getName(); + final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName); + + String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName : + clusterAlias + ":" + leaderIndexName; + FollowIndexAction.Request followRequest = + new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName, + autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(), + autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(), + autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getRetryTimeout(), + autoFollowPattern.getIdleShardRetryDelay()); + + // Execute if the create and follow api call succeeds: + Runnable successHandler = () -> { + LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName); + + // This function updates the auto follow metadata in the cluster to record that the leader index has been followed: + // (so that we do not try to follow it in subsequent auto follow runs) + Function function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow); + // The coordinator always runs on the elected master node, so we can update cluster state here: + updateAutoFollowMetadata(function, updateError -> { + if (updateError != null) { + LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError); + if (leaderIndicesErrorHolder.compareAndSet(null, updateError) == false) { + leaderIndicesErrorHolder.get().addSuppressed(updateError); + } + } else { + LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName); + } + if (leaderIndicesCountDown.countDown()) { + finalise(leaderIndicesErrorHolder.get()); + } + }); + }; + // Execute if the create and follow apu call fails: + Consumer failureHandler = followError -> { + assert followError != null; + LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError); + if (leaderIndicesCountDown.countDown()) { + finalise(followError); + } + }; + createAndFollow(followRequest, successHandler, failureHandler); + } + } + } + + private void finalise(Exception failure) { + if (autoFollowPatternsErrorHolder.compareAndSet(null, failure) == false) { + autoFollowPatternsErrorHolder.get().addSuppressed(failure); + } + + if (autoFollowPatternsCountDown.countDown()) { + handler.accept(autoFollowPatternsErrorHolder.get()); + } + } + + static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, + ClusterState leaderClusterState, + ClusterState followerClusterState, + List followedIndexUUIDs) { + List leaderIndicesToFollow = new ArrayList<>(); + for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) { + if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { + if (followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) { + // TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData + // has a leader index uuid custom metadata entry that matches with uuid of leaderIndexMetaData variable + // If so then handle it differently: not follow it, but just add an entry to + // AutoFollowMetadata#followedLeaderIndexUUIDs + leaderIndicesToFollow.add(leaderIndexMetaData.getIndex()); + } + } + } + return leaderIndicesToFollow; + } + + static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String leaderIndexName) { + if (autoFollowPattern.getFollowIndexPattern() != null) { + return autoFollowPattern.getFollowIndexPattern().replace("{{leader_index}}", leaderIndexName); + } else { + return leaderIndexName; + } + } + + static Function recordLeaderIndexAsFollowFunction(String clusterAlias, Index indexToFollow) { + return currentState -> { + AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); + + Map> newFollowedIndexUUIDS = + new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + newFollowedIndexUUIDS.get(clusterAlias).add(indexToFollow.getUUID()); + + ClusterState.Builder newState = ClusterState.builder(currentState); + AutoFollowMetadata newAutoFollowMetadata = + new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), newFollowedIndexUUIDS); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata) + .build()); + return newState.build(); + }; + } + + // abstract methods to make unit testing possible: + + abstract void getLeaderClusterState(Client leaderClient, + BiConsumer handler); + + abstract void createAndFollow(FollowIndexAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler); + + abstract void updateAutoFollowMetadata(Function updateFunction, + Consumer handler); + + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternAction.java new file mode 100644 index 00000000000..82e142202d2 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternAction.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class DeleteAutoFollowPatternAction extends Action { + + public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/delete"; + public static final DeleteAutoFollowPatternAction INSTANCE = new DeleteAutoFollowPatternAction(); + + private DeleteAutoFollowPatternAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends AcknowledgedRequest { + + private String leaderClusterAlias; + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (leaderClusterAlias == null) { + validationException = addValidationError("leaderClusterAlias is missing", validationException); + } + return validationException; + } + + public String getLeaderClusterAlias() { + return leaderClusterAlias; + } + + public void setLeaderClusterAlias(String leaderClusterAlias) { + this.leaderClusterAlias = leaderClusterAlias; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + leaderClusterAlias = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(leaderClusterAlias); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(leaderClusterAlias, request.leaderClusterAlias); + } + + @Override + public int hashCode() { + return Objects.hash(leaderClusterAlias); + } + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java new file mode 100644 index 00000000000..a01fd8e3bc2 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java @@ -0,0 +1,284 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class PutAutoFollowPatternAction extends Action { + + public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/put"; + public static final PutAutoFollowPatternAction INSTANCE = new PutAutoFollowPatternAction(); + + private PutAutoFollowPatternAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends AcknowledgedRequest implements ToXContentObject { + + static final ParseField LEADER_CLUSTER_ALIAS_FIELD = new ParseField("leader_cluster_alias"); + static final ParseField LEADER_INDEX_PATTERNS_FIELD = new ParseField("leader_index_patterns"); + static final ParseField FOLLOW_INDEX_NAME_PATTERN_FIELD = new ParseField("follow_index_name_pattern"); + + private static final ObjectParser PARSER = new ObjectParser<>("put_auto_follow_pattern_request", Request::new); + + static { + PARSER.declareString(Request::setLeaderClusterAlias, LEADER_CLUSTER_ALIAS_FIELD); + PARSER.declareStringArray(Request::setLeaderIndexPatterns, LEADER_INDEX_PATTERNS_FIELD); + PARSER.declareString(Request::setFollowIndexNamePattern, FOLLOW_INDEX_NAME_PATTERN_FIELD); + PARSER.declareInt(Request::setMaxBatchOperationCount, AutoFollowPattern.MAX_BATCH_OPERATION_COUNT); + PARSER.declareInt(Request::setMaxConcurrentReadBatches, AutoFollowPattern.MAX_CONCURRENT_READ_BATCHES); + PARSER.declareLong(Request::setMaxOperationSizeInBytes, AutoFollowPattern.MAX_BATCH_SIZE_IN_BYTES); + PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES); + PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE); + PARSER.declareField(Request::setRetryTimeout, + (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.RETRY_TIMEOUT.getPreferredName()), + ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING); + PARSER.declareField(Request::setIdleShardRetryDelay, + (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName()), + ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + } + + public static Request fromXContent(XContentParser parser, String remoteClusterAlias) throws IOException { + Request request = PARSER.parse(parser, null); + if (remoteClusterAlias != null) { + if (request.leaderClusterAlias == null) { + request.leaderClusterAlias = remoteClusterAlias; + } else { + if (request.leaderClusterAlias.equals(remoteClusterAlias) == false) { + throw new IllegalArgumentException("provided leaderClusterAlias is not equal"); + } + } + } + return request; + } + + private String leaderClusterAlias; + private List leaderIndexPatterns; + private String followIndexNamePattern; + + private Integer maxBatchOperationCount; + private Integer maxConcurrentReadBatches; + private Long maxOperationSizeInBytes; + private Integer maxConcurrentWriteBatches; + private Integer maxWriteBufferSize; + private TimeValue retryTimeout; + private TimeValue idleShardRetryDelay; + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (leaderClusterAlias == null) { + validationException = addValidationError("leaderClusterAlias is missing", validationException); + } + if (leaderIndexPatterns == null || leaderIndexPatterns.isEmpty()) { + validationException = addValidationError("leaderIndexPatterns is missing", validationException); + } + return validationException; + } + + public String getLeaderClusterAlias() { + return leaderClusterAlias; + } + + public void setLeaderClusterAlias(String leaderClusterAlias) { + this.leaderClusterAlias = leaderClusterAlias; + } + + public List getLeaderIndexPatterns() { + return leaderIndexPatterns; + } + + public void setLeaderIndexPatterns(List leaderIndexPatterns) { + this.leaderIndexPatterns = leaderIndexPatterns; + } + + public String getFollowIndexNamePattern() { + return followIndexNamePattern; + } + + public void setFollowIndexNamePattern(String followIndexNamePattern) { + this.followIndexNamePattern = followIndexNamePattern; + } + + public Integer getMaxBatchOperationCount() { + return maxBatchOperationCount; + } + + public void setMaxBatchOperationCount(Integer maxBatchOperationCount) { + this.maxBatchOperationCount = maxBatchOperationCount; + } + + public Integer getMaxConcurrentReadBatches() { + return maxConcurrentReadBatches; + } + + public void setMaxConcurrentReadBatches(Integer maxConcurrentReadBatches) { + this.maxConcurrentReadBatches = maxConcurrentReadBatches; + } + + public Long getMaxOperationSizeInBytes() { + return maxOperationSizeInBytes; + } + + public void setMaxOperationSizeInBytes(Long maxOperationSizeInBytes) { + this.maxOperationSizeInBytes = maxOperationSizeInBytes; + } + + public Integer getMaxConcurrentWriteBatches() { + return maxConcurrentWriteBatches; + } + + public void setMaxConcurrentWriteBatches(Integer maxConcurrentWriteBatches) { + this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + } + + public Integer getMaxWriteBufferSize() { + return maxWriteBufferSize; + } + + public void setMaxWriteBufferSize(Integer maxWriteBufferSize) { + this.maxWriteBufferSize = maxWriteBufferSize; + } + + public TimeValue getRetryTimeout() { + return retryTimeout; + } + + public void setRetryTimeout(TimeValue retryTimeout) { + this.retryTimeout = retryTimeout; + } + + public TimeValue getIdleShardRetryDelay() { + return idleShardRetryDelay; + } + + public void setIdleShardRetryDelay(TimeValue idleShardRetryDelay) { + this.idleShardRetryDelay = idleShardRetryDelay; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + leaderClusterAlias = in.readString(); + leaderIndexPatterns = in.readList(StreamInput::readString); + followIndexNamePattern = in.readOptionalString(); + maxBatchOperationCount = in.readOptionalVInt(); + maxConcurrentReadBatches = in.readOptionalVInt(); + maxOperationSizeInBytes = in.readOptionalLong(); + maxConcurrentWriteBatches = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalVInt(); + retryTimeout = in.readOptionalTimeValue(); + idleShardRetryDelay = in.readOptionalTimeValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(leaderClusterAlias); + out.writeStringList(leaderIndexPatterns); + out.writeOptionalString(followIndexNamePattern); + out.writeOptionalVInt(maxBatchOperationCount); + out.writeOptionalVInt(maxConcurrentReadBatches); + out.writeOptionalLong(maxOperationSizeInBytes); + out.writeOptionalVInt(maxConcurrentWriteBatches); + out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalTimeValue(retryTimeout); + out.writeOptionalTimeValue(idleShardRetryDelay); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias); + builder.field(LEADER_INDEX_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns); + if (followIndexNamePattern != null) { + builder.field(FOLLOW_INDEX_NAME_PATTERN_FIELD.getPreferredName(), followIndexNamePattern); + } + if (maxBatchOperationCount != null) { + builder.field(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); + } + if (maxOperationSizeInBytes != null) { + builder.field(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes); + } + if (maxWriteBufferSize != null) { + builder.field(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + } + if (maxConcurrentReadBatches != null) { + builder.field(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); + } + if (maxConcurrentWriteBatches != null) { + builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); + } + if (retryTimeout != null) { + builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep()); + } + if (idleShardRetryDelay != null) { + builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); + } + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(leaderClusterAlias, request.leaderClusterAlias) && + Objects.equals(leaderIndexPatterns, request.leaderIndexPatterns) && + Objects.equals(followIndexNamePattern, request.followIndexNamePattern) && + Objects.equals(maxBatchOperationCount, request.maxBatchOperationCount) && + Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) && + Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) && + Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && + Objects.equals(retryTimeout, request.retryTimeout) && + Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay); + } + + @Override + public int hashCode() { + return Objects.hash( + leaderClusterAlias, + leaderIndexPatterns, + followIndexNamePattern, + maxBatchOperationCount, + maxConcurrentReadBatches, + maxOperationSizeInBytes, + maxConcurrentWriteBatches, + maxWriteBufferSize, + retryTimeout, + idleShardRetryDelay + ); + } + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java new file mode 100644 index 00000000000..6c1ca81e7c4 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TransportDeleteAutoFollowPatternAction extends + TransportMasterNodeAction { + + @Inject + public TransportDeleteAutoFollowPatternAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, DeleteAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, DeleteAutoFollowPatternAction.Request::new); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected void masterOperation(DeleteAutoFollowPatternAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return innerDelete(request, currentState); + } + }); + } + + static ClusterState innerDelete(DeleteAutoFollowPatternAction.Request request, ClusterState currentState) { + AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); + if (currentAutoFollowMetadata == null) { + throw new ResourceNotFoundException("no auto-follow patterns for cluster alias [{}] found", + request.getLeaderClusterAlias()); + } + Map patterns = currentAutoFollowMetadata.getPatterns(); + AutoFollowPattern autoFollowPatternToRemove = patterns.get(request.getLeaderClusterAlias()); + if (autoFollowPatternToRemove == null) { + throw new ResourceNotFoundException("no auto-follow patterns for cluster alias [{}] found", + request.getLeaderClusterAlias()); + } + + final Map patternsCopy = new HashMap<>(patterns); + final Map> followedLeaderIndexUUIDSCopy = + new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + patternsCopy.remove(request.getLeaderClusterAlias()); + followedLeaderIndexUUIDSCopy.remove(request.getLeaderClusterAlias()); + + AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(patternsCopy, followedLeaderIndexUUIDSCopy); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata) + .build()); + return newState.build(); + } + + @Override + protected ClusterBlockException checkBlock(DeleteAutoFollowPatternAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java new file mode 100644 index 00000000000..3d3e342c0cd --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TransportPutAutoFollowPatternAction extends + TransportMasterNodeAction { + + private final Client client; + + @Inject + public TransportPutAutoFollowPatternAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, Client client, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, PutAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, PutAutoFollowPatternAction.Request::new); + this.client = client; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected void masterOperation(PutAutoFollowPatternAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + final Client leaderClient; + if (request.getLeaderClusterAlias().equals("_local_")) { + leaderClient = client; + } else { + leaderClient = client.getRemoteClusterClient(request.getLeaderClusterAlias()); + } + + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + + leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + final ClusterState leaderClusterState = clusterStateResponse.getState(); + clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return innerPut(request, currentState, leaderClusterState); + } + }); + }, listener::onFailure)); + } + + static ClusterState innerPut(PutAutoFollowPatternAction.Request request, + ClusterState localState, + ClusterState leaderClusterState) { + // auto patterns are always overwritten + // only already followed index uuids are updated + + AutoFollowMetadata currentAutoFollowMetadata = localState.metaData().custom(AutoFollowMetadata.TYPE); + Map> followedLeaderIndices; + Map patterns; + if (currentAutoFollowMetadata != null) { + patterns = new HashMap<>(currentAutoFollowMetadata.getPatterns()); + followedLeaderIndices = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + } else { + patterns = new HashMap<>(); + followedLeaderIndices = new HashMap<>(); + } + + AutoFollowPattern previousPattern = patterns.get(request.getLeaderClusterAlias()); + List followedIndexUUIDs = followedLeaderIndices.get(request.getLeaderClusterAlias()); + if (followedIndexUUIDs == null) { + followedIndexUUIDs = new ArrayList<>(); + followedLeaderIndices.put(request.getLeaderClusterAlias(), followedIndexUUIDs); + } + + // Mark existing leader indices as already auto followed: + if (previousPattern != null) { + markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), leaderClusterState.metaData(), + previousPattern, followedIndexUUIDs); + } else { + markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), leaderClusterState.metaData(), + followedIndexUUIDs); + } + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern( + request.getLeaderIndexPatterns(), + request.getFollowIndexNamePattern(), + request.getMaxBatchOperationCount(), + request.getMaxConcurrentReadBatches(), + request.getMaxOperationSizeInBytes(), + request.getMaxConcurrentWriteBatches(), + request.getMaxWriteBufferSize(), + request.getRetryTimeout(), + request.getIdleShardRetryDelay() + ); + patterns.put(request.getLeaderClusterAlias(), autoFollowPattern); + ClusterState.Builder newState = ClusterState.builder(localState); + newState.metaData(MetaData.builder(localState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, followedLeaderIndices)) + .build()); + return newState.build(); + } + + private static void markExistingIndicesAsAutoFollowedForNewPatterns( + List leaderIndexPatterns, + MetaData leaderMetaData, + AutoFollowPattern previousPattern, + List followedIndexUUIDS) { + + final List newPatterns = leaderIndexPatterns + .stream() + .filter(p -> previousPattern.getLeaderIndexPatterns().contains(p) == false) + .collect(Collectors.toList()); + markExistingIndicesAsAutoFollowed(newPatterns, leaderMetaData, followedIndexUUIDS); + } + + private static void markExistingIndicesAsAutoFollowed( + List patterns, + MetaData leaderMetaData, + List followedIndexUUIDS) { + + for (final IndexMetaData indexMetaData : leaderMetaData) { + if (AutoFollowPattern.match(patterns, indexMetaData.getIndex().getName())) { + followedIndexUUIDS.add(indexMetaData.getIndexUUID()); + } + } + } + + @Override + protected ClusterBlockException checkBlock(PutAutoFollowPatternAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestDeleteAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestDeleteAutoFollowPatternAction.java new file mode 100644 index 00000000000..bd3585c7982 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestDeleteAutoFollowPatternAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.Request; + +import java.io.IOException; + +import static org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.INSTANCE; + +public class RestDeleteAutoFollowPatternAction extends BaseRestHandler { + + public RestDeleteAutoFollowPatternAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.DELETE, "/_ccr/_auto_follow/{leader_cluster_alias}", this); + } + + @Override + public String getName() { + return "ccr_delete_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = new Request(); + request.setLeaderClusterAlias(restRequest.param("leader_cluster_alias")); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutAutoFollowPatternAction.java new file mode 100644 index 00000000000..d92ebb7b0bb --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutAutoFollowPatternAction.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction.Request; + +import java.io.IOException; + +import static org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction.INSTANCE; + +public class RestPutAutoFollowPatternAction extends BaseRestHandler { + + public RestPutAutoFollowPatternAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.PUT, "/_ccr/_auto_follow/{leader_cluster_alias}", this); + } + + @Override + public String getName() { + return "ccr_put_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = createRequest(restRequest); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } + + static Request createRequest(RestRequest restRequest) throws IOException { + try (XContentParser parser = restRequest.contentOrSourceParamParser()) { + return Request.fromXContent(parser, restRequest.param("leader_cluster_alias")); + } + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java new file mode 100644 index 00000000000..dd1376a4d7a --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -0,0 +1,296 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AutoFollowCoordinatorTests extends ESTestCase { + + public void testAutoFollower() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0))) + .build(); + + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + + ClusterState currentState = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + boolean[] invoked = new boolean[]{false}; + Consumer handler = e -> { + invoked[0] = true; + assertThat(e, nullValue()); + }; + AutoFollower autoFollower = new AutoFollower(client, handler, currentState) { + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + handler.accept(leaderState, null); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); + assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + ClusterState resultCs = updateFunction.apply(currentState); + AutoFollowMetadata result = resultCs.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1)); + handler.accept(null); + } + }; + autoFollower.autoFollowIndices(); + assertThat(invoked[0], is(true)); + } + + public void testAutoFollowerClusterStateApiFailure() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + Exception failure = new RuntimeException("failure"); + boolean[] invoked = new boolean[]{false}; + Consumer handler = e -> { + invoked[0] = true; + assertThat(e, sameInstance(failure)); + }; + AutoFollower autoFollower = new AutoFollower(client, handler, followerState) { + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + handler.accept(null, failure); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + fail("should not get here"); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + fail("should not get here"); + } + }; + autoFollower.autoFollowIndices(); + assertThat(invoked[0], is(true)); + } + + public void testAutoFollowerUpdateClusterStateFailure() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0))) + .build(); + + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + Exception failure = new RuntimeException("failure"); + boolean[] invoked = new boolean[]{false}; + Consumer handler = e -> { + invoked[0] = true; + assertThat(e, sameInstance(failure)); + }; + AutoFollower autoFollower = new AutoFollower(client, handler, followerState) { + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + handler.accept(leaderState, null); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); + assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + handler.accept(failure); + } + }; + autoFollower.autoFollowIndices(); + assertThat(invoked[0], is(true)); + } + + public void testAutoFollowerCreateAndFollowApiCallFailure() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0))) + .build(); + + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + Exception failure = new RuntimeException("failure"); + boolean[] invoked = new boolean[]{false}; + Consumer handler = e -> { + invoked[0] = true; + assertThat(e, sameInstance(failure)); + }; + AutoFollower autoFollower = new AutoFollower(client, handler, followerState) { + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + handler.accept(leaderState, null); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); + assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); + failureHandler.accept(failure); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + fail("should not get here"); + } + }; + autoFollower.autoFollowIndices(); + assertThat(invoked[0], is(true)); + } + + public void testGetLeaderIndicesToFollow() { + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null); + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap()))) + .build(); + + MetaData.Builder imdBuilder = MetaData.builder(); + for (int i = 0; i < 5; i++) { + Settings.Builder builder = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, "metrics-" + i); + imdBuilder.put(IndexMetaData.builder("metrics-" + i) + .settings(builder) + .numberOfShards(1) + .numberOfReplicas(0)); + } + imdBuilder.put(IndexMetaData.builder("logs-0") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0)); + + ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) + .metaData(imdBuilder) + .build(); + + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); + result.sort(Comparator.comparing(Index::getName)); + assertThat(result.size(), equalTo(5)); + assertThat(result.get(0).getName(), equalTo("metrics-0")); + assertThat(result.get(1).getName(), equalTo("metrics-1")); + assertThat(result.get(2).getName(), equalTo("metrics-2")); + assertThat(result.get(3).getName(), equalTo("metrics-3")); + assertThat(result.get(4).getName(), equalTo("metrics-4")); + + List followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID()); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs); + result.sort(Comparator.comparing(Index::getName)); + assertThat(result.size(), equalTo(4)); + assertThat(result.get(0).getName(), equalTo("metrics-0")); + assertThat(result.get(1).getName(), equalTo("metrics-1")); + assertThat(result.get(2).getName(), equalTo("metrics-3")); + assertThat(result.get(3).getName(), equalTo("metrics-4")); + } + + public void testGetFollowerIndexName() { + AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, + null, null, null, null, null, null); + assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0")); + + autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-metrics-0", null, null, + null, null, null, null, null); + assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); + + autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null, + null, null, null, null, null, null); + assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java new file mode 100644 index 00000000000..a4808e428fe --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java @@ -0,0 +1,189 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.ccr.LocalStateCcr; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class AutoFollowTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return Collections.singleton(LocalStateCcr.class); + } + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + public void testAutoFollow() throws Exception { + Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + createIndex("logs-201812", leaderIndexSettings, "_doc"); + + // Enabling auto following: + putAutoFollowPatterns("logs-*", "transactions-*"); + + createIndex("metrics-201901", leaderIndexSettings, "_doc"); + + createIndex("logs-201901", leaderIndexSettings, "_doc"); + assertBusy(() -> { + IndicesExistsRequest request = new IndicesExistsRequest("copy-logs-201901"); + assertTrue(client().admin().indices().exists(request).actionGet().isExists()); + }); + createIndex("transactions-201901", leaderIndexSettings, "_doc"); + assertBusy(() -> { + IndicesExistsRequest request = new IndicesExistsRequest("copy-transactions-201901"); + assertTrue(client().admin().indices().exists(request).actionGet().isExists()); + }); + + IndicesExistsRequest request = new IndicesExistsRequest("copy-metrics-201901"); + assertFalse(client().admin().indices().exists(request).actionGet().isExists()); + request = new IndicesExistsRequest("copy-logs-201812"); + assertFalse(client().admin().indices().exists(request).actionGet().isExists()); + } + + public void testAutoFollowManyIndices() throws Exception { + Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + putAutoFollowPatterns("logs-*"); + int numIndices = randomIntBetween(4, 32); + for (int i = 0; i < numIndices; i++) { + createIndex("logs-" + i, leaderIndexSettings, "_doc"); + } + int expectedVal1 = numIndices; + assertBusy(() -> { + MetaData metaData = client().admin().cluster().prepareState().get().getState().metaData(); + int count = (int) Arrays.stream(metaData.getConcreteAllIndices()).filter(s -> s.startsWith("copy-")).count(); + assertThat(count, equalTo(expectedVal1)); + }); + + deleteAutoFollowPatternSetting(); + createIndex("logs-does-not-count", leaderIndexSettings, "_doc"); + + putAutoFollowPatterns("logs-*"); + int i = numIndices; + numIndices = numIndices + randomIntBetween(4, 32); + for (; i < numIndices; i++) { + createIndex("logs-" + i, leaderIndexSettings, "_doc"); + } + int expectedVal2 = numIndices; + assertBusy(() -> { + MetaData metaData = client().admin().cluster().prepareState().get().getState().metaData(); + int count = (int) Arrays.stream(metaData.getConcreteAllIndices()).filter(s -> s.startsWith("copy-")).count(); + assertThat(count, equalTo(expectedVal2)); + }); + } + + public void testAutoFollowParameterAreDelegated() throws Exception { + Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + // Enabling auto following: + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("_local_"); + request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); + // Need to set this, because following an index in the same cluster + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + if (randomBoolean()) { + request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxOperationSizeInBytes(randomNonNegativeLong()); + } + if (randomBoolean()) { + request.setRetryTimeout(TimeValue.timeValueMillis(500)); + } + if (randomBoolean()) { + request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500)); + } + assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); + + createIndex("logs-201901", leaderIndexSettings, "_doc"); + assertBusy(() -> { + PersistentTasksCustomMetaData persistentTasksMetaData = + client().admin().cluster().prepareState().get().getState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(persistentTasksMetaData, notNullValue()); + assertThat(persistentTasksMetaData.tasks().size(), equalTo(1)); + ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams(); + assertThat(shardFollowTask.getLeaderShardId().getIndexName(), equalTo("logs-201901")); + assertThat(shardFollowTask.getFollowShardId().getIndexName(), equalTo("copy-logs-201901")); + if (request.getMaxWriteBufferSize() != null) { + assertThat(shardFollowTask.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize())); + } + if (request.getMaxConcurrentReadBatches() != null) { + assertThat(shardFollowTask.getMaxConcurrentReadBatches(), equalTo(request.getMaxConcurrentReadBatches())); + } + if (request.getMaxConcurrentWriteBatches() != null) { + assertThat(shardFollowTask.getMaxConcurrentWriteBatches(), equalTo(request.getMaxConcurrentWriteBatches())); + } + if (request.getMaxBatchOperationCount() != null) { + assertThat(shardFollowTask.getMaxBatchOperationCount(), equalTo(request.getMaxBatchOperationCount())); + } + if (request.getMaxOperationSizeInBytes() != null) { + assertThat(shardFollowTask.getMaxBatchSizeInBytes(), equalTo(request.getMaxOperationSizeInBytes())); + } + if (request.getRetryTimeout() != null) { + assertThat(shardFollowTask.getRetryTimeout(), equalTo(request.getRetryTimeout())); + } + if (request.getIdleShardRetryDelay() != null) { + assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay())); + } + }); + } + + private void putAutoFollowPatterns(String... patterns) { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("_local_"); + request.setLeaderIndexPatterns(Arrays.asList(patterns)); + // Need to set this, because following an index in the same cluster + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); + } + + private void deleteAutoFollowPatternSetting() { + DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("_local_"); + assertTrue(client().execute(DeleteAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternRequestTests.java new file mode 100644 index 00000000000..0ca1b3d1278 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternRequestTests.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.test.AbstractStreamableTestCase; + +public class DeleteAutoFollowPatternRequestTests extends AbstractStreamableTestCase { + + @Override + protected DeleteAutoFollowPatternAction.Request createBlankInstance() { + return new DeleteAutoFollowPatternAction.Request(); + } + + @Override + protected DeleteAutoFollowPatternAction.Request createTestInstance() { + DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias(randomAlphaOfLength(4)); + return request; + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java new file mode 100644 index 00000000000..27760578db9 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; + +import java.io.IOException; +import java.util.Arrays; + +public class PutAutoFollowPatternRequestTests extends AbstractStreamableXContentTestCase { + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected PutAutoFollowPatternAction.Request doParseInstance(XContentParser parser) throws IOException { + return PutAutoFollowPatternAction.Request.fromXContent(parser, null); + } + + @Override + protected PutAutoFollowPatternAction.Request createBlankInstance() { + return new PutAutoFollowPatternAction.Request(); + } + + @Override + protected PutAutoFollowPatternAction.Request createTestInstance() { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias(randomAlphaOfLength(4)); + request.setLeaderIndexPatterns(Arrays.asList(generateRandomStringArray(4, 4, false))); + if (randomBoolean()) { + request.setFollowIndexNamePattern(randomAlphaOfLength(4)); + } + if (randomBoolean()) { + request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500)); + } + if (randomBoolean()) { + request.setRetryTimeout(TimeValue.timeValueMillis(500)); + } + if (randomBoolean()) { + request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxOperationSizeInBytes(randomNonNegativeLong()); + } + if (randomBoolean()) { + request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE)); + } + return request; + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java new file mode 100644 index 00000000000..03065ea8d38 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.Request; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { + + public void testInnerDelete() { + Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); + Map existingAutoFollowPatterns = new HashMap<>(); + { + List existingPatterns = new ArrayList<>(); + existingPatterns.add("transactions-*"); + existingAutoFollowPatterns.put("eu_cluster", + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + + List existingUUIDS = new ArrayList<>(); + existingUUIDS.add("_val"); + existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS); + } + { + List existingPatterns = new ArrayList<>(); + existingPatterns.add("logs-*"); + existingAutoFollowPatterns.put("asia_cluster", + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + + List existingUUIDS = new ArrayList<>(); + existingUUIDS.add("_val"); + existingAlreadyFollowedIndexUUIDS.put("asia_cluster", existingUUIDS); + } + ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS))) + .build(); + + Request request = new Request(); + request.setLeaderClusterAlias("eu_cluster"); + AutoFollowMetadata result = TransportDeleteAutoFollowPatternAction.innerDelete(request, clusterState) + .getMetaData() + .custom(AutoFollowMetadata.TYPE); + assertThat(result.getPatterns().size(), equalTo(1)); + assertThat(result.getPatterns().get("asia_cluster"), notNullValue()); + assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(result.getFollowedLeaderIndexUUIDs().get("asia_cluster"), notNullValue()); + } + + public void testInnerDeleteDoesNotExist() { + Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); + Map existingAutoFollowPatterns = new HashMap<>(); + { + List existingPatterns = new ArrayList<>(); + existingPatterns.add("transactions-*"); + existingAutoFollowPatterns.put("eu_cluster", + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + } + ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS))) + .build(); + + Request request = new Request(); + request.setLeaderClusterAlias("asia_cluster"); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportDeleteAutoFollowPatternAction.innerDelete(request, clusterState)); + assertThat(e.getMessage(), equalTo("no auto-follow patterns for cluster alias [asia_cluster] found")); + } + + public void testInnerDeleteNoAutoFollowMetadata() { + ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder()) + .build(); + + Request request = new Request(); + request.setLeaderClusterAlias("asia_cluster"); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportDeleteAutoFollowPatternAction.innerDelete(request, clusterState)); + assertThat(e.getMessage(), equalTo("no auto-follow patterns for cluster alias [asia_cluster] found")); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java new file mode 100644 index 00000000000..d894eda0b11 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class TransportPutAutoFollowPatternActionTests extends ESTestCase { + + public void testInnerPut() { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("eu_cluster"); + request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); + + ClusterState localState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder()) + .build(); + + ClusterState remoteState = ClusterState.builder(new ClusterName("eu_cluster")) + .metaData(MetaData.builder()) + .build(); + + ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState); + AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata, notNullValue()); + assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(0), equalTo("logs-*")); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(0)); + } + + public void testInnerPut_existingLeaderIndices() { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("eu_cluster"); + request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); + + ClusterState localState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder()) + .build(); + + int numLeaderIndices = randomIntBetween(1, 8); + int numMatchingLeaderIndices = randomIntBetween(1, 8); + MetaData.Builder mdBuilder = MetaData.builder(); + for (int i = 0; i < numLeaderIndices; i++) { + mdBuilder.put(IndexMetaData.builder("transactions-" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0)); + } + for (int i = 0; i < numMatchingLeaderIndices; i++) { + mdBuilder.put(IndexMetaData.builder("logs-" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0)); + } + + ClusterState remoteState = ClusterState.builder(new ClusterName("eu_cluster")) + .metaData(mdBuilder) + .build(); + + ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState); + AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata, notNullValue()); + assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(0), equalTo("logs-*")); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(numMatchingLeaderIndices)); + } + + public void testInnerPut_existingLeaderIndicesAndAutoFollowMetadata() { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("eu_cluster"); + request.setLeaderIndexPatterns(Arrays.asList("logs-*", "transactions-*")); + + Map existingAutoFollowPatterns = new HashMap<>(); + List existingPatterns = new ArrayList<>(); + existingPatterns.add("transactions-*"); + existingAutoFollowPatterns.put("eu_cluster", + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); + List existingUUIDS = new ArrayList<>(); + existingUUIDS.add("_val"); + existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS); + ClusterState localState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS))) + .build(); + + int numLeaderIndices = randomIntBetween(1, 8); + MetaData.Builder mdBuilder = MetaData.builder(); + for (int i = 0; i < numLeaderIndices; i++) { + mdBuilder.put(IndexMetaData.builder("logs-" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0)); + } + + ClusterState remoteState = ClusterState.builder(new ClusterName("eu_cluster")) + .metaData(mdBuilder) + .build(); + + ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState); + AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata, notNullValue()); + assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().size(), equalTo(2)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(0), equalTo("logs-*")); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(1), equalTo("transactions-*")); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(numLeaderIndices + 1)); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 2d3707e98cf..03820b1f40b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackUsageAction; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction; import org.elasticsearch.xpack.core.graph.GraphFeatureSetUsage; import org.elasticsearch.xpack.core.graph.action.GraphExploreAction; @@ -366,7 +367,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ROLLUP, RollupFeatureSetUsage::new), new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new), new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new), - new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new) + new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new), + // ccr + new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new) ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java new file mode 100644 index 00000000000..244a5d441d9 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -0,0 +1,357 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ccr; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.security.xcontent.XContentUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Custom metadata that contains auto follow patterns and what leader indices an auto follow pattern has already followed. + */ +public class AutoFollowMetadata extends AbstractNamedDiffable implements XPackPlugin.XPackMetaDataCustom { + + public static final String TYPE = "ccr_auto_follow"; + + private static final ParseField PATTERNS_FIELD = new ParseField("patterns"); + private static final ParseField FOLLOWED_LEADER_INDICES_FIELD = new ParseField("followed_leader_indices"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow", + args -> new AutoFollowMetadata((Map) args[0], (Map>) args[1])); + + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map patterns = new HashMap<>(); + String fieldName = null; + for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = p.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + patterns.put(fieldName, AutoFollowPattern.PARSER.parse(p, c)); + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + } + return patterns; + }, PATTERNS_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map> alreadyFollowedIndexUUIDS = new HashMap<>(); + String fieldName = null; + for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = p.currentName(); + } else if (token == XContentParser.Token.START_ARRAY) { + alreadyFollowedIndexUUIDS.put(fieldName, Arrays.asList(XContentUtils.readStringArray(p, false))); + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + } + return alreadyFollowedIndexUUIDS; + }, FOLLOWED_LEADER_INDICES_FIELD); + } + + public static AutoFollowMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final Map patterns; + private final Map> followedLeaderIndexUUIDs; + + public AutoFollowMetadata(Map patterns, Map> followedLeaderIndexUUIDs) { + this.patterns = patterns; + this.followedLeaderIndexUUIDs = followedLeaderIndexUUIDs; + } + + public AutoFollowMetadata(StreamInput in) throws IOException { + patterns = in.readMap(StreamInput::readString, AutoFollowPattern::new); + followedLeaderIndexUUIDs = in.readMapOfLists(StreamInput::readString, StreamInput::readString); + } + + public Map getPatterns() { + return patterns; + } + + public Map> getFollowedLeaderIndexUUIDs() { + return followedLeaderIndexUUIDs; + } + + @Override + public EnumSet context() { + // TODO: When a snapshot is restored do we want to restore this? + // (Otherwise we would start following indices automatically immediately) + return MetaData.ALL_CONTEXTS; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_6_5_0.minimumCompatibilityVersion(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(patterns, StreamOutput::writeString, (out1, value) -> value.writeTo(out1)); + out.writeMapOfLists(followedLeaderIndexUUIDs, StreamOutput::writeString, StreamOutput::writeString); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(PATTERNS_FIELD.getPreferredName()); + for (Map.Entry entry : patterns.entrySet()) { + builder.startObject(entry.getKey()); + builder.value(entry.getValue()); + builder.endObject(); + } + builder.endObject(); + + builder.startObject(FOLLOWED_LEADER_INDICES_FIELD.getPreferredName()); + for (Map.Entry> entry : followedLeaderIndexUUIDs.entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + return builder; + } + + @Override + public boolean isFragment() { + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AutoFollowMetadata that = (AutoFollowMetadata) o; + return Objects.equals(patterns, that.patterns); + } + + @Override + public int hashCode() { + return Objects.hash(patterns); + } + + public static class AutoFollowPattern implements Writeable, ToXContentObject { + + private static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_patterns"); + private static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_pattern"); + public static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count"); + public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); + public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes"); + public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); + public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout"); + public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("auto_follow_pattern", + args -> new AutoFollowPattern((List) args[0], (String) args[1], (Integer) args[2], (Integer) args[3], + (Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8])); + + static { + PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_OPERATION_COUNT); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_READ_BATCHES); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_SIZE_IN_BYTES); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()), + RETRY_TIMEOUT, ObjectParser.ValueType.STRING); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), + IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + } + + private final List leaderIndexPatterns; + private final String followIndexPattern; + private final Integer maxBatchOperationCount; + private final Integer maxConcurrentReadBatches; + private final Long maxOperationSizeInBytes; + private final Integer maxConcurrentWriteBatches; + private final Integer maxWriteBufferSize; + private final TimeValue retryTimeout; + private final TimeValue idleShardRetryDelay; + + public AutoFollowPattern(List leaderIndexPatterns, String followIndexPattern, Integer maxBatchOperationCount, + Integer maxConcurrentReadBatches, Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches, + Integer maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay) { + this.leaderIndexPatterns = leaderIndexPatterns; + this.followIndexPattern = followIndexPattern; + this.maxBatchOperationCount = maxBatchOperationCount; + this.maxConcurrentReadBatches = maxConcurrentReadBatches; + this.maxOperationSizeInBytes = maxOperationSizeInBytes; + this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + this.maxWriteBufferSize = maxWriteBufferSize; + this.retryTimeout = retryTimeout; + this.idleShardRetryDelay = idleShardRetryDelay; + } + + AutoFollowPattern(StreamInput in) throws IOException { + leaderIndexPatterns = in.readList(StreamInput::readString); + followIndexPattern = in.readOptionalString(); + maxBatchOperationCount = in.readOptionalVInt(); + maxConcurrentReadBatches = in.readOptionalVInt(); + maxOperationSizeInBytes = in.readOptionalLong(); + maxConcurrentWriteBatches = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalVInt(); + retryTimeout = in.readOptionalTimeValue(); + idleShardRetryDelay = in.readOptionalTimeValue(); + } + + public boolean match(String indexName) { + return match(leaderIndexPatterns, indexName); + } + + public static boolean match(List leaderIndexPatterns, String indexName) { + return Regex.simpleMatch(leaderIndexPatterns, indexName); + } + + public List getLeaderIndexPatterns() { + return leaderIndexPatterns; + } + + public String getFollowIndexPattern() { + return followIndexPattern; + } + + public Integer getMaxBatchOperationCount() { + return maxBatchOperationCount; + } + + public Integer getMaxConcurrentReadBatches() { + return maxConcurrentReadBatches; + } + + public Long getMaxOperationSizeInBytes() { + return maxOperationSizeInBytes; + } + + public Integer getMaxConcurrentWriteBatches() { + return maxConcurrentWriteBatches; + } + + public Integer getMaxWriteBufferSize() { + return maxWriteBufferSize; + } + + public TimeValue getRetryTimeout() { + return retryTimeout; + } + + public TimeValue getIdleShardRetryDelay() { + return idleShardRetryDelay; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringList(leaderIndexPatterns); + out.writeOptionalString(followIndexPattern); + out.writeOptionalVInt(maxBatchOperationCount); + out.writeOptionalVInt(maxConcurrentReadBatches); + out.writeOptionalLong(maxOperationSizeInBytes); + out.writeOptionalVInt(maxConcurrentWriteBatches); + out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalTimeValue(retryTimeout); + out.writeOptionalTimeValue(idleShardRetryDelay); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.array(LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns.toArray(new String[0])); + if (followIndexPattern != null) { + builder.field(FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexPattern); + } + if (maxBatchOperationCount != null) { + builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); + } + if (maxConcurrentReadBatches != null) { + builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); + } + if (maxOperationSizeInBytes != null) { + builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes); + } + if (maxConcurrentWriteBatches != null) { + builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); + } + if (maxWriteBufferSize != null){ + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + } + if (retryTimeout != null) { + builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout); + } + if (idleShardRetryDelay != null) { + builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay); + } + return builder; + } + + @Override + public boolean isFragment() { + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AutoFollowPattern that = (AutoFollowPattern) o; + return Objects.equals(leaderIndexPatterns, that.leaderIndexPatterns) && + Objects.equals(followIndexPattern, that.followIndexPattern) && + Objects.equals(maxBatchOperationCount, that.maxBatchOperationCount) && + Objects.equals(maxConcurrentReadBatches, that.maxConcurrentReadBatches) && + Objects.equals(maxOperationSizeInBytes, that.maxOperationSizeInBytes) && + Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && + Objects.equals(retryTimeout, that.retryTimeout) && + Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay); + } + + @Override + public int hashCode() { + return Objects.hash( + leaderIndexPatterns, + followIndexPattern, + maxBatchOperationCount, + maxConcurrentReadBatches, + maxOperationSizeInBytes, + maxConcurrentWriteBatches, + maxWriteBufferSize, + retryTimeout, + idleShardRetryDelay + ); + } + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadataTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadataTests.java new file mode 100644 index 00000000000..5227c04962a --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadataTests.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ccr; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +public class AutoFollowMetadataTests extends AbstractSerializingTestCase { + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + return s -> true; + } + + @Override + protected AutoFollowMetadata doParseInstance(XContentParser parser) throws IOException { + return AutoFollowMetadata.fromXContent(parser); + } + + @Override + protected AutoFollowMetadata createTestInstance() { + int numEntries = randomIntBetween(0, 32); + Map configs = new HashMap<>(numEntries); + Map> followedLeaderIndices = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + List leaderPatterns = Arrays.asList(generateRandomStringArray(4, 4, false)); + AutoFollowMetadata.AutoFollowPattern autoFollowPattern = + new AutoFollowMetadata.AutoFollowPattern(leaderPatterns, randomAlphaOfLength(4), randomIntBetween(0, Integer.MAX_VALUE), + randomIntBetween(0, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(0, Integer.MAX_VALUE), + randomIntBetween(0, Integer.MAX_VALUE), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500)); + configs.put(Integer.toString(i), autoFollowPattern); + followedLeaderIndices.put(Integer.toString(i), Arrays.asList(generateRandomStringArray(4, 4, false))); + } + return new AutoFollowMetadata(configs, followedLeaderIndices); + } + + @Override + protected Writeable.Reader instanceReader() { + return AutoFollowMetadata::new; + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.delete_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.delete_auto_follow_pattern.json new file mode 100644 index 00000000000..b14effd5f3f --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.delete_auto_follow_pattern.json @@ -0,0 +1,17 @@ +{ + "ccr.delete_auto_follow_pattern": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", + "methods": [ "DELETE" ], + "url": { + "path": "/_ccr/_auto_follow/{leader_cluster_alias}", + "paths": [ "/_ccr/_auto_follow/{leader_cluster_alias}" ], + "parts": { + "leader_cluster_alias": { + "type": "string", + "required": true, + "description": "The name of the leader cluster alias." + } + } + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.put_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.put_auto_follow_pattern.json new file mode 100644 index 00000000000..28e7299713d --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.put_auto_follow_pattern.json @@ -0,0 +1,21 @@ +{ + "ccr.put_auto_follow_pattern": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", + "methods": [ "PUT" ], + "url": { + "path": "/_ccr/_auto_follow/{leader_cluster_alias}", + "paths": [ "/_ccr/_auto_follow/{leader_cluster_alias}" ], + "parts": { + "leader_cluster_alias": { + "type": "string", + "required": true, + "description": "The name of the leader cluster alias." + } + } + }, + "body": { + "description" : "The specification of the auto follow pattern", + "required" : true + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml new file mode 100644 index 00000000000..f4cf79fb558 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml @@ -0,0 +1,13 @@ +--- +"Test put and delete auto follow pattern": + - do: + ccr.put_auto_follow_pattern: + leader_cluster_alias: _local_ + body: + leader_index_patterns: ['logs-*'] + - is_true: acknowledged + + - do: + ccr.delete_auto_follow_pattern: + leader_cluster_alias: _local_ + - is_true: acknowledged