From cea8999406cba545acda9e1c2a51391b2912a59e Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Wed, 4 Mar 2015 14:43:53 +0100 Subject: [PATCH] Store: Delete index folder if all shards were allocated away from a data only node If a folder for an index was created that folder is never deleted from that node unless the index is deleted. Data only nodes therefore can have empty folders for indices that they do not even have shards for. This commit makes sure empty folders are cleaned up after all shards have moved away from a data only node. The behavior is unchanged for master eligible nodes. closes #9985 --- .../elasticsearch/indices/IndicesService.java | 10 +- .../indices/store/IndicesStore.java | 11 ++- .../store/IndicesStoreIntegrationTests.java | 99 +++++++++++++++++-- 3 files changed, 108 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index ca19c142d95..06cf97a3985 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -471,12 +471,14 @@ public class IndicesService extends AbstractLifecycleComponent i if (nodeEnv.hasNodeFile()) { synchronized (this) { String indexName = metaData.index(); - if (indices.containsKey(metaData.index())) { - String localUUid = indices.get(metaData.index()).v1().indexUUID(); - throw new ElasticsearchIllegalStateException("Can't delete index store for [" + metaData.getIndex() + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]"); + if (indices.containsKey(indexName)) { + String localUUid = indices.get(indexName).v1().indexUUID(); + throw new ElasticsearchIllegalStateException("Can't delete index store for [" + indexName + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]"); } ClusterState clusterState = clusterService.state(); - if (clusterState.metaData().hasIndex(indexName)) { + if (clusterState.metaData().hasIndex(indexName) && (clusterState.nodes().localNode().masterNode() == true)) { + // we do not delete the store if it is a master eligible node and the index is still in the cluster state + // because we want to keep the meta data for indices around even if no shards are left here final IndexMetaData index = clusterState.metaData().index(indexName); throw new ElasticsearchIllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getUUID() + "] [" + metaData.getUUID() + "]"); } diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 4cdebbfbb90..7d43e9e9075 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -296,9 +296,18 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex()); try { indicesService.deleteShardStore("no longer used", shardId, indexMeta); - } catch (Exception ex) { + } catch (Throwable ex) { logger.debug("{} failed to delete unallocated shard, ignoring", ex, shardId); } + // if the index doesn't exists anymore, delete its store as well, but only if its a non master node, since master + // nodes keep the index metadata around + if (indicesService.hasIndex(shardId.getIndex()) == false && currentState.nodes().localNode().masterNode() == false) { + try { + indicesService.deleteIndexStore("no longer used", indexMeta); + } catch (Throwable ex) { + logger.debug("{} failed to delete unallocated index, ignoring", ex, shardId.getIndex()); + } + } return currentState; } diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java index 21ce5bbb5a6..ad71c70d17f 100644 --- a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java +++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java @@ -27,11 +27,13 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; @@ -41,6 +43,8 @@ import org.junit.Test; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; @@ -53,6 +57,57 @@ import static org.hamcrest.Matchers.equalTo; @ClusterScope(scope= Scope.TEST, numDataNodes = 0) public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { + @Test + public void indexCleanup() throws Exception { + final String masterNode = internalCluster().startNode(ImmutableSettings.builder().put("node.data", false)); + final String node_1 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false)); + final String node_2 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false)); + logger.info("--> creating index [test] with one shard and on replica"); + assertAcked(prepareCreate("test").setSettings( + ImmutableSettings.builder().put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + ensureGreen("test"); + + logger.info("--> making sure that shard and its replica are allocated on node_1 and node_2"); + assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true)); + + logger.info("--> starting node server3"); + final String node_3 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false)); + logger.info("--> running cluster_health"); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForNodes("4") + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + + assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(false)); + assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false)); + + logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish"); + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get(); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForNodes("4") + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + + assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false)); + assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false)); + assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(true)); + } + @Test public void shardsCleanup() throws Exception { final String node_1 = internalCluster().startNode(); @@ -115,26 +170,43 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { @Test public void testShardActiveElseWhere() throws Exception { - String node_1 = internalCluster().startNode(); - String node_2 = internalCluster().startNode(); + boolean node1IsMasterEligible = randomBoolean(); + boolean node2IsMasterEligible = !node1IsMasterEligible || randomBoolean(); + Future node_1_future = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.master", node1IsMasterEligible).build()); + Future node_2_future = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.master", node2IsMasterEligible).build()); + final String node_1 = node_1_future.get(); + final String node_2 = node_2_future.get(); final String node_1_id = internalCluster().getInstance(DiscoveryService.class, node_1).localNode().getId(); final String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().getId(); + logger.debug("node {} (node_1) is {}master eligible", node_1, node1IsMasterEligible ? "" : "not "); + logger.debug("node {} (node_2) is {}master eligible", node_2, node2IsMasterEligible ? "" : "not "); + logger.debug("node {} became master", internalCluster().getMasterName()); final int numShards = scaledRandomIntBetween(2, 20); assertAcked(prepareCreate("test") .setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)) ); ensureGreen("test"); + waitNoPendingTasksOnAll(); ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get(); + RoutingNode routingNode = stateResponse.getState().routingNodes().node(node_2_id); - int[] node2Shards = new int[routingNode.numberOfOwningShards()]; + final int[] node2Shards = new int[routingNode.numberOfOwningShards()]; int i = 0; for (MutableShardRouting mutableShardRouting : routingNode) { - node2Shards[i++] = mutableShardRouting.shardId().id(); + node2Shards[i] = mutableShardRouting.shardId().id(); + i++; } logger.info("Node 2 has shards: {}", Arrays.toString(node2Shards)); - waitNoPendingTasksOnAll(); + final long shardVersions[] = new long[numShards]; + final int shardIds[] = new int[numShards]; + i=0; + for (ShardRouting shardRouting : stateResponse.getState().getRoutingTable().allShards("test")) { + shardVersions[i] = shardRouting.version(); + shardIds[i] = shardRouting.getId(); + i++; + } internalCluster().getInstance(ClusterService.class, node_2).submitStateUpdateTask("test", Priority.IMMEDIATE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -142,7 +214,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { for (int i = 0; i < numShards; i++) { indexRoutingTableBuilder.addIndexShard( new IndexShardRoutingTable.Builder(new ShardId("test", i), false) - .addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, 1)) + .addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, shardVersions[shardIds[i]])) .build() ); } @@ -166,6 +238,11 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { } } + private Path indexDirectory(String server, String index) { + NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); + return env.indexPaths(new Index(index))[0]; + } + private Path shardDirectory(String server, String index, int shard) { NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); return env.shardPaths(new ShardId(index, shard))[0]; @@ -181,5 +258,13 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { return Files.exists(shardDirectory(server, index, shard)); } - + private boolean waitForIndexDeletion(final String server, final String index) throws InterruptedException { + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + return !Files.exists(indexDirectory(server, index)); + } + }); + return Files.exists(indexDirectory(server, index)); + } }