diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index 7a8d650a878..f8aba2c543d 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -27,13 +27,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; @@ -41,6 +35,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -54,8 +49,7 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.disruption.SingleNodeDisruption; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; import org.junit.Test; import java.io.IOException; @@ -65,7 +59,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static java.lang.Thread.sleep; @@ -171,6 +164,66 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { } + @Test + /* Test that shard is deleted in case ShardActiveRequest after relocation and next incoming cluster state is an index delete. */ + public void shardCleanupIfShardDeletionAfterRelocationFailedAndIndexDeleted() throws Exception { + final String node_1 = internalCluster().startNode(); + logger.info("--> creating index [test] with one shard and on replica"); + assertAcked(prepareCreate("test").setSettings( + Settings.builder().put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) + ); + ensureGreen("test"); + assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true)); + + final String node_2 = internalCluster().startDataOnlyNode(Settings.builder().build()); + assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(false)); + assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(false)); + + // add a transport delegate that will prevent the shard active request to succeed the first time after relocation has finished. + // node_1 will then wait for the next cluster state change before it tries a next attempt to delet the shard. + MockTransportService transportServiceNode_1 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_1); + String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().id(); + DiscoveryNode node_2_disco = internalCluster().clusterService().state().getNodes().dataNodes().get(node_2_id); + final CountDownLatch shardActiveRequestSent = new CountDownLatch(1); + transportServiceNode_1.addDelegate(node_2_disco, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) { + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + if (action.equals("internal:index/shard/exists") && shardActiveRequestSent.getCount() > 0) { + shardActiveRequestSent.countDown(); + logger.info("prevent shard active request from being sent"); + throw new ConnectTransportException(node, "DISCONNECT: simulated"); + } + super.sendRequest(node, requestId, action, request, options); + } + }); + + logger.info("--> move shard from {} to {}, and wait for relocation to finish", node_1, node_2); + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2)).get(); + shardActiveRequestSent.await(); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + logClusterState(); + // delete the index. node_1 that still waits for the next cluster state update will then get the delete index next. + // it must still delete the shard, even if it cannot find it anymore in indicesservice + client().admin().indices().prepareDelete("test").get(); + + assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false)); + assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false)); + assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(false)); + assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(false)); + assertThat(waitForShardDeletion(node_2, "test", 0), equalTo(false)); + assertThat(waitForIndexDeletion(node_2, "test"), equalTo(false)); + assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(false)); + assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(false)); + } + @Test public void shardsCleanup() throws Exception { final String node_1 = internalCluster().startNode();