Fix index filtering in follow info api. (#37752)
The filtering by follower index was completely broken. Also the wrong persistent tasks were selected, causing the wrong status to be reported. Closes #37738
This commit is contained in:
parent
d9d13f3414
commit
2908ca1b35
|
@ -65,19 +65,28 @@ public class TransportFollowInfoAction extends TransportMasterNodeReadAction<Fol
|
|||
List<String> concreteFollowerIndices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(state,
|
||||
IndicesOptions.STRICT_EXPAND_OPEN_CLOSED, request.getFollowerIndices()));
|
||||
|
||||
List<FollowerInfo> followerInfos = getFollowInfos(concreteFollowerIndices, state);
|
||||
listener.onResponse(new FollowInfoAction.Response(followerInfos));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
|
||||
}
|
||||
|
||||
static List<FollowerInfo> getFollowInfos(List<String> concreteFollowerIndices, ClusterState state) {
|
||||
List<FollowerInfo> followerInfos = new ArrayList<>();
|
||||
PersistentTasksCustomMetaData persistentTasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
|
||||
for (IndexMetaData indexMetaData : state.metaData()) {
|
||||
for (String index : concreteFollowerIndices) {
|
||||
IndexMetaData indexMetaData = state.metaData().index(index);
|
||||
Map<String, String> ccrCustomData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
|
||||
if (ccrCustomData != null) {
|
||||
Optional<ShardFollowTask> result;
|
||||
if (persistentTasks != null) {
|
||||
result = persistentTasks.taskMap().values().stream()
|
||||
.map(persistentTask -> (ShardFollowTask) persistentTask.getParams())
|
||||
.filter(shardFollowTask -> concreteFollowerIndices.isEmpty() ||
|
||||
concreteFollowerIndices.contains(shardFollowTask.getFollowShardId().getIndexName()))
|
||||
result = persistentTasks.findTasks(ShardFollowTask.NAME, task -> true).stream()
|
||||
.map(task -> (ShardFollowTask) task.getParams())
|
||||
.filter(shardFollowTask -> index.equals(shardFollowTask.getFollowShardId().getIndexName()))
|
||||
.findAny();
|
||||
} else {
|
||||
result = Optional.empty();
|
||||
|
@ -107,11 +116,6 @@ public class TransportFollowInfoAction extends TransportMasterNodeReadAction<Fol
|
|||
}
|
||||
}
|
||||
|
||||
listener.onResponse(new FollowInfoAction.Response(followerInfos));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
|
||||
return followerInfos;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.xpack.CcrSingleNodeTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.xpack.ccr.LocalIndexFollowingIT.getIndexSettings;
|
||||
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class FollowInfoIT extends CcrSingleNodeTestCase {
|
||||
|
||||
public void testFollowInfoApiFollowerIndexFiltering() throws Exception {
|
||||
final String leaderIndexSettings = getIndexSettings(1, 0,
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("leader1");
|
||||
assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("leader2");
|
||||
|
||||
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
|
||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
followRequest = getPutFollowRequest("leader2", "follower2");
|
||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
FollowInfoAction.Request request = new FollowInfoAction.Request();
|
||||
request.setFollowerIndices("follower1");
|
||||
FollowInfoAction.Response response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
|
||||
assertThat(response.getFollowInfos().size(), equalTo(1));
|
||||
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
|
||||
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
|
||||
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
|
||||
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
|
||||
|
||||
request = new FollowInfoAction.Request();
|
||||
request.setFollowerIndices("follower2");
|
||||
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
|
||||
assertThat(response.getFollowInfos().size(), equalTo(1));
|
||||
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2"));
|
||||
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2"));
|
||||
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
|
||||
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
|
||||
|
||||
request = new FollowInfoAction.Request();
|
||||
request.setFollowerIndices("_all");
|
||||
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
|
||||
response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex));
|
||||
assertThat(response.getFollowInfos().size(), equalTo(2));
|
||||
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
|
||||
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
|
||||
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
|
||||
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
|
||||
assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2"));
|
||||
assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2"));
|
||||
assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE));
|
||||
assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue());
|
||||
|
||||
// Pause follower1 index and check the follower info api:
|
||||
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
|
||||
|
||||
request = new FollowInfoAction.Request();
|
||||
request.setFollowerIndices("follower1");
|
||||
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
|
||||
assertThat(response.getFollowInfos().size(), equalTo(1));
|
||||
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
|
||||
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
|
||||
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED));
|
||||
assertThat(response.getFollowInfos().get(0).getParameters(), nullValue());
|
||||
|
||||
request = new FollowInfoAction.Request();
|
||||
request.setFollowerIndices("follower2");
|
||||
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
|
||||
assertThat(response.getFollowInfos().size(), equalTo(1));
|
||||
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2"));
|
||||
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2"));
|
||||
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
|
||||
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
|
||||
|
||||
request = new FollowInfoAction.Request();
|
||||
request.setFollowerIndices("_all");
|
||||
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
|
||||
response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex));
|
||||
assertThat(response.getFollowInfos().size(), equalTo(2));
|
||||
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
|
||||
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
|
||||
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED));
|
||||
assertThat(response.getFollowInfos().get(0).getParameters(), nullValue());
|
||||
assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2"));
|
||||
assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2"));
|
||||
assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE));
|
||||
assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue());
|
||||
|
||||
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet());
|
||||
}
|
||||
|
||||
public void testFollowInfoApiIndexMissing() throws Exception {
|
||||
final String leaderIndexSettings = getIndexSettings(1, 0,
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("leader1");
|
||||
assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("leader2");
|
||||
|
||||
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
|
||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
followRequest = getPutFollowRequest("leader2", "follower2");
|
||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
FollowInfoAction.Request request1 = new FollowInfoAction.Request();
|
||||
request1.setFollowerIndices("follower3");
|
||||
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request1).actionGet());
|
||||
|
||||
FollowInfoAction.Request request2 = new FollowInfoAction.Request();
|
||||
request2.setFollowerIndices("follower2", "follower3");
|
||||
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request2).actionGet());
|
||||
|
||||
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
|
||||
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.xpack.ccr.action.TransportFollowStatsActionTests.createShardFollowTask;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class TransportFollowInfoActionTests extends ESTestCase {
|
||||
|
||||
public void testGetFollowInfos() {
|
||||
ClusterState state = createCS(
|
||||
new String[] {"follower1", "follower2", "follower3", "index4"},
|
||||
new boolean[]{true, true, true, false},
|
||||
new boolean[]{true, true, false, false}
|
||||
);
|
||||
List<String> concreteIndices = Arrays.asList("follower1", "follower3");
|
||||
|
||||
List<FollowerInfo> result = TransportFollowInfoAction.getFollowInfos(concreteIndices, state);
|
||||
assertThat(result.size(), equalTo(2));
|
||||
assertThat(result.get(0).getFollowerIndex(), equalTo("follower1"));
|
||||
assertThat(result.get(0).getStatus(), equalTo(Response.Status.ACTIVE));
|
||||
assertThat(result.get(1).getFollowerIndex(), equalTo("follower3"));
|
||||
assertThat(result.get(1).getStatus(), equalTo(Response.Status.PAUSED));
|
||||
}
|
||||
|
||||
private static ClusterState createCS(String[] indices, boolean[] followerIndices, boolean[] statuses) {
|
||||
PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder();
|
||||
MetaData.Builder mdBuilder = MetaData.builder();
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
String index = indices[i];
|
||||
boolean isFollowIndex = followerIndices[i];
|
||||
boolean active = statuses[i];
|
||||
|
||||
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(index)
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0);
|
||||
|
||||
if (isFollowIndex) {
|
||||
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
|
||||
if (active) {
|
||||
persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME, createShardFollowTask(index), null);
|
||||
}
|
||||
}
|
||||
mdBuilder.put(imdBuilder);
|
||||
}
|
||||
|
||||
mdBuilder.putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build());
|
||||
return ClusterState.builder(new ClusterName("_cluster"))
|
||||
.metaData(mdBuilder.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
|
@ -44,7 +44,7 @@ public class TransportFollowStatsActionTests extends ESTestCase {
|
|||
assertThat(result.size(), equalTo(0));
|
||||
}
|
||||
|
||||
private static ShardFollowTask createShardFollowTask(String followerIndex) {
|
||||
static ShardFollowTask createShardFollowTask(String followerIndex) {
|
||||
return new ShardFollowTask(
|
||||
null,
|
||||
new ShardId(followerIndex, "", 0),
|
||||
|
|
Loading…
Reference in New Issue