diff --git a/server/src/main/java/org/elasticsearch/common/regex/Regex.java b/server/src/main/java/org/elasticsearch/common/regex/Regex.java index bcf2dfba3ef..1f4e4651412 100644 --- a/server/src/main/java/org/elasticsearch/common/regex/Regex.java +++ b/server/src/main/java/org/elasticsearch/common/regex/Regex.java @@ -138,6 +138,15 @@ public class Regex { return false; } + /** + * Similar to {@link #simpleMatch(String[], String)}, but accepts a list of strings instead of an array of strings for the patterns to + * match. + */ + public static boolean simpleMatch(final List patterns, final String str) { + // #simpleMatch(String[], String) is likely to be inlined into this method + return patterns != null && simpleMatch(patterns.toArray(Strings.EMPTY_ARRAY), str); + } + public static boolean simpleMatch(String[] patterns, String[] types) { if (patterns != null && types != null) { for (String type : types) { diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index c14e13e7bb0..17a6db286f2 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -78,6 +78,34 @@ public class FollowIndexIT extends ESRestTestCase { } } + public void testAutoFollowPatterns() throws Exception { + assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster); + + Request request = new Request("PUT", "/_ccr/_auto_follow/leader_cluster"); + request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}"); + assertOK(client().performRequest(request)); + + try (RestClient leaderClient = buildLeaderClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + request = new Request("PUT", "/logs-20190101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(leaderClient.performRequest(request)); + + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true"); + } + } + + assertBusy(() -> { + ensureYellow("logs-20190101"); + verifyDocuments("logs-20190101", 5); + }); + } + private static void index(RestClient client, String index, String id, Object... fields) throws IOException { XContentBuilder document = jsonBuilder().startObject(); for (int i = 0; i < fields.length; i += 2) { @@ -135,6 +163,15 @@ public class FollowIndexIT extends ESRestTestCase { return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); } + private static void ensureYellow(String index) throws IOException { + Request request = new Request("GET", "/_cluster/health/" + index); + request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_no_relocating_shards", "true"); + request.addParameter("timeout", "70s"); + request.addParameter("level", "shards"); + client().performRequest(request); + } + private RestClient buildLeaderClient() throws IOException { assert runningAgainstLeaderCluster == false; String leaderUrl = System.getProperty("tests.leader_host"); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b00883f5c2a..cd0561b1c0c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -39,21 +39,28 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; import org.elasticsearch.xpack.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction; +import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.FollowIndexAction; +import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; +import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction; +import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction; +import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction; import org.elasticsearch.xpack.core.XPackPlugin; @@ -113,7 +120,14 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E final Environment environment, final NodeEnvironment nodeEnvironment, final NamedWriteableRegistry namedWriteableRegistry) { - return Collections.singleton(ccrLicenseChecker); + if (enabled == false) { + return emptyList(); + } + + return Arrays.asList( + ccrLicenseChecker, + new AutoFollowCoordinator(settings, client, threadPool, clusterService) + ); } @Override @@ -128,12 +142,18 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E } return Arrays.asList( + // internal actions new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class), + new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), + // stats action new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), + // follow actions new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class), new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class), - new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), - new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class)); + new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class), + // auto-follow actions + new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class), + new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class)); } public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, @@ -141,10 +161,15 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { return Arrays.asList( + // stats API new RestCcrStatsAction(settings, restController), + // follow APIs new RestCreateAndFollowIndexAction(settings, restController), new RestFollowIndexAction(settings, restController), - new RestUnfollowIndexAction(settings, restController)); + new RestUnfollowIndexAction(settings, restController), + // auto-follow APIs + new RestDeleteAutoFollowPatternAction(settings, restController), + new RestPutAutoFollowPatternAction(settings, restController)); } public List getNamedWriteables() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 6960766bad0..a942990ea5a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.unit.TimeValue; import java.util.Arrays; import java.util.List; @@ -32,6 +33,12 @@ public final class CcrSettings { public static final Setting CCR_FOLLOWING_INDEX_SETTING = Setting.boolSetting("index.xpack.ccr.following_index", false, Setting.Property.IndexScope); + /** + * Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow + */ + public static final Setting CCR_AUTO_FOLLOW_POLL_INTERVAL = + Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope); + /** * The settings defined by CCR. * @@ -40,7 +47,8 @@ public final class CcrSettings { static List> getSettings() { return Arrays.asList( CCR_ENABLED_SETTING, - CCR_FOLLOWING_INDEX_SETTING); + CCR_FOLLOWING_INDEX_SETTING, + CCR_AUTO_FOLLOW_POLL_INTERVAL); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java new file mode 100644 index 00000000000..234fe32cdd0 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -0,0 +1,306 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.index.Index; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A component that runs only on the elected master node and follows leader indices automatically + * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}. + */ +public class AutoFollowCoordinator implements ClusterStateApplier { + + private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); + + private final Client client; + private final TimeValue pollInterval; + private final ThreadPool threadPool; + private final ClusterService clusterService; + + private volatile boolean localNodeMaster = false; + + public AutoFollowCoordinator(Settings settings, + Client client, + ThreadPool threadPool, + ClusterService clusterService) { + this.client = client; + this.threadPool = threadPool; + this.clusterService = clusterService; + + this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); + clusterService.addStateApplier(this); + } + + private void doAutoFollow() { + if (localNodeMaster == false) { + return; + } + ClusterState followerClusterState = clusterService.state(); + AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + if (autoFollowMetadata == null) { + threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + return; + } + + if (autoFollowMetadata.getPatterns().isEmpty()) { + threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + return; + } + + Consumer handler = e -> { + if (e != null) { + LOGGER.warn("Failure occurred during auto following indices", e); + } + threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + }; + AutoFollower operation = new AutoFollower(client, handler, followerClusterState) { + + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.metaData(true); + leaderClient.admin().cluster().state(request, + ActionListener.wrap( + r -> handler.accept(r.getState(), null), + e -> handler.accept(null, e) + ) + ); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + client.execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest), + ActionListener.wrap(r -> successHandler.run(), failureHandler)); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return updateFunction.apply(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + handler.accept(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + handler.accept(null); + } + }); + } + + }; + operation.autoFollowIndices(); + } + + @Override + public void applyClusterState(ClusterChangedEvent event) { + final boolean beforeLocalMasterNode = localNodeMaster; + localNodeMaster = event.localNodeMaster(); + if (beforeLocalMasterNode == false && localNodeMaster) { + threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + } + } + + abstract static class AutoFollower { + + private final Client client; + private final Consumer handler; + private final ClusterState followerClusterState; + private final AutoFollowMetadata autoFollowMetadata; + + private final CountDown autoFollowPatternsCountDown; + private final AtomicReference autoFollowPatternsErrorHolder = new AtomicReference<>(); + + AutoFollower(Client client, Consumer handler, ClusterState followerClusterState) { + this.client = client; + this.handler = handler; + this.followerClusterState = followerClusterState; + this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size()); + } + + void autoFollowIndices() { + for (Map.Entry entry : autoFollowMetadata.getPatterns().entrySet()) { + String clusterAlias = entry.getKey(); + AutoFollowPattern autoFollowPattern = entry.getValue(); + Client leaderClient = clusterAlias.equals("_local_") ? client : client.getRemoteClusterClient(clusterAlias); + List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); + + getLeaderClusterState(leaderClient, (leaderClusterState, e) -> { + if (leaderClusterState != null) { + assert e == null; + handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState); + } else { + finalise(e); + } + }); + } + } + + private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollowPattern, + List followedIndexUUIDs, ClusterState leaderClusterState) { + final List leaderIndicesToFollow = + getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs); + if (leaderIndicesToFollow.isEmpty()) { + finalise(null); + } else { + final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); + final AtomicReference leaderIndicesErrorHolder = new AtomicReference<>(); + for (Index indexToFollow : leaderIndicesToFollow) { + final String leaderIndexName = indexToFollow.getName(); + final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName); + + String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName : + clusterAlias + ":" + leaderIndexName; + FollowIndexAction.Request followRequest = + new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName, + autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(), + autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(), + autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getRetryTimeout(), + autoFollowPattern.getIdleShardRetryDelay()); + + // Execute if the create and follow api call succeeds: + Runnable successHandler = () -> { + LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName); + + // This function updates the auto follow metadata in the cluster to record that the leader index has been followed: + // (so that we do not try to follow it in subsequent auto follow runs) + Function function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow); + // The coordinator always runs on the elected master node, so we can update cluster state here: + updateAutoFollowMetadata(function, updateError -> { + if (updateError != null) { + LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError); + if (leaderIndicesErrorHolder.compareAndSet(null, updateError) == false) { + leaderIndicesErrorHolder.get().addSuppressed(updateError); + } + } else { + LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName); + } + if (leaderIndicesCountDown.countDown()) { + finalise(leaderIndicesErrorHolder.get()); + } + }); + }; + // Execute if the create and follow apu call fails: + Consumer failureHandler = followError -> { + assert followError != null; + LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError); + if (leaderIndicesCountDown.countDown()) { + finalise(followError); + } + }; + createAndFollow(followRequest, successHandler, failureHandler); + } + } + } + + private void finalise(Exception failure) { + if (autoFollowPatternsErrorHolder.compareAndSet(null, failure) == false) { + autoFollowPatternsErrorHolder.get().addSuppressed(failure); + } + + if (autoFollowPatternsCountDown.countDown()) { + handler.accept(autoFollowPatternsErrorHolder.get()); + } + } + + static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, + ClusterState leaderClusterState, + ClusterState followerClusterState, + List followedIndexUUIDs) { + List leaderIndicesToFollow = new ArrayList<>(); + for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) { + if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { + if (followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) { + // TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData + // has a leader index uuid custom metadata entry that matches with uuid of leaderIndexMetaData variable + // If so then handle it differently: not follow it, but just add an entry to + // AutoFollowMetadata#followedLeaderIndexUUIDs + leaderIndicesToFollow.add(leaderIndexMetaData.getIndex()); + } + } + } + return leaderIndicesToFollow; + } + + static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String leaderIndexName) { + if (autoFollowPattern.getFollowIndexPattern() != null) { + return autoFollowPattern.getFollowIndexPattern().replace("{{leader_index}}", leaderIndexName); + } else { + return leaderIndexName; + } + } + + static Function recordLeaderIndexAsFollowFunction(String clusterAlias, Index indexToFollow) { + return currentState -> { + AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); + + Map> newFollowedIndexUUIDS = + new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + newFollowedIndexUUIDS.get(clusterAlias).add(indexToFollow.getUUID()); + + ClusterState.Builder newState = ClusterState.builder(currentState); + AutoFollowMetadata newAutoFollowMetadata = + new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), newFollowedIndexUUIDS); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata) + .build()); + return newState.build(); + }; + } + + // abstract methods to make unit testing possible: + + abstract void getLeaderClusterState(Client leaderClient, + BiConsumer handler); + + abstract void createAndFollow(FollowIndexAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler); + + abstract void updateAutoFollowMetadata(Function updateFunction, + Consumer handler); + + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternAction.java new file mode 100644 index 00000000000..82e142202d2 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternAction.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class DeleteAutoFollowPatternAction extends Action { + + public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/delete"; + public static final DeleteAutoFollowPatternAction INSTANCE = new DeleteAutoFollowPatternAction(); + + private DeleteAutoFollowPatternAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends AcknowledgedRequest { + + private String leaderClusterAlias; + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (leaderClusterAlias == null) { + validationException = addValidationError("leaderClusterAlias is missing", validationException); + } + return validationException; + } + + public String getLeaderClusterAlias() { + return leaderClusterAlias; + } + + public void setLeaderClusterAlias(String leaderClusterAlias) { + this.leaderClusterAlias = leaderClusterAlias; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + leaderClusterAlias = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(leaderClusterAlias); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(leaderClusterAlias, request.leaderClusterAlias); + } + + @Override + public int hashCode() { + return Objects.hash(leaderClusterAlias); + } + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java new file mode 100644 index 00000000000..a01fd8e3bc2 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java @@ -0,0 +1,284 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class PutAutoFollowPatternAction extends Action { + + public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/put"; + public static final PutAutoFollowPatternAction INSTANCE = new PutAutoFollowPatternAction(); + + private PutAutoFollowPatternAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends AcknowledgedRequest implements ToXContentObject { + + static final ParseField LEADER_CLUSTER_ALIAS_FIELD = new ParseField("leader_cluster_alias"); + static final ParseField LEADER_INDEX_PATTERNS_FIELD = new ParseField("leader_index_patterns"); + static final ParseField FOLLOW_INDEX_NAME_PATTERN_FIELD = new ParseField("follow_index_name_pattern"); + + private static final ObjectParser PARSER = new ObjectParser<>("put_auto_follow_pattern_request", Request::new); + + static { + PARSER.declareString(Request::setLeaderClusterAlias, LEADER_CLUSTER_ALIAS_FIELD); + PARSER.declareStringArray(Request::setLeaderIndexPatterns, LEADER_INDEX_PATTERNS_FIELD); + PARSER.declareString(Request::setFollowIndexNamePattern, FOLLOW_INDEX_NAME_PATTERN_FIELD); + PARSER.declareInt(Request::setMaxBatchOperationCount, AutoFollowPattern.MAX_BATCH_OPERATION_COUNT); + PARSER.declareInt(Request::setMaxConcurrentReadBatches, AutoFollowPattern.MAX_CONCURRENT_READ_BATCHES); + PARSER.declareLong(Request::setMaxOperationSizeInBytes, AutoFollowPattern.MAX_BATCH_SIZE_IN_BYTES); + PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES); + PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE); + PARSER.declareField(Request::setRetryTimeout, + (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.RETRY_TIMEOUT.getPreferredName()), + ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING); + PARSER.declareField(Request::setIdleShardRetryDelay, + (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName()), + ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + } + + public static Request fromXContent(XContentParser parser, String remoteClusterAlias) throws IOException { + Request request = PARSER.parse(parser, null); + if (remoteClusterAlias != null) { + if (request.leaderClusterAlias == null) { + request.leaderClusterAlias = remoteClusterAlias; + } else { + if (request.leaderClusterAlias.equals(remoteClusterAlias) == false) { + throw new IllegalArgumentException("provided leaderClusterAlias is not equal"); + } + } + } + return request; + } + + private String leaderClusterAlias; + private List leaderIndexPatterns; + private String followIndexNamePattern; + + private Integer maxBatchOperationCount; + private Integer maxConcurrentReadBatches; + private Long maxOperationSizeInBytes; + private Integer maxConcurrentWriteBatches; + private Integer maxWriteBufferSize; + private TimeValue retryTimeout; + private TimeValue idleShardRetryDelay; + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (leaderClusterAlias == null) { + validationException = addValidationError("leaderClusterAlias is missing", validationException); + } + if (leaderIndexPatterns == null || leaderIndexPatterns.isEmpty()) { + validationException = addValidationError("leaderIndexPatterns is missing", validationException); + } + return validationException; + } + + public String getLeaderClusterAlias() { + return leaderClusterAlias; + } + + public void setLeaderClusterAlias(String leaderClusterAlias) { + this.leaderClusterAlias = leaderClusterAlias; + } + + public List getLeaderIndexPatterns() { + return leaderIndexPatterns; + } + + public void setLeaderIndexPatterns(List leaderIndexPatterns) { + this.leaderIndexPatterns = leaderIndexPatterns; + } + + public String getFollowIndexNamePattern() { + return followIndexNamePattern; + } + + public void setFollowIndexNamePattern(String followIndexNamePattern) { + this.followIndexNamePattern = followIndexNamePattern; + } + + public Integer getMaxBatchOperationCount() { + return maxBatchOperationCount; + } + + public void setMaxBatchOperationCount(Integer maxBatchOperationCount) { + this.maxBatchOperationCount = maxBatchOperationCount; + } + + public Integer getMaxConcurrentReadBatches() { + return maxConcurrentReadBatches; + } + + public void setMaxConcurrentReadBatches(Integer maxConcurrentReadBatches) { + this.maxConcurrentReadBatches = maxConcurrentReadBatches; + } + + public Long getMaxOperationSizeInBytes() { + return maxOperationSizeInBytes; + } + + public void setMaxOperationSizeInBytes(Long maxOperationSizeInBytes) { + this.maxOperationSizeInBytes = maxOperationSizeInBytes; + } + + public Integer getMaxConcurrentWriteBatches() { + return maxConcurrentWriteBatches; + } + + public void setMaxConcurrentWriteBatches(Integer maxConcurrentWriteBatches) { + this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + } + + public Integer getMaxWriteBufferSize() { + return maxWriteBufferSize; + } + + public void setMaxWriteBufferSize(Integer maxWriteBufferSize) { + this.maxWriteBufferSize = maxWriteBufferSize; + } + + public TimeValue getRetryTimeout() { + return retryTimeout; + } + + public void setRetryTimeout(TimeValue retryTimeout) { + this.retryTimeout = retryTimeout; + } + + public TimeValue getIdleShardRetryDelay() { + return idleShardRetryDelay; + } + + public void setIdleShardRetryDelay(TimeValue idleShardRetryDelay) { + this.idleShardRetryDelay = idleShardRetryDelay; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + leaderClusterAlias = in.readString(); + leaderIndexPatterns = in.readList(StreamInput::readString); + followIndexNamePattern = in.readOptionalString(); + maxBatchOperationCount = in.readOptionalVInt(); + maxConcurrentReadBatches = in.readOptionalVInt(); + maxOperationSizeInBytes = in.readOptionalLong(); + maxConcurrentWriteBatches = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalVInt(); + retryTimeout = in.readOptionalTimeValue(); + idleShardRetryDelay = in.readOptionalTimeValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(leaderClusterAlias); + out.writeStringList(leaderIndexPatterns); + out.writeOptionalString(followIndexNamePattern); + out.writeOptionalVInt(maxBatchOperationCount); + out.writeOptionalVInt(maxConcurrentReadBatches); + out.writeOptionalLong(maxOperationSizeInBytes); + out.writeOptionalVInt(maxConcurrentWriteBatches); + out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalTimeValue(retryTimeout); + out.writeOptionalTimeValue(idleShardRetryDelay); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias); + builder.field(LEADER_INDEX_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns); + if (followIndexNamePattern != null) { + builder.field(FOLLOW_INDEX_NAME_PATTERN_FIELD.getPreferredName(), followIndexNamePattern); + } + if (maxBatchOperationCount != null) { + builder.field(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); + } + if (maxOperationSizeInBytes != null) { + builder.field(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes); + } + if (maxWriteBufferSize != null) { + builder.field(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + } + if (maxConcurrentReadBatches != null) { + builder.field(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); + } + if (maxConcurrentWriteBatches != null) { + builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); + } + if (retryTimeout != null) { + builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep()); + } + if (idleShardRetryDelay != null) { + builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); + } + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(leaderClusterAlias, request.leaderClusterAlias) && + Objects.equals(leaderIndexPatterns, request.leaderIndexPatterns) && + Objects.equals(followIndexNamePattern, request.followIndexNamePattern) && + Objects.equals(maxBatchOperationCount, request.maxBatchOperationCount) && + Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) && + Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) && + Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && + Objects.equals(retryTimeout, request.retryTimeout) && + Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay); + } + + @Override + public int hashCode() { + return Objects.hash( + leaderClusterAlias, + leaderIndexPatterns, + followIndexNamePattern, + maxBatchOperationCount, + maxConcurrentReadBatches, + maxOperationSizeInBytes, + maxConcurrentWriteBatches, + maxWriteBufferSize, + retryTimeout, + idleShardRetryDelay + ); + } + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java new file mode 100644 index 00000000000..6c1ca81e7c4 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TransportDeleteAutoFollowPatternAction extends + TransportMasterNodeAction { + + @Inject + public TransportDeleteAutoFollowPatternAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, DeleteAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, DeleteAutoFollowPatternAction.Request::new); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected void masterOperation(DeleteAutoFollowPatternAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return innerDelete(request, currentState); + } + }); + } + + static ClusterState innerDelete(DeleteAutoFollowPatternAction.Request request, ClusterState currentState) { + AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); + if (currentAutoFollowMetadata == null) { + throw new ResourceNotFoundException("no auto-follow patterns for cluster alias [{}] found", + request.getLeaderClusterAlias()); + } + Map patterns = currentAutoFollowMetadata.getPatterns(); + AutoFollowPattern autoFollowPatternToRemove = patterns.get(request.getLeaderClusterAlias()); + if (autoFollowPatternToRemove == null) { + throw new ResourceNotFoundException("no auto-follow patterns for cluster alias [{}] found", + request.getLeaderClusterAlias()); + } + + final Map patternsCopy = new HashMap<>(patterns); + final Map> followedLeaderIndexUUIDSCopy = + new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + patternsCopy.remove(request.getLeaderClusterAlias()); + followedLeaderIndexUUIDSCopy.remove(request.getLeaderClusterAlias()); + + AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(patternsCopy, followedLeaderIndexUUIDSCopy); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata) + .build()); + return newState.build(); + } + + @Override + protected ClusterBlockException checkBlock(DeleteAutoFollowPatternAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java new file mode 100644 index 00000000000..3d3e342c0cd --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TransportPutAutoFollowPatternAction extends + TransportMasterNodeAction { + + private final Client client; + + @Inject + public TransportPutAutoFollowPatternAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, Client client, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, PutAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, PutAutoFollowPatternAction.Request::new); + this.client = client; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected void masterOperation(PutAutoFollowPatternAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + final Client leaderClient; + if (request.getLeaderClusterAlias().equals("_local_")) { + leaderClient = client; + } else { + leaderClient = client.getRemoteClusterClient(request.getLeaderClusterAlias()); + } + + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + + leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + final ClusterState leaderClusterState = clusterStateResponse.getState(); + clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return innerPut(request, currentState, leaderClusterState); + } + }); + }, listener::onFailure)); + } + + static ClusterState innerPut(PutAutoFollowPatternAction.Request request, + ClusterState localState, + ClusterState leaderClusterState) { + // auto patterns are always overwritten + // only already followed index uuids are updated + + AutoFollowMetadata currentAutoFollowMetadata = localState.metaData().custom(AutoFollowMetadata.TYPE); + Map> followedLeaderIndices; + Map patterns; + if (currentAutoFollowMetadata != null) { + patterns = new HashMap<>(currentAutoFollowMetadata.getPatterns()); + followedLeaderIndices = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + } else { + patterns = new HashMap<>(); + followedLeaderIndices = new HashMap<>(); + } + + AutoFollowPattern previousPattern = patterns.get(request.getLeaderClusterAlias()); + List followedIndexUUIDs = followedLeaderIndices.get(request.getLeaderClusterAlias()); + if (followedIndexUUIDs == null) { + followedIndexUUIDs = new ArrayList<>(); + followedLeaderIndices.put(request.getLeaderClusterAlias(), followedIndexUUIDs); + } + + // Mark existing leader indices as already auto followed: + if (previousPattern != null) { + markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), leaderClusterState.metaData(), + previousPattern, followedIndexUUIDs); + } else { + markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), leaderClusterState.metaData(), + followedIndexUUIDs); + } + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern( + request.getLeaderIndexPatterns(), + request.getFollowIndexNamePattern(), + request.getMaxBatchOperationCount(), + request.getMaxConcurrentReadBatches(), + request.getMaxOperationSizeInBytes(), + request.getMaxConcurrentWriteBatches(), + request.getMaxWriteBufferSize(), + request.getRetryTimeout(), + request.getIdleShardRetryDelay() + ); + patterns.put(request.getLeaderClusterAlias(), autoFollowPattern); + ClusterState.Builder newState = ClusterState.builder(localState); + newState.metaData(MetaData.builder(localState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, followedLeaderIndices)) + .build()); + return newState.build(); + } + + private static void markExistingIndicesAsAutoFollowedForNewPatterns( + List leaderIndexPatterns, + MetaData leaderMetaData, + AutoFollowPattern previousPattern, + List followedIndexUUIDS) { + + final List newPatterns = leaderIndexPatterns + .stream() + .filter(p -> previousPattern.getLeaderIndexPatterns().contains(p) == false) + .collect(Collectors.toList()); + markExistingIndicesAsAutoFollowed(newPatterns, leaderMetaData, followedIndexUUIDS); + } + + private static void markExistingIndicesAsAutoFollowed( + List patterns, + MetaData leaderMetaData, + List followedIndexUUIDS) { + + for (final IndexMetaData indexMetaData : leaderMetaData) { + if (AutoFollowPattern.match(patterns, indexMetaData.getIndex().getName())) { + followedIndexUUIDS.add(indexMetaData.getIndexUUID()); + } + } + } + + @Override + protected ClusterBlockException checkBlock(PutAutoFollowPatternAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestDeleteAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestDeleteAutoFollowPatternAction.java new file mode 100644 index 00000000000..bd3585c7982 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestDeleteAutoFollowPatternAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.Request; + +import java.io.IOException; + +import static org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.INSTANCE; + +public class RestDeleteAutoFollowPatternAction extends BaseRestHandler { + + public RestDeleteAutoFollowPatternAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.DELETE, "/_ccr/_auto_follow/{leader_cluster_alias}", this); + } + + @Override + public String getName() { + return "ccr_delete_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = new Request(); + request.setLeaderClusterAlias(restRequest.param("leader_cluster_alias")); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutAutoFollowPatternAction.java new file mode 100644 index 00000000000..d92ebb7b0bb --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutAutoFollowPatternAction.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction.Request; + +import java.io.IOException; + +import static org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction.INSTANCE; + +public class RestPutAutoFollowPatternAction extends BaseRestHandler { + + public RestPutAutoFollowPatternAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.PUT, "/_ccr/_auto_follow/{leader_cluster_alias}", this); + } + + @Override + public String getName() { + return "ccr_put_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = createRequest(restRequest); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } + + static Request createRequest(RestRequest restRequest) throws IOException { + try (XContentParser parser = restRequest.contentOrSourceParamParser()) { + return Request.fromXContent(parser, restRequest.param("leader_cluster_alias")); + } + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java new file mode 100644 index 00000000000..dd1376a4d7a --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -0,0 +1,296 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AutoFollowCoordinatorTests extends ESTestCase { + + public void testAutoFollower() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0))) + .build(); + + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + + ClusterState currentState = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + boolean[] invoked = new boolean[]{false}; + Consumer handler = e -> { + invoked[0] = true; + assertThat(e, nullValue()); + }; + AutoFollower autoFollower = new AutoFollower(client, handler, currentState) { + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + handler.accept(leaderState, null); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); + assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + ClusterState resultCs = updateFunction.apply(currentState); + AutoFollowMetadata result = resultCs.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1)); + handler.accept(null); + } + }; + autoFollower.autoFollowIndices(); + assertThat(invoked[0], is(true)); + } + + public void testAutoFollowerClusterStateApiFailure() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + Exception failure = new RuntimeException("failure"); + boolean[] invoked = new boolean[]{false}; + Consumer handler = e -> { + invoked[0] = true; + assertThat(e, sameInstance(failure)); + }; + AutoFollower autoFollower = new AutoFollower(client, handler, followerState) { + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + handler.accept(null, failure); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + fail("should not get here"); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + fail("should not get here"); + } + }; + autoFollower.autoFollowIndices(); + assertThat(invoked[0], is(true)); + } + + public void testAutoFollowerUpdateClusterStateFailure() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0))) + .build(); + + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + Exception failure = new RuntimeException("failure"); + boolean[] invoked = new boolean[]{false}; + Consumer handler = e -> { + invoked[0] = true; + assertThat(e, sameInstance(failure)); + }; + AutoFollower autoFollower = new AutoFollower(client, handler, followerState) { + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + handler.accept(leaderState, null); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); + assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + handler.accept(failure); + } + }; + autoFollower.autoFollowIndices(); + assertThat(invoked[0], is(true)); + } + + public void testAutoFollowerCreateAndFollowApiCallFailure() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0))) + .build(); + + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + Exception failure = new RuntimeException("failure"); + boolean[] invoked = new boolean[]{false}; + Consumer handler = e -> { + invoked[0] = true; + assertThat(e, sameInstance(failure)); + }; + AutoFollower autoFollower = new AutoFollower(client, handler, followerState) { + @Override + void getLeaderClusterState(Client leaderClient, BiConsumer handler) { + handler.accept(leaderState, null); + } + + @Override + void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); + assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); + failureHandler.accept(failure); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + fail("should not get here"); + } + }; + autoFollower.autoFollowIndices(); + assertThat(invoked[0], is(true)); + } + + public void testGetLeaderIndicesToFollow() { + AutoFollowPattern autoFollowPattern = + new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null); + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap()))) + .build(); + + MetaData.Builder imdBuilder = MetaData.builder(); + for (int i = 0; i < 5; i++) { + Settings.Builder builder = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, "metrics-" + i); + imdBuilder.put(IndexMetaData.builder("metrics-" + i) + .settings(builder) + .numberOfShards(1) + .numberOfReplicas(0)); + } + imdBuilder.put(IndexMetaData.builder("logs-0") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0)); + + ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) + .metaData(imdBuilder) + .build(); + + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); + result.sort(Comparator.comparing(Index::getName)); + assertThat(result.size(), equalTo(5)); + assertThat(result.get(0).getName(), equalTo("metrics-0")); + assertThat(result.get(1).getName(), equalTo("metrics-1")); + assertThat(result.get(2).getName(), equalTo("metrics-2")); + assertThat(result.get(3).getName(), equalTo("metrics-3")); + assertThat(result.get(4).getName(), equalTo("metrics-4")); + + List followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID()); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs); + result.sort(Comparator.comparing(Index::getName)); + assertThat(result.size(), equalTo(4)); + assertThat(result.get(0).getName(), equalTo("metrics-0")); + assertThat(result.get(1).getName(), equalTo("metrics-1")); + assertThat(result.get(2).getName(), equalTo("metrics-3")); + assertThat(result.get(3).getName(), equalTo("metrics-4")); + } + + public void testGetFollowerIndexName() { + AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, + null, null, null, null, null, null); + assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0")); + + autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-metrics-0", null, null, + null, null, null, null, null); + assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); + + autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null, + null, null, null, null, null, null); + assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java new file mode 100644 index 00000000000..a4808e428fe --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java @@ -0,0 +1,189 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.ccr.LocalStateCcr; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class AutoFollowTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return Collections.singleton(LocalStateCcr.class); + } + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + public void testAutoFollow() throws Exception { + Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + createIndex("logs-201812", leaderIndexSettings, "_doc"); + + // Enabling auto following: + putAutoFollowPatterns("logs-*", "transactions-*"); + + createIndex("metrics-201901", leaderIndexSettings, "_doc"); + + createIndex("logs-201901", leaderIndexSettings, "_doc"); + assertBusy(() -> { + IndicesExistsRequest request = new IndicesExistsRequest("copy-logs-201901"); + assertTrue(client().admin().indices().exists(request).actionGet().isExists()); + }); + createIndex("transactions-201901", leaderIndexSettings, "_doc"); + assertBusy(() -> { + IndicesExistsRequest request = new IndicesExistsRequest("copy-transactions-201901"); + assertTrue(client().admin().indices().exists(request).actionGet().isExists()); + }); + + IndicesExistsRequest request = new IndicesExistsRequest("copy-metrics-201901"); + assertFalse(client().admin().indices().exists(request).actionGet().isExists()); + request = new IndicesExistsRequest("copy-logs-201812"); + assertFalse(client().admin().indices().exists(request).actionGet().isExists()); + } + + public void testAutoFollowManyIndices() throws Exception { + Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + putAutoFollowPatterns("logs-*"); + int numIndices = randomIntBetween(4, 32); + for (int i = 0; i < numIndices; i++) { + createIndex("logs-" + i, leaderIndexSettings, "_doc"); + } + int expectedVal1 = numIndices; + assertBusy(() -> { + MetaData metaData = client().admin().cluster().prepareState().get().getState().metaData(); + int count = (int) Arrays.stream(metaData.getConcreteAllIndices()).filter(s -> s.startsWith("copy-")).count(); + assertThat(count, equalTo(expectedVal1)); + }); + + deleteAutoFollowPatternSetting(); + createIndex("logs-does-not-count", leaderIndexSettings, "_doc"); + + putAutoFollowPatterns("logs-*"); + int i = numIndices; + numIndices = numIndices + randomIntBetween(4, 32); + for (; i < numIndices; i++) { + createIndex("logs-" + i, leaderIndexSettings, "_doc"); + } + int expectedVal2 = numIndices; + assertBusy(() -> { + MetaData metaData = client().admin().cluster().prepareState().get().getState().metaData(); + int count = (int) Arrays.stream(metaData.getConcreteAllIndices()).filter(s -> s.startsWith("copy-")).count(); + assertThat(count, equalTo(expectedVal2)); + }); + } + + public void testAutoFollowParameterAreDelegated() throws Exception { + Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + // Enabling auto following: + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("_local_"); + request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); + // Need to set this, because following an index in the same cluster + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + if (randomBoolean()) { + request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxOperationSizeInBytes(randomNonNegativeLong()); + } + if (randomBoolean()) { + request.setRetryTimeout(TimeValue.timeValueMillis(500)); + } + if (randomBoolean()) { + request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500)); + } + assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); + + createIndex("logs-201901", leaderIndexSettings, "_doc"); + assertBusy(() -> { + PersistentTasksCustomMetaData persistentTasksMetaData = + client().admin().cluster().prepareState().get().getState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(persistentTasksMetaData, notNullValue()); + assertThat(persistentTasksMetaData.tasks().size(), equalTo(1)); + ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams(); + assertThat(shardFollowTask.getLeaderShardId().getIndexName(), equalTo("logs-201901")); + assertThat(shardFollowTask.getFollowShardId().getIndexName(), equalTo("copy-logs-201901")); + if (request.getMaxWriteBufferSize() != null) { + assertThat(shardFollowTask.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize())); + } + if (request.getMaxConcurrentReadBatches() != null) { + assertThat(shardFollowTask.getMaxConcurrentReadBatches(), equalTo(request.getMaxConcurrentReadBatches())); + } + if (request.getMaxConcurrentWriteBatches() != null) { + assertThat(shardFollowTask.getMaxConcurrentWriteBatches(), equalTo(request.getMaxConcurrentWriteBatches())); + } + if (request.getMaxBatchOperationCount() != null) { + assertThat(shardFollowTask.getMaxBatchOperationCount(), equalTo(request.getMaxBatchOperationCount())); + } + if (request.getMaxOperationSizeInBytes() != null) { + assertThat(shardFollowTask.getMaxBatchSizeInBytes(), equalTo(request.getMaxOperationSizeInBytes())); + } + if (request.getRetryTimeout() != null) { + assertThat(shardFollowTask.getRetryTimeout(), equalTo(request.getRetryTimeout())); + } + if (request.getIdleShardRetryDelay() != null) { + assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay())); + } + }); + } + + private void putAutoFollowPatterns(String... patterns) { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("_local_"); + request.setLeaderIndexPatterns(Arrays.asList(patterns)); + // Need to set this, because following an index in the same cluster + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); + } + + private void deleteAutoFollowPatternSetting() { + DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("_local_"); + assertTrue(client().execute(DeleteAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternRequestTests.java new file mode 100644 index 00000000000..0ca1b3d1278 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/DeleteAutoFollowPatternRequestTests.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.test.AbstractStreamableTestCase; + +public class DeleteAutoFollowPatternRequestTests extends AbstractStreamableTestCase { + + @Override + protected DeleteAutoFollowPatternAction.Request createBlankInstance() { + return new DeleteAutoFollowPatternAction.Request(); + } + + @Override + protected DeleteAutoFollowPatternAction.Request createTestInstance() { + DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias(randomAlphaOfLength(4)); + return request; + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java new file mode 100644 index 00000000000..27760578db9 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; + +import java.io.IOException; +import java.util.Arrays; + +public class PutAutoFollowPatternRequestTests extends AbstractStreamableXContentTestCase { + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected PutAutoFollowPatternAction.Request doParseInstance(XContentParser parser) throws IOException { + return PutAutoFollowPatternAction.Request.fromXContent(parser, null); + } + + @Override + protected PutAutoFollowPatternAction.Request createBlankInstance() { + return new PutAutoFollowPatternAction.Request(); + } + + @Override + protected PutAutoFollowPatternAction.Request createTestInstance() { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias(randomAlphaOfLength(4)); + request.setLeaderIndexPatterns(Arrays.asList(generateRandomStringArray(4, 4, false))); + if (randomBoolean()) { + request.setFollowIndexNamePattern(randomAlphaOfLength(4)); + } + if (randomBoolean()) { + request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500)); + } + if (randomBoolean()) { + request.setRetryTimeout(TimeValue.timeValueMillis(500)); + } + if (randomBoolean()) { + request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxOperationSizeInBytes(randomNonNegativeLong()); + } + if (randomBoolean()) { + request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE)); + } + return request; + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java new file mode 100644 index 00000000000..03065ea8d38 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.Request; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { + + public void testInnerDelete() { + Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); + Map existingAutoFollowPatterns = new HashMap<>(); + { + List existingPatterns = new ArrayList<>(); + existingPatterns.add("transactions-*"); + existingAutoFollowPatterns.put("eu_cluster", + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + + List existingUUIDS = new ArrayList<>(); + existingUUIDS.add("_val"); + existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS); + } + { + List existingPatterns = new ArrayList<>(); + existingPatterns.add("logs-*"); + existingAutoFollowPatterns.put("asia_cluster", + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + + List existingUUIDS = new ArrayList<>(); + existingUUIDS.add("_val"); + existingAlreadyFollowedIndexUUIDS.put("asia_cluster", existingUUIDS); + } + ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS))) + .build(); + + Request request = new Request(); + request.setLeaderClusterAlias("eu_cluster"); + AutoFollowMetadata result = TransportDeleteAutoFollowPatternAction.innerDelete(request, clusterState) + .getMetaData() + .custom(AutoFollowMetadata.TYPE); + assertThat(result.getPatterns().size(), equalTo(1)); + assertThat(result.getPatterns().get("asia_cluster"), notNullValue()); + assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(result.getFollowedLeaderIndexUUIDs().get("asia_cluster"), notNullValue()); + } + + public void testInnerDeleteDoesNotExist() { + Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); + Map existingAutoFollowPatterns = new HashMap<>(); + { + List existingPatterns = new ArrayList<>(); + existingPatterns.add("transactions-*"); + existingAutoFollowPatterns.put("eu_cluster", + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + } + ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS))) + .build(); + + Request request = new Request(); + request.setLeaderClusterAlias("asia_cluster"); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportDeleteAutoFollowPatternAction.innerDelete(request, clusterState)); + assertThat(e.getMessage(), equalTo("no auto-follow patterns for cluster alias [asia_cluster] found")); + } + + public void testInnerDeleteNoAutoFollowMetadata() { + ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder()) + .build(); + + Request request = new Request(); + request.setLeaderClusterAlias("asia_cluster"); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportDeleteAutoFollowPatternAction.innerDelete(request, clusterState)); + assertThat(e.getMessage(), equalTo("no auto-follow patterns for cluster alias [asia_cluster] found")); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java new file mode 100644 index 00000000000..d894eda0b11 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class TransportPutAutoFollowPatternActionTests extends ESTestCase { + + public void testInnerPut() { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("eu_cluster"); + request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); + + ClusterState localState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder()) + .build(); + + ClusterState remoteState = ClusterState.builder(new ClusterName("eu_cluster")) + .metaData(MetaData.builder()) + .build(); + + ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState); + AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata, notNullValue()); + assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(0), equalTo("logs-*")); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(0)); + } + + public void testInnerPut_existingLeaderIndices() { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("eu_cluster"); + request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); + + ClusterState localState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder()) + .build(); + + int numLeaderIndices = randomIntBetween(1, 8); + int numMatchingLeaderIndices = randomIntBetween(1, 8); + MetaData.Builder mdBuilder = MetaData.builder(); + for (int i = 0; i < numLeaderIndices; i++) { + mdBuilder.put(IndexMetaData.builder("transactions-" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0)); + } + for (int i = 0; i < numMatchingLeaderIndices; i++) { + mdBuilder.put(IndexMetaData.builder("logs-" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0)); + } + + ClusterState remoteState = ClusterState.builder(new ClusterName("eu_cluster")) + .metaData(mdBuilder) + .build(); + + ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState); + AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata, notNullValue()); + assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(0), equalTo("logs-*")); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(numMatchingLeaderIndices)); + } + + public void testInnerPut_existingLeaderIndicesAndAutoFollowMetadata() { + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setLeaderClusterAlias("eu_cluster"); + request.setLeaderIndexPatterns(Arrays.asList("logs-*", "transactions-*")); + + Map existingAutoFollowPatterns = new HashMap<>(); + List existingPatterns = new ArrayList<>(); + existingPatterns.add("transactions-*"); + existingAutoFollowPatterns.put("eu_cluster", + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); + List existingUUIDS = new ArrayList<>(); + existingUUIDS.add("_val"); + existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS); + ClusterState localState = ClusterState.builder(new ClusterName("us_cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS))) + .build(); + + int numLeaderIndices = randomIntBetween(1, 8); + MetaData.Builder mdBuilder = MetaData.builder(); + for (int i = 0; i < numLeaderIndices; i++) { + mdBuilder.put(IndexMetaData.builder("logs-" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0)); + } + + ClusterState remoteState = ClusterState.builder(new ClusterName("eu_cluster")) + .metaData(mdBuilder) + .build(); + + ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState); + AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata, notNullValue()); + assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().size(), equalTo(2)); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(0), equalTo("logs-*")); + assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(1), equalTo("transactions-*")); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(numLeaderIndices + 1)); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 2d3707e98cf..03820b1f40b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackUsageAction; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction; import org.elasticsearch.xpack.core.graph.GraphFeatureSetUsage; import org.elasticsearch.xpack.core.graph.action.GraphExploreAction; @@ -366,7 +367,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ROLLUP, RollupFeatureSetUsage::new), new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new), new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new), - new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new) + new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new), + // ccr + new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new) ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java new file mode 100644 index 00000000000..244a5d441d9 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -0,0 +1,357 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ccr; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.security.xcontent.XContentUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Custom metadata that contains auto follow patterns and what leader indices an auto follow pattern has already followed. + */ +public class AutoFollowMetadata extends AbstractNamedDiffable implements XPackPlugin.XPackMetaDataCustom { + + public static final String TYPE = "ccr_auto_follow"; + + private static final ParseField PATTERNS_FIELD = new ParseField("patterns"); + private static final ParseField FOLLOWED_LEADER_INDICES_FIELD = new ParseField("followed_leader_indices"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow", + args -> new AutoFollowMetadata((Map) args[0], (Map>) args[1])); + + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map patterns = new HashMap<>(); + String fieldName = null; + for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = p.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + patterns.put(fieldName, AutoFollowPattern.PARSER.parse(p, c)); + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + } + return patterns; + }, PATTERNS_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map> alreadyFollowedIndexUUIDS = new HashMap<>(); + String fieldName = null; + for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = p.currentName(); + } else if (token == XContentParser.Token.START_ARRAY) { + alreadyFollowedIndexUUIDS.put(fieldName, Arrays.asList(XContentUtils.readStringArray(p, false))); + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + } + return alreadyFollowedIndexUUIDS; + }, FOLLOWED_LEADER_INDICES_FIELD); + } + + public static AutoFollowMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final Map patterns; + private final Map> followedLeaderIndexUUIDs; + + public AutoFollowMetadata(Map patterns, Map> followedLeaderIndexUUIDs) { + this.patterns = patterns; + this.followedLeaderIndexUUIDs = followedLeaderIndexUUIDs; + } + + public AutoFollowMetadata(StreamInput in) throws IOException { + patterns = in.readMap(StreamInput::readString, AutoFollowPattern::new); + followedLeaderIndexUUIDs = in.readMapOfLists(StreamInput::readString, StreamInput::readString); + } + + public Map getPatterns() { + return patterns; + } + + public Map> getFollowedLeaderIndexUUIDs() { + return followedLeaderIndexUUIDs; + } + + @Override + public EnumSet context() { + // TODO: When a snapshot is restored do we want to restore this? + // (Otherwise we would start following indices automatically immediately) + return MetaData.ALL_CONTEXTS; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_6_5_0.minimumCompatibilityVersion(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(patterns, StreamOutput::writeString, (out1, value) -> value.writeTo(out1)); + out.writeMapOfLists(followedLeaderIndexUUIDs, StreamOutput::writeString, StreamOutput::writeString); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(PATTERNS_FIELD.getPreferredName()); + for (Map.Entry entry : patterns.entrySet()) { + builder.startObject(entry.getKey()); + builder.value(entry.getValue()); + builder.endObject(); + } + builder.endObject(); + + builder.startObject(FOLLOWED_LEADER_INDICES_FIELD.getPreferredName()); + for (Map.Entry> entry : followedLeaderIndexUUIDs.entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + return builder; + } + + @Override + public boolean isFragment() { + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AutoFollowMetadata that = (AutoFollowMetadata) o; + return Objects.equals(patterns, that.patterns); + } + + @Override + public int hashCode() { + return Objects.hash(patterns); + } + + public static class AutoFollowPattern implements Writeable, ToXContentObject { + + private static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_patterns"); + private static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_pattern"); + public static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count"); + public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); + public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes"); + public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); + public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout"); + public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("auto_follow_pattern", + args -> new AutoFollowPattern((List) args[0], (String) args[1], (Integer) args[2], (Integer) args[3], + (Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8])); + + static { + PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_OPERATION_COUNT); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_READ_BATCHES); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_SIZE_IN_BYTES); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()), + RETRY_TIMEOUT, ObjectParser.ValueType.STRING); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), + IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + } + + private final List leaderIndexPatterns; + private final String followIndexPattern; + private final Integer maxBatchOperationCount; + private final Integer maxConcurrentReadBatches; + private final Long maxOperationSizeInBytes; + private final Integer maxConcurrentWriteBatches; + private final Integer maxWriteBufferSize; + private final TimeValue retryTimeout; + private final TimeValue idleShardRetryDelay; + + public AutoFollowPattern(List leaderIndexPatterns, String followIndexPattern, Integer maxBatchOperationCount, + Integer maxConcurrentReadBatches, Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches, + Integer maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay) { + this.leaderIndexPatterns = leaderIndexPatterns; + this.followIndexPattern = followIndexPattern; + this.maxBatchOperationCount = maxBatchOperationCount; + this.maxConcurrentReadBatches = maxConcurrentReadBatches; + this.maxOperationSizeInBytes = maxOperationSizeInBytes; + this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + this.maxWriteBufferSize = maxWriteBufferSize; + this.retryTimeout = retryTimeout; + this.idleShardRetryDelay = idleShardRetryDelay; + } + + AutoFollowPattern(StreamInput in) throws IOException { + leaderIndexPatterns = in.readList(StreamInput::readString); + followIndexPattern = in.readOptionalString(); + maxBatchOperationCount = in.readOptionalVInt(); + maxConcurrentReadBatches = in.readOptionalVInt(); + maxOperationSizeInBytes = in.readOptionalLong(); + maxConcurrentWriteBatches = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalVInt(); + retryTimeout = in.readOptionalTimeValue(); + idleShardRetryDelay = in.readOptionalTimeValue(); + } + + public boolean match(String indexName) { + return match(leaderIndexPatterns, indexName); + } + + public static boolean match(List leaderIndexPatterns, String indexName) { + return Regex.simpleMatch(leaderIndexPatterns, indexName); + } + + public List getLeaderIndexPatterns() { + return leaderIndexPatterns; + } + + public String getFollowIndexPattern() { + return followIndexPattern; + } + + public Integer getMaxBatchOperationCount() { + return maxBatchOperationCount; + } + + public Integer getMaxConcurrentReadBatches() { + return maxConcurrentReadBatches; + } + + public Long getMaxOperationSizeInBytes() { + return maxOperationSizeInBytes; + } + + public Integer getMaxConcurrentWriteBatches() { + return maxConcurrentWriteBatches; + } + + public Integer getMaxWriteBufferSize() { + return maxWriteBufferSize; + } + + public TimeValue getRetryTimeout() { + return retryTimeout; + } + + public TimeValue getIdleShardRetryDelay() { + return idleShardRetryDelay; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringList(leaderIndexPatterns); + out.writeOptionalString(followIndexPattern); + out.writeOptionalVInt(maxBatchOperationCount); + out.writeOptionalVInt(maxConcurrentReadBatches); + out.writeOptionalLong(maxOperationSizeInBytes); + out.writeOptionalVInt(maxConcurrentWriteBatches); + out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalTimeValue(retryTimeout); + out.writeOptionalTimeValue(idleShardRetryDelay); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.array(LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns.toArray(new String[0])); + if (followIndexPattern != null) { + builder.field(FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexPattern); + } + if (maxBatchOperationCount != null) { + builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); + } + if (maxConcurrentReadBatches != null) { + builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); + } + if (maxOperationSizeInBytes != null) { + builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes); + } + if (maxConcurrentWriteBatches != null) { + builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); + } + if (maxWriteBufferSize != null){ + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + } + if (retryTimeout != null) { + builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout); + } + if (idleShardRetryDelay != null) { + builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay); + } + return builder; + } + + @Override + public boolean isFragment() { + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AutoFollowPattern that = (AutoFollowPattern) o; + return Objects.equals(leaderIndexPatterns, that.leaderIndexPatterns) && + Objects.equals(followIndexPattern, that.followIndexPattern) && + Objects.equals(maxBatchOperationCount, that.maxBatchOperationCount) && + Objects.equals(maxConcurrentReadBatches, that.maxConcurrentReadBatches) && + Objects.equals(maxOperationSizeInBytes, that.maxOperationSizeInBytes) && + Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && + Objects.equals(retryTimeout, that.retryTimeout) && + Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay); + } + + @Override + public int hashCode() { + return Objects.hash( + leaderIndexPatterns, + followIndexPattern, + maxBatchOperationCount, + maxConcurrentReadBatches, + maxOperationSizeInBytes, + maxConcurrentWriteBatches, + maxWriteBufferSize, + retryTimeout, + idleShardRetryDelay + ); + } + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadataTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadataTests.java new file mode 100644 index 00000000000..5227c04962a --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadataTests.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ccr; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +public class AutoFollowMetadataTests extends AbstractSerializingTestCase { + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + return s -> true; + } + + @Override + protected AutoFollowMetadata doParseInstance(XContentParser parser) throws IOException { + return AutoFollowMetadata.fromXContent(parser); + } + + @Override + protected AutoFollowMetadata createTestInstance() { + int numEntries = randomIntBetween(0, 32); + Map configs = new HashMap<>(numEntries); + Map> followedLeaderIndices = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + List leaderPatterns = Arrays.asList(generateRandomStringArray(4, 4, false)); + AutoFollowMetadata.AutoFollowPattern autoFollowPattern = + new AutoFollowMetadata.AutoFollowPattern(leaderPatterns, randomAlphaOfLength(4), randomIntBetween(0, Integer.MAX_VALUE), + randomIntBetween(0, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(0, Integer.MAX_VALUE), + randomIntBetween(0, Integer.MAX_VALUE), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500)); + configs.put(Integer.toString(i), autoFollowPattern); + followedLeaderIndices.put(Integer.toString(i), Arrays.asList(generateRandomStringArray(4, 4, false))); + } + return new AutoFollowMetadata(configs, followedLeaderIndices); + } + + @Override + protected Writeable.Reader instanceReader() { + return AutoFollowMetadata::new; + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.delete_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.delete_auto_follow_pattern.json new file mode 100644 index 00000000000..b14effd5f3f --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.delete_auto_follow_pattern.json @@ -0,0 +1,17 @@ +{ + "ccr.delete_auto_follow_pattern": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", + "methods": [ "DELETE" ], + "url": { + "path": "/_ccr/_auto_follow/{leader_cluster_alias}", + "paths": [ "/_ccr/_auto_follow/{leader_cluster_alias}" ], + "parts": { + "leader_cluster_alias": { + "type": "string", + "required": true, + "description": "The name of the leader cluster alias." + } + } + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.put_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.put_auto_follow_pattern.json new file mode 100644 index 00000000000..28e7299713d --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.put_auto_follow_pattern.json @@ -0,0 +1,21 @@ +{ + "ccr.put_auto_follow_pattern": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", + "methods": [ "PUT" ], + "url": { + "path": "/_ccr/_auto_follow/{leader_cluster_alias}", + "paths": [ "/_ccr/_auto_follow/{leader_cluster_alias}" ], + "parts": { + "leader_cluster_alias": { + "type": "string", + "required": true, + "description": "The name of the leader cluster alias." + } + } + }, + "body": { + "description" : "The specification of the auto follow pattern", + "required" : true + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml new file mode 100644 index 00000000000..f4cf79fb558 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml @@ -0,0 +1,13 @@ +--- +"Test put and delete auto follow pattern": + - do: + ccr.put_auto_follow_pattern: + leader_cluster_alias: _local_ + body: + leader_index_patterns: ['logs-*'] + - is_true: acknowledged + + - do: + ccr.delete_auto_follow_pattern: + leader_cluster_alias: _local_ + - is_true: acknowledged