diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 6b31f7caa94..a429e92bcbd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -186,9 +186,8 @@ public class ShardChangesIT extends ESIntegTestCase { assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureYellow("index1"); - final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + final PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); final int firstBatchNumDocs = randomIntBetween(2, 64); logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); @@ -213,7 +212,7 @@ public class ShardChangesIT extends ESIntegTestCase { } unfollowIndex("index2"); - client().execute(ResumeFollowAction.INSTANCE, followRequest).get(); + client().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get(); final int secondBatchNumDocs = randomIntBetween(2, 64); logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { @@ -245,9 +244,8 @@ public class ShardChangesIT extends ESIntegTestCase { assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureYellow("index1"); - final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + final PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); final long firstBatchNumDocs = randomIntBetween(2, 64); for (long i = 0; i < firstBatchNumDocs; i++) { @@ -286,9 +284,8 @@ public class ShardChangesIT extends ESIntegTestCase { .build())); ensureGreen("index1"); - final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + final PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{\"f\":1}", XContentType.JSON).get(); assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); @@ -337,13 +334,12 @@ public class ShardChangesIT extends ESIntegTestCase { long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10)); atLeastDocsIndexed("index1", numDocsIndexed / 3); - ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - followRequest.setMaxBatchOperationCount(maxReadSize); - followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10)); - followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); - followRequest.setMaxWriteBufferSize(randomIntBetween(1024, 10240)); - PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + PutFollowAction.Request followRequest = follow("index1", "index2"); + followRequest.getFollowRequest().setMaxBatchOperationCount(maxReadSize); + followRequest.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(2, 10)); + followRequest.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); + followRequest.getFollowRequest().setMaxWriteBufferSize(randomIntBetween(1024, 10240)); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); atLeastDocsIndexed("index1", numDocsIndexed); run.set(false); @@ -379,14 +375,14 @@ public class ShardChangesIT extends ESIntegTestCase { }); thread.start(); - ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - followRequest.setMaxBatchOperationCount(randomIntBetween(32, 2048)); - followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10)); - followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); - client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest)).get(); + PutFollowAction.Request followRequest = follow("index1", "index2"); + followRequest.getFollowRequest().setMaxBatchOperationCount(randomIntBetween(32, 2048)); + followRequest.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(2, 10)); + followRequest.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); - long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(), - followRequest.getMaxBatchOperationCount() * 10)); + long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getFollowRequest().getMaxBatchOperationCount(), + followRequest.getFollowRequest().getMaxBatchOperationCount() * 10)); long minNumDocsReplicated = maxNumDocsReplicated / 3L; logger.info("waiting for at least [{}] documents to be indexed and then stop a random data node", minNumDocsReplicated); atLeastDocsIndexed("index2", minNumDocsReplicated); @@ -408,8 +404,8 @@ public class ShardChangesIT extends ESIntegTestCase { internalCluster().ensureAtLeastNumDataNodes(2); ensureGreen("index1"); - final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest)).get(); + final PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); final int numDocs = randomIntBetween(2, 64); for (int i = 0; i < numDocs; i++) { @@ -452,19 +448,19 @@ public class ShardChangesIT extends ESIntegTestCase { assertAcked(client().admin().indices().prepareCreate("test-leader").get()); assertAcked(client().admin().indices().prepareCreate("test-follower").get()); // Leader index does not exist. - ResumeFollowAction.Request followRequest1 = createFollowRequest("non-existent-leader", "test-follower"); + ResumeFollowAction.Request followRequest1 = resumeFollow("non-existent-leader", "test-follower"); expectThrows(IndexNotFoundException.class, () -> client().execute(ResumeFollowAction.INSTANCE, followRequest1).actionGet()); expectThrows(IndexNotFoundException.class, () -> client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest1)) .actionGet()); // Follower index does not exist. - ResumeFollowAction.Request followRequest2 = createFollowRequest("non-test-leader", "non-existent-follower"); + ResumeFollowAction.Request followRequest2 = resumeFollow("non-test-leader", "non-existent-follower"); expectThrows(IndexNotFoundException.class, () -> client().execute(ResumeFollowAction.INSTANCE, followRequest2).actionGet()); expectThrows(IndexNotFoundException.class, () -> client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest2)) .actionGet()); // Both indices do not exist. - ResumeFollowAction.Request followRequest3 = createFollowRequest("non-existent-leader", "non-existent-follower"); + ResumeFollowAction.Request followRequest3 = resumeFollow("non-existent-leader", "non-existent-follower"); expectThrows(IndexNotFoundException.class, () -> client().execute(ResumeFollowAction.INSTANCE, followRequest3).actionGet()); expectThrows(IndexNotFoundException.class, () -> client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest3)) @@ -484,10 +480,9 @@ public class ShardChangesIT extends ESIntegTestCase { client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } - ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - followRequest.setMaxOperationSizeInBytes(1L); - final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + PutFollowAction.Request followRequest = follow("index1", "index2"); + followRequest.getFollowRequest().setMaxOperationSizeInBytes(1L); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); final Map firstBatchNumDocsPerShard = new HashMap<>(); final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); @@ -514,21 +509,19 @@ public class ShardChangesIT extends ESIntegTestCase { assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON)); ensureGreen("index3"); - ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); - followRequest = createFollowRequest("index3", "index4"); - createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + followRequest = follow("index3", "index4"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); unfollowIndex("index2", "index4"); - ResumeFollowAction.Request wrongRequest1 = createFollowRequest("index1", "index4"); + ResumeFollowAction.Request wrongRequest1 = resumeFollow("index1", "index4"); Exception e = expectThrows(IllegalArgumentException.class, () -> client().execute(ResumeFollowAction.INSTANCE, wrongRequest1).actionGet()); assertThat(e.getMessage(), containsString("follow index [index4] should reference")); - ResumeFollowAction.Request wrongRequest2 = createFollowRequest("index3", "index2"); + ResumeFollowAction.Request wrongRequest2 = resumeFollow("index3", "index2"); e = expectThrows(IllegalArgumentException.class, () -> client().execute(ResumeFollowAction.INSTANCE, wrongRequest2).actionGet()); assertThat(e.getMessage(), containsString("follow index [index2] should reference")); } @@ -537,9 +530,8 @@ public class ShardChangesIT extends ESIntegTestCase { String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get()); ensureYellow("index1"); - ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); unfollowIndex("index2"); client().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); @@ -559,9 +551,8 @@ public class ShardChangesIT extends ESIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + final PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); @@ -594,9 +585,8 @@ public class ShardChangesIT extends ESIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + final PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); @@ -624,9 +614,8 @@ public class ShardChangesIT extends ESIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + final PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); @@ -643,9 +632,8 @@ public class ShardChangesIT extends ESIntegTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .build())); - final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); - final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); - client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + final PutFollowAction.Request followRequest = follow("index1", "index2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); @@ -883,7 +871,11 @@ public class ShardChangesIT extends ESIntegTestCase { }); } - public static ResumeFollowAction.Request createFollowRequest(String leaderIndex, String followerIndex) { + public static PutFollowAction.Request follow(String leaderIndex, String followerIndex) { + return new PutFollowAction.Request(resumeFollow(leaderIndex, followerIndex)); + } + + public static ResumeFollowAction.Request resumeFollow(String leaderIndex, String followerIndex) { ResumeFollowAction.Request request = new ResumeFollowAction.Request(); request.setLeaderIndex(leaderIndex); request.setFollowerIndex(followerIndex); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowActionTests.java index b8e623c8d1b..442180c7089 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowActionTests.java @@ -35,7 +35,7 @@ public class TransportResumeFollowActionTests extends ESTestCase { customMetaData.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "uuid"); customMetaData.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, "_na_"); - ResumeFollowAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2"); + ResumeFollowAction.Request request = ShardChangesIT.resumeFollow("index1", "index2"); String[] UUIDs = new String[]{"uuid"}; { // should fail, because leader index does not exist