Deleting a follower index does not delete its ShardFollowTasks, potentially leaving many persistent tasks in the cluster that cannot be allocated on nodes and unnecessary fill the logs. This commit adds a cluster state listener (ShardFollowTaskCleaner) that completes (with a failure) any persistent task that refers to a non existent follower index. I think that this bug has been introduced by #34404: before this change the task would have been completed as failed and removed from the cluster state. Backport of #44702 and #44801 on 7.x
This commit is contained in:
parent
fd74b63602
commit
9944e193f9
|
@ -53,6 +53,7 @@ import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
|
|||
import org.elasticsearch.xpack.ccr.action.CcrRequests;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTaskCleaner;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
|
||||
|
@ -184,6 +185,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
ccrLicenseChecker,
|
||||
restoreSourceService,
|
||||
new CcrRepositoryManager(settings, clusterService, client),
|
||||
new ShardFollowTaskCleaner(clusterService, threadPool, client),
|
||||
new AutoFollowCoordinator(
|
||||
settings,
|
||||
client,
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
|
||||
import org.elasticsearch.persistent.PersistentTaskResponse;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
/**
|
||||
* A {@link ClusterStateListener} that completes any {@link ShardFollowTask} which concerns a deleted index.
|
||||
*/
|
||||
public class ShardFollowTaskCleaner implements ClusterStateListener {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ShardFollowTaskCleaner.class);
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final Client client;
|
||||
|
||||
public ShardFollowTaskCleaner(final ClusterService clusterService, final ThreadPool threadPool, final Client client) {
|
||||
this.threadPool = threadPool;
|
||||
this.client = client;
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(final ClusterChangedEvent event) {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
return;
|
||||
}
|
||||
if (event.localNodeMaster() == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
MetaData metaData = event.state().metaData();
|
||||
PersistentTasksCustomMetaData persistentTasksMetaData = metaData.custom(PersistentTasksCustomMetaData.TYPE);
|
||||
if (persistentTasksMetaData == null) {
|
||||
return;
|
||||
}
|
||||
for (PersistentTasksCustomMetaData.PersistentTask<?> persistentTask : persistentTasksMetaData.tasks()) {
|
||||
if (ShardFollowTask.NAME.equals(persistentTask.getTaskName()) == false) {
|
||||
// this task is not a shard follow task
|
||||
continue;
|
||||
}
|
||||
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
|
||||
Index followerIndex = shardFollowTask.getFollowShardId().getIndex();
|
||||
if (metaData.index(followerIndex) != null) {
|
||||
// the index exists, do not clean this persistent task
|
||||
continue;
|
||||
}
|
||||
IndexNotFoundException e = new IndexNotFoundException(followerIndex);
|
||||
CompletionPersistentTaskAction.Request request =
|
||||
new CompletionPersistentTaskAction.Request(persistentTask.getId(), persistentTask.getAllocationId(), e);
|
||||
threadPool.generic().submit(() -> {
|
||||
client.execute(CompletionPersistentTaskAction.INSTANCE, request, new ActionListener<PersistentTaskResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(PersistentTaskResponse persistentTaskResponse) {
|
||||
logger.debug("task [{}] cleaned up", persistentTask.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn("failed to clean up task [{}]", persistentTask.getId());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,6 +30,7 @@ import static java.util.Collections.singletonMap;
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.xpack.ccr.LocalIndexFollowingIT.getIndexSettings;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.collection.IsEmptyCollection.empty;
|
||||
|
||||
/*
|
||||
|
@ -149,9 +150,8 @@ public class FollowStatsIT extends CcrSingleNodeTestCase {
|
|||
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
|
||||
}
|
||||
|
||||
public void testFollowStatsApiIncludeShardFollowStatsWithRemovedFollowerIndex() throws Exception {
|
||||
final String leaderIndexSettings = getIndexSettings(1, 0,
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
public void testFollowStatsApiWithDeletedFollowerIndex() throws Exception {
|
||||
final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("leader1");
|
||||
|
||||
|
@ -171,18 +171,11 @@ public class FollowStatsIT extends CcrSingleNodeTestCase {
|
|||
|
||||
assertAcked(client().admin().indices().delete(new DeleteIndexRequest("follower1")).actionGet());
|
||||
|
||||
statsRequest = new FollowStatsAction.StatsRequest();
|
||||
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
statsRequest = new FollowStatsAction.StatsRequest();
|
||||
statsRequest.setIndices(new String[] {"follower1"});
|
||||
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
|
||||
assertThat(response.getStatsResponses().size(), equalTo(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
|
||||
|
||||
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
|
||||
assertBusy(() -> {
|
||||
FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
|
||||
FollowStatsAction.StatsResponses statsResponse = client().execute(FollowStatsAction.INSTANCE, request).actionGet();
|
||||
assertThat(statsResponse.getStatsResponses(), hasSize(0));
|
||||
});
|
||||
}
|
||||
|
||||
public void testFollowStatsApiIncludeShardFollowStatsWithClosedFollowerIndex() throws Exception {
|
||||
|
|
|
@ -762,13 +762,14 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|||
StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
|
||||
assertThat(response.getNodeFailures(), empty());
|
||||
assertThat(response.getTaskFailures(), empty());
|
||||
assertThat(response.getStatsResponses(), hasSize(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L));
|
||||
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
|
||||
assertThat(fatalException, notNullValue());
|
||||
assertThat(fatalException.getMessage(), equalTo("no such index [index2]"));
|
||||
if (response.getStatsResponses().isEmpty() == false) {
|
||||
assertThat(response.getStatsResponses(), hasSize(1));
|
||||
assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L));
|
||||
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
|
||||
assertThat(fatalException, notNullValue());
|
||||
assertThat(fatalException.getMessage(), equalTo("no such index [index2]"));
|
||||
}
|
||||
});
|
||||
pauseFollow("index2");
|
||||
ensureNoCcrTasks();
|
||||
}
|
||||
|
||||
|
@ -1321,6 +1322,37 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testCleanUpShardFollowTasksForDeletedIndices() throws Exception {
|
||||
final int numberOfShards = randomIntBetween(1, 10);
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate("index1")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1))
|
||||
.build()));
|
||||
|
||||
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
|
||||
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
|
||||
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));
|
||||
|
||||
assertBusy(() -> {
|
||||
String action = ShardFollowTask.NAME + "[c]";
|
||||
ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get();
|
||||
assertThat(listTasksResponse.getTasks(), hasSize(numberOfShards));
|
||||
});
|
||||
|
||||
assertAcked(followerClient().admin().indices().prepareDelete("index2"));
|
||||
|
||||
assertBusy(() -> {
|
||||
String action = ShardFollowTask.NAME + "[c]";
|
||||
ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get();
|
||||
assertThat(listTasksResponse.getTasks(), hasSize(0));
|
||||
});
|
||||
ensureNoCcrTasks();
|
||||
}
|
||||
|
||||
private long getFollowTaskSettingsVersion(String followerIndex) {
|
||||
long settingsVersion = -1L;
|
||||
for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {
|
||||
|
|
Loading…
Reference in New Issue