diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java index bc3d846343a..988f6b97bd2 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java @@ -31,14 +31,6 @@ public class CcrMultiClusterLicenseIT extends ESRestTestCase { return true; } - public void testResumeFollow() { - if (runningAgainstLeaderCluster == false) { - final Request request = new Request("POST", "/follower/_ccr/resume_follow"); - request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"leader\"}"); - assertNonCompliantLicense(request); - } - } - public void testFollow() { if (runningAgainstLeaderCluster == false) { final Request request = new Request("PUT", "/follower/_ccr/follow"); diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 6d5ca4559fe..d5e7cbcce49 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -80,7 +80,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase { refresh(allowedIndex); verifyDocuments(adminClient(), allowedIndex, numDocs); } else { - follow(allowedIndex, allowedIndex); + follow(client(), allowedIndex, allowedIndex); assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs)); assertThat(countCcrNodeTasks(), equalTo(1)); assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex)); @@ -93,7 +93,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase { assertThat(countCcrNodeTasks(), equalTo(0)); }); - resumeFollow(allowedIndex, allowedIndex); + resumeFollow(allowedIndex); assertThat(countCcrNodeTasks(), equalTo(1)); assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow"))); // Make sure that there are no other ccr relates operations running: @@ -106,11 +106,11 @@ public class FollowIndexSecurityIT extends ESRestTestCase { assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_close"))); assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow"))); - Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex, allowedIndex)); + Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex)); assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata")); // User does not have manage_follow_index index privilege for 'unallowedIndex': - e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, unallowedIndex)); + e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, unallowedIndex)); assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]")); // Verify that the follow index has not been created and no node tasks are running @@ -119,7 +119,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase { // User does have manage_follow_index index privilege on 'allowed' index, // but not read / monitor roles on 'disallowed' index: - e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, allowedIndex)); + e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, allowedIndex)); assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " + "privilege for action [indices:monitor/stats] is missing, " + "privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing")); @@ -127,16 +127,20 @@ public class FollowIndexSecurityIT extends ESRestTestCase { assertThat(indexExists(adminClient(), unallowedIndex), is(false)); assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); - e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex, unallowedIndex)); + follow(adminClient(), unallowedIndex, unallowedIndex); + pauseFollow(adminClient(), unallowedIndex); + + e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex)); assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " + "privilege for action [indices:monitor/stats] is missing, " + "privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing")); - assertThat(indexExists(adminClient(), unallowedIndex), is(false)); - assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); e = expectThrows(ResponseException.class, () -> client().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow"))); assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/unfollow] is unauthorized for user [test_ccr]")); + assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_close"))); + assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow"))); + assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); } } @@ -187,7 +191,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase { // Cleanup by deleting auto follow pattern and pause following: request = new Request("DELETE", "/_ccr/auto_follow/test_pattern"); assertOK(client().performRequest(request)); - pauseFollow(allowedIndex); + pauseFollow(client(), allowedIndex); } private int countCcrNodeTasks() throws IOException { @@ -228,18 +232,17 @@ public class FollowIndexSecurityIT extends ESRestTestCase { assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh"))); } - private static void resumeFollow(String leaderIndex, String followIndex) throws IOException { + private static void resumeFollow(String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow"); - request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex + - "\", \"poll_timeout\": \"10ms\"}"); + request.setJsonEntity("{\"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } - private static void follow(String leaderIndex, String followIndex) throws IOException { + private static void follow(RestClient client, String leaderIndex, String followIndex) throws IOException { final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow"); request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); - assertOK(client().performRequest(request)); + assertOK(client.performRequest(request)); } void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException { @@ -302,7 +305,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase { return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); } - private static void pauseFollow(String followIndex) throws IOException { + private static void pauseFollow(RestClient client, String followIndex) throws IOException { assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow"))); } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 8e50b3697f6..ff7dc9e72b5 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -71,7 +71,7 @@ public class FollowIndexIT extends ESRestTestCase { assertBusy(() -> verifyDocuments(followIndexName, numDocs)); // unfollow and then follow and then index a few docs in leader index: pauseFollow(followIndexName); - resumeFollow(leaderIndexName, followIndexName); + resumeFollow(followIndexName); try (RestClient leaderClient = buildLeaderClient()) { int id = numDocs; index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true"); @@ -84,14 +84,14 @@ public class FollowIndexIT extends ESRestTestCase { pauseFollow(followIndexName); assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_close"))); assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_ccr/unfollow"))); - Exception e = expectThrows(ResponseException.class, () -> resumeFollow(leaderIndexName, followIndexName)); + Exception e = expectThrows(ResponseException.class, () -> resumeFollow(followIndexName)); assertThat(e.getMessage(), containsString("follow index [" + followIndexName + "] does not have ccr metadata")); } } public void testFollowNonExistingLeaderIndex() throws Exception { assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster); - ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index", "non-existing-index")); + ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index")); assertThat(e.getMessage(), containsString("no such index")); assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); @@ -151,10 +151,9 @@ public class FollowIndexIT extends ESRestTestCase { assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh"))); } - private static void resumeFollow(String leaderIndex, String followIndex) throws IOException { + private static void resumeFollow(String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow"); - request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex + - "\", \"poll_timeout\": \"10ms\"}"); + request.setJsonEntity("{\"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml index 9289be50b21..f66825d0b92 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml @@ -52,9 +52,7 @@ - do: ccr.resume_follow: index: bar - body: - leader_cluster: local - leader_index: foo + body: {} - is_true: acknowledged - do: diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 1c1cade2484..442f2309da4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -97,6 +97,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids"; public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid"; + public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name"; + public static final String CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY = "leader_cluster_name"; private final boolean enabled; private final Settings settings; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index a18ec3bf6c4..031769d0abb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -176,11 +176,10 @@ public class AutoFollowCoordinator implements ClusterStateApplier { @Override void createAndFollow(Map headers, - ResumeFollowAction.Request followRequest, + PutFollowAction.Request request, Runnable successHandler, Consumer failureHandler) { Client followerClient = CcrLicenseChecker.wrapClient(client, headers); - PutFollowAction.Request request = new PutFollowAction.Request(followRequest); followerClient.execute( PutFollowAction.INSTANCE, request, @@ -278,7 +277,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { } private void checkAutoFollowPattern(String autoFollowPattenName, - String clusterAlias, + String leaderCluster, AutoFollowPattern autoFollowPattern, List leaderIndicesToFollow, Map headers, @@ -302,7 +301,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); } } else { - followLeaderIndex(autoFollowPattenName, clusterAlias, indexToFollow, autoFollowPattern, headers, error -> { + followLeaderIndex(autoFollowPattenName, leaderCluster, indexToFollow, autoFollowPattern, headers, error -> { results.set(slot, new Tuple<>(indexToFollow, error)); if (leaderIndicesCountDown.countDown()) { resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); @@ -314,7 +313,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { } private void followLeaderIndex(String autoFollowPattenName, - String clusterAlias, + String leaderCluster, Index indexToFollow, AutoFollowPattern pattern, Map headers, @@ -322,17 +321,20 @@ public class AutoFollowCoordinator implements ClusterStateApplier { final String leaderIndexName = indexToFollow.getName(); final String followIndexName = getFollowerIndexName(pattern, leaderIndexName); - ResumeFollowAction.Request request = new ResumeFollowAction.Request(); - request.setLeaderCluster(clusterAlias); + ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request(); + followRequest.setFollowerIndex(followIndexName); + followRequest.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount()); + followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches()); + followRequest.setMaxBatchSize(pattern.getMaxBatchSize()); + followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches()); + followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize()); + followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay()); + followRequest.setPollTimeout(pattern.getPollTimeout()); + + PutFollowAction.Request request = new PutFollowAction.Request(); + request.setLeaderCluster(leaderCluster); request.setLeaderIndex(indexToFollow.getName()); - request.setFollowerIndex(followIndexName); - request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount()); - request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches()); - request.setMaxBatchSize(pattern.getMaxBatchSize()); - request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches()); - request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize()); - request.setMaxRetryDelay(pattern.getMaxRetryDelay()); - request.setPollTimeout(pattern.getPollTimeout()); + request.setFollowRequest(followRequest); // Execute if the create and follow api call succeeds: Runnable successHandler = () -> { @@ -418,7 +420,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { abstract void createAndFollow( Map headers, - ResumeFollowAction.Request followRequest, + PutFollowAction.Request followRequest, Runnable successHandler, Consumer failureHandler ); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 13d173ed815..66bed231f72 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -95,11 +95,11 @@ public final class TransportPutFollowAction listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; } - String leaderCluster = request.getFollowRequest().getLeaderCluster(); + String leaderCluster = request.getLeaderCluster(); // Validates whether the leader cluster has been configured properly: client.getRemoteClusterClient(leaderCluster); - String leaderIndex = request.getFollowRequest().getLeaderIndex(); + String leaderIndex = request.getLeaderIndex(); createFollowerIndexAndFollowRemoteIndex(request, leaderCluster, leaderIndex, listener); } @@ -122,8 +122,7 @@ public final class TransportPutFollowAction final PutFollowAction.Request request, final ActionListener listener) { if (leaderIndexMetaData == null) { - listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() + - "] does not exist")); + listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not exist")); return; } @@ -160,6 +159,8 @@ public final class TransportPutFollowAction Map metadata = new HashMap<>(); metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs)); metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID()); + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName()); + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY, request.getLeaderCluster()); imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); // Copy all settings, but overwrite a few settings. diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 569e2d2cacf..d65189434fa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingSlowLog; import org.elasticsearch.index.SearchSlowLog; @@ -97,33 +98,34 @@ public class TransportResumeFollowAction extends HandledTransportAction listener) { final ClusterState state = clusterService.state(); final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); + if (followerIndexMetadata == null) { + listener.onFailure(new IndexNotFoundException(request.getFollowerIndex())); + return; + } + + final Map ccrMetadata = followerIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + if (ccrMetadata == null) { + throw new IllegalArgumentException("follow index ["+ request.getFollowerIndex() + "] does not have ccr metadata"); + } + final String leaderCluster = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY); + // Validates whether the leader cluster has been configured properly: + client.getRemoteClusterClient(leaderCluster); + final String leaderIndex = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( - client, - clusterAlias, - leaderIndex, - listener::onFailure, - (leaderHistoryUUID, leaderIndexMetadata) -> { - try { - start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); - } catch (final IOException e) { - listener.onFailure(e); - } - }); + client, + leaderCluster, + leaderIndex, + listener::onFailure, + (leaderHistoryUUID, leaderIndexMetadata) -> { + try { + start(request, leaderCluster, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); + } catch (final IOException e) { + listener.onFailure(e); + } + }); } /** @@ -207,13 +209,6 @@ public class TransportResumeFollowAction extends HandledTransportAction ccrIndexMetadata = followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); if (ccrIndexMetadata == null) { throw new IllegalArgumentException("follow index ["+ followIndex.getIndex().getName() + "] does not have ccr metadata"); @@ -238,7 +233,8 @@ public class TransportResumeFollowAction extends HandledTransportAction client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); } + + static Request createRequest(RestRequest restRequest) throws IOException { + try (XContentParser parser = restRequest.contentOrSourceParamParser()) { + return Request.fromXContent(parser, restRequest.param("index")); + } + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 3f5c340deed..001134515cc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -16,6 +16,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.LocalStateCcr; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.junit.After; import org.junit.Before; @@ -63,14 +64,20 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase { assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } - protected ResumeFollowAction.Request getFollowRequest() { + protected ResumeFollowAction.Request getResumeFollowRequest() { ResumeFollowAction.Request request = new ResumeFollowAction.Request(); - request.setLeaderCluster("local"); - request.setLeaderIndex("leader"); request.setFollowerIndex("follower"); request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); request.setPollTimeout(TimeValue.timeValueMillis(10)); return request; } + protected PutFollowAction.Request getPutFollowRequest() { + PutFollowAction.Request request = new PutFollowAction.Request(); + request.setLeaderCluster("local"); + request.setLeaderIndex("leader"); + request.setFollowRequest(getResumeFollowRequest()); + return request; + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index ab14f2dfb8e..a5803f10a75 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -49,7 +49,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase { } public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException { - final ResumeFollowAction.Request followRequest = getFollowRequest(); + final ResumeFollowAction.Request followRequest = getResumeFollowRequest(); final CountDownLatch latch = new CountDownLatch(1); client().execute( ResumeFollowAction.INSTANCE, @@ -71,8 +71,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase { } public void testThatCreateAndFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException { - final ResumeFollowAction.Request followRequest = getFollowRequest(); - final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); + final PutFollowAction.Request createAndFollowRequest = getPutFollowRequest(); final CountDownLatch latch = new CountDownLatch(1); client().execute( PutFollowAction.INSTANCE, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 17bb6c8d70d..d0cc41f22c2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -82,7 +82,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -100,7 +99,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderYellow("index1"); - final PutFollowAction.Request followRequest = follow("index1", "index2"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); final int firstBatchNumDocs = randomIntBetween(2, 64); @@ -162,7 +161,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderYellow("index1"); - final PutFollowAction.Request followRequest = follow("index1", "index2"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); final long firstBatchNumDocs = randomIntBetween(2, 64); @@ -202,7 +201,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { .build())); ensureLeaderGreen("index1"); - final PutFollowAction.Request followRequest = follow("index1", "index2"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); leaderClient().prepareIndex("index1", "doc", "1").setSource("{\"f\":1}", XContentType.JSON).get(); @@ -252,7 +251,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10)); atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed / 3); - PutFollowAction.Request followRequest = follow("index1", "index2"); + PutFollowAction.Request followRequest = putFollow("index1", "index2"); followRequest.getFollowRequest().setMaxBatchOperationCount(maxReadSize); followRequest.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(2, 10)); followRequest.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); @@ -295,7 +294,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { }); thread.start(); - PutFollowAction.Request followRequest = follow("index1", "index2"); + PutFollowAction.Request followRequest = putFollow("index1", "index2"); followRequest.getFollowRequest().setMaxBatchOperationCount(randomIntBetween(32, 2048)); followRequest.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(2, 10)); followRequest.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); @@ -323,7 +322,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderGreen("index1"); - final PutFollowAction.Request followRequest = follow("index1", "index2"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); final int numDocs = randomIntBetween(2, 64); @@ -372,22 +371,17 @@ public class IndexFollowingIT extends CcrIntegTestCase { ensureLeaderGreen("test-leader"); ensureFollowerGreen("test-follower"); // Leader index does not exist. - ResumeFollowAction.Request followRequest1 = resumeFollow("non-existent-leader", "test-follower"); - expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest1).actionGet()); expectThrows(IndexNotFoundException.class, - () -> followerClient().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest1)) + () -> followerClient().execute(PutFollowAction.INSTANCE, putFollow("non-existent-leader", "test-follower")) .actionGet()); // Follower index does not exist. - ResumeFollowAction.Request followRequest2 = resumeFollow("non-test-leader", "non-existent-follower"); + ResumeFollowAction.Request followRequest1 = resumeFollow("non-existent-follower"); + expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest1).actionGet()); + // Both indices do not exist. + ResumeFollowAction.Request followRequest2 = resumeFollow("non-existent-follower"); expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest2).actionGet()); expectThrows(IndexNotFoundException.class, - () -> followerClient().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest2)) - .actionGet()); - // Both indices do not exist. - ResumeFollowAction.Request followRequest3 = resumeFollow("non-existent-leader", "non-existent-follower"); - expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest3).actionGet()); - expectThrows(IndexNotFoundException.class, - () -> followerClient().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest3)) + () -> followerClient().execute(PutFollowAction.INSTANCE, putFollow("non-existing-leader", "non-existing-follower")) .actionGet()); } @@ -404,7 +398,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } - PutFollowAction.Request followRequest = follow("index1", "index2"); + PutFollowAction.Request followRequest = putFollow("index1", "index2"); followRequest.getFollowRequest().setMaxBatchSize(new ByteSizeValue(1, ByteSizeUnit.BYTES)); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -427,37 +421,11 @@ public class IndexFollowingIT extends CcrIntegTestCase { assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs); } - public void testDontFollowTheWrongIndex() throws Exception { - String leaderIndexSettings = getIndexSettings(1, 0, - Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); - assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureLeaderGreen("index1"); - assertAcked(leaderClient().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON)); - ensureLeaderGreen("index3"); - - PutFollowAction.Request followRequest = follow("index1", "index2"); - followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - - followRequest = follow("index3", "index4"); - followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - pauseFollow("index2", "index4"); - - ResumeFollowAction.Request wrongRequest1 = resumeFollow("index1", "index4"); - Exception e = expectThrows(IllegalArgumentException.class, - () -> followerClient().execute(ResumeFollowAction.INSTANCE, wrongRequest1).actionGet()); - assertThat(e.getMessage(), containsString("follow index [index4] should reference")); - - ResumeFollowAction.Request wrongRequest2 = resumeFollow("index3", "index2"); - e = expectThrows(IllegalArgumentException.class, - () -> followerClient().execute(ResumeFollowAction.INSTANCE, wrongRequest2).actionGet()); - assertThat(e.getMessage(), containsString("follow index [index2] should reference")); - } - public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception { String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get()); ensureLeaderYellow("index1"); - PutFollowAction.Request followRequest = follow("index1", "index2"); + PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); pauseFollow("index2"); followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); @@ -478,7 +446,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - final PutFollowAction.Request followRequest = follow("index1", "index2"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); @@ -512,7 +480,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - final PutFollowAction.Request followRequest = follow("index1", "index2"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); @@ -541,7 +509,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - final PutFollowAction.Request followRequest = follow("index1", "index2"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); @@ -570,7 +538,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - final PutFollowAction.Request followRequest = follow("index1", "index2"); + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); @@ -595,7 +563,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { public void testUnfollowIndex() throws Exception { String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get()); - PutFollowAction.Request followRequest = follow("index1", "index2"); + PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); assertBusy(() -> { @@ -647,7 +615,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { }); threads[i].start(); } - PutFollowAction.Request follow = follow("leader-index", "follower-index"); + PutFollowAction.Request follow = putFollow("leader-index", "follower-index"); followerClient().execute(PutFollowAction.INSTANCE, follow).get(); ensureFollowerGreen("follower-index"); atLeastDocsIndexed(followerClient(), "follower-index", between(20, 60)); @@ -674,14 +642,11 @@ public class IndexFollowingIT extends CcrIntegTestCase { Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderGreen("index1"); - PutFollowAction.Request followRequest = follow("index1", "index2"); - followRequest.getFollowRequest().setLeaderCluster("another_cluster"); + PutFollowAction.Request followRequest = putFollow("index1", "index2"); + followRequest.setLeaderCluster("another_cluster"); Exception e = expectThrows(IllegalArgumentException.class, () -> followerClient().execute(PutFollowAction.INSTANCE, followRequest).actionGet()); assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]")); - e = expectThrows(IllegalArgumentException.class, - () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).actionGet()); - assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]")); PutAutoFollowPatternAction.Request putAutoFollowRequest = new PutAutoFollowPatternAction.Request(); putAutoFollowRequest.setName("name"); putAutoFollowRequest.setLeaderCluster("another_cluster"); @@ -696,7 +661,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { String leaderIndexSettings = getIndexSettings(1, numberOfReplicas, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON)); - PutFollowAction.Request follow = follow("leader-index", "follower-index"); + PutFollowAction.Request follow = putFollow("leader-index", "follower-index"); followerClient().execute(PutFollowAction.INSTANCE, follow).get(); getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(2, 3)); ensureFollowerGreen("follower-index"); @@ -998,14 +963,16 @@ public class IndexFollowingIT extends CcrIntegTestCase { }); } - public static PutFollowAction.Request follow(String leaderIndex, String followerIndex) { - return new PutFollowAction.Request(resumeFollow(leaderIndex, followerIndex)); - } - - public static ResumeFollowAction.Request resumeFollow(String leaderIndex, String followerIndex) { - ResumeFollowAction.Request request = new ResumeFollowAction.Request(); + public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) { + PutFollowAction.Request request = new PutFollowAction.Request(); request.setLeaderCluster("leader_cluster"); request.setLeaderIndex(leaderIndex); + request.setFollowRequest(resumeFollow(followerIndex)); + return request; + } + + public static ResumeFollowAction.Request resumeFollow(String followerIndex) { + ResumeFollowAction.Request request = new ResumeFollowAction.Request(); request.setFollowerIndex(followerIndex); request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); request.setPollTimeout(TimeValue.timeValueMillis(10)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 5ff1c67f323..3267be6f420 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -31,7 +31,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); ensureGreen("leader"); - final PutFollowAction.Request followRequest = new PutFollowAction.Request(getFollowRequest()); + final PutFollowAction.Request followRequest = getPutFollowRequest(); client().execute(PutFollowAction.INSTANCE, followRequest).get(); final long firstBatchNumDocs = randomIntBetween(2, 64); @@ -61,7 +61,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); } - client().execute(ResumeFollowAction.INSTANCE, getFollowRequest()).get(); + client().execute(ResumeFollowAction.INSTANCE, getResumeFollowRequest()).get(); assertBusy(() -> { assertThat(client().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 3f4c70f0165..9db8d5f55f0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; -import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import java.util.ArrayList; import java.util.Arrays; @@ -91,13 +91,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase { @Override void createAndFollow(Map headers, - ResumeFollowAction.Request followRequest, + PutFollowAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { assertThat(headers, equalTo(autoFollowHeaders.get("remote"))); assertThat(followRequest.getLeaderCluster(), equalTo("remote")); assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101")); - assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); + assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101")); successHandler.run(); } @@ -150,7 +150,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { @Override void createAndFollow(Map headers, - ResumeFollowAction.Request followRequest, + PutFollowAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { fail("should not get here"); @@ -211,12 +211,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase { @Override void createAndFollow(Map headers, - ResumeFollowAction.Request followRequest, + PutFollowAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { assertThat(followRequest.getLeaderCluster(), equalTo("remote")); assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101")); - assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); + assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101")); successHandler.run(); } @@ -274,12 +274,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase { @Override void createAndFollow(Map headers, - ResumeFollowAction.Request followRequest, + PutFollowAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { assertThat(followRequest.getLeaderCluster(), equalTo("remote")); assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101")); - assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); + assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101")); failureHandler.accept(failure); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java index f86594b3b69..b8c1d5511df 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java @@ -5,10 +5,13 @@ */ package org.elasticsearch.xpack.ccr.action; -import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; -public class PutFollowActionRequestTests extends AbstractStreamableTestCase { +import java.io.IOException; + +public class PutFollowActionRequestTests extends AbstractStreamableXContentTestCase { @Override protected PutFollowAction.Request createBlankInstance() { @@ -17,6 +20,20 @@ public class PutFollowActionRequestTests extends AbstractStreamableTestCase validate(request, null, null, null, null)); - assertThat(e.getMessage(), equalTo("leader index [leader_cluster:index1] does not exist")); - } - { - // should fail, because follow index does not exist - IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); - Exception e = expectThrows(IllegalArgumentException.class, - () -> validate(request, leaderIMD, null, null, null)); - assertThat(e.getMessage(), equalTo("follow index [index2] does not exist")); - } { IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, null); IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, null); @@ -83,7 +70,7 @@ public class TransportResumeFollowActionTests extends ESTestCase { IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, null); IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData); Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null)); - assertThat(e.getMessage(), equalTo("leader index [leader_cluster:index1] does not have soft deletes enabled")); + assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled")); } { // should fail because the follower index does not have soft deletes enabled diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java index 5fdb13871b5..291fc853335 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java @@ -12,14 +12,29 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.FOLLOWER_INDEX_FIELD; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_BATCH_OPERATION_COUNT; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_BATCH_SIZE; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_READ_BATCHES; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_WRITE_BATCHES; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.POLL_TIMEOUT; + public final class PutFollowAction extends Action { public static final PutFollowAction INSTANCE = new PutFollowAction(); @@ -34,25 +49,97 @@ public final class PutFollowAction extends Action { return new Response(); } - public static class Request extends AcknowledgedRequest implements IndicesRequest { + public static class Request extends AcknowledgedRequest implements IndicesRequest, ToXContentObject { - private ResumeFollowAction.Request followRequest; + private static final ParseField LEADER_CLUSTER_FIELD = new ParseField("leader_cluster"); + private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index"); - public Request(ResumeFollowAction.Request followRequest) { - this.followRequest = Objects.requireNonNull(followRequest); + private static final ObjectParser PARSER = new ObjectParser<>(NAME, () -> { + Request request = new Request(); + request.setFollowRequest(new ResumeFollowAction.Request()); + return request; + }); + + static { + PARSER.declareString(Request::setLeaderCluster, LEADER_CLUSTER_FIELD); + PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD); + PARSER.declareString((request, value) -> request.followRequest.setFollowerIndex(value), FOLLOWER_INDEX_FIELD); + PARSER.declareInt((request, value) -> request.followRequest.setMaxBatchOperationCount(value), MAX_BATCH_OPERATION_COUNT); + PARSER.declareInt((request, value) -> request.followRequest.setMaxConcurrentReadBatches(value), MAX_CONCURRENT_READ_BATCHES); + PARSER.declareField( + (request, value) -> request.followRequest.setMaxBatchSize(value), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_BATCH_SIZE.getPreferredName()), + MAX_BATCH_SIZE, + ObjectParser.ValueType.STRING); + PARSER.declareInt((request, value) -> request.followRequest.setMaxConcurrentWriteBatches(value), MAX_CONCURRENT_WRITE_BATCHES); + PARSER.declareInt((request, value) -> request.followRequest.setMaxWriteBufferSize(value), MAX_WRITE_BUFFER_SIZE); + PARSER.declareField( + (request, value) -> request.followRequest.setMaxRetryDelay(value), + (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()), + MAX_RETRY_DELAY_FIELD, + ObjectParser.ValueType.STRING); + PARSER.declareField( + (request, value) -> request.followRequest.setPollTimeout(value), + (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()), + POLL_TIMEOUT, + ObjectParser.ValueType.STRING); } - public Request() { + public static Request fromXContent(final XContentParser parser, final String followerIndex) throws IOException { + Request request = PARSER.parse(parser, followerIndex); + if (followerIndex != null) { + if (request.getFollowRequest().getFollowerIndex() == null) { + request.getFollowRequest().setFollowerIndex(followerIndex); + } else { + if (request.getFollowRequest().getFollowerIndex().equals(followerIndex) == false) { + throw new IllegalArgumentException("provided follower_index is not equal"); + } + } + } + return request; + } + private String leaderCluster; + private String leaderIndex; + private ResumeFollowAction.Request followRequest; + + public Request() { + } + + public String getLeaderCluster() { + return leaderCluster; + } + + public void setLeaderCluster(String leaderCluster) { + this.leaderCluster = leaderCluster; + } + + public String getLeaderIndex() { + return leaderIndex; + } + + public void setLeaderIndex(String leaderIndex) { + this.leaderIndex = leaderIndex; } public ResumeFollowAction.Request getFollowRequest() { return followRequest; } + public void setFollowRequest(ResumeFollowAction.Request followRequest) { + this.followRequest = followRequest; + } + @Override public ActionRequestValidationException validate() { - return followRequest.validate(); + ActionRequestValidationException e = followRequest.validate(); + if (leaderCluster == null) { + e = addValidationError(LEADER_CLUSTER_FIELD.getPreferredName() + " is missing", e); + } + if (leaderIndex == null) { + e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e); + } + return e; } @Override @@ -68,6 +155,8 @@ public final class PutFollowAction extends Action { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + leaderCluster = in.readString(); + leaderIndex = in.readString(); followRequest = new ResumeFollowAction.Request(); followRequest.readFrom(in); } @@ -75,20 +164,36 @@ public final class PutFollowAction extends Action { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeString(leaderCluster); + out.writeString(leaderIndex); followRequest.writeTo(out); } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(LEADER_CLUSTER_FIELD.getPreferredName(), leaderCluster); + builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); + followRequest.toXContentFragment(builder, params); + } + builder.endObject(); + return builder; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return Objects.equals(followRequest, request.followRequest); + return Objects.equals(leaderCluster, request.leaderCluster) && + Objects.equals(leaderIndex, request.leaderIndex) && + Objects.equals(followRequest, request.followRequest); } @Override public int hashCode() { - return Objects.hash(followRequest); + return Objects.hash(leaderCluster, leaderIndex, followRequest); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java index 02ed7a1a5fb..127ccf7610f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java @@ -43,21 +43,17 @@ public final class ResumeFollowAction extends Action { public static class Request extends ActionRequest implements ToXContentObject { - private static final ParseField LEADER_CLUSTER_FIELD = new ParseField("leader_cluster"); - private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index"); - private static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index"); - private static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count"); - private static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); - private static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size"); - private static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); - private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); - private static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); - private static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); - private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); + static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index"); + static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count"); + static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); + static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size"); + static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); + static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); + static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); + static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); static { - PARSER.declareString(Request::setLeaderCluster, LEADER_CLUSTER_FIELD); - PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD); PARSER.declareString(Request::setFollowerIndex, FOLLOWER_INDEX_FIELD); PARSER.declareInt(Request::setMaxBatchOperationCount, MAX_BATCH_OPERATION_COUNT); PARSER.declareInt(Request::setMaxConcurrentReadBatches, MAX_CONCURRENT_READ_BATCHES); @@ -94,26 +90,6 @@ public final class ResumeFollowAction extends Action { return request; } - private String leaderCluster; - - public String getLeaderCluster() { - return leaderCluster; - } - - public void setLeaderCluster(String leaderCluster) { - this.leaderCluster = leaderCluster; - } - - private String leaderIndex; - - public String getLeaderIndex() { - return leaderIndex; - } - - public void setLeaderIndex(String leaderIndex) { - this.leaderIndex = leaderIndex; - } - private String followerIndex; public String getFollowerIndex() { @@ -201,12 +177,6 @@ public final class ResumeFollowAction extends Action { public ActionRequestValidationException validate() { ActionRequestValidationException e = null; - if (leaderCluster == null) { - e = addValidationError(LEADER_CLUSTER_FIELD.getPreferredName() + " is missing", e); - } - if (leaderIndex == null) { - e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e); - } if (followerIndex == null) { e = addValidationError(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing", e); } @@ -242,8 +212,6 @@ public final class ResumeFollowAction extends Action { @Override public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); - leaderCluster = in.readString(); - leaderIndex = in.readString(); followerIndex = in.readString(); maxBatchOperationCount = in.readOptionalVInt(); maxConcurrentReadBatches = in.readOptionalVInt(); @@ -257,8 +225,6 @@ public final class ResumeFollowAction extends Action { @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(leaderCluster); - out.writeString(leaderIndex); out.writeString(followerIndex); out.writeOptionalVInt(maxBatchOperationCount); out.writeOptionalVInt(maxConcurrentReadBatches); @@ -273,35 +239,37 @@ public final class ResumeFollowAction extends Action { public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); { - builder.field(LEADER_CLUSTER_FIELD.getPreferredName(), leaderCluster); - builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); - builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); - if (maxBatchOperationCount != null) { - builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); - } - if (maxBatchSize != null) { - builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep()); - } - if (maxWriteBufferSize != null) { - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); - } - if (maxConcurrentReadBatches != null) { - builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); - } - if (maxConcurrentWriteBatches != null) { - builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); - } - if (maxRetryDelay != null) { - builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); - } - if (pollTimeout != null) { - builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); - } + toXContentFragment(builder, params); } builder.endObject(); return builder; } + void toXContentFragment(final XContentBuilder builder, final Params params) throws IOException { + builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); + if (maxBatchOperationCount != null) { + builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); + } + if (maxBatchSize != null) { + builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep()); + } + if (maxWriteBufferSize != null) { + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + } + if (maxConcurrentReadBatches != null) { + builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); + } + if (maxConcurrentWriteBatches != null) { + builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); + } + if (maxRetryDelay != null) { + builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); + } + if (pollTimeout != null) { + builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); + } + } + @Override public boolean equals(final Object o) { if (this == o) return true; @@ -314,16 +282,12 @@ public final class ResumeFollowAction extends Action { Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && Objects.equals(maxRetryDelay, request.maxRetryDelay) && Objects.equals(pollTimeout, request.pollTimeout) && - Objects.equals(leaderCluster, request.leaderCluster) && - Objects.equals(leaderIndex, request.leaderIndex) && Objects.equals(followerIndex, request.followerIndex); } @Override public int hashCode() { return Objects.hash( - leaderCluster, - leaderIndex, followerIndex, maxBatchOperationCount, maxConcurrentReadBatches,