Hide orphaned tasks from follower stats (#48901)
CCR follower stats can return information for persistent tasks that are in the process of being cleaned up. This is problematic for tests where CCR follower indices have been deleted, but their persistent follower task is only cleaned up asynchronously afterwards. If one of the following tests then accesses the follower stats, it might still get the stats for that follower task. In addition, some tests were not cleaning up their auto-follow patterns, leaving orphaned patterns behind. Other tests cleaned up their auto-follow patterns. As always the same name was used, it just depended on the test execution order whether this led to a failure or not. This commit fixes the offensive tests, and will also automatically remove auto-follow-patterns at the end of tests, like we do for many other features. Closes #48700
This commit is contained in:
parent
8835142ac9
commit
af887be3e5
|
@ -63,6 +63,13 @@ PUT /_ccr/auto_follow/my_auto_follow_pattern
|
||||||
// TEST[setup:remote_cluster]
|
// TEST[setup:remote_cluster]
|
||||||
// TESTSETUP
|
// TESTSETUP
|
||||||
|
|
||||||
|
[source,console]
|
||||||
|
--------------------------------------------------
|
||||||
|
DELETE /_ccr/auto_follow/my_auto_follow_pattern
|
||||||
|
--------------------------------------------------
|
||||||
|
// TEST
|
||||||
|
// TEARDOWN
|
||||||
|
|
||||||
//////////////////////////
|
//////////////////////////
|
||||||
|
|
||||||
[source,console]
|
[source,console]
|
||||||
|
|
|
@ -59,6 +59,13 @@ PUT /_ccr/auto_follow/my_auto_follow_pattern
|
||||||
// TEST[setup:remote_cluster]
|
// TEST[setup:remote_cluster]
|
||||||
// TESTSETUP
|
// TESTSETUP
|
||||||
|
|
||||||
|
[source,console]
|
||||||
|
--------------------------------------------------
|
||||||
|
DELETE /_ccr/auto_follow/my_auto_follow_pattern
|
||||||
|
--------------------------------------------------
|
||||||
|
// TEST
|
||||||
|
// TEARDOWN
|
||||||
|
|
||||||
[source,console]
|
[source,console]
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
POST /_ccr/auto_follow/my_auto_follow_pattern/pause
|
POST /_ccr/auto_follow/my_auto_follow_pattern/pause
|
||||||
|
|
|
@ -471,6 +471,15 @@ public abstract class ESRestTestCase extends ESTestCase {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether to preserve auto-follow patterns. Defaults to not
|
||||||
|
* preserving them. Only runs at all if xpack is installed on the cluster
|
||||||
|
* being tested.
|
||||||
|
*/
|
||||||
|
protected boolean preserveAutoFollowPatternsUponCompletion() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns whether to wait to make absolutely certain that all snapshots
|
* Returns whether to wait to make absolutely certain that all snapshots
|
||||||
* have been deleted.
|
* have been deleted.
|
||||||
|
@ -553,6 +562,10 @@ public abstract class ESRestTestCase extends ESTestCase {
|
||||||
deleteAllILMPolicies();
|
deleteAllILMPolicies();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (hasXPack && false == preserveAutoFollowPatternsUponCompletion()) {
|
||||||
|
deleteAllAutoFollowPatterns();
|
||||||
|
}
|
||||||
|
|
||||||
assertThat("Found in progress snapshots [" + inProgressSnapshots.get() + "].", inProgressSnapshots.get(), anEmptyMap());
|
assertThat("Found in progress snapshots [" + inProgressSnapshots.get() + "].", inProgressSnapshots.get(), anEmptyMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -718,6 +731,31 @@ public abstract class ESRestTestCase extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void deleteAllAutoFollowPatterns() throws IOException {
|
||||||
|
final List<Map<?, ?>> patterns;
|
||||||
|
|
||||||
|
try {
|
||||||
|
Response response = adminClient().performRequest(new Request("GET", "/_ccr/auto_follow"));
|
||||||
|
patterns = (List<Map<?, ?>>) entityAsMap(response).get("patterns");
|
||||||
|
} catch (ResponseException e) {
|
||||||
|
if (RestStatus.METHOD_NOT_ALLOWED.getStatus() == e.getResponse().getStatusLine().getStatusCode() ||
|
||||||
|
RestStatus.BAD_REQUEST.getStatus() == e.getResponse().getStatusLine().getStatusCode()) {
|
||||||
|
// If bad request returned, CCR is not enabled.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (patterns == null || patterns.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Map<?, ?> pattern : patterns) {
|
||||||
|
String patternName = (String) pattern.get("name");
|
||||||
|
adminClient().performRequest(new Request("DELETE", "/_ccr/auto_follow/" + patternName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Logs a message if there are still running tasks. The reasoning is that any tasks still running are state the is trying to bleed into
|
* Logs a message if there are still running tasks. The reasoning is that any tasks still running are state the is trying to bleed into
|
||||||
* other tests.
|
* other tests.
|
||||||
|
|
|
@ -13,9 +13,11 @@ import org.elasticsearch.action.TaskOperationFailure;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.license.LicenseUtils;
|
import org.elasticsearch.license.LicenseUtils;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
|
@ -116,15 +118,17 @@ public class TransportFollowStatsAction extends TransportTasksAction<
|
||||||
if (persistentTasksMetaData == null) {
|
if (persistentTasksMetaData == null) {
|
||||||
return Collections.emptySet();
|
return Collections.emptySet();
|
||||||
}
|
}
|
||||||
|
final MetaData metaData = state.metaData();
|
||||||
final Set<String> requestedFollowerIndices = indices != null ?
|
final Set<String> requestedFollowerIndices = indices != null ?
|
||||||
new HashSet<>(Arrays.asList(indices)) : Collections.emptySet();
|
new HashSet<>(Arrays.asList(indices)) : Collections.emptySet();
|
||||||
return persistentTasksMetaData.tasks().stream()
|
return persistentTasksMetaData.tasks().stream()
|
||||||
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
|
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
|
||||||
.map(persistentTask -> {
|
.map(persistentTask -> {
|
||||||
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
|
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
|
||||||
return shardFollowTask.getFollowShardId().getIndexName();
|
return shardFollowTask.getFollowShardId().getIndex();
|
||||||
})
|
})
|
||||||
|
.filter(followerIndex -> metaData.index(followerIndex) != null) // hide tasks that are orphaned (see ShardFollowTaskCleaner)
|
||||||
|
.map(Index::getName)
|
||||||
.filter(followerIndex -> Strings.isAllOrWildcard(indices) || requestedFollowerIndices.contains(followerIndex))
|
.filter(followerIndex -> Strings.isAllOrWildcard(indices) || requestedFollowerIndices.contains(followerIndex))
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.xpack.ccr.Ccr;
|
import org.elasticsearch.xpack.ccr.Ccr;
|
||||||
|
@ -58,7 +59,8 @@ public class TransportFollowInfoActionTests extends ESTestCase {
|
||||||
if (isFollowIndex) {
|
if (isFollowIndex) {
|
||||||
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
|
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
|
||||||
if (active) {
|
if (active) {
|
||||||
persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME, createShardFollowTask(index), null);
|
persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME,
|
||||||
|
createShardFollowTask(new Index(index, IndexMetaData.INDEX_UUID_NA_VALUE)), null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mdBuilder.put(imdBuilder);
|
mdBuilder.put(imdBuilder);
|
||||||
|
|
|
@ -5,12 +5,16 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ccr.action;
|
package org.elasticsearch.xpack.ccr.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -24,30 +28,48 @@ import static org.hamcrest.Matchers.is;
|
||||||
public class TransportFollowStatsActionTests extends ESTestCase {
|
public class TransportFollowStatsActionTests extends ESTestCase {
|
||||||
|
|
||||||
public void testFindFollowerIndicesFromShardFollowTasks() {
|
public void testFindFollowerIndicesFromShardFollowTasks() {
|
||||||
|
Settings indexSettings = Settings.builder()
|
||||||
|
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
IndexMetaData index1 = IndexMetaData.builder("index1").settings(indexSettings).build();
|
||||||
|
IndexMetaData index2 = IndexMetaData.builder("index2").settings(indexSettings).build();
|
||||||
|
IndexMetaData index3 = IndexMetaData.builder("index3").settings(indexSettings).build();
|
||||||
|
|
||||||
PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder()
|
PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder()
|
||||||
.addTask("1", ShardFollowTask.NAME, createShardFollowTask("abc"), null)
|
.addTask("1", ShardFollowTask.NAME, createShardFollowTask(index1.getIndex()), null)
|
||||||
.addTask("2", ShardFollowTask.NAME, createShardFollowTask("def"), null);
|
.addTask("2", ShardFollowTask.NAME, createShardFollowTask(index2.getIndex()), null)
|
||||||
|
.addTask("3", ShardFollowTask.NAME, createShardFollowTask(index3.getIndex()), null);
|
||||||
|
|
||||||
ClusterState clusterState = ClusterState.builder(new ClusterName("_cluster"))
|
ClusterState clusterState = ClusterState.builder(new ClusterName("_cluster"))
|
||||||
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build()).build())
|
.metaData(MetaData.builder()
|
||||||
|
.putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build())
|
||||||
|
// only add index1 and index2
|
||||||
|
.put(index1, false)
|
||||||
|
.put(index2, false)
|
||||||
|
.build())
|
||||||
.build();
|
.build();
|
||||||
Set<String> result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, null);
|
Set<String> result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, null);
|
||||||
assertThat(result.size(), equalTo(2));
|
assertThat(result.size(), equalTo(2));
|
||||||
assertThat(result.contains("abc"), is(true));
|
assertThat(result.contains(index1.getIndex().getName()), is(true));
|
||||||
assertThat(result.contains("def"), is(true));
|
assertThat(result.contains(index2.getIndex().getName()), is(true));
|
||||||
|
|
||||||
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"def"});
|
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState,
|
||||||
|
new String[]{index2.getIndex().getName()});
|
||||||
assertThat(result.size(), equalTo(1));
|
assertThat(result.size(), equalTo(1));
|
||||||
assertThat(result.contains("def"), is(true));
|
assertThat(result.contains(index2.getIndex().getName()), is(true));
|
||||||
|
|
||||||
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"ghi"});
|
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState,
|
||||||
|
new String[]{index3.getIndex().getName()});
|
||||||
assertThat(result.size(), equalTo(0));
|
assertThat(result.size(), equalTo(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
static ShardFollowTask createShardFollowTask(String followerIndex) {
|
static ShardFollowTask createShardFollowTask(Index followerIndex) {
|
||||||
return new ShardFollowTask(
|
return new ShardFollowTask(
|
||||||
null,
|
null,
|
||||||
new ShardId(followerIndex, "", 0),
|
new ShardId(followerIndex, 0),
|
||||||
new ShardId("leader_index", "", 0),
|
new ShardId("leader_index", "", 0),
|
||||||
1024,
|
1024,
|
||||||
1024,
|
1024,
|
||||||
|
|
Loading…
Reference in New Issue