diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index d513fc83ce6..b736200a57f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -53,6 +53,7 @@ import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; +import org.elasticsearch.xpack.ccr.action.ShardFollowTaskCleaner; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; @@ -184,6 +185,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E ccrLicenseChecker, restoreSourceService, new CcrRepositoryManager(settings, clusterService, client), + new ShardFollowTaskCleaner(clusterService, threadPool, client), new AutoFollowCoordinator( settings, client, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java new file mode 100644 index 00000000000..053ef2273cb --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.persistent.CompletionPersistentTaskAction; +import org.elasticsearch.persistent.PersistentTaskResponse; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.threadpool.ThreadPool; + +/** + * A {@link ClusterStateListener} that completes any {@link ShardFollowTask} which concerns a deleted index. + */ +public class ShardFollowTaskCleaner implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(ShardFollowTaskCleaner.class); + + private final ThreadPool threadPool; + private final Client client; + + public ShardFollowTaskCleaner(final ClusterService clusterService, final ThreadPool threadPool, final Client client) { + this.threadPool = threadPool; + this.client = client; + clusterService.addListener(this); + } + + @Override + public void clusterChanged(final ClusterChangedEvent event) { + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + return; + } + if (event.localNodeMaster() == false) { + return; + } + + MetaData metaData = event.state().metaData(); + PersistentTasksCustomMetaData persistentTasksMetaData = metaData.custom(PersistentTasksCustomMetaData.TYPE); + if (persistentTasksMetaData == null) { + return; + } + for (PersistentTasksCustomMetaData.PersistentTask persistentTask : persistentTasksMetaData.tasks()) { + if (ShardFollowTask.NAME.equals(persistentTask.getTaskName()) == false) { + // this task is not a shard follow task + continue; + } + ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); + Index followerIndex = shardFollowTask.getFollowShardId().getIndex(); + if (metaData.index(followerIndex) != null) { + // the index exists, do not clean this persistent task + continue; + } + IndexNotFoundException e = new IndexNotFoundException(followerIndex); + CompletionPersistentTaskAction.Request request = + new CompletionPersistentTaskAction.Request(persistentTask.getId(), persistentTask.getAllocationId(), e); + threadPool.generic().submit(() -> { + client.execute(CompletionPersistentTaskAction.INSTANCE, request, new ActionListener() { + + @Override + public void onResponse(PersistentTaskResponse persistentTaskResponse) { + logger.debug("task [{}] cleaned up", persistentTask.getId()); + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to clean up task [{}]", persistentTask.getId()); + } + }); + }); + } + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowStatsIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowStatsIT.java index 1f1c6cd5c64..df24a5296a2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowStatsIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowStatsIT.java @@ -30,6 +30,7 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.ccr.LocalIndexFollowingIT.getIndexSettings; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.collection.IsEmptyCollection.empty; /* @@ -149,9 +150,8 @@ public class FollowStatsIT extends CcrSingleNodeTestCase { assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet()); } - public void testFollowStatsApiIncludeShardFollowStatsWithRemovedFollowerIndex() throws Exception { - final String leaderIndexSettings = getIndexSettings(1, 0, - singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + public void testFollowStatsApiWithDeletedFollowerIndex() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON)); ensureGreen("leader1"); @@ -171,18 +171,11 @@ public class FollowStatsIT extends CcrSingleNodeTestCase { assertAcked(client().admin().indices().delete(new DeleteIndexRequest("follower1")).actionGet()); - statsRequest = new FollowStatsAction.StatsRequest(); - response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); - assertThat(response.getStatsResponses().size(), equalTo(1)); - assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); - - statsRequest = new FollowStatsAction.StatsRequest(); - statsRequest.setIndices(new String[] {"follower1"}); - response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); - assertThat(response.getStatsResponses().size(), equalTo(1)); - assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); - - assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet()); + assertBusy(() -> { + FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest(); + FollowStatsAction.StatsResponses statsResponse = client().execute(FollowStatsAction.INSTANCE, request).actionGet(); + assertThat(statsResponse.getStatsResponses(), hasSize(0)); + }); } public void testFollowStatsApiIncludeShardFollowStatsWithClosedFollowerIndex() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index ef00a0fcb83..d0afe736f20 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -762,13 +762,14 @@ public class IndexFollowingIT extends CcrIntegTestCase { StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet(); assertThat(response.getNodeFailures(), empty()); assertThat(response.getTaskFailures(), empty()); - assertThat(response.getStatsResponses(), hasSize(1)); - assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L)); - ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException(); - assertThat(fatalException, notNullValue()); - assertThat(fatalException.getMessage(), equalTo("no such index [index2]")); + if (response.getStatsResponses().isEmpty() == false) { + assertThat(response.getStatsResponses(), hasSize(1)); + assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L)); + ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException(); + assertThat(fatalException, notNullValue()); + assertThat(fatalException.getMessage(), equalTo("no such index [index2]")); + } }); - pauseFollow("index2"); ensureNoCcrTasks(); } @@ -1321,6 +1322,37 @@ public class IndexFollowingIT extends CcrIntegTestCase { } } + public void testCleanUpShardFollowTasksForDeletedIndices() throws Exception { + final int numberOfShards = randomIntBetween(1, 10); + assertAcked(leaderClient().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1)) + .build())); + + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L))); + + assertBusy(() -> { + String action = ShardFollowTask.NAME + "[c]"; + ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get(); + assertThat(listTasksResponse.getTasks(), hasSize(numberOfShards)); + }); + + assertAcked(followerClient().admin().indices().prepareDelete("index2")); + + assertBusy(() -> { + String action = ShardFollowTask.NAME + "[c]"; + ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get(); + assertThat(listTasksResponse.getTasks(), hasSize(0)); + }); + ensureNoCcrTasks(); + } + private long getFollowTaskSettingsVersion(String followerIndex) { long settingsVersion = -1L; for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {