From 44b461aff2bf5de6517cb8ef3c43ca61e2286cef Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 19 Oct 2018 07:41:46 +0200 Subject: [PATCH] [CCR] Make leader cluster a required argument. (#34580) This change makes it no longer possible to follow / auto follow without specifying a leader cluster. If a local index needs to be followed then `cluster.remote.*.seeds` should point to nodes in the local cluster. Closes #34258 --- .../rest-api-spec/test/ccr/auto_follow.yml | 35 +++++-- .../test/ccr/follow_and_unfollow.yml | 21 +++++ .../rest-api-spec/test/ccr/follow_stats.yml | 20 ++++ .../ccr/action/AutoFollowCoordinator.java | 37 ++------ .../TransportPutAutoFollowPatternAction.java | 8 +- .../ccr/action/TransportPutFollowAction.java | 43 ++------- .../action/TransportResumeFollowAction.java | 36 +------ .../xpack/CCRSingleNodeTestCase.java | 71 ++++++++++++++ .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 26 +++--- .../xpack/ccr/LocalIndexFollowingIT.java | 93 +++++++++++++++++++ .../action/AutoFollowCoordinatorTests.java | 29 ------ .../ResumeFollowActionRequestTests.java | 1 + .../core/ccr/action/ResumeFollowAction.java | 10 +- 13 files changed, 271 insertions(+), 159 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CCRSingleNodeTestCase.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml index 2bf7820c10f..c19598d39c8 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml @@ -1,8 +1,27 @@ --- "Test put and delete auto follow pattern": + - do: + cluster.state: {} + + - set: {master_node: master} + + - do: + nodes.info: {} + + - set: {nodes.$master.transport_address: local_ip} + + - do: + cluster.put_settings: + body: + transient: + cluster.remote.local.seeds: $local_ip + flat_settings: true + + - match: {transient: {cluster.remote.local.seeds: $local_ip}} + - do: ccr.put_auto_follow_pattern: - leader_cluster_alias: _local_ + leader_cluster_alias: local body: leader_index_patterns: ['logs-*'] max_concurrent_read_batches: 2 @@ -10,21 +29,21 @@ - do: ccr.get_auto_follow_pattern: - leader_cluster_alias: _local_ - - match: { _local_.leader_index_patterns: ['logs-*'] } - - match: { _local_.max_concurrent_read_batches: 2 } + leader_cluster_alias: local + - match: { local.leader_index_patterns: ['logs-*'] } + - match: { local.max_concurrent_read_batches: 2 } - do: ccr.get_auto_follow_pattern: {} - - match: { _local_.leader_index_patterns: ['logs-*'] } - - match: { _local_.max_concurrent_read_batches: 2 } + - match: { local.leader_index_patterns: ['logs-*'] } + - match: { local.max_concurrent_read_batches: 2 } - do: ccr.delete_auto_follow_pattern: - leader_cluster_alias: _local_ + leader_cluster_alias: local - is_true: acknowledged - do: catch: missing ccr.get_auto_follow_pattern: - leader_cluster_alias: _local_ + leader_cluster_alias: local diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml index ab60b2e4948..9289be50b21 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml @@ -1,5 +1,24 @@ --- "Test follow and unfollow an existing index": + - do: + cluster.state: {} + + - set: {master_node: master} + + - do: + nodes.info: {} + + - set: {nodes.$master.transport_address: local_ip} + + - do: + cluster.put_settings: + body: + transient: + cluster.remote.local.seeds: $local_ip + flat_settings: true + + - match: {transient: {cluster.remote.local.seeds: $local_ip}} + - do: indices.create: index: foo @@ -19,6 +38,7 @@ ccr.follow: index: bar body: + leader_cluster: local leader_index: foo - is_true: follow_index_created - is_true: follow_index_shards_acked @@ -33,6 +53,7 @@ ccr.resume_follow: index: bar body: + leader_cluster: local leader_index: foo - is_true: acknowledged diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml index 3e5fff7b02d..ba784689dc5 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml @@ -1,5 +1,24 @@ --- "Test stats": + - do: + cluster.state: {} + + - set: {master_node: master} + + - do: + nodes.info: {} + + - set: {nodes.$master.transport_address: local_ip} + + - do: + cluster.put_settings: + body: + transient: + cluster.remote.local.seeds: $local_ip + flat_settings: true + + - match: {transient: {cluster.remote.local.seeds: $local_ip}} + - do: indices.create: index: foo @@ -18,6 +37,7 @@ ccr.follow: index: bar body: + leader_cluster: local leader_index: foo - is_true: follow_index_created - is_true: follow_index_shards_acked diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 0f4aa94caf5..7ac40e682db 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -164,22 +163,14 @@ public class AutoFollowCoordinator implements ClusterStateApplier { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.metaData(true); - - if ("_local_".equals(leaderClusterAlias)) { - Client client = CcrLicenseChecker.wrapClient(AutoFollowCoordinator.this.client, headers); - client.admin().cluster().state( - request, ActionListener.wrap(r -> handler.accept(r.getState(), null), e -> handler.accept(null, e))); - } else { - // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( - client, - headers, - leaderClusterAlias, - request, - e -> handler.accept(null, e), - leaderClusterState -> handler.accept(leaderClusterState, null)); - } - + // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( + client, + headers, + leaderClusterAlias, + request, + e -> handler.accept(null, e), + leaderClusterState -> handler.accept(leaderClusterState, null)); } @Override @@ -305,9 +296,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { final String followIndexName = getFollowerIndexName(pattern, leaderIndexName); ResumeFollowAction.Request request = new ResumeFollowAction.Request(); - if ("_local_".equals(clusterAlias) == false) { - request.setLeaderCluster(clusterAlias); - } + request.setLeaderCluster(clusterAlias); request.setLeaderIndex(indexToFollow.getName()); request.setFollowerIndex(followIndexName); request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount()); @@ -346,14 +335,6 @@ public class AutoFollowCoordinator implements ClusterStateApplier { List followedIndexUUIDs) { List leaderIndicesToFollow = new ArrayList<>(); for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) { - // If an auto follow pattern has been set up for the local cluster then - // we should not automatically follow a leader index that is also a follow index because - // this can result into an index creation explosion. - if (leaderIndexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) != null && - clusterAlias.equals("_local_")) { - continue; - } - if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { if (followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) { // TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 94dc5467145..19f05f575d9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -76,13 +76,7 @@ public class TransportPutAutoFollowPatternAction extends listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; } - final Client leaderClient; - if (request.getLeaderCluster().equals("_local_")) { - leaderClient = client; - } else { - leaderClient = client.getRemoteClusterClient(request.getLeaderCluster()); - } - + final Client leaderClient = client.getRemoteClusterClient(request.getLeaderCluster()); final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.clear(); clusterStateRequest.metaData(true); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 7e44a0132af..e2a562a5186 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -41,7 +40,6 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.function.Consumer; public final class TransportPutFollowAction extends TransportMasterNodeAction { @@ -96,49 +94,22 @@ public final class TransportPutFollowAction listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; } - String clusterAlias = request.getFollowRequest().getLeaderCluster(); - if (clusterAlias == null) { - createFollowerIndexAndFollowLocalIndex(request, state, listener); - } else { - // In the case of following a local index there is no cluster alias: - client.getRemoteClusterClient(clusterAlias); - String leaderIndex = request.getFollowRequest().getLeaderIndex(); - createFollowerIndexAndFollowRemoteIndex(request, clusterAlias, leaderIndex, listener); - } - } + String leaderCluster = request.getFollowRequest().getLeaderCluster(); + // Validates whether the leader cluster has been configured properly: + client.getRemoteClusterClient(leaderCluster); - private void createFollowerIndexAndFollowLocalIndex( - final PutFollowAction.Request request, - final ClusterState state, - final ActionListener listener) { - // following an index in local cluster, so use local cluster state to fetch leader index metadata - final String leaderIndex = request.getFollowRequest().getLeaderIndex(); - final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex); - if (leaderIndexMetadata == null) { - listener.onFailure(new IndexNotFoundException(leaderIndex)); - return; - } - - Consumer historyUUIDhandler = historyUUIDs -> { - createFollowerIndex(leaderIndexMetadata, historyUUIDs, request, listener); - }; - ccrLicenseChecker.hasPrivilegesToFollowIndices(client, new String[] {leaderIndex}, e -> { - if (e == null) { - ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDhandler); - } else { - listener.onFailure(e); - } - }); + String leaderIndex = request.getFollowRequest().getLeaderIndex(); + createFollowerIndexAndFollowRemoteIndex(request, leaderCluster, leaderIndex, listener); } private void createFollowerIndexAndFollowRemoteIndex( final PutFollowAction.Request request, - final String clusterAlias, + final String leaderCluster, final String leaderIndex, final ActionListener listener) { ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( client, - clusterAlias, + leaderCluster, leaderIndex, listener::onFailure, (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 32e0d0ad8f2..bbace5b70df 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -22,7 +22,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingSlowLog; import org.elasticsearch.index.SearchSlowLog; @@ -99,38 +98,11 @@ public class TransportResumeFollowAction extends HandledTransportAction listener) { - final ClusterState state = clusterService.state(); - final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); - // following an index in local cluster, so use local cluster state to fetch leader index metadata - final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getLeaderIndex()); - if (leaderIndexMetadata == null) { - throw new IndexNotFoundException(request.getFollowerIndex()); - } - ccrLicenseChecker.hasPrivilegesToFollowIndices(client, new String[] {request.getLeaderIndex()}, e -> { - if (e == null) { - ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDs -> { - try { - start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener); - } catch (final IOException ioe) { - listener.onFailure(ioe); - } - }); - } else { - listener.onFailure(e); - } - }); + final String leaderIndex = request.getLeaderIndex(); + followRemoteIndex(request, clusterAlias, leaderIndex, listener); } private void followRemoteIndex( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CCRSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CCRSingleNodeTestCase.java new file mode 100644 index 00000000000..4bf3f2e40e6 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CCRSingleNodeTestCase.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack; + +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.license.LicenseService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.LocalStateCcr; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; +import org.junit.After; +import org.junit.Before; + +import java.util.Collection; +import java.util.Collections; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +public abstract class CCRSingleNodeTestCase extends ESSingleNodeTestCase { + + @Override + protected Settings nodeSettings() { + Settings.Builder builder = Settings.builder(); + builder.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + builder.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + builder.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); + builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); + return builder.build(); + } + + @Override + protected Collection> getPlugins() { + return Collections.singletonList(LocalStateCcr.class); + } + + @Before + public void setupLocalRemote() { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + String address = getInstanceFromNode(TransportService.class).boundAddress().publishAddress().toString(); + updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", address)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + @After + public void remoteLocalRemote() { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", (String) null)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + protected ResumeFollowAction.Request getFollowRequest() { + ResumeFollowAction.Request request = new ResumeFollowAction.Request(); + request.setLeaderCluster("local"); + request.setLeaderIndex("leader"); + request.setFollowerIndex("follower"); + request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); + request.setPollTimeout(TimeValue.timeValueMillis(10)); + return request; + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index 9b3b6a0f7ee..cf667be0d1c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -17,17 +17,17 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.xpack.CCRSingleNodeTestCase; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; -import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; -import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; -import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.util.Collection; import java.util.Collections; @@ -36,13 +36,18 @@ import java.util.concurrent.CountDownLatch; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -public class CcrLicenseIT extends ESSingleNodeTestCase { +public class CcrLicenseIT extends CCRSingleNodeTestCase { @Override protected Collection> getPlugins() { return Collections.singletonList(NonCompliantLicenseLocalStateCcr.class); } + @Override + protected Settings nodeSettings() { + return Settings.EMPTY; + } + public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException { final ResumeFollowAction.Request followRequest = getFollowRequest(); final CountDownLatch latch = new CountDownLatch(1); @@ -194,13 +199,4 @@ public class CcrLicenseIT extends ESSingleNodeTestCase { assertThat(e.getMessage(), equalTo("current license is non-compliant for [ccr]")); } - private ResumeFollowAction.Request getFollowRequest() { - ResumeFollowAction.Request request = new ResumeFollowAction.Request(); - request.setLeaderIndex("leader"); - request.setFollowerIndex("follower"); - request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); - request.setPollTimeout(TimeValue.timeValueMillis(10)); - return request; - } - } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java new file mode 100644 index 00000000000..d3612c614d4 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xpack.CCRSingleNodeTestCase; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; + +import java.io.IOException; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class LocalIndexFollowingIT extends CCRSingleNodeTestCase { + + public void testFollowIndex() throws Exception { + final String leaderIndexSettings = getIndexSettings(2, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader"); + + final PutFollowAction.Request followRequest = new PutFollowAction.Request(getFollowRequest()); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + final long firstBatchNumDocs = randomIntBetween(2, 64); + for (int i = 0; i < firstBatchNumDocs; i++) { + client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + + assertBusy(() -> { + assertThat(client().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs)); + }); + + final long secondBatchNumDocs = randomIntBetween(2, 64); + for (int i = 0; i < secondBatchNumDocs; i++) { + client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + + assertBusy(() -> { + assertThat(client().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs + secondBatchNumDocs)); + }); + + PauseFollowAction.Request pauseRequest = new PauseFollowAction.Request(); + pauseRequest.setFollowIndex("follower"); + client().execute(PauseFollowAction.INSTANCE, pauseRequest); + + final long thirdBatchNumDocs = randomIntBetween(2, 64); + for (int i = 0; i < thirdBatchNumDocs; i++) { + client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); + } + + client().execute(ResumeFollowAction.INSTANCE, getFollowRequest()).get(); + assertBusy(() -> { + assertThat(client().prepareSearch("follower").get().getHits().totalHits, + equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)); + }); + } + + private String getIndexSettings(final int numberOfShards, final int numberOfReplicas, + final Map additionalIndexSettings) throws IOException { + final String settings; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("settings"); + { + builder.field("index.number_of_shards", numberOfShards); + builder.field("index.number_of_replicas", numberOfReplicas); + for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { + builder.field(additionalSetting.getKey(), additionalSetting.getValue()); + } + } + builder.endObject(); + } + builder.endObject(); + settings = BytesReference.bytes(builder).utf8ToString(); + } + return settings; + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 08fdac2e462..69fe1fcdd85 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -342,34 +341,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(result.get(3).getName(), equalTo("metrics-4")); } - public void testGetLeaderIndicesToFollowDoNotSelectFollowIndicesInTheSameCluster() { - MetaData.Builder imdBuilder = MetaData.builder(); - imdBuilder.put(IndexMetaData.builder("metrics-0") - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0)); - imdBuilder.put(IndexMetaData.builder("metrics-1") - .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>()) - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0)); - - AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null); - imdBuilder.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), - Collections.emptyMap(), Collections.emptyMap())); - - ClusterState clusterState = ClusterState.builder(new ClusterName("name")) - .metaData(imdBuilder) - .build(); - - List result = AutoFollower.getLeaderIndicesToFollow("_local_", autoFollowPattern, clusterState, - clusterState, Collections.emptyList()); - result.sort(Comparator.comparing(Index::getName)); - assertThat(result.size(), equalTo(1)); - assertThat(result.get(0).getName(), equalTo("metrics-0")); - } - public void testGetFollowerIndexName() { AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java index 3f875da0178..8101a6db2b7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java @@ -72,6 +72,7 @@ public class ResumeFollowActionRequestTests extends AbstractStreamableXContentTe public void testValidate() { ResumeFollowAction.Request request = new ResumeFollowAction.Request(); + request.setLeaderCluster("leader_cluster"); request.setLeaderIndex("index1"); request.setFollowerIndex("index2"); request.setMaxRetryDelay(TimeValue.ZERO); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java index 897a7d06f36..02ed7a1a5fb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java @@ -94,7 +94,6 @@ public final class ResumeFollowAction extends Action { return request; } - // will be a required field when following local indices is no longer allowed private String leaderCluster; public String getLeaderCluster() { @@ -202,6 +201,9 @@ public final class ResumeFollowAction extends Action { public ActionRequestValidationException validate() { ActionRequestValidationException e = null; + if (leaderCluster == null) { + e = addValidationError(LEADER_CLUSTER_FIELD.getPreferredName() + " is missing", e); + } if (leaderIndex == null) { e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e); } @@ -240,7 +242,7 @@ public final class ResumeFollowAction extends Action { @Override public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); - leaderCluster = in.readOptionalString(); + leaderCluster = in.readString(); leaderIndex = in.readString(); followerIndex = in.readString(); maxBatchOperationCount = in.readOptionalVInt(); @@ -255,7 +257,7 @@ public final class ResumeFollowAction extends Action { @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); - out.writeOptionalString(leaderCluster); + out.writeString(leaderCluster); out.writeString(leaderIndex); out.writeString(followerIndex); out.writeOptionalVInt(maxBatchOperationCount); @@ -320,7 +322,7 @@ public final class ResumeFollowAction extends Action { @Override public int hashCode() { return Objects.hash( - leaderCluster, + leaderCluster, leaderIndex, followerIndex, maxBatchOperationCount,