Store: Delete index folder if all shards were allocated away from a data only node

If a folder for an index was created that folder is never deleted from that node unless the index is deleted.
Data only nodes therefore can have empty folders for indices that they do not even have shards for.
This commit makes sure empty folders are cleaned up after all shards have moved away from a data only
node. The behavior is unchanged for master eligible nodes.

closes #9985
This commit is contained in:
Britta Weber 2015-03-04 14:43:53 +01:00
parent cba6dff3ac
commit cea8999406
3 changed files with 108 additions and 12 deletions

View File

@ -471,12 +471,14 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
if (nodeEnv.hasNodeFile()) {
synchronized (this) {
String indexName = metaData.index();
if (indices.containsKey(metaData.index())) {
String localUUid = indices.get(metaData.index()).v1().indexUUID();
throw new ElasticsearchIllegalStateException("Can't delete index store for [" + metaData.getIndex() + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]");
if (indices.containsKey(indexName)) {
String localUUid = indices.get(indexName).v1().indexUUID();
throw new ElasticsearchIllegalStateException("Can't delete index store for [" + indexName + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]");
}
ClusterState clusterState = clusterService.state();
if (clusterState.metaData().hasIndex(indexName)) {
if (clusterState.metaData().hasIndex(indexName) && (clusterState.nodes().localNode().masterNode() == true)) {
// we do not delete the store if it is a master eligible node and the index is still in the cluster state
// because we want to keep the meta data for indices around even if no shards are left here
final IndexMetaData index = clusterState.metaData().index(indexName);
throw new ElasticsearchIllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getUUID() + "] [" + metaData.getUUID() + "]");
}

View File

@ -296,9 +296,18 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex());
try {
indicesService.deleteShardStore("no longer used", shardId, indexMeta);
} catch (Exception ex) {
} catch (Throwable ex) {
logger.debug("{} failed to delete unallocated shard, ignoring", ex, shardId);
}
// if the index doesn't exists anymore, delete its store as well, but only if its a non master node, since master
// nodes keep the index metadata around
if (indicesService.hasIndex(shardId.getIndex()) == false && currentState.nodes().localNode().masterNode() == false) {
try {
indicesService.deleteIndexStore("no longer used", indexMeta);
} catch (Throwable ex) {
logger.debug("{} failed to delete unallocated index, ignoring", ex, shardId.getIndex());
}
}
return currentState;
}

View File

@ -27,11 +27,13 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
@ -41,6 +43,8 @@ import org.junit.Test;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
@ -53,6 +57,57 @@ import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope= Scope.TEST, numDataNodes = 0)
public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
@Test
public void indexCleanup() throws Exception {
final String masterNode = internalCluster().startNode(ImmutableSettings.builder().put("node.data", false));
final String node_1 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false));
final String node_2 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false));
logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
ImmutableSettings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen("test");
logger.info("--> making sure that shard and its replica are allocated on node_1 and node_2");
assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true));
assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true));
logger.info("--> starting node server3");
final String node_3 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false));
logger.info("--> running cluster_health");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForNodes("4")
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true));
assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true));
assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(false));
assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false));
logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish");
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get();
clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForNodes("4")
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false));
assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false));
assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true));
assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(true));
}
@Test
public void shardsCleanup() throws Exception {
final String node_1 = internalCluster().startNode();
@ -115,26 +170,43 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
@Test
public void testShardActiveElseWhere() throws Exception {
String node_1 = internalCluster().startNode();
String node_2 = internalCluster().startNode();
boolean node1IsMasterEligible = randomBoolean();
boolean node2IsMasterEligible = !node1IsMasterEligible || randomBoolean();
Future<String> node_1_future = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.master", node1IsMasterEligible).build());
Future<String> node_2_future = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.master", node2IsMasterEligible).build());
final String node_1 = node_1_future.get();
final String node_2 = node_2_future.get();
final String node_1_id = internalCluster().getInstance(DiscoveryService.class, node_1).localNode().getId();
final String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().getId();
logger.debug("node {} (node_1) is {}master eligible", node_1, node1IsMasterEligible ? "" : "not ");
logger.debug("node {} (node_2) is {}master eligible", node_2, node2IsMasterEligible ? "" : "not ");
logger.debug("node {} became master", internalCluster().getMasterName());
final int numShards = scaledRandomIntBetween(2, 20);
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards))
);
ensureGreen("test");
waitNoPendingTasksOnAll();
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
RoutingNode routingNode = stateResponse.getState().routingNodes().node(node_2_id);
int[] node2Shards = new int[routingNode.numberOfOwningShards()];
final int[] node2Shards = new int[routingNode.numberOfOwningShards()];
int i = 0;
for (MutableShardRouting mutableShardRouting : routingNode) {
node2Shards[i++] = mutableShardRouting.shardId().id();
node2Shards[i] = mutableShardRouting.shardId().id();
i++;
}
logger.info("Node 2 has shards: {}", Arrays.toString(node2Shards));
waitNoPendingTasksOnAll();
final long shardVersions[] = new long[numShards];
final int shardIds[] = new int[numShards];
i=0;
for (ShardRouting shardRouting : stateResponse.getState().getRoutingTable().allShards("test")) {
shardVersions[i] = shardRouting.version();
shardIds[i] = shardRouting.getId();
i++;
}
internalCluster().getInstance(ClusterService.class, node_2).submitStateUpdateTask("test", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
@ -142,7 +214,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
for (int i = 0; i < numShards; i++) {
indexRoutingTableBuilder.addIndexShard(
new IndexShardRoutingTable.Builder(new ShardId("test", i), false)
.addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, 1))
.addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, shardVersions[shardIds[i]]))
.build()
);
}
@ -166,6 +238,11 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
}
}
private Path indexDirectory(String server, String index) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
return env.indexPaths(new Index(index))[0];
}
private Path shardDirectory(String server, String index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
return env.shardPaths(new ShardId(index, shard))[0];
@ -181,5 +258,13 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
return Files.exists(shardDirectory(server, index, shard));
}
private boolean waitForIndexDeletion(final String server, final String index) throws InterruptedException {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
return !Files.exists(indexDirectory(server, index));
}
});
return Files.exists(indexDirectory(server, index));
}
}