[CCR] Fix follow stats API's follower index filtering feature (#36647)
Currently always all follow stats for all follower indices are being returned even if follow stats for only specific indices are requested.
This commit is contained in:
parent
fbf88b2ab7
commit
68a674ef1f
|
@ -22,6 +22,9 @@ import org.elasticsearch.xpack.ccr.Ccr;
|
||||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -82,12 +85,15 @@ public class TransportFollowStatsAction extends TransportTasksAction<
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Set<String> requestedFollowerIndices = request.indices() != null ?
|
||||||
|
new HashSet<>(Arrays.asList(request.indices())) : Collections.emptySet();
|
||||||
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
|
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
|
||||||
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
|
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
|
||||||
.map(persistentTask -> {
|
.map(persistentTask -> {
|
||||||
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
|
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
|
||||||
return shardFollowTask.getFollowShardId().getIndexName();
|
return shardFollowTask.getFollowShardId().getIndexName();
|
||||||
})
|
})
|
||||||
|
.filter(followerIndex -> requestedFollowerIndices.isEmpty() || requestedFollowerIndices.contains(followerIndex))
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
for (final Task task : taskManager.getTasks().values()) {
|
for (final Task task : taskManager.getTasks().values()) {
|
||||||
|
|
|
@ -67,19 +67,19 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
|
||||||
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
|
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ResumeFollowAction.Request getResumeFollowRequest() {
|
protected ResumeFollowAction.Request getResumeFollowRequest(String followerIndex) {
|
||||||
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
|
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
|
||||||
request.setFollowerIndex("follower");
|
request.setFollowerIndex(followerIndex);
|
||||||
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
|
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
|
||||||
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
|
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected PutFollowAction.Request getPutFollowRequest() {
|
protected PutFollowAction.Request getPutFollowRequest(String leaderIndex, String followerIndex) {
|
||||||
PutFollowAction.Request request = new PutFollowAction.Request();
|
PutFollowAction.Request request = new PutFollowAction.Request();
|
||||||
request.setRemoteCluster("local");
|
request.setRemoteCluster("local");
|
||||||
request.setLeaderIndex("leader");
|
request.setLeaderIndex(leaderIndex);
|
||||||
request.setFollowRequest(getResumeFollowRequest());
|
request.setFollowRequest(getResumeFollowRequest(followerIndex));
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
|
public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
|
||||||
final ResumeFollowAction.Request followRequest = getResumeFollowRequest();
|
final ResumeFollowAction.Request followRequest = getResumeFollowRequest("follower");
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
client().execute(
|
client().execute(
|
||||||
ResumeFollowAction.INSTANCE,
|
ResumeFollowAction.INSTANCE,
|
||||||
|
@ -71,7 +71,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testThatCreateAndFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
|
public void testThatCreateAndFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
|
||||||
final PutFollowAction.Request createAndFollowRequest = getPutFollowRequest();
|
final PutFollowAction.Request createAndFollowRequest = getPutFollowRequest("leader", "follower");
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
client().execute(
|
client().execute(
|
||||||
PutFollowAction.INSTANCE,
|
PutFollowAction.INSTANCE,
|
||||||
|
|
|
@ -11,11 +11,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.xpack.CcrSingleNodeTestCase;
|
import org.elasticsearch.xpack.CcrSingleNodeTestCase;
|
||||||
|
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static java.util.Collections.singletonMap;
|
import static java.util.Collections.singletonMap;
|
||||||
|
@ -31,7 +33,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
|
||||||
assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
ensureGreen("leader");
|
ensureGreen("leader");
|
||||||
|
|
||||||
final PutFollowAction.Request followRequest = getPutFollowRequest();
|
final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower");
|
||||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||||
|
|
||||||
final long firstBatchNumDocs = randomIntBetween(2, 64);
|
final long firstBatchNumDocs = randomIntBetween(2, 64);
|
||||||
|
@ -61,7 +63,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
|
||||||
client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
|
client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
|
||||||
}
|
}
|
||||||
|
|
||||||
client().execute(ResumeFollowAction.INSTANCE, getResumeFollowRequest()).get();
|
client().execute(ResumeFollowAction.INSTANCE, getResumeFollowRequest("follower")).get();
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value,
|
assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value,
|
||||||
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs));
|
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs));
|
||||||
|
@ -69,13 +71,46 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
|
||||||
ensureEmptyWriteBuffers();
|
ensureEmptyWriteBuffers();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testFollowStatsApiFollowerIndexFiltering() throws Exception {
|
||||||
|
final String leaderIndexSettings = getIndexSettings(1, 0,
|
||||||
|
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||||
|
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
|
ensureGreen("leader1");
|
||||||
|
assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
|
ensureGreen("leader2");
|
||||||
|
|
||||||
|
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
|
||||||
|
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||||
|
|
||||||
|
followRequest = getPutFollowRequest("leader2", "follower2");
|
||||||
|
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||||
|
|
||||||
|
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
|
||||||
|
statsRequest.setIndices(new String[] {"follower1"});
|
||||||
|
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||||
|
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||||
|
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||||
|
|
||||||
|
statsRequest = new FollowStatsAction.StatsRequest();
|
||||||
|
statsRequest.setIndices(new String[] {"follower2"});
|
||||||
|
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||||
|
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||||
|
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower2"));
|
||||||
|
|
||||||
|
response = client().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet();
|
||||||
|
assertThat(response.getStatsResponses().size(), equalTo(2));
|
||||||
|
response.getStatsResponses().sort(Comparator.comparing(o -> o.status().followerIndex()));
|
||||||
|
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||||
|
assertThat(response.getStatsResponses().get(1).status().followerIndex(), equalTo("follower2"));
|
||||||
|
}
|
||||||
|
|
||||||
public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Exception {
|
public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Exception {
|
||||||
final String leaderIndexSettings = getIndexSettings(2, 0,
|
final String leaderIndexSettings = getIndexSettings(2, 0,
|
||||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "false"));
|
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "false"));
|
||||||
assertAcked(client().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
ResumeFollowAction.Request followRequest = getResumeFollowRequest();
|
ResumeFollowAction.Request followRequest = getResumeFollowRequest("follower");
|
||||||
followRequest.setFollowerIndex("follower-index");
|
followRequest.setFollowerIndex("follower-index");
|
||||||
PutFollowAction.Request putFollowRequest = getPutFollowRequest();
|
PutFollowAction.Request putFollowRequest = getPutFollowRequest("leader", "follower");
|
||||||
putFollowRequest.setLeaderIndex("leader-index");
|
putFollowRequest.setLeaderIndex("leader-index");
|
||||||
putFollowRequest.setFollowRequest(followRequest);
|
putFollowRequest.setFollowRequest(followRequest);
|
||||||
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
|
||||||
|
|
Loading…
Reference in New Issue