From 68a674ef1fcb120aa77ffdc59d76fa403664430e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 14 Dec 2018 19:39:30 +0100 Subject: [PATCH] [CCR] Fix follow stats API's follower index filtering feature (#36647) Currently always all follow stats for all follower indices are being returned even if follow stats for only specific indices are requested. --- .../action/TransportFollowStatsAction.java | 6 +++ .../xpack/CcrSingleNodeTestCase.java | 10 ++--- .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 4 +- .../xpack/ccr/LocalIndexFollowingIT.java | 43 +++++++++++++++++-- 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java index aa7a7afeae0..8ab66aec8e8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java @@ -22,6 +22,9 @@ import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; @@ -82,12 +85,15 @@ public class TransportFollowStatsAction extends TransportTasksAction< return; } + final Set requestedFollowerIndices = request.indices() != null ? + new HashSet<>(Arrays.asList(request.indices())) : Collections.emptySet(); final Set followerIndices = persistentTasksMetaData.tasks().stream() .filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME)) .map(persistentTask -> { ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); return shardFollowTask.getFollowShardId().getIndexName(); }) + .filter(followerIndex -> requestedFollowerIndices.isEmpty() || requestedFollowerIndices.contains(followerIndex)) .collect(Collectors.toSet()); for (final Task task : taskManager.getTasks().values()) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 439950019a6..417de7cd985 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -67,19 +67,19 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase { assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } - protected ResumeFollowAction.Request getResumeFollowRequest() { + protected ResumeFollowAction.Request getResumeFollowRequest(String followerIndex) { ResumeFollowAction.Request request = new ResumeFollowAction.Request(); - request.setFollowerIndex("follower"); + request.setFollowerIndex(followerIndex); request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); request.setReadPollTimeout(TimeValue.timeValueMillis(10)); return request; } - protected PutFollowAction.Request getPutFollowRequest() { + protected PutFollowAction.Request getPutFollowRequest(String leaderIndex, String followerIndex) { PutFollowAction.Request request = new PutFollowAction.Request(); request.setRemoteCluster("local"); - request.setLeaderIndex("leader"); - request.setFollowRequest(getResumeFollowRequest()); + request.setLeaderIndex(leaderIndex); + request.setFollowRequest(getResumeFollowRequest(followerIndex)); return request; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index 928584316c5..f8e7eab1c86 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -49,7 +49,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase { } public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException { - final ResumeFollowAction.Request followRequest = getResumeFollowRequest(); + final ResumeFollowAction.Request followRequest = getResumeFollowRequest("follower"); final CountDownLatch latch = new CountDownLatch(1); client().execute( ResumeFollowAction.INSTANCE, @@ -71,7 +71,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase { } public void testThatCreateAndFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException { - final PutFollowAction.Request createAndFollowRequest = getPutFollowRequest(); + final PutFollowAction.Request createAndFollowRequest = getPutFollowRequest("leader", "follower"); final CountDownLatch latch = new CountDownLatch(1); client().execute( PutFollowAction.INSTANCE, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 8d096fe1f59..bbdf1a06354 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -11,11 +11,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.xpack.CcrSingleNodeTestCase; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; +import java.util.Comparator; import java.util.Map; import static java.util.Collections.singletonMap; @@ -31,7 +33,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); ensureGreen("leader"); - final PutFollowAction.Request followRequest = getPutFollowRequest(); + final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); client().execute(PutFollowAction.INSTANCE, followRequest).get(); final long firstBatchNumDocs = randomIntBetween(2, 64); @@ -61,7 +63,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); } - client().execute(ResumeFollowAction.INSTANCE, getResumeFollowRequest()).get(); + client().execute(ResumeFollowAction.INSTANCE, getResumeFollowRequest("follower")).get(); assertBusy(() -> { assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)); @@ -69,13 +71,46 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { ensureEmptyWriteBuffers(); } + public void testFollowStatsApiFollowerIndexFiltering() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader1"); + assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader2"); + + PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + followRequest = getPutFollowRequest("leader2", "follower2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[] {"follower1"}); + FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[] {"follower2"}); + response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower2")); + + response = client().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(2)); + response.getStatsResponses().sort(Comparator.comparing(o -> o.status().followerIndex())); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + assertThat(response.getStatsResponses().get(1).status().followerIndex(), equalTo("follower2")); + } + public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Exception { final String leaderIndexSettings = getIndexSettings(2, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "false")); assertAcked(client().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON)); - ResumeFollowAction.Request followRequest = getResumeFollowRequest(); + ResumeFollowAction.Request followRequest = getResumeFollowRequest("follower"); followRequest.setFollowerIndex("follower-index"); - PutFollowAction.Request putFollowRequest = getPutFollowRequest(); + PutFollowAction.Request putFollowRequest = getPutFollowRequest("leader", "follower"); putFollowRequest.setLeaderIndex("leader-index"); putFollowRequest.setFollowRequest(followRequest); IllegalArgumentException error = expectThrows(IllegalArgumentException.class,