diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 8d3ce201a03..196064c693f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -32,9 +33,11 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import java.io.File; +import java.util.Map; /** * @author kimchy (shay.banon) @@ -74,6 +77,34 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe IndexService indexService = indicesService.indexService(indexRoutingTable.index()); if (indexService == null) { // not allocated on this node yet... + // checking if the index was completely relocated to another nodes but left unused shards + for (Map.Entry shardEntry : indexRoutingTable.getShards().entrySet()) { + if (indexRoutingTable != null) { + IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardEntry.getKey()); + if (indexShardRoutingTable != null) { + boolean shardCanBeDeleted = true; + for (ShardRouting routing : indexShardRoutingTable) { + // The shard is not yet started - we cannot make determination if it's used or not yet + if (!routing.started()) { + shardCanBeDeleted = false; + break; + } + String localNodeId = clusterService.localNode().id(); + // Check if shard is active on the current node or is getting relocated to the current node + if (localNodeId.equals(routing.currentNodeId()) || localNodeId.equals(routing.relocatingNodeId())) { + // Shard is used locally - keep it + shardCanBeDeleted = false; + break; + } + } + if (shardCanBeDeleted) { + ShardId shardId = shardEntry.getValue().getShardId(); + logger.debug("[{}] deleting shard that is no longer used", shardId); + FileSystemUtils.deleteRecursively(nodeEnv.shardLocation(shardId)); + } + } + } + } continue; } // if the store is not persistent, don't bother trying to check if it can be deleted diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java new file mode 100644 index 00000000000..133ca1d8313 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indices.store; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; + +import static org.elasticsearch.client.Requests.*; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author imotov + */ +public class IndicesStoreTests extends AbstractNodesTests { + + protected Client client1; + + @BeforeClass public void startNodes() { + // The default (none) gateway cleans the shards on closing + putDefaultSettings(settingsBuilder().put("gateway.type", "local")); + startNode("server1"); + startNode("server2"); + client1 = getClient1(); + } + + @AfterClass public void closeNodes() { + client1.close(); + closeAllNodes(); + } + + protected Client getClient1() { + return client("server1"); + } + + @Test + public void shardsCleanup() { + try { + client1.admin().indices().prepareDelete("test").execute().actionGet(); + } catch (Exception ex) { + // Ignore + } + + logger.info("--> creating index [test] with one shard and on replica"); + client1.admin().indices().create(createIndexRequest("test") + .settings(settingsBuilder().put("index.numberOfReplicas", 1).put("index.numberOfShards", 1))).actionGet(); + + logger.info("--> running cluster_health"); + ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.status()); + + + logger.info("--> making sure that shard and it's replica are allocated on server1 and server2"); + assertThat(shardDirectory("server1", "test", 0).exists(), equalTo(true)); + assertThat(shardDirectory("server2", "test", 0).exists(), equalTo(true)); + + logger.info("--> starting node server3"); + startNode("server3"); + + logger.info("--> making sure that shard is not allocated on server3"); + assertThat(shardDirectory("server3", "test", 0).exists(), equalTo(false)); + + File server2Shard = shardDirectory("server2", "test", 0); + logger.info("--> stopping node server2"); + closeNode("server2"); + assertThat(server2Shard.exists(), equalTo(true)); + + logger.info("--> running cluster_health"); + clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.status()); + + logger.info("--> making sure that shard and it's replica exist on server1, server2 and server3"); + assertThat(shardDirectory("server1", "test", 0).exists(), equalTo(true)); + assertThat(server2Shard.exists(), equalTo(true)); + assertThat(shardDirectory("server3", "test", 0).exists(), equalTo(true)); + + logger.info("--> starting node server2"); + startNode("server2"); + + logger.info("--> running cluster_health"); + clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.status()); + + logger.info("--> making sure that shard and it's replica are allocated on server1 and server3 but not on server2"); + assertThat(shardDirectory("server1", "test", 0).exists(), equalTo(true)); + assertThat(shardDirectory("server2", "test", 0).exists(), equalTo(false)); + assertThat(shardDirectory("server3", "test", 0).exists(), equalTo(true)); + } + + private File shardDirectory(String server, String index, int shard) { + InternalNode node = ((InternalNode) node(server)); + NodeEnvironment env = node.injector().getInstance(NodeEnvironment.class); + return env.shardLocation(new ShardId(index, shard)); + } + + +}