[CCR] Make leader cluster a required argument. (#34580)

This change makes it no longer possible to follow / auto follow without
specifying a leader cluster. If a local index needs to be followed
then `cluster.remote.*.seeds` should point to nodes in the local cluster.

Closes #34258
This commit is contained in:
Martijn van Groningen 2018-10-19 07:41:46 +02:00 committed by GitHub
parent 39a6163316
commit 44b461aff2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 271 additions and 159 deletions

View File

@ -1,8 +1,27 @@
--- ---
"Test put and delete auto follow pattern": "Test put and delete auto follow pattern":
- do:
cluster.state: {}
- set: {master_node: master}
- do:
nodes.info: {}
- set: {nodes.$master.transport_address: local_ip}
- do:
cluster.put_settings:
body:
transient:
cluster.remote.local.seeds: $local_ip
flat_settings: true
- match: {transient: {cluster.remote.local.seeds: $local_ip}}
- do: - do:
ccr.put_auto_follow_pattern: ccr.put_auto_follow_pattern:
leader_cluster_alias: _local_ leader_cluster_alias: local
body: body:
leader_index_patterns: ['logs-*'] leader_index_patterns: ['logs-*']
max_concurrent_read_batches: 2 max_concurrent_read_batches: 2
@ -10,21 +29,21 @@
- do: - do:
ccr.get_auto_follow_pattern: ccr.get_auto_follow_pattern:
leader_cluster_alias: _local_ leader_cluster_alias: local
- match: { _local_.leader_index_patterns: ['logs-*'] } - match: { local.leader_index_patterns: ['logs-*'] }
- match: { _local_.max_concurrent_read_batches: 2 } - match: { local.max_concurrent_read_batches: 2 }
- do: - do:
ccr.get_auto_follow_pattern: {} ccr.get_auto_follow_pattern: {}
- match: { _local_.leader_index_patterns: ['logs-*'] } - match: { local.leader_index_patterns: ['logs-*'] }
- match: { _local_.max_concurrent_read_batches: 2 } - match: { local.max_concurrent_read_batches: 2 }
- do: - do:
ccr.delete_auto_follow_pattern: ccr.delete_auto_follow_pattern:
leader_cluster_alias: _local_ leader_cluster_alias: local
- is_true: acknowledged - is_true: acknowledged
- do: - do:
catch: missing catch: missing
ccr.get_auto_follow_pattern: ccr.get_auto_follow_pattern:
leader_cluster_alias: _local_ leader_cluster_alias: local

View File

@ -1,5 +1,24 @@
--- ---
"Test follow and unfollow an existing index": "Test follow and unfollow an existing index":
- do:
cluster.state: {}
- set: {master_node: master}
- do:
nodes.info: {}
- set: {nodes.$master.transport_address: local_ip}
- do:
cluster.put_settings:
body:
transient:
cluster.remote.local.seeds: $local_ip
flat_settings: true
- match: {transient: {cluster.remote.local.seeds: $local_ip}}
- do: - do:
indices.create: indices.create:
index: foo index: foo
@ -19,6 +38,7 @@
ccr.follow: ccr.follow:
index: bar index: bar
body: body:
leader_cluster: local
leader_index: foo leader_index: foo
- is_true: follow_index_created - is_true: follow_index_created
- is_true: follow_index_shards_acked - is_true: follow_index_shards_acked
@ -33,6 +53,7 @@
ccr.resume_follow: ccr.resume_follow:
index: bar index: bar
body: body:
leader_cluster: local
leader_index: foo leader_index: foo
- is_true: acknowledged - is_true: acknowledged

View File

@ -1,5 +1,24 @@
--- ---
"Test stats": "Test stats":
- do:
cluster.state: {}
- set: {master_node: master}
- do:
nodes.info: {}
- set: {nodes.$master.transport_address: local_ip}
- do:
cluster.put_settings:
body:
transient:
cluster.remote.local.seeds: $local_ip
flat_settings: true
- match: {transient: {cluster.remote.local.seeds: $local_ip}}
- do: - do:
indices.create: indices.create:
index: foo index: foo
@ -18,6 +37,7 @@
ccr.follow: ccr.follow:
index: bar index: bar
body: body:
leader_cluster: local
leader_index: foo leader_index: foo
- is_true: follow_index_created - is_true: follow_index_created
- is_true: follow_index_shards_acked - is_true: follow_index_shards_acked

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
@ -164,22 +163,14 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
final ClusterStateRequest request = new ClusterStateRequest(); final ClusterStateRequest request = new ClusterStateRequest();
request.clear(); request.clear();
request.metaData(true); request.metaData(true);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
if ("_local_".equals(leaderClusterAlias)) { ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
Client client = CcrLicenseChecker.wrapClient(AutoFollowCoordinator.this.client, headers); client,
client.admin().cluster().state( headers,
request, ActionListener.wrap(r -> handler.accept(r.getState(), null), e -> handler.accept(null, e))); leaderClusterAlias,
} else { request,
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API e -> handler.accept(null, e),
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( leaderClusterState -> handler.accept(leaderClusterState, null));
client,
headers,
leaderClusterAlias,
request,
e -> handler.accept(null, e),
leaderClusterState -> handler.accept(leaderClusterState, null));
}
} }
@Override @Override
@ -305,9 +296,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName); final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);
ResumeFollowAction.Request request = new ResumeFollowAction.Request(); ResumeFollowAction.Request request = new ResumeFollowAction.Request();
if ("_local_".equals(clusterAlias) == false) { request.setLeaderCluster(clusterAlias);
request.setLeaderCluster(clusterAlias);
}
request.setLeaderIndex(indexToFollow.getName()); request.setLeaderIndex(indexToFollow.getName());
request.setFollowerIndex(followIndexName); request.setFollowerIndex(followIndexName);
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount()); request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
@ -346,14 +335,6 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
List<String> followedIndexUUIDs) { List<String> followedIndexUUIDs) {
List<Index> leaderIndicesToFollow = new ArrayList<>(); List<Index> leaderIndicesToFollow = new ArrayList<>();
for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) { for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) {
// If an auto follow pattern has been set up for the local cluster then
// we should not automatically follow a leader index that is also a follow index because
// this can result into an index creation explosion.
if (leaderIndexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) != null &&
clusterAlias.equals("_local_")) {
continue;
}
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
if (followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) { if (followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) {
// TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData // TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData

View File

@ -76,13 +76,7 @@ public class TransportPutAutoFollowPatternAction extends
listener.onFailure(LicenseUtils.newComplianceException("ccr")); listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return; return;
} }
final Client leaderClient; final Client leaderClient = client.getRemoteClusterClient(request.getLeaderCluster());
if (request.getLeaderCluster().equals("_local_")) {
leaderClient = client;
} else {
leaderClient = client.getRemoteClusterClient(request.getLeaderCluster());
}
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear(); clusterStateRequest.clear();
clusterStateRequest.metaData(true); clusterStateRequest.metaData(true);

View File

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -41,7 +40,6 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.function.Consumer;
public final class TransportPutFollowAction public final class TransportPutFollowAction
extends TransportMasterNodeAction<PutFollowAction.Request, PutFollowAction.Response> { extends TransportMasterNodeAction<PutFollowAction.Request, PutFollowAction.Response> {
@ -96,49 +94,22 @@ public final class TransportPutFollowAction
listener.onFailure(LicenseUtils.newComplianceException("ccr")); listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return; return;
} }
String clusterAlias = request.getFollowRequest().getLeaderCluster(); String leaderCluster = request.getFollowRequest().getLeaderCluster();
if (clusterAlias == null) { // Validates whether the leader cluster has been configured properly:
createFollowerIndexAndFollowLocalIndex(request, state, listener); client.getRemoteClusterClient(leaderCluster);
} else {
// In the case of following a local index there is no cluster alias:
client.getRemoteClusterClient(clusterAlias);
String leaderIndex = request.getFollowRequest().getLeaderIndex();
createFollowerIndexAndFollowRemoteIndex(request, clusterAlias, leaderIndex, listener);
}
}
private void createFollowerIndexAndFollowLocalIndex( String leaderIndex = request.getFollowRequest().getLeaderIndex();
final PutFollowAction.Request request, createFollowerIndexAndFollowRemoteIndex(request, leaderCluster, leaderIndex, listener);
final ClusterState state,
final ActionListener<PutFollowAction.Response> listener) {
// following an index in local cluster, so use local cluster state to fetch leader index metadata
final String leaderIndex = request.getFollowRequest().getLeaderIndex();
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex);
if (leaderIndexMetadata == null) {
listener.onFailure(new IndexNotFoundException(leaderIndex));
return;
}
Consumer<String[]> historyUUIDhandler = historyUUIDs -> {
createFollowerIndex(leaderIndexMetadata, historyUUIDs, request, listener);
};
ccrLicenseChecker.hasPrivilegesToFollowIndices(client, new String[] {leaderIndex}, e -> {
if (e == null) {
ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDhandler);
} else {
listener.onFailure(e);
}
});
} }
private void createFollowerIndexAndFollowRemoteIndex( private void createFollowerIndexAndFollowRemoteIndex(
final PutFollowAction.Request request, final PutFollowAction.Request request,
final String clusterAlias, final String leaderCluster,
final String leaderIndex, final String leaderIndex,
final ActionListener<PutFollowAction.Response> listener) { final ActionListener<PutFollowAction.Response> listener) {
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client, client,
clusterAlias, leaderCluster,
leaderIndex, leaderIndex,
listener::onFailure, listener::onFailure,
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));

View File

@ -22,7 +22,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog; import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.SearchSlowLog;
@ -99,38 +98,11 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
return; return;
} }
final String clusterAlias = request.getLeaderCluster(); final String clusterAlias = request.getLeaderCluster();
if (clusterAlias == null) { // Validates whether the leader cluster has been configured properly:
followLocalIndex(request, listener); client.getRemoteClusterClient(clusterAlias);
} else {
// In the case of following a local index there is no cluster alias:
client.getRemoteClusterClient(clusterAlias);
final String leaderIndex = request.getLeaderIndex();
followRemoteIndex(request, clusterAlias, leaderIndex, listener);
}
}
private void followLocalIndex(final ResumeFollowAction.Request request, final String leaderIndex = request.getLeaderIndex();
final ActionListener<AcknowledgedResponse> listener) { followRemoteIndex(request, clusterAlias, leaderIndex, listener);
final ClusterState state = clusterService.state();
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
// following an index in local cluster, so use local cluster state to fetch leader index metadata
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getLeaderIndex());
if (leaderIndexMetadata == null) {
throw new IndexNotFoundException(request.getFollowerIndex());
}
ccrLicenseChecker.hasPrivilegesToFollowIndices(client, new String[] {request.getLeaderIndex()}, e -> {
if (e == null) {
ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDs -> {
try {
start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener);
} catch (final IOException ioe) {
listener.onFailure(ioe);
}
});
} else {
listener.onFailure(e);
}
});
} }
private void followRemoteIndex( private void followRemoteIndex(

View File

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.junit.After;
import org.junit.Before;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
public abstract class CCRSingleNodeTestCase extends ESSingleNodeTestCase {
@Override
protected Settings nodeSettings() {
Settings.Builder builder = Settings.builder();
builder.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
builder.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
builder.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
return builder.build();
}
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singletonList(LocalStateCcr.class);
}
@Before
public void setupLocalRemote() {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
String address = getInstanceFromNode(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", address));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}
@After
public void remoteLocalRemote() {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", (String) null));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}
protected ResumeFollowAction.Request getFollowRequest() {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderCluster("local");
request.setLeaderIndex("leader");
request.setFollowerIndex("follower");
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setPollTimeout(TimeValue.timeValueMillis(10));
return request;
}
}

View File

@ -17,17 +17,17 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.xpack.CCRSingleNodeTestCase;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -36,13 +36,18 @@ import java.util.concurrent.CountDownLatch;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
public class CcrLicenseIT extends ESSingleNodeTestCase { public class CcrLicenseIT extends CCRSingleNodeTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> getPlugins() { protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singletonList(NonCompliantLicenseLocalStateCcr.class); return Collections.singletonList(NonCompliantLicenseLocalStateCcr.class);
} }
@Override
protected Settings nodeSettings() {
return Settings.EMPTY;
}
public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException { public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
final ResumeFollowAction.Request followRequest = getFollowRequest(); final ResumeFollowAction.Request followRequest = getFollowRequest();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
@ -194,13 +199,4 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
assertThat(e.getMessage(), equalTo("current license is non-compliant for [ccr]")); assertThat(e.getMessage(), equalTo("current license is non-compliant for [ccr]"));
} }
private ResumeFollowAction.Request getFollowRequest() {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderIndex("leader");
request.setFollowerIndex("follower");
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setPollTimeout(TimeValue.timeValueMillis(10));
return request;
}
} }

View File

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.xpack.CCRSingleNodeTestCase;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.io.IOException;
import java.util.Map;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
public class LocalIndexFollowingIT extends CCRSingleNodeTestCase {
public void testFollowIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(2, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader");
final PutFollowAction.Request followRequest = new PutFollowAction.Request(getFollowRequest());
client().execute(PutFollowAction.INSTANCE, followRequest).get();
final long firstBatchNumDocs = randomIntBetween(2, 64);
for (int i = 0; i < firstBatchNumDocs; i++) {
client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
}
assertBusy(() -> {
assertThat(client().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs));
});
final long secondBatchNumDocs = randomIntBetween(2, 64);
for (int i = 0; i < secondBatchNumDocs; i++) {
client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
}
assertBusy(() -> {
assertThat(client().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs + secondBatchNumDocs));
});
PauseFollowAction.Request pauseRequest = new PauseFollowAction.Request();
pauseRequest.setFollowIndex("follower");
client().execute(PauseFollowAction.INSTANCE, pauseRequest);
final long thirdBatchNumDocs = randomIntBetween(2, 64);
for (int i = 0; i < thirdBatchNumDocs; i++) {
client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
}
client().execute(ResumeFollowAction.INSTANCE, getFollowRequest()).get();
assertBusy(() -> {
assertThat(client().prepareSearch("follower").get().getHits().totalHits,
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs));
});
}
private String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException {
final String settings;
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("settings");
{
builder.field("index.number_of_shards", numberOfShards);
builder.field("index.number_of_replicas", numberOfReplicas);
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
}
}
builder.endObject();
}
builder.endObject();
settings = BytesReference.bytes(builder).utf8ToString();
}
return settings;
}
}

View File

@ -16,7 +16,6 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
@ -342,34 +341,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(result.get(3).getName(), equalTo("metrics-4")); assertThat(result.get(3).getName(), equalTo("metrics-4"));
} }
public void testGetLeaderIndicesToFollowDoNotSelectFollowIndicesInTheSameCluster() {
MetaData.Builder imdBuilder = MetaData.builder();
imdBuilder.put(IndexMetaData.builder("metrics-0")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0));
imdBuilder.put(IndexMetaData.builder("metrics-1")
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0));
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null);
imdBuilder.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern),
Collections.emptyMap(), Collections.emptyMap()));
ClusterState clusterState = ClusterState.builder(new ClusterName("name"))
.metaData(imdBuilder)
.build();
List<Index> result = AutoFollower.getLeaderIndicesToFollow("_local_", autoFollowPattern, clusterState,
clusterState, Collections.emptyList());
result.sort(Comparator.comparing(Index::getName));
assertThat(result.size(), equalTo(1));
assertThat(result.get(0).getName(), equalTo("metrics-0"));
}
public void testGetFollowerIndexName() { public void testGetFollowerIndexName() {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null,
null, null, null, null, null, null); null, null, null, null, null, null);

View File

@ -72,6 +72,7 @@ public class ResumeFollowActionRequestTests extends AbstractStreamableXContentTe
public void testValidate() { public void testValidate() {
ResumeFollowAction.Request request = new ResumeFollowAction.Request(); ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderCluster("leader_cluster");
request.setLeaderIndex("index1"); request.setLeaderIndex("index1");
request.setFollowerIndex("index2"); request.setFollowerIndex("index2");
request.setMaxRetryDelay(TimeValue.ZERO); request.setMaxRetryDelay(TimeValue.ZERO);

View File

@ -94,7 +94,6 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
return request; return request;
} }
// will be a required field when following local indices is no longer allowed
private String leaderCluster; private String leaderCluster;
public String getLeaderCluster() { public String getLeaderCluster() {
@ -202,6 +201,9 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
ActionRequestValidationException e = null; ActionRequestValidationException e = null;
if (leaderCluster == null) {
e = addValidationError(LEADER_CLUSTER_FIELD.getPreferredName() + " is missing", e);
}
if (leaderIndex == null) { if (leaderIndex == null) {
e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e); e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e);
} }
@ -240,7 +242,7 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
@Override @Override
public void readFrom(final StreamInput in) throws IOException { public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
leaderCluster = in.readOptionalString(); leaderCluster = in.readString();
leaderIndex = in.readString(); leaderIndex = in.readString();
followerIndex = in.readString(); followerIndex = in.readString();
maxBatchOperationCount = in.readOptionalVInt(); maxBatchOperationCount = in.readOptionalVInt();
@ -255,7 +257,7 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
@Override @Override
public void writeTo(final StreamOutput out) throws IOException { public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeOptionalString(leaderCluster); out.writeString(leaderCluster);
out.writeString(leaderIndex); out.writeString(leaderIndex);
out.writeString(followerIndex); out.writeString(followerIndex);
out.writeOptionalVInt(maxBatchOperationCount); out.writeOptionalVInt(maxBatchOperationCount);
@ -320,7 +322,7 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash( return Objects.hash(
leaderCluster, leaderCluster,
leaderIndex, leaderIndex,
followerIndex, followerIndex,
maxBatchOperationCount, maxBatchOperationCount,