[CCR] Added auto follow patterns feature (#33118)
Auto Following Patterns is a cross cluster replication feature that keeps track whether in the leader cluster indices are being created with names that match with a specific pattern and if so automatically let the follower cluster follow these newly created indices. This change adds an `AutoFollowCoordinator` component that is only active on the elected master node. Periodically this component checks the the cluster state of remote clusters if there new leader indices that match with configured auto follow patterns that have been defined in `AutoFollowMetadata` custom metadata. This change also adds two new APIs to manage auto follow patterns. A put auto follow pattern api: ``` PUT /_ccr/_autofollow/{{remote_cluster}} { "leader_index_pattern": ["logs-*", ...], "follow_index_pattern": "{{leader_index}}-copy", "max_concurrent_read_batches": 2 ... // other optional parameters } ``` and delete auto follow pattern api: ``` DELETE /_ccr/_autofollow/{{remote_cluster_alias}} ``` The auto follow patterns are directly tied to the remote cluster aliases configured in the follow cluster. Relates to #33007 Co-authored-by: Jason Tedor jason@tedor.me
This commit is contained in:
parent
d71ced1b00
commit
a721d09c81
|
@ -138,6 +138,15 @@ public class Regex {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar to {@link #simpleMatch(String[], String)}, but accepts a list of strings instead of an array of strings for the patterns to
|
||||
* match.
|
||||
*/
|
||||
public static boolean simpleMatch(final List<String> patterns, final String str) {
|
||||
// #simpleMatch(String[], String) is likely to be inlined into this method
|
||||
return patterns != null && simpleMatch(patterns.toArray(Strings.EMPTY_ARRAY), str);
|
||||
}
|
||||
|
||||
public static boolean simpleMatch(String[] patterns, String[] types) {
|
||||
if (patterns != null && types != null) {
|
||||
for (String type : types) {
|
||||
|
|
|
@ -78,6 +78,34 @@ public class FollowIndexIT extends ESRestTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testAutoFollowPatterns() throws Exception {
|
||||
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
|
||||
|
||||
Request request = new Request("PUT", "/_ccr/_auto_follow/leader_cluster");
|
||||
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.build();
|
||||
request = new Request("PUT", "/logs-20190101");
|
||||
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
|
||||
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
|
||||
assertOK(leaderClient.performRequest(request));
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
String id = Integer.toString(i);
|
||||
index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
|
||||
}
|
||||
}
|
||||
|
||||
assertBusy(() -> {
|
||||
ensureYellow("logs-20190101");
|
||||
verifyDocuments("logs-20190101", 5);
|
||||
});
|
||||
}
|
||||
|
||||
private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
|
||||
XContentBuilder document = jsonBuilder().startObject();
|
||||
for (int i = 0; i < fields.length; i += 2) {
|
||||
|
@ -135,6 +163,15 @@ public class FollowIndexIT extends ESRestTestCase {
|
|||
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
|
||||
}
|
||||
|
||||
private static void ensureYellow(String index) throws IOException {
|
||||
Request request = new Request("GET", "/_cluster/health/" + index);
|
||||
request.addParameter("wait_for_status", "yellow");
|
||||
request.addParameter("wait_for_no_relocating_shards", "true");
|
||||
request.addParameter("timeout", "70s");
|
||||
request.addParameter("level", "shards");
|
||||
client().performRequest(request);
|
||||
}
|
||||
|
||||
private RestClient buildLeaderClient() throws IOException {
|
||||
assert runningAgainstLeaderCluster == false;
|
||||
String leaderUrl = System.getProperty("tests.leader_host");
|
||||
|
|
|
@ -39,21 +39,28 @@ import org.elasticsearch.threadpool.ExecutorBuilder;
|
|||
import org.elasticsearch.threadpool.FixedExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
|
||||
import org.elasticsearch.xpack.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
|
||||
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
|
||||
|
@ -113,7 +120,14 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
final Environment environment,
|
||||
final NodeEnvironment nodeEnvironment,
|
||||
final NamedWriteableRegistry namedWriteableRegistry) {
|
||||
return Collections.singleton(ccrLicenseChecker);
|
||||
if (enabled == false) {
|
||||
return emptyList();
|
||||
}
|
||||
|
||||
return Arrays.asList(
|
||||
ccrLicenseChecker,
|
||||
new AutoFollowCoordinator(settings, client, threadPool, clusterService)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,12 +142,18 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
}
|
||||
|
||||
return Arrays.asList(
|
||||
// internal actions
|
||||
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class),
|
||||
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
|
||||
// stats action
|
||||
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
|
||||
// follow actions
|
||||
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class),
|
||||
new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class),
|
||||
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
|
||||
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class));
|
||||
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class),
|
||||
// auto-follow actions
|
||||
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
|
||||
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class));
|
||||
}
|
||||
|
||||
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
|
||||
|
@ -141,10 +161,15 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<DiscoveryNodes> nodesInCluster) {
|
||||
return Arrays.asList(
|
||||
// stats API
|
||||
new RestCcrStatsAction(settings, restController),
|
||||
// follow APIs
|
||||
new RestCreateAndFollowIndexAction(settings, restController),
|
||||
new RestFollowIndexAction(settings, restController),
|
||||
new RestUnfollowIndexAction(settings, restController));
|
||||
new RestUnfollowIndexAction(settings, restController),
|
||||
// auto-follow APIs
|
||||
new RestDeleteAutoFollowPatternAction(settings, restController),
|
||||
new RestPutAutoFollowPatternAction(settings, restController));
|
||||
}
|
||||
|
||||
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr;
|
|||
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
@ -32,6 +33,12 @@ public final class CcrSettings {
|
|||
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
|
||||
Setting.boolSetting("index.xpack.ccr.following_index", false, Setting.Property.IndexScope);
|
||||
|
||||
/**
|
||||
* Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow
|
||||
*/
|
||||
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_POLL_INTERVAL =
|
||||
Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope);
|
||||
|
||||
/**
|
||||
* The settings defined by CCR.
|
||||
*
|
||||
|
@ -40,7 +47,8 @@ public final class CcrSettings {
|
|||
static List<Setting<?>> getSettings() {
|
||||
return Arrays.asList(
|
||||
CCR_ENABLED_SETTING,
|
||||
CCR_FOLLOWING_INDEX_SETTING);
|
||||
CCR_FOLLOWING_INDEX_SETTING,
|
||||
CCR_AUTO_FOLLOW_POLL_INTERVAL);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,306 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* A component that runs only on the elected master node and follows leader indices automatically
|
||||
* if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
|
||||
*/
|
||||
public class AutoFollowCoordinator implements ClusterStateApplier {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class);
|
||||
|
||||
private final Client client;
|
||||
private final TimeValue pollInterval;
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private volatile boolean localNodeMaster = false;
|
||||
|
||||
public AutoFollowCoordinator(Settings settings,
|
||||
Client client,
|
||||
ThreadPool threadPool,
|
||||
ClusterService clusterService) {
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
|
||||
this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings);
|
||||
clusterService.addStateApplier(this);
|
||||
}
|
||||
|
||||
private void doAutoFollow() {
|
||||
if (localNodeMaster == false) {
|
||||
return;
|
||||
}
|
||||
ClusterState followerClusterState = clusterService.state();
|
||||
AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
|
||||
if (autoFollowMetadata == null) {
|
||||
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
|
||||
return;
|
||||
}
|
||||
|
||||
if (autoFollowMetadata.getPatterns().isEmpty()) {
|
||||
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
|
||||
return;
|
||||
}
|
||||
|
||||
Consumer<Exception> handler = e -> {
|
||||
if (e != null) {
|
||||
LOGGER.warn("Failure occurred during auto following indices", e);
|
||||
}
|
||||
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
|
||||
};
|
||||
AutoFollower operation = new AutoFollower(client, handler, followerClusterState) {
|
||||
|
||||
@Override
|
||||
void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
|
||||
ClusterStateRequest request = new ClusterStateRequest();
|
||||
request.clear();
|
||||
request.metaData(true);
|
||||
leaderClient.admin().cluster().state(request,
|
||||
ActionListener.wrap(
|
||||
r -> handler.accept(r.getState(), null),
|
||||
e -> handler.accept(null, e)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
void createAndFollow(FollowIndexAction.Request followRequest,
|
||||
Runnable successHandler,
|
||||
Consumer<Exception> failureHandler) {
|
||||
client.execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest),
|
||||
ActionListener.wrap(r -> successHandler.run(), failureHandler));
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
||||
clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() {
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return updateFunction.apply(currentState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
handler.accept(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
handler.accept(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
};
|
||||
operation.autoFollowIndices();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyClusterState(ClusterChangedEvent event) {
|
||||
final boolean beforeLocalMasterNode = localNodeMaster;
|
||||
localNodeMaster = event.localNodeMaster();
|
||||
if (beforeLocalMasterNode == false && localNodeMaster) {
|
||||
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
|
||||
}
|
||||
}
|
||||
|
||||
abstract static class AutoFollower {
|
||||
|
||||
private final Client client;
|
||||
private final Consumer<Exception> handler;
|
||||
private final ClusterState followerClusterState;
|
||||
private final AutoFollowMetadata autoFollowMetadata;
|
||||
|
||||
private final CountDown autoFollowPatternsCountDown;
|
||||
private final AtomicReference<Exception> autoFollowPatternsErrorHolder = new AtomicReference<>();
|
||||
|
||||
AutoFollower(Client client, Consumer<Exception> handler, ClusterState followerClusterState) {
|
||||
this.client = client;
|
||||
this.handler = handler;
|
||||
this.followerClusterState = followerClusterState;
|
||||
this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
|
||||
this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size());
|
||||
}
|
||||
|
||||
void autoFollowIndices() {
|
||||
for (Map.Entry<String, AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) {
|
||||
String clusterAlias = entry.getKey();
|
||||
AutoFollowPattern autoFollowPattern = entry.getValue();
|
||||
Client leaderClient = clusterAlias.equals("_local_") ? client : client.getRemoteClusterClient(clusterAlias);
|
||||
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
|
||||
|
||||
getLeaderClusterState(leaderClient, (leaderClusterState, e) -> {
|
||||
if (leaderClusterState != null) {
|
||||
assert e == null;
|
||||
handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState);
|
||||
} else {
|
||||
finalise(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollowPattern,
|
||||
List<String> followedIndexUUIDs, ClusterState leaderClusterState) {
|
||||
final List<Index> leaderIndicesToFollow =
|
||||
getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs);
|
||||
if (leaderIndicesToFollow.isEmpty()) {
|
||||
finalise(null);
|
||||
} else {
|
||||
final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
|
||||
final AtomicReference<Exception> leaderIndicesErrorHolder = new AtomicReference<>();
|
||||
for (Index indexToFollow : leaderIndicesToFollow) {
|
||||
final String leaderIndexName = indexToFollow.getName();
|
||||
final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName);
|
||||
|
||||
String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
|
||||
clusterAlias + ":" + leaderIndexName;
|
||||
FollowIndexAction.Request followRequest =
|
||||
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
|
||||
autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(),
|
||||
autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(),
|
||||
autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getRetryTimeout(),
|
||||
autoFollowPattern.getIdleShardRetryDelay());
|
||||
|
||||
// Execute if the create and follow api call succeeds:
|
||||
Runnable successHandler = () -> {
|
||||
LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName);
|
||||
|
||||
// This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
|
||||
// (so that we do not try to follow it in subsequent auto follow runs)
|
||||
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow);
|
||||
// The coordinator always runs on the elected master node, so we can update cluster state here:
|
||||
updateAutoFollowMetadata(function, updateError -> {
|
||||
if (updateError != null) {
|
||||
LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError);
|
||||
if (leaderIndicesErrorHolder.compareAndSet(null, updateError) == false) {
|
||||
leaderIndicesErrorHolder.get().addSuppressed(updateError);
|
||||
}
|
||||
} else {
|
||||
LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName);
|
||||
}
|
||||
if (leaderIndicesCountDown.countDown()) {
|
||||
finalise(leaderIndicesErrorHolder.get());
|
||||
}
|
||||
});
|
||||
};
|
||||
// Execute if the create and follow apu call fails:
|
||||
Consumer<Exception> failureHandler = followError -> {
|
||||
assert followError != null;
|
||||
LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError);
|
||||
if (leaderIndicesCountDown.countDown()) {
|
||||
finalise(followError);
|
||||
}
|
||||
};
|
||||
createAndFollow(followRequest, successHandler, failureHandler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void finalise(Exception failure) {
|
||||
if (autoFollowPatternsErrorHolder.compareAndSet(null, failure) == false) {
|
||||
autoFollowPatternsErrorHolder.get().addSuppressed(failure);
|
||||
}
|
||||
|
||||
if (autoFollowPatternsCountDown.countDown()) {
|
||||
handler.accept(autoFollowPatternsErrorHolder.get());
|
||||
}
|
||||
}
|
||||
|
||||
static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern,
|
||||
ClusterState leaderClusterState,
|
||||
ClusterState followerClusterState,
|
||||
List<String> followedIndexUUIDs) {
|
||||
List<Index> leaderIndicesToFollow = new ArrayList<>();
|
||||
for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) {
|
||||
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
|
||||
if (followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) {
|
||||
// TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData
|
||||
// has a leader index uuid custom metadata entry that matches with uuid of leaderIndexMetaData variable
|
||||
// If so then handle it differently: not follow it, but just add an entry to
|
||||
// AutoFollowMetadata#followedLeaderIndexUUIDs
|
||||
leaderIndicesToFollow.add(leaderIndexMetaData.getIndex());
|
||||
}
|
||||
}
|
||||
}
|
||||
return leaderIndicesToFollow;
|
||||
}
|
||||
|
||||
static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String leaderIndexName) {
|
||||
if (autoFollowPattern.getFollowIndexPattern() != null) {
|
||||
return autoFollowPattern.getFollowIndexPattern().replace("{{leader_index}}", leaderIndexName);
|
||||
} else {
|
||||
return leaderIndexName;
|
||||
}
|
||||
}
|
||||
|
||||
static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(String clusterAlias, Index indexToFollow) {
|
||||
return currentState -> {
|
||||
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
|
||||
Map<String, List<String>> newFollowedIndexUUIDS =
|
||||
new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
|
||||
newFollowedIndexUUIDS.get(clusterAlias).add(indexToFollow.getUUID());
|
||||
|
||||
ClusterState.Builder newState = ClusterState.builder(currentState);
|
||||
AutoFollowMetadata newAutoFollowMetadata =
|
||||
new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), newFollowedIndexUUIDS);
|
||||
newState.metaData(MetaData.builder(currentState.getMetaData())
|
||||
.putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata)
|
||||
.build());
|
||||
return newState.build();
|
||||
};
|
||||
}
|
||||
|
||||
// abstract methods to make unit testing possible:
|
||||
|
||||
abstract void getLeaderClusterState(Client leaderClient,
|
||||
BiConsumer<ClusterState,
|
||||
Exception> handler);
|
||||
|
||||
abstract void createAndFollow(FollowIndexAction.Request followRequest,
|
||||
Runnable successHandler,
|
||||
Consumer<Exception> failureHandler);
|
||||
|
||||
abstract void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
|
||||
Consumer<Exception> handler);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
public class DeleteAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||
|
||||
public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/delete";
|
||||
public static final DeleteAutoFollowPatternAction INSTANCE = new DeleteAutoFollowPatternAction();
|
||||
|
||||
private DeleteAutoFollowPatternAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AcknowledgedResponse newResponse() {
|
||||
return new AcknowledgedResponse();
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<Request> {
|
||||
|
||||
private String leaderClusterAlias;
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (leaderClusterAlias == null) {
|
||||
validationException = addValidationError("leaderClusterAlias is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
public String getLeaderClusterAlias() {
|
||||
return leaderClusterAlias;
|
||||
}
|
||||
|
||||
public void setLeaderClusterAlias(String leaderClusterAlias) {
|
||||
this.leaderClusterAlias = leaderClusterAlias;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
leaderClusterAlias = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(leaderClusterAlias);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Request request = (Request) o;
|
||||
return Objects.equals(leaderClusterAlias, request.leaderClusterAlias);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(leaderClusterAlias);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,284 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||
|
||||
public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/put";
|
||||
public static final PutAutoFollowPatternAction INSTANCE = new PutAutoFollowPatternAction();
|
||||
|
||||
private PutAutoFollowPatternAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AcknowledgedResponse newResponse() {
|
||||
return new AcknowledgedResponse();
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
|
||||
|
||||
static final ParseField LEADER_CLUSTER_ALIAS_FIELD = new ParseField("leader_cluster_alias");
|
||||
static final ParseField LEADER_INDEX_PATTERNS_FIELD = new ParseField("leader_index_patterns");
|
||||
static final ParseField FOLLOW_INDEX_NAME_PATTERN_FIELD = new ParseField("follow_index_name_pattern");
|
||||
|
||||
private static final ObjectParser<Request, String> PARSER = new ObjectParser<>("put_auto_follow_pattern_request", Request::new);
|
||||
|
||||
static {
|
||||
PARSER.declareString(Request::setLeaderClusterAlias, LEADER_CLUSTER_ALIAS_FIELD);
|
||||
PARSER.declareStringArray(Request::setLeaderIndexPatterns, LEADER_INDEX_PATTERNS_FIELD);
|
||||
PARSER.declareString(Request::setFollowIndexNamePattern, FOLLOW_INDEX_NAME_PATTERN_FIELD);
|
||||
PARSER.declareInt(Request::setMaxBatchOperationCount, AutoFollowPattern.MAX_BATCH_OPERATION_COUNT);
|
||||
PARSER.declareInt(Request::setMaxConcurrentReadBatches, AutoFollowPattern.MAX_CONCURRENT_READ_BATCHES);
|
||||
PARSER.declareLong(Request::setMaxOperationSizeInBytes, AutoFollowPattern.MAX_BATCH_SIZE_IN_BYTES);
|
||||
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES);
|
||||
PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE);
|
||||
PARSER.declareField(Request::setRetryTimeout,
|
||||
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.RETRY_TIMEOUT.getPreferredName()),
|
||||
ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
|
||||
PARSER.declareField(Request::setIdleShardRetryDelay,
|
||||
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
|
||||
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
|
||||
}
|
||||
|
||||
public static Request fromXContent(XContentParser parser, String remoteClusterAlias) throws IOException {
|
||||
Request request = PARSER.parse(parser, null);
|
||||
if (remoteClusterAlias != null) {
|
||||
if (request.leaderClusterAlias == null) {
|
||||
request.leaderClusterAlias = remoteClusterAlias;
|
||||
} else {
|
||||
if (request.leaderClusterAlias.equals(remoteClusterAlias) == false) {
|
||||
throw new IllegalArgumentException("provided leaderClusterAlias is not equal");
|
||||
}
|
||||
}
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
private String leaderClusterAlias;
|
||||
private List<String> leaderIndexPatterns;
|
||||
private String followIndexNamePattern;
|
||||
|
||||
private Integer maxBatchOperationCount;
|
||||
private Integer maxConcurrentReadBatches;
|
||||
private Long maxOperationSizeInBytes;
|
||||
private Integer maxConcurrentWriteBatches;
|
||||
private Integer maxWriteBufferSize;
|
||||
private TimeValue retryTimeout;
|
||||
private TimeValue idleShardRetryDelay;
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (leaderClusterAlias == null) {
|
||||
validationException = addValidationError("leaderClusterAlias is missing", validationException);
|
||||
}
|
||||
if (leaderIndexPatterns == null || leaderIndexPatterns.isEmpty()) {
|
||||
validationException = addValidationError("leaderIndexPatterns is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
public String getLeaderClusterAlias() {
|
||||
return leaderClusterAlias;
|
||||
}
|
||||
|
||||
public void setLeaderClusterAlias(String leaderClusterAlias) {
|
||||
this.leaderClusterAlias = leaderClusterAlias;
|
||||
}
|
||||
|
||||
public List<String> getLeaderIndexPatterns() {
|
||||
return leaderIndexPatterns;
|
||||
}
|
||||
|
||||
public void setLeaderIndexPatterns(List<String> leaderIndexPatterns) {
|
||||
this.leaderIndexPatterns = leaderIndexPatterns;
|
||||
}
|
||||
|
||||
public String getFollowIndexNamePattern() {
|
||||
return followIndexNamePattern;
|
||||
}
|
||||
|
||||
public void setFollowIndexNamePattern(String followIndexNamePattern) {
|
||||
this.followIndexNamePattern = followIndexNamePattern;
|
||||
}
|
||||
|
||||
public Integer getMaxBatchOperationCount() {
|
||||
return maxBatchOperationCount;
|
||||
}
|
||||
|
||||
public void setMaxBatchOperationCount(Integer maxBatchOperationCount) {
|
||||
this.maxBatchOperationCount = maxBatchOperationCount;
|
||||
}
|
||||
|
||||
public Integer getMaxConcurrentReadBatches() {
|
||||
return maxConcurrentReadBatches;
|
||||
}
|
||||
|
||||
public void setMaxConcurrentReadBatches(Integer maxConcurrentReadBatches) {
|
||||
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
|
||||
}
|
||||
|
||||
public Long getMaxOperationSizeInBytes() {
|
||||
return maxOperationSizeInBytes;
|
||||
}
|
||||
|
||||
public void setMaxOperationSizeInBytes(Long maxOperationSizeInBytes) {
|
||||
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
|
||||
}
|
||||
|
||||
public Integer getMaxConcurrentWriteBatches() {
|
||||
return maxConcurrentWriteBatches;
|
||||
}
|
||||
|
||||
public void setMaxConcurrentWriteBatches(Integer maxConcurrentWriteBatches) {
|
||||
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
|
||||
}
|
||||
|
||||
public Integer getMaxWriteBufferSize() {
|
||||
return maxWriteBufferSize;
|
||||
}
|
||||
|
||||
public void setMaxWriteBufferSize(Integer maxWriteBufferSize) {
|
||||
this.maxWriteBufferSize = maxWriteBufferSize;
|
||||
}
|
||||
|
||||
public TimeValue getRetryTimeout() {
|
||||
return retryTimeout;
|
||||
}
|
||||
|
||||
public void setRetryTimeout(TimeValue retryTimeout) {
|
||||
this.retryTimeout = retryTimeout;
|
||||
}
|
||||
|
||||
public TimeValue getIdleShardRetryDelay() {
|
||||
return idleShardRetryDelay;
|
||||
}
|
||||
|
||||
public void setIdleShardRetryDelay(TimeValue idleShardRetryDelay) {
|
||||
this.idleShardRetryDelay = idleShardRetryDelay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
leaderClusterAlias = in.readString();
|
||||
leaderIndexPatterns = in.readList(StreamInput::readString);
|
||||
followIndexNamePattern = in.readOptionalString();
|
||||
maxBatchOperationCount = in.readOptionalVInt();
|
||||
maxConcurrentReadBatches = in.readOptionalVInt();
|
||||
maxOperationSizeInBytes = in.readOptionalLong();
|
||||
maxConcurrentWriteBatches = in.readOptionalVInt();
|
||||
maxWriteBufferSize = in.readOptionalVInt();
|
||||
retryTimeout = in.readOptionalTimeValue();
|
||||
idleShardRetryDelay = in.readOptionalTimeValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(leaderClusterAlias);
|
||||
out.writeStringList(leaderIndexPatterns);
|
||||
out.writeOptionalString(followIndexNamePattern);
|
||||
out.writeOptionalVInt(maxBatchOperationCount);
|
||||
out.writeOptionalVInt(maxConcurrentReadBatches);
|
||||
out.writeOptionalLong(maxOperationSizeInBytes);
|
||||
out.writeOptionalVInt(maxConcurrentWriteBatches);
|
||||
out.writeOptionalVInt(maxWriteBufferSize);
|
||||
out.writeOptionalTimeValue(retryTimeout);
|
||||
out.writeOptionalTimeValue(idleShardRetryDelay);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias);
|
||||
builder.field(LEADER_INDEX_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns);
|
||||
if (followIndexNamePattern != null) {
|
||||
builder.field(FOLLOW_INDEX_NAME_PATTERN_FIELD.getPreferredName(), followIndexNamePattern);
|
||||
}
|
||||
if (maxBatchOperationCount != null) {
|
||||
builder.field(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
|
||||
}
|
||||
if (maxOperationSizeInBytes != null) {
|
||||
builder.field(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes);
|
||||
}
|
||||
if (maxWriteBufferSize != null) {
|
||||
builder.field(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
|
||||
}
|
||||
if (maxConcurrentReadBatches != null) {
|
||||
builder.field(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
|
||||
}
|
||||
if (maxConcurrentWriteBatches != null) {
|
||||
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
|
||||
}
|
||||
if (retryTimeout != null) {
|
||||
builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
|
||||
}
|
||||
if (idleShardRetryDelay != null) {
|
||||
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
|
||||
}
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Request request = (Request) o;
|
||||
return Objects.equals(leaderClusterAlias, request.leaderClusterAlias) &&
|
||||
Objects.equals(leaderIndexPatterns, request.leaderIndexPatterns) &&
|
||||
Objects.equals(followIndexNamePattern, request.followIndexNamePattern) &&
|
||||
Objects.equals(maxBatchOperationCount, request.maxBatchOperationCount) &&
|
||||
Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) &&
|
||||
Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) &&
|
||||
Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) &&
|
||||
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
|
||||
Objects.equals(retryTimeout, request.retryTimeout) &&
|
||||
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(
|
||||
leaderClusterAlias,
|
||||
leaderIndexPatterns,
|
||||
followIndexNamePattern,
|
||||
maxBatchOperationCount,
|
||||
maxConcurrentReadBatches,
|
||||
maxOperationSizeInBytes,
|
||||
maxConcurrentWriteBatches,
|
||||
maxWriteBufferSize,
|
||||
retryTimeout,
|
||||
idleShardRetryDelay
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TransportDeleteAutoFollowPatternAction extends
|
||||
TransportMasterNodeAction<DeleteAutoFollowPatternAction.Request, AcknowledgedResponse> {
|
||||
|
||||
@Inject
|
||||
public TransportDeleteAutoFollowPatternAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, DeleteAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, DeleteAutoFollowPatternAction.Request::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse() {
|
||||
return new AcknowledgedResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(DeleteAutoFollowPatternAction.Request request,
|
||||
ClusterState state,
|
||||
ActionListener<AcknowledgedResponse> listener) throws Exception {
|
||||
clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(),
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
||||
return new AcknowledgedResponse(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return innerDelete(request, currentState);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static ClusterState innerDelete(DeleteAutoFollowPatternAction.Request request, ClusterState currentState) {
|
||||
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
if (currentAutoFollowMetadata == null) {
|
||||
throw new ResourceNotFoundException("no auto-follow patterns for cluster alias [{}] found",
|
||||
request.getLeaderClusterAlias());
|
||||
}
|
||||
Map<String, AutoFollowPattern> patterns = currentAutoFollowMetadata.getPatterns();
|
||||
AutoFollowPattern autoFollowPatternToRemove = patterns.get(request.getLeaderClusterAlias());
|
||||
if (autoFollowPatternToRemove == null) {
|
||||
throw new ResourceNotFoundException("no auto-follow patterns for cluster alias [{}] found",
|
||||
request.getLeaderClusterAlias());
|
||||
}
|
||||
|
||||
final Map<String, AutoFollowPattern> patternsCopy = new HashMap<>(patterns);
|
||||
final Map<String, List<String>> followedLeaderIndexUUIDSCopy =
|
||||
new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
|
||||
patternsCopy.remove(request.getLeaderClusterAlias());
|
||||
followedLeaderIndexUUIDSCopy.remove(request.getLeaderClusterAlias());
|
||||
|
||||
AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(patternsCopy, followedLeaderIndexUUIDSCopy);
|
||||
ClusterState.Builder newState = ClusterState.builder(currentState);
|
||||
newState.metaData(MetaData.builder(currentState.getMetaData())
|
||||
.putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata)
|
||||
.build());
|
||||
return newState.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(DeleteAutoFollowPatternAction.Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,173 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class TransportPutAutoFollowPatternAction extends
|
||||
TransportMasterNodeAction<PutAutoFollowPatternAction.Request, AcknowledgedResponse> {
|
||||
|
||||
private final Client client;
|
||||
|
||||
@Inject
|
||||
public TransportPutAutoFollowPatternAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters, Client client,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, PutAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, PutAutoFollowPatternAction.Request::new);
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse() {
|
||||
return new AcknowledgedResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(PutAutoFollowPatternAction.Request request,
|
||||
ClusterState state,
|
||||
ActionListener<AcknowledgedResponse> listener) throws Exception {
|
||||
final Client leaderClient;
|
||||
if (request.getLeaderClusterAlias().equals("_local_")) {
|
||||
leaderClient = client;
|
||||
} else {
|
||||
leaderClient = client.getRemoteClusterClient(request.getLeaderClusterAlias());
|
||||
}
|
||||
|
||||
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
||||
clusterStateRequest.clear();
|
||||
clusterStateRequest.metaData(true);
|
||||
|
||||
leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
|
||||
final ClusterState leaderClusterState = clusterStateResponse.getState();
|
||||
clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(),
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
||||
return new AcknowledgedResponse(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return innerPut(request, currentState, leaderClusterState);
|
||||
}
|
||||
});
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
|
||||
ClusterState localState,
|
||||
ClusterState leaderClusterState) {
|
||||
// auto patterns are always overwritten
|
||||
// only already followed index uuids are updated
|
||||
|
||||
AutoFollowMetadata currentAutoFollowMetadata = localState.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
Map<String, List<String>> followedLeaderIndices;
|
||||
Map<String, AutoFollowPattern> patterns;
|
||||
if (currentAutoFollowMetadata != null) {
|
||||
patterns = new HashMap<>(currentAutoFollowMetadata.getPatterns());
|
||||
followedLeaderIndices = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
|
||||
} else {
|
||||
patterns = new HashMap<>();
|
||||
followedLeaderIndices = new HashMap<>();
|
||||
}
|
||||
|
||||
AutoFollowPattern previousPattern = patterns.get(request.getLeaderClusterAlias());
|
||||
List<String> followedIndexUUIDs = followedLeaderIndices.get(request.getLeaderClusterAlias());
|
||||
if (followedIndexUUIDs == null) {
|
||||
followedIndexUUIDs = new ArrayList<>();
|
||||
followedLeaderIndices.put(request.getLeaderClusterAlias(), followedIndexUUIDs);
|
||||
}
|
||||
|
||||
// Mark existing leader indices as already auto followed:
|
||||
if (previousPattern != null) {
|
||||
markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), leaderClusterState.metaData(),
|
||||
previousPattern, followedIndexUUIDs);
|
||||
} else {
|
||||
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), leaderClusterState.metaData(),
|
||||
followedIndexUUIDs);
|
||||
}
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
|
||||
request.getLeaderIndexPatterns(),
|
||||
request.getFollowIndexNamePattern(),
|
||||
request.getMaxBatchOperationCount(),
|
||||
request.getMaxConcurrentReadBatches(),
|
||||
request.getMaxOperationSizeInBytes(),
|
||||
request.getMaxConcurrentWriteBatches(),
|
||||
request.getMaxWriteBufferSize(),
|
||||
request.getRetryTimeout(),
|
||||
request.getIdleShardRetryDelay()
|
||||
);
|
||||
patterns.put(request.getLeaderClusterAlias(), autoFollowPattern);
|
||||
ClusterState.Builder newState = ClusterState.builder(localState);
|
||||
newState.metaData(MetaData.builder(localState.getMetaData())
|
||||
.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, followedLeaderIndices))
|
||||
.build());
|
||||
return newState.build();
|
||||
}
|
||||
|
||||
private static void markExistingIndicesAsAutoFollowedForNewPatterns(
|
||||
List<String> leaderIndexPatterns,
|
||||
MetaData leaderMetaData,
|
||||
AutoFollowPattern previousPattern,
|
||||
List<String> followedIndexUUIDS) {
|
||||
|
||||
final List<String> newPatterns = leaderIndexPatterns
|
||||
.stream()
|
||||
.filter(p -> previousPattern.getLeaderIndexPatterns().contains(p) == false)
|
||||
.collect(Collectors.toList());
|
||||
markExistingIndicesAsAutoFollowed(newPatterns, leaderMetaData, followedIndexUUIDS);
|
||||
}
|
||||
|
||||
private static void markExistingIndicesAsAutoFollowed(
|
||||
List<String> patterns,
|
||||
MetaData leaderMetaData,
|
||||
List<String> followedIndexUUIDS) {
|
||||
|
||||
for (final IndexMetaData indexMetaData : leaderMetaData) {
|
||||
if (AutoFollowPattern.match(patterns, indexMetaData.getIndex().getName())) {
|
||||
followedIndexUUIDS.add(indexMetaData.getIndexUUID());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(PutAutoFollowPatternAction.Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.rest;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.Request;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.INSTANCE;
|
||||
|
||||
public class RestDeleteAutoFollowPatternAction extends BaseRestHandler {
|
||||
|
||||
public RestDeleteAutoFollowPatternAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.DELETE, "/_ccr/_auto_follow/{leader_cluster_alias}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "ccr_delete_auto_follow_pattern_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
Request request = new Request();
|
||||
request.setLeaderClusterAlias(restRequest.param("leader_cluster_alias"));
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.rest;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction.Request;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction.INSTANCE;
|
||||
|
||||
public class RestPutAutoFollowPatternAction extends BaseRestHandler {
|
||||
|
||||
public RestPutAutoFollowPatternAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.PUT, "/_ccr/_auto_follow/{leader_cluster_alias}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "ccr_put_auto_follow_pattern_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
Request request = createRequest(restRequest);
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
static Request createRequest(RestRequest restRequest) throws IOException {
|
||||
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
|
||||
return Request.fromXContent(parser, restRequest.param("leader_cluster_alias"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,296 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||
|
||||
public void testAutoFollower() {
|
||||
Client client = mock(Client.class);
|
||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||
|
||||
ClusterState leaderState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)))
|
||||
.build();
|
||||
|
||||
AutoFollowPattern autoFollowPattern =
|
||||
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
||||
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS);
|
||||
|
||||
ClusterState currentState = ClusterState.builder(new ClusterName("name"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
||||
.build();
|
||||
|
||||
boolean[] invoked = new boolean[]{false};
|
||||
Consumer<Exception> handler = e -> {
|
||||
invoked[0] = true;
|
||||
assertThat(e, nullValue());
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower(client, handler, currentState) {
|
||||
@Override
|
||||
void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
|
||||
handler.accept(leaderState, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler) {
|
||||
assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
|
||||
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
|
||||
successHandler.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
||||
ClusterState resultCs = updateFunction.apply(currentState);
|
||||
AutoFollowMetadata result = resultCs.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
||||
assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1));
|
||||
handler.accept(null);
|
||||
}
|
||||
};
|
||||
autoFollower.autoFollowIndices();
|
||||
assertThat(invoked[0], is(true));
|
||||
}
|
||||
|
||||
public void testAutoFollowerClusterStateApiFailure() {
|
||||
Client client = mock(Client.class);
|
||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||
|
||||
AutoFollowPattern autoFollowPattern =
|
||||
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
||||
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS);
|
||||
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
||||
.build();
|
||||
|
||||
Exception failure = new RuntimeException("failure");
|
||||
boolean[] invoked = new boolean[]{false};
|
||||
Consumer<Exception> handler = e -> {
|
||||
invoked[0] = true;
|
||||
assertThat(e, sameInstance(failure));
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower(client, handler, followerState) {
|
||||
@Override
|
||||
void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
|
||||
handler.accept(null, failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler) {
|
||||
fail("should not get here");
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
||||
fail("should not get here");
|
||||
}
|
||||
};
|
||||
autoFollower.autoFollowIndices();
|
||||
assertThat(invoked[0], is(true));
|
||||
}
|
||||
|
||||
public void testAutoFollowerUpdateClusterStateFailure() {
|
||||
Client client = mock(Client.class);
|
||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||
|
||||
ClusterState leaderState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)))
|
||||
.build();
|
||||
|
||||
AutoFollowPattern autoFollowPattern =
|
||||
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
||||
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS);
|
||||
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
||||
.build();
|
||||
|
||||
Exception failure = new RuntimeException("failure");
|
||||
boolean[] invoked = new boolean[]{false};
|
||||
Consumer<Exception> handler = e -> {
|
||||
invoked[0] = true;
|
||||
assertThat(e, sameInstance(failure));
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower(client, handler, followerState) {
|
||||
@Override
|
||||
void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
|
||||
handler.accept(leaderState, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler) {
|
||||
assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
|
||||
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
|
||||
successHandler.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
||||
handler.accept(failure);
|
||||
}
|
||||
};
|
||||
autoFollower.autoFollowIndices();
|
||||
assertThat(invoked[0], is(true));
|
||||
}
|
||||
|
||||
public void testAutoFollowerCreateAndFollowApiCallFailure() {
|
||||
Client client = mock(Client.class);
|
||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||
|
||||
ClusterState leaderState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)))
|
||||
.build();
|
||||
|
||||
AutoFollowPattern autoFollowPattern =
|
||||
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
||||
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS);
|
||||
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
||||
.build();
|
||||
|
||||
Exception failure = new RuntimeException("failure");
|
||||
boolean[] invoked = new boolean[]{false};
|
||||
Consumer<Exception> handler = e -> {
|
||||
invoked[0] = true;
|
||||
assertThat(e, sameInstance(failure));
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower(client, handler, followerState) {
|
||||
@Override
|
||||
void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
|
||||
handler.accept(leaderState, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler) {
|
||||
assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
|
||||
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
|
||||
failureHandler.accept(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
||||
fail("should not get here");
|
||||
}
|
||||
};
|
||||
autoFollower.autoFollowIndices();
|
||||
assertThat(invoked[0], is(true));
|
||||
}
|
||||
|
||||
public void testGetLeaderIndicesToFollow() {
|
||||
AutoFollowPattern autoFollowPattern =
|
||||
new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null);
|
||||
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap())))
|
||||
.build();
|
||||
|
||||
MetaData.Builder imdBuilder = MetaData.builder();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_INDEX_UUID, "metrics-" + i);
|
||||
imdBuilder.put(IndexMetaData.builder("metrics-" + i)
|
||||
.settings(builder)
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0));
|
||||
}
|
||||
imdBuilder.put(IndexMetaData.builder("logs-0")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0));
|
||||
|
||||
ClusterState leaderState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(imdBuilder)
|
||||
.build();
|
||||
|
||||
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList());
|
||||
result.sort(Comparator.comparing(Index::getName));
|
||||
assertThat(result.size(), equalTo(5));
|
||||
assertThat(result.get(0).getName(), equalTo("metrics-0"));
|
||||
assertThat(result.get(1).getName(), equalTo("metrics-1"));
|
||||
assertThat(result.get(2).getName(), equalTo("metrics-2"));
|
||||
assertThat(result.get(3).getName(), equalTo("metrics-3"));
|
||||
assertThat(result.get(4).getName(), equalTo("metrics-4"));
|
||||
|
||||
List<String> followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID());
|
||||
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs);
|
||||
result.sort(Comparator.comparing(Index::getName));
|
||||
assertThat(result.size(), equalTo(4));
|
||||
assertThat(result.get(0).getName(), equalTo("metrics-0"));
|
||||
assertThat(result.get(1).getName(), equalTo("metrics-1"));
|
||||
assertThat(result.get(2).getName(), equalTo("metrics-3"));
|
||||
assertThat(result.get(3).getName(), equalTo("metrics-4"));
|
||||
}
|
||||
|
||||
public void testGetFollowerIndexName() {
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null,
|
||||
null, null, null, null, null, null);
|
||||
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0"));
|
||||
|
||||
autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-metrics-0", null, null,
|
||||
null, null, null, null, null);
|
||||
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
|
||||
|
||||
autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null,
|
||||
null, null, null, null, null, null);
|
||||
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.xpack.ccr.LocalStateCcr;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class AutoFollowTests extends ESSingleNodeTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return Collections.singleton(LocalStateCcr.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean resetNodeAfterTest() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void testAutoFollow() throws Exception {
|
||||
Settings leaderIndexSettings = Settings.builder()
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
|
||||
.build();
|
||||
|
||||
createIndex("logs-201812", leaderIndexSettings, "_doc");
|
||||
|
||||
// Enabling auto following:
|
||||
putAutoFollowPatterns("logs-*", "transactions-*");
|
||||
|
||||
createIndex("metrics-201901", leaderIndexSettings, "_doc");
|
||||
|
||||
createIndex("logs-201901", leaderIndexSettings, "_doc");
|
||||
assertBusy(() -> {
|
||||
IndicesExistsRequest request = new IndicesExistsRequest("copy-logs-201901");
|
||||
assertTrue(client().admin().indices().exists(request).actionGet().isExists());
|
||||
});
|
||||
createIndex("transactions-201901", leaderIndexSettings, "_doc");
|
||||
assertBusy(() -> {
|
||||
IndicesExistsRequest request = new IndicesExistsRequest("copy-transactions-201901");
|
||||
assertTrue(client().admin().indices().exists(request).actionGet().isExists());
|
||||
});
|
||||
|
||||
IndicesExistsRequest request = new IndicesExistsRequest("copy-metrics-201901");
|
||||
assertFalse(client().admin().indices().exists(request).actionGet().isExists());
|
||||
request = new IndicesExistsRequest("copy-logs-201812");
|
||||
assertFalse(client().admin().indices().exists(request).actionGet().isExists());
|
||||
}
|
||||
|
||||
public void testAutoFollowManyIndices() throws Exception {
|
||||
Settings leaderIndexSettings = Settings.builder()
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
|
||||
.build();
|
||||
|
||||
putAutoFollowPatterns("logs-*");
|
||||
int numIndices = randomIntBetween(4, 32);
|
||||
for (int i = 0; i < numIndices; i++) {
|
||||
createIndex("logs-" + i, leaderIndexSettings, "_doc");
|
||||
}
|
||||
int expectedVal1 = numIndices;
|
||||
assertBusy(() -> {
|
||||
MetaData metaData = client().admin().cluster().prepareState().get().getState().metaData();
|
||||
int count = (int) Arrays.stream(metaData.getConcreteAllIndices()).filter(s -> s.startsWith("copy-")).count();
|
||||
assertThat(count, equalTo(expectedVal1));
|
||||
});
|
||||
|
||||
deleteAutoFollowPatternSetting();
|
||||
createIndex("logs-does-not-count", leaderIndexSettings, "_doc");
|
||||
|
||||
putAutoFollowPatterns("logs-*");
|
||||
int i = numIndices;
|
||||
numIndices = numIndices + randomIntBetween(4, 32);
|
||||
for (; i < numIndices; i++) {
|
||||
createIndex("logs-" + i, leaderIndexSettings, "_doc");
|
||||
}
|
||||
int expectedVal2 = numIndices;
|
||||
assertBusy(() -> {
|
||||
MetaData metaData = client().admin().cluster().prepareState().get().getState().metaData();
|
||||
int count = (int) Arrays.stream(metaData.getConcreteAllIndices()).filter(s -> s.startsWith("copy-")).count();
|
||||
assertThat(count, equalTo(expectedVal2));
|
||||
});
|
||||
}
|
||||
|
||||
public void testAutoFollowParameterAreDelegated() throws Exception {
|
||||
Settings leaderIndexSettings = Settings.builder()
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
|
||||
.build();
|
||||
|
||||
// Enabling auto following:
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setLeaderClusterAlias("_local_");
|
||||
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
||||
// Need to set this, because following an index in the same cluster
|
||||
request.setFollowIndexNamePattern("copy-{{leader_index}}");
|
||||
if (randomBoolean()) {
|
||||
request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setMaxOperationSizeInBytes(randomNonNegativeLong());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setRetryTimeout(TimeValue.timeValueMillis(500));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
|
||||
}
|
||||
assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
|
||||
|
||||
createIndex("logs-201901", leaderIndexSettings, "_doc");
|
||||
assertBusy(() -> {
|
||||
PersistentTasksCustomMetaData persistentTasksMetaData =
|
||||
client().admin().cluster().prepareState().get().getState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
assertThat(persistentTasksMetaData, notNullValue());
|
||||
assertThat(persistentTasksMetaData.tasks().size(), equalTo(1));
|
||||
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams();
|
||||
assertThat(shardFollowTask.getLeaderShardId().getIndexName(), equalTo("logs-201901"));
|
||||
assertThat(shardFollowTask.getFollowShardId().getIndexName(), equalTo("copy-logs-201901"));
|
||||
if (request.getMaxWriteBufferSize() != null) {
|
||||
assertThat(shardFollowTask.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize()));
|
||||
}
|
||||
if (request.getMaxConcurrentReadBatches() != null) {
|
||||
assertThat(shardFollowTask.getMaxConcurrentReadBatches(), equalTo(request.getMaxConcurrentReadBatches()));
|
||||
}
|
||||
if (request.getMaxConcurrentWriteBatches() != null) {
|
||||
assertThat(shardFollowTask.getMaxConcurrentWriteBatches(), equalTo(request.getMaxConcurrentWriteBatches()));
|
||||
}
|
||||
if (request.getMaxBatchOperationCount() != null) {
|
||||
assertThat(shardFollowTask.getMaxBatchOperationCount(), equalTo(request.getMaxBatchOperationCount()));
|
||||
}
|
||||
if (request.getMaxOperationSizeInBytes() != null) {
|
||||
assertThat(shardFollowTask.getMaxBatchSizeInBytes(), equalTo(request.getMaxOperationSizeInBytes()));
|
||||
}
|
||||
if (request.getRetryTimeout() != null) {
|
||||
assertThat(shardFollowTask.getRetryTimeout(), equalTo(request.getRetryTimeout()));
|
||||
}
|
||||
if (request.getIdleShardRetryDelay() != null) {
|
||||
assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void putAutoFollowPatterns(String... patterns) {
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setLeaderClusterAlias("_local_");
|
||||
request.setLeaderIndexPatterns(Arrays.asList(patterns));
|
||||
// Need to set this, because following an index in the same cluster
|
||||
request.setFollowIndexNamePattern("copy-{{leader_index}}");
|
||||
assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
|
||||
}
|
||||
|
||||
private void deleteAutoFollowPatternSetting() {
|
||||
DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request();
|
||||
request.setLeaderClusterAlias("_local_");
|
||||
assertTrue(client().execute(DeleteAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
|
||||
public class DeleteAutoFollowPatternRequestTests extends AbstractStreamableTestCase<DeleteAutoFollowPatternAction.Request> {
|
||||
|
||||
@Override
|
||||
protected DeleteAutoFollowPatternAction.Request createBlankInstance() {
|
||||
return new DeleteAutoFollowPatternAction.Request();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DeleteAutoFollowPatternAction.Request createTestInstance() {
|
||||
DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request();
|
||||
request.setLeaderClusterAlias(randomAlphaOfLength(4));
|
||||
return request;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class PutAutoFollowPatternRequestTests extends AbstractStreamableXContentTestCase<PutAutoFollowPatternAction.Request> {
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PutAutoFollowPatternAction.Request doParseInstance(XContentParser parser) throws IOException {
|
||||
return PutAutoFollowPatternAction.Request.fromXContent(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PutAutoFollowPatternAction.Request createBlankInstance() {
|
||||
return new PutAutoFollowPatternAction.Request();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PutAutoFollowPatternAction.Request createTestInstance() {
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setLeaderClusterAlias(randomAlphaOfLength(4));
|
||||
request.setLeaderIndexPatterns(Arrays.asList(generateRandomStringArray(4, 4, false)));
|
||||
if (randomBoolean()) {
|
||||
request.setFollowIndexNamePattern(randomAlphaOfLength(4));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setRetryTimeout(TimeValue.timeValueMillis(500));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setMaxOperationSizeInBytes(randomNonNegativeLong());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE));
|
||||
}
|
||||
return request;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.Request;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
|
||||
|
||||
public void testInnerDelete() {
|
||||
Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
|
||||
Map<String, AutoFollowMetadata.AutoFollowPattern> existingAutoFollowPatterns = new HashMap<>();
|
||||
{
|
||||
List<String> existingPatterns = new ArrayList<>();
|
||||
existingPatterns.add("transactions-*");
|
||||
existingAutoFollowPatterns.put("eu_cluster",
|
||||
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
|
||||
|
||||
List<String> existingUUIDS = new ArrayList<>();
|
||||
existingUUIDS.add("_val");
|
||||
existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS);
|
||||
}
|
||||
{
|
||||
List<String> existingPatterns = new ArrayList<>();
|
||||
existingPatterns.add("logs-*");
|
||||
existingAutoFollowPatterns.put("asia_cluster",
|
||||
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
|
||||
|
||||
List<String> existingUUIDS = new ArrayList<>();
|
||||
existingUUIDS.add("_val");
|
||||
existingAlreadyFollowedIndexUUIDS.put("asia_cluster", existingUUIDS);
|
||||
}
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS)))
|
||||
.build();
|
||||
|
||||
Request request = new Request();
|
||||
request.setLeaderClusterAlias("eu_cluster");
|
||||
AutoFollowMetadata result = TransportDeleteAutoFollowPatternAction.innerDelete(request, clusterState)
|
||||
.getMetaData()
|
||||
.custom(AutoFollowMetadata.TYPE);
|
||||
assertThat(result.getPatterns().size(), equalTo(1));
|
||||
assertThat(result.getPatterns().get("asia_cluster"), notNullValue());
|
||||
assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
||||
assertThat(result.getFollowedLeaderIndexUUIDs().get("asia_cluster"), notNullValue());
|
||||
}
|
||||
|
||||
public void testInnerDeleteDoesNotExist() {
|
||||
Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
|
||||
Map<String, AutoFollowMetadata.AutoFollowPattern> existingAutoFollowPatterns = new HashMap<>();
|
||||
{
|
||||
List<String> existingPatterns = new ArrayList<>();
|
||||
existingPatterns.add("transactions-*");
|
||||
existingAutoFollowPatterns.put("eu_cluster",
|
||||
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
|
||||
}
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS)))
|
||||
.build();
|
||||
|
||||
Request request = new Request();
|
||||
request.setLeaderClusterAlias("asia_cluster");
|
||||
Exception e = expectThrows(ResourceNotFoundException.class,
|
||||
() -> TransportDeleteAutoFollowPatternAction.innerDelete(request, clusterState));
|
||||
assertThat(e.getMessage(), equalTo("no auto-follow patterns for cluster alias [asia_cluster] found"));
|
||||
}
|
||||
|
||||
public void testInnerDeleteNoAutoFollowMetadata() {
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster"))
|
||||
.metaData(MetaData.builder())
|
||||
.build();
|
||||
|
||||
Request request = new Request();
|
||||
request.setLeaderClusterAlias("asia_cluster");
|
||||
Exception e = expectThrows(ResourceNotFoundException.class,
|
||||
() -> TransportDeleteAutoFollowPatternAction.innerDelete(request, clusterState));
|
||||
assertThat(e.getMessage(), equalTo("no auto-follow patterns for cluster alias [asia_cluster] found"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
|
||||
|
||||
public void testInnerPut() {
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setLeaderClusterAlias("eu_cluster");
|
||||
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
||||
|
||||
ClusterState localState = ClusterState.builder(new ClusterName("us_cluster"))
|
||||
.metaData(MetaData.builder())
|
||||
.build();
|
||||
|
||||
ClusterState remoteState = ClusterState.builder(new ClusterName("eu_cluster"))
|
||||
.metaData(MetaData.builder())
|
||||
.build();
|
||||
|
||||
ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState);
|
||||
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
assertThat(autoFollowMetadata, notNullValue());
|
||||
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
|
||||
assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().size(), equalTo(1));
|
||||
assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
|
||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testInnerPut_existingLeaderIndices() {
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setLeaderClusterAlias("eu_cluster");
|
||||
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
||||
|
||||
ClusterState localState = ClusterState.builder(new ClusterName("us_cluster"))
|
||||
.metaData(MetaData.builder())
|
||||
.build();
|
||||
|
||||
int numLeaderIndices = randomIntBetween(1, 8);
|
||||
int numMatchingLeaderIndices = randomIntBetween(1, 8);
|
||||
MetaData.Builder mdBuilder = MetaData.builder();
|
||||
for (int i = 0; i < numLeaderIndices; i++) {
|
||||
mdBuilder.put(IndexMetaData.builder("transactions-" + i)
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0));
|
||||
}
|
||||
for (int i = 0; i < numMatchingLeaderIndices; i++) {
|
||||
mdBuilder.put(IndexMetaData.builder("logs-" + i)
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0));
|
||||
}
|
||||
|
||||
ClusterState remoteState = ClusterState.builder(new ClusterName("eu_cluster"))
|
||||
.metaData(mdBuilder)
|
||||
.build();
|
||||
|
||||
ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState);
|
||||
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
assertThat(autoFollowMetadata, notNullValue());
|
||||
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
|
||||
assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().size(), equalTo(1));
|
||||
assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
|
||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(numMatchingLeaderIndices));
|
||||
}
|
||||
|
||||
public void testInnerPut_existingLeaderIndicesAndAutoFollowMetadata() {
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setLeaderClusterAlias("eu_cluster");
|
||||
request.setLeaderIndexPatterns(Arrays.asList("logs-*", "transactions-*"));
|
||||
|
||||
Map<String, AutoFollowMetadata.AutoFollowPattern> existingAutoFollowPatterns = new HashMap<>();
|
||||
List<String> existingPatterns = new ArrayList<>();
|
||||
existingPatterns.add("transactions-*");
|
||||
existingAutoFollowPatterns.put("eu_cluster",
|
||||
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
|
||||
Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
|
||||
List<String> existingUUIDS = new ArrayList<>();
|
||||
existingUUIDS.add("_val");
|
||||
existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS);
|
||||
ClusterState localState = ClusterState.builder(new ClusterName("us_cluster"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS)))
|
||||
.build();
|
||||
|
||||
int numLeaderIndices = randomIntBetween(1, 8);
|
||||
MetaData.Builder mdBuilder = MetaData.builder();
|
||||
for (int i = 0; i < numLeaderIndices; i++) {
|
||||
mdBuilder.put(IndexMetaData.builder("logs-" + i)
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0));
|
||||
}
|
||||
|
||||
ClusterState remoteState = ClusterState.builder(new ClusterName("eu_cluster"))
|
||||
.metaData(mdBuilder)
|
||||
.build();
|
||||
|
||||
ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState);
|
||||
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
assertThat(autoFollowMetadata, notNullValue());
|
||||
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
|
||||
assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().size(), equalTo(2));
|
||||
assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
|
||||
assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(1), equalTo("transactions-*"));
|
||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(numLeaderIndices + 1));
|
||||
}
|
||||
|
||||
}
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||
import org.elasticsearch.xpack.core.action.XPackUsageAction;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
|
||||
import org.elasticsearch.xpack.core.graph.GraphFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.graph.action.GraphExploreAction;
|
||||
|
@ -366,7 +367,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ROLLUP, RollupFeatureSetUsage::new),
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new),
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new),
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new)
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new),
|
||||
// ccr
|
||||
new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,357 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ccr;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.AbstractNamedDiffable;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.security.xcontent.XContentUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Custom metadata that contains auto follow patterns and what leader indices an auto follow pattern has already followed.
|
||||
*/
|
||||
public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> implements XPackPlugin.XPackMetaDataCustom {
|
||||
|
||||
public static final String TYPE = "ccr_auto_follow";
|
||||
|
||||
private static final ParseField PATTERNS_FIELD = new ParseField("patterns");
|
||||
private static final ParseField FOLLOWED_LEADER_INDICES_FIELD = new ParseField("followed_leader_indices");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final ConstructingObjectParser<AutoFollowMetadata, Void> PARSER = new ConstructingObjectParser<>("auto_follow",
|
||||
args -> new AutoFollowMetadata((Map<String, AutoFollowPattern>) args[0], (Map<String, List<String>>) args[1]));
|
||||
|
||||
static {
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
String fieldName = null;
|
||||
for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
fieldName = p.currentName();
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
patterns.put(fieldName, AutoFollowPattern.PARSER.parse(p, c));
|
||||
} else {
|
||||
throw new ElasticsearchParseException("unexpected token [" + token + "]");
|
||||
}
|
||||
}
|
||||
return patterns;
|
||||
}, PATTERNS_FIELD);
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
|
||||
Map<String, List<String>> alreadyFollowedIndexUUIDS = new HashMap<>();
|
||||
String fieldName = null;
|
||||
for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
fieldName = p.currentName();
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
alreadyFollowedIndexUUIDS.put(fieldName, Arrays.asList(XContentUtils.readStringArray(p, false)));
|
||||
} else {
|
||||
throw new ElasticsearchParseException("unexpected token [" + token + "]");
|
||||
}
|
||||
}
|
||||
return alreadyFollowedIndexUUIDS;
|
||||
}, FOLLOWED_LEADER_INDICES_FIELD);
|
||||
}
|
||||
|
||||
public static AutoFollowMetadata fromXContent(XContentParser parser) throws IOException {
|
||||
return PARSER.parse(parser, null);
|
||||
}
|
||||
|
||||
private final Map<String, AutoFollowPattern> patterns;
|
||||
private final Map<String, List<String>> followedLeaderIndexUUIDs;
|
||||
|
||||
public AutoFollowMetadata(Map<String, AutoFollowPattern> patterns, Map<String, List<String>> followedLeaderIndexUUIDs) {
|
||||
this.patterns = patterns;
|
||||
this.followedLeaderIndexUUIDs = followedLeaderIndexUUIDs;
|
||||
}
|
||||
|
||||
public AutoFollowMetadata(StreamInput in) throws IOException {
|
||||
patterns = in.readMap(StreamInput::readString, AutoFollowPattern::new);
|
||||
followedLeaderIndexUUIDs = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
|
||||
}
|
||||
|
||||
public Map<String, AutoFollowPattern> getPatterns() {
|
||||
return patterns;
|
||||
}
|
||||
|
||||
public Map<String, List<String>> getFollowedLeaderIndexUUIDs() {
|
||||
return followedLeaderIndexUUIDs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
// TODO: When a snapshot is restored do we want to restore this?
|
||||
// (Otherwise we would start following indices automatically immediately)
|
||||
return MetaData.ALL_CONTEXTS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.V_6_5_0.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeMap(patterns, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
|
||||
out.writeMapOfLists(followedLeaderIndexUUIDs, StreamOutput::writeString, StreamOutput::writeString);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(PATTERNS_FIELD.getPreferredName());
|
||||
for (Map.Entry<String, AutoFollowPattern> entry : patterns.entrySet()) {
|
||||
builder.startObject(entry.getKey());
|
||||
builder.value(entry.getValue());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
builder.startObject(FOLLOWED_LEADER_INDICES_FIELD.getPreferredName());
|
||||
for (Map.Entry<String, List<String>> entry : followedLeaderIndexUUIDs.entrySet()) {
|
||||
builder.field(entry.getKey(), entry.getValue());
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
AutoFollowMetadata that = (AutoFollowMetadata) o;
|
||||
return Objects.equals(patterns, that.patterns);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(patterns);
|
||||
}
|
||||
|
||||
public static class AutoFollowPattern implements Writeable, ToXContentObject {
|
||||
|
||||
private static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_patterns");
|
||||
private static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_pattern");
|
||||
public static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count");
|
||||
public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches");
|
||||
public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes");
|
||||
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
|
||||
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
|
||||
public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout");
|
||||
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final ConstructingObjectParser<AutoFollowPattern, Void> PARSER =
|
||||
new ConstructingObjectParser<>("auto_follow_pattern",
|
||||
args -> new AutoFollowPattern((List<String>) args[0], (String) args[1], (Integer) args[2], (Integer) args[3],
|
||||
(Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8]));
|
||||
|
||||
static {
|
||||
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD);
|
||||
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD);
|
||||
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_OPERATION_COUNT);
|
||||
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_READ_BATCHES);
|
||||
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_SIZE_IN_BYTES);
|
||||
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
|
||||
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE);
|
||||
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
|
||||
(p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()),
|
||||
RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
|
||||
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
|
||||
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
|
||||
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
|
||||
}
|
||||
|
||||
private final List<String> leaderIndexPatterns;
|
||||
private final String followIndexPattern;
|
||||
private final Integer maxBatchOperationCount;
|
||||
private final Integer maxConcurrentReadBatches;
|
||||
private final Long maxOperationSizeInBytes;
|
||||
private final Integer maxConcurrentWriteBatches;
|
||||
private final Integer maxWriteBufferSize;
|
||||
private final TimeValue retryTimeout;
|
||||
private final TimeValue idleShardRetryDelay;
|
||||
|
||||
public AutoFollowPattern(List<String> leaderIndexPatterns, String followIndexPattern, Integer maxBatchOperationCount,
|
||||
Integer maxConcurrentReadBatches, Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches,
|
||||
Integer maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay) {
|
||||
this.leaderIndexPatterns = leaderIndexPatterns;
|
||||
this.followIndexPattern = followIndexPattern;
|
||||
this.maxBatchOperationCount = maxBatchOperationCount;
|
||||
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
|
||||
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
|
||||
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
|
||||
this.maxWriteBufferSize = maxWriteBufferSize;
|
||||
this.retryTimeout = retryTimeout;
|
||||
this.idleShardRetryDelay = idleShardRetryDelay;
|
||||
}
|
||||
|
||||
AutoFollowPattern(StreamInput in) throws IOException {
|
||||
leaderIndexPatterns = in.readList(StreamInput::readString);
|
||||
followIndexPattern = in.readOptionalString();
|
||||
maxBatchOperationCount = in.readOptionalVInt();
|
||||
maxConcurrentReadBatches = in.readOptionalVInt();
|
||||
maxOperationSizeInBytes = in.readOptionalLong();
|
||||
maxConcurrentWriteBatches = in.readOptionalVInt();
|
||||
maxWriteBufferSize = in.readOptionalVInt();
|
||||
retryTimeout = in.readOptionalTimeValue();
|
||||
idleShardRetryDelay = in.readOptionalTimeValue();
|
||||
}
|
||||
|
||||
public boolean match(String indexName) {
|
||||
return match(leaderIndexPatterns, indexName);
|
||||
}
|
||||
|
||||
public static boolean match(List<String> leaderIndexPatterns, String indexName) {
|
||||
return Regex.simpleMatch(leaderIndexPatterns, indexName);
|
||||
}
|
||||
|
||||
public List<String> getLeaderIndexPatterns() {
|
||||
return leaderIndexPatterns;
|
||||
}
|
||||
|
||||
public String getFollowIndexPattern() {
|
||||
return followIndexPattern;
|
||||
}
|
||||
|
||||
public Integer getMaxBatchOperationCount() {
|
||||
return maxBatchOperationCount;
|
||||
}
|
||||
|
||||
public Integer getMaxConcurrentReadBatches() {
|
||||
return maxConcurrentReadBatches;
|
||||
}
|
||||
|
||||
public Long getMaxOperationSizeInBytes() {
|
||||
return maxOperationSizeInBytes;
|
||||
}
|
||||
|
||||
public Integer getMaxConcurrentWriteBatches() {
|
||||
return maxConcurrentWriteBatches;
|
||||
}
|
||||
|
||||
public Integer getMaxWriteBufferSize() {
|
||||
return maxWriteBufferSize;
|
||||
}
|
||||
|
||||
public TimeValue getRetryTimeout() {
|
||||
return retryTimeout;
|
||||
}
|
||||
|
||||
public TimeValue getIdleShardRetryDelay() {
|
||||
return idleShardRetryDelay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeStringList(leaderIndexPatterns);
|
||||
out.writeOptionalString(followIndexPattern);
|
||||
out.writeOptionalVInt(maxBatchOperationCount);
|
||||
out.writeOptionalVInt(maxConcurrentReadBatches);
|
||||
out.writeOptionalLong(maxOperationSizeInBytes);
|
||||
out.writeOptionalVInt(maxConcurrentWriteBatches);
|
||||
out.writeOptionalVInt(maxWriteBufferSize);
|
||||
out.writeOptionalTimeValue(retryTimeout);
|
||||
out.writeOptionalTimeValue(idleShardRetryDelay);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.array(LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns.toArray(new String[0]));
|
||||
if (followIndexPattern != null) {
|
||||
builder.field(FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexPattern);
|
||||
}
|
||||
if (maxBatchOperationCount != null) {
|
||||
builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
|
||||
}
|
||||
if (maxConcurrentReadBatches != null) {
|
||||
builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
|
||||
}
|
||||
if (maxOperationSizeInBytes != null) {
|
||||
builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes);
|
||||
}
|
||||
if (maxConcurrentWriteBatches != null) {
|
||||
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
|
||||
}
|
||||
if (maxWriteBufferSize != null){
|
||||
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
|
||||
}
|
||||
if (retryTimeout != null) {
|
||||
builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout);
|
||||
}
|
||||
if (idleShardRetryDelay != null) {
|
||||
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
AutoFollowPattern that = (AutoFollowPattern) o;
|
||||
return Objects.equals(leaderIndexPatterns, that.leaderIndexPatterns) &&
|
||||
Objects.equals(followIndexPattern, that.followIndexPattern) &&
|
||||
Objects.equals(maxBatchOperationCount, that.maxBatchOperationCount) &&
|
||||
Objects.equals(maxConcurrentReadBatches, that.maxConcurrentReadBatches) &&
|
||||
Objects.equals(maxOperationSizeInBytes, that.maxOperationSizeInBytes) &&
|
||||
Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) &&
|
||||
Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) &&
|
||||
Objects.equals(retryTimeout, that.retryTimeout) &&
|
||||
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(
|
||||
leaderIndexPatterns,
|
||||
followIndexPattern,
|
||||
maxBatchOperationCount,
|
||||
maxConcurrentReadBatches,
|
||||
maxOperationSizeInBytes,
|
||||
maxConcurrentWriteBatches,
|
||||
maxWriteBufferSize,
|
||||
retryTimeout,
|
||||
idleShardRetryDelay
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ccr;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class AutoFollowMetadataTests extends AbstractSerializingTestCase<AutoFollowMetadata> {
|
||||
|
||||
@Override
|
||||
protected Predicate<String> getRandomFieldsExcludeFilter() {
|
||||
return s -> true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AutoFollowMetadata doParseInstance(XContentParser parser) throws IOException {
|
||||
return AutoFollowMetadata.fromXContent(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AutoFollowMetadata createTestInstance() {
|
||||
int numEntries = randomIntBetween(0, 32);
|
||||
Map<String, AutoFollowMetadata.AutoFollowPattern> configs = new HashMap<>(numEntries);
|
||||
Map<String, List<String>> followedLeaderIndices = new HashMap<>(numEntries);
|
||||
for (int i = 0; i < numEntries; i++) {
|
||||
List<String> leaderPatterns = Arrays.asList(generateRandomStringArray(4, 4, false));
|
||||
AutoFollowMetadata.AutoFollowPattern autoFollowPattern =
|
||||
new AutoFollowMetadata.AutoFollowPattern(leaderPatterns, randomAlphaOfLength(4), randomIntBetween(0, Integer.MAX_VALUE),
|
||||
randomIntBetween(0, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(0, Integer.MAX_VALUE),
|
||||
randomIntBetween(0, Integer.MAX_VALUE), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500));
|
||||
configs.put(Integer.toString(i), autoFollowPattern);
|
||||
followedLeaderIndices.put(Integer.toString(i), Arrays.asList(generateRandomStringArray(4, 4, false)));
|
||||
}
|
||||
return new AutoFollowMetadata(configs, followedLeaderIndices);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<AutoFollowMetadata> instanceReader() {
|
||||
return AutoFollowMetadata::new;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
{
|
||||
"ccr.delete_auto_follow_pattern": {
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
|
||||
"methods": [ "DELETE" ],
|
||||
"url": {
|
||||
"path": "/_ccr/_auto_follow/{leader_cluster_alias}",
|
||||
"paths": [ "/_ccr/_auto_follow/{leader_cluster_alias}" ],
|
||||
"parts": {
|
||||
"leader_cluster_alias": {
|
||||
"type": "string",
|
||||
"required": true,
|
||||
"description": "The name of the leader cluster alias."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
{
|
||||
"ccr.put_auto_follow_pattern": {
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
|
||||
"methods": [ "PUT" ],
|
||||
"url": {
|
||||
"path": "/_ccr/_auto_follow/{leader_cluster_alias}",
|
||||
"paths": [ "/_ccr/_auto_follow/{leader_cluster_alias}" ],
|
||||
"parts": {
|
||||
"leader_cluster_alias": {
|
||||
"type": "string",
|
||||
"required": true,
|
||||
"description": "The name of the leader cluster alias."
|
||||
}
|
||||
}
|
||||
},
|
||||
"body": {
|
||||
"description" : "The specification of the auto follow pattern",
|
||||
"required" : true
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
---
|
||||
"Test put and delete auto follow pattern":
|
||||
- do:
|
||||
ccr.put_auto_follow_pattern:
|
||||
leader_cluster_alias: _local_
|
||||
body:
|
||||
leader_index_patterns: ['logs-*']
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.delete_auto_follow_pattern:
|
||||
leader_cluster_alias: _local_
|
||||
- is_true: acknowledged
|
Loading…
Reference in New Issue