Follow stats api should return a 404 when requesting stats for a non existing index (#37220)
Currently it returns an empty response with a 200 response code. Closes #37021
This commit is contained in:
parent
0a93a0358b
commit
ef2f5e4a13
|
@ -630,6 +630,22 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
public void testGetFollowStats() throws Exception {
|
||||
RestHighLevelClient client = highLevelClient();
|
||||
|
||||
{
|
||||
// Create leader index:
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader");
|
||||
createIndexRequest.settings(Collections.singletonMap("index.soft_deletes.enabled", true));
|
||||
CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
|
||||
assertThat(response.isAcknowledged(), is(true));
|
||||
}
|
||||
{
|
||||
// Follow index, so that we can query for follow stats:
|
||||
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower");
|
||||
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
|
||||
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
|
||||
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
|
||||
assertThat(putFollowResponse.isIndexFollowingStarted(), is(true));
|
||||
}
|
||||
|
||||
// tag::ccr-get-follow-stats-request
|
||||
FollowStatsRequest request =
|
||||
new FollowStatsRequest("follower"); // <1>
|
||||
|
@ -671,6 +687,12 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
// end::ccr-get-follow-stats-execute-async
|
||||
|
||||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||
|
||||
{
|
||||
PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower");
|
||||
AcknowledgedResponse pauseFollowResponse = client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT);
|
||||
assertThat(pauseFollowResponse.isAcknowledged(), is(true));
|
||||
}
|
||||
}
|
||||
|
||||
static Map<String, Object> toMap(Response response) throws IOException {
|
||||
|
|
|
@ -43,6 +43,12 @@
|
|||
- is_true: follow_index_shards_acked
|
||||
- is_true: index_following_started
|
||||
|
||||
- do:
|
||||
ccr.follow_stats:
|
||||
index: _all
|
||||
- length: { indices: 1 }
|
||||
- match: { indices.0.index: "bar" }
|
||||
|
||||
# we can not reliably wait for replication to occur so we test the endpoint without indexing any documents
|
||||
- do:
|
||||
ccr.follow_stats:
|
||||
|
@ -77,3 +83,7 @@
|
|||
index: bar
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
ccr.follow_stats:
|
||||
index: unknown
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.TaskOperationFailure;
|
||||
|
@ -13,6 +14,7 @@ import org.elasticsearch.action.support.ActionFilters;
|
|||
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
@ -65,6 +67,15 @@ public class TransportFollowStatsAction extends TransportTasksAction<
|
|||
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (Strings.isAllOrWildcard(request.indices()) == false) {
|
||||
final ClusterState state = clusterService.state();
|
||||
Set<String> shardFollowTaskFollowerIndices = findFollowerIndicesFromShardFollowTasks(state, request.indices());
|
||||
if (shardFollowTaskFollowerIndices.isEmpty()) {
|
||||
String resources = String.join(",", request.indices());
|
||||
throw new ResourceNotFoundException("No shard follow tasks for follower indices [{}]", resources);
|
||||
}
|
||||
}
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
|
@ -80,21 +91,7 @@ public class TransportFollowStatsAction extends TransportTasksAction<
|
|||
@Override
|
||||
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
|
||||
final ClusterState state = clusterService.state();
|
||||
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
if (persistentTasksMetaData == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final Set<String> requestedFollowerIndices = request.indices() != null ?
|
||||
new HashSet<>(Arrays.asList(request.indices())) : Collections.emptySet();
|
||||
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
|
||||
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
|
||||
.map(persistentTask -> {
|
||||
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
|
||||
return shardFollowTask.getFollowShardId().getIndexName();
|
||||
})
|
||||
.filter(followerIndex -> requestedFollowerIndices.isEmpty() || requestedFollowerIndices.contains(followerIndex))
|
||||
.collect(Collectors.toSet());
|
||||
final Set<String> followerIndices = findFollowerIndicesFromShardFollowTasks(state, request.indices());
|
||||
|
||||
for (final Task task : taskManager.getTasks().values()) {
|
||||
if (task instanceof ShardFollowNodeTask) {
|
||||
|
@ -114,4 +111,22 @@ public class TransportFollowStatsAction extends TransportTasksAction<
|
|||
listener.onResponse(new FollowStatsAction.StatsResponse(task.getStatus()));
|
||||
}
|
||||
|
||||
static Set<String> findFollowerIndicesFromShardFollowTasks(ClusterState state, String[] indices) {
|
||||
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
if (persistentTasksMetaData == null) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
final Set<String> requestedFollowerIndices = indices != null ?
|
||||
new HashSet<>(Arrays.asList(indices)) : Collections.emptySet();
|
||||
return persistentTasksMetaData.tasks().stream()
|
||||
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
|
||||
.map(persistentTask -> {
|
||||
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
|
||||
return shardFollowTask.getFollowShardId().getIndexName();
|
||||
})
|
||||
.filter(followerIndex -> Strings.isAllOrWildcard(indices) || requestedFollowerIndices.contains(followerIndex))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -6,9 +6,12 @@
|
|||
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
@ -116,4 +119,106 @@ public class FollowStatsIT extends CcrSingleNodeTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
public void testFollowStatsApiResourceNotFound() throws Exception {
|
||||
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
|
||||
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(0));
|
||||
|
||||
statsRequest.setIndices(new String[] {"follower1"});
|
||||
Exception e = expectThrows(ResourceNotFoundException.class,
|
||||
() -> client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("No shard follow tasks for follower indices [follower1]"));
|
||||
|
||||
final String leaderIndexSettings = getIndexSettings(1, 0,
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("leader1");
|
||||
|
||||
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
|
||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
statsRequest.setIndices(new String[] {"follower2"});
|
||||
e = expectThrows(ResourceNotFoundException.class,
|
||||
() -> client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("No shard follow tasks for follower indices [follower2]"));
|
||||
|
||||
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
|
||||
}
|
||||
|
||||
public void testFollowStatsApiIncludeShardFollowStatsWithRemovedFollowerIndex() throws Exception {
|
||||
final String leaderIndexSettings = getIndexSettings(1, 0,
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("leader1");
|
||||
|
||||
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
|
||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
|
||||
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
statsRequest = new FollowStatsAction.StatsRequest();
|
||||
statsRequest.setIndices(new String[] {"follower1"});
|
||||
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
assertAcked(client().admin().indices().delete(new DeleteIndexRequest("follower1")).actionGet());
|
||||
|
||||
statsRequest = new FollowStatsAction.StatsRequest();
|
||||
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
statsRequest = new FollowStatsAction.StatsRequest();
|
||||
statsRequest.setIndices(new String[] {"follower1"});
|
||||
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
|
||||
}
|
||||
|
||||
public void testFollowStatsApiIncludeShardFollowStatsWithClosedFollowerIndex() throws Exception {
|
||||
final String leaderIndexSettings = getIndexSettings(1, 0,
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("leader1");
|
||||
|
||||
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
|
||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
|
||||
FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
statsRequest = new FollowStatsAction.StatsRequest();
|
||||
statsRequest.setIndices(new String[] {"follower1"});
|
||||
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
assertAcked(client().admin().indices().close(new CloseIndexRequest("follower1")).actionGet());
|
||||
|
||||
statsRequest = new FollowStatsAction.StatsRequest();
|
||||
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
statsRequest = new FollowStatsAction.StatsRequest();
|
||||
statsRequest.setIndices(new String[] {"follower1"});
|
||||
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class TransportFollowStatsActionTests extends ESTestCase {
|
||||
|
||||
public void testFindFollowerIndicesFromShardFollowTasks() {
|
||||
PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder()
|
||||
.addTask("1", ShardFollowTask.NAME, createShardFollowTask("abc"), null)
|
||||
.addTask("2", ShardFollowTask.NAME, createShardFollowTask("def"), null);
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_cluster"))
|
||||
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build()).build())
|
||||
.build();
|
||||
Set<String> result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, null);
|
||||
assertThat(result.size(), equalTo(2));
|
||||
assertThat(result.contains("abc"), is(true));
|
||||
assertThat(result.contains("def"), is(true));
|
||||
|
||||
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"def"});
|
||||
assertThat(result.size(), equalTo(1));
|
||||
assertThat(result.contains("def"), is(true));
|
||||
|
||||
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"ghi"});
|
||||
assertThat(result.size(), equalTo(0));
|
||||
}
|
||||
|
||||
private static ShardFollowTask createShardFollowTask(String followerIndex) {
|
||||
return new ShardFollowTask(
|
||||
null,
|
||||
new ShardId(followerIndex, "", 0),
|
||||
new ShardId("leader_index", "", 0),
|
||||
1024,
|
||||
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
|
||||
1,
|
||||
1024,
|
||||
TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE,
|
||||
1,
|
||||
10240,
|
||||
new ByteSizeValue(512, ByteSizeUnit.MB),
|
||||
TimeValue.timeValueMillis(10),
|
||||
TimeValue.timeValueMillis(10),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue