diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 59b3d871075..fcde3daa07e 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -20,27 +20,36 @@ package org.elasticsearch.indices.store; import org.apache.lucene.store.StoreRateLimiting; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.settings.NodeSettingsService; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; /** * @@ -50,6 +59,9 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type"; public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec"; + private static final String ACTION_SHARD_EXISTS = "index/shard/exists"; + private static final EnumSet ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED); + class ApplySettings implements NodeSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { @@ -96,6 +108,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe this.indicesService = indicesService; this.clusterService = clusterService; this.transportService = transportService; + transportService.registerHandler(ACTION_SHARD_EXISTS, new ShardActiveRequestHandler()); // we limit with 20MB / sec by default with a default type set to merge sice 0.90.1 this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name()); @@ -109,6 +122,15 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe clusterService.addLast(this); } + IndicesStore() { + super(ImmutableSettings.EMPTY); + nodeEnv = null; + nodeSettingsService = null; + indicesService = null; + this.clusterService = null; + this.transportService = null; + } + public StoreRateLimiting rateLimiting() { return this.rateLimiting; } @@ -131,75 +153,270 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { // Note, closed indices will not have any routing information, so won't be deleted for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { - ShardId shardId = indexShardRoutingTable.shardId(); - // a shard can be deleted if all its copies are active, and its not allocated on this node - boolean shardCanBeDeleted = true; - if (indexShardRoutingTable.size() == 0) { - // should not really happen, there should always be at least 1 (primary) shard in a - // shard replication group, in any case, protected from deleting something by mistake - shardCanBeDeleted = false; - } else { - for (ShardRouting shardRouting : indexShardRoutingTable) { - // be conservative here, check on started, not even active - if (!shardRouting.started()) { - shardCanBeDeleted = false; - break; - } - - // if the allocated or relocation node id doesn't exists in the cluster state or we're not connected to it - // it may be a stale node, make sure we don't do anything with this until the routing table has properly been - // rerouted to reflect the fact that the node does not exists - DiscoveryNode node = event.state().nodes().get(shardRouting.currentNodeId()); - if (node == null || !transportService.nodeConnected(node)) { - shardCanBeDeleted = false; - break; - } - if (shardRouting.relocatingNodeId() != null) { - node = event.state().nodes().get(shardRouting.relocatingNodeId()); - if (node == null || !transportService.nodeConnected(node)) { - shardCanBeDeleted = false; - break; - } - } - - // check if shard is active on the current node or is getting relocated to the our node - String localNodeId = clusterService.localNode().id(); - if (localNodeId.equals(shardRouting.currentNodeId()) || localNodeId.equals(shardRouting.relocatingNodeId())) { - shardCanBeDeleted = false; - break; - } - } - } - if (shardCanBeDeleted) { - IndexService indexService = indicesService.indexService(indexRoutingTable.index()); - if (indexService == null) { - // not physical allocation of the index, delete it from the file system if applicable - if (nodeEnv.hasNodeFile()) { - File[] shardLocations = nodeEnv.shardLocations(shardId); - if (FileSystemUtils.exists(shardLocations)) { - logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); - FileSystemUtils.deleteRecursively(shardLocations); - } - } - } else { - if (!indexService.hasShard(shardId.id())) { - if (indexService.store().canDeleteUnallocated(shardId)) { - logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); - try { - indexService.store().deleteUnallocated(indexShardRoutingTable.shardId()); - } catch (Exception e) { - logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id()); - } - } - } else { - // this state is weird, should we log? - // basically, it means that the shard is not allocated on this node using the routing - // but its still physically exists on an IndexService - // Note, this listener should run after IndicesClusterStateService... - } + if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) { + ShardId shardId = indexShardRoutingTable.shardId(); + IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService == null || !indexService.hasShard(shardId.getId())) { + deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable); } } } } } + + boolean shardCanBeDeleted(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) { + // a shard can be deleted if all its copies are active, and its not allocated on this node + if (indexShardRoutingTable.size() == 0) { + // should not really happen, there should always be at least 1 (primary) shard in a + // shard replication group, in any case, protected from deleting something by mistake + return false; + } + + for (ShardRouting shardRouting : indexShardRoutingTable) { + // be conservative here, check on started, not even active + if (!shardRouting.started()) { + return false; + } + + // if the allocated or relocation node id doesn't exists in the cluster state it may be a stale node, + // make sure we don't do anything with this until the routing table has properly been rerouted to reflect + // the fact that the node does not exists + DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId()); + if (node == null) { + return false; + } + // If all nodes have been upgraded to >= 1.3.0 at some point we get back here and have the chance to + // run this api. (when cluster state is then updated) + if (node.getVersion().before(Version.V_1_3_0)) { + logger.debug("Skip deleting deleting shard instance [{}], a node holding a shard instance is < 1.3.0", shardRouting); + return false; + } + if (shardRouting.relocatingNodeId() != null) { + node = state.nodes().get(shardRouting.relocatingNodeId()); + if (node == null) { + return false; + } + if (node.getVersion().before(Version.V_1_3_0)) { + logger.debug("Skip deleting deleting shard instance [{}], a node holding a shard instance is < 1.3.0", shardRouting); + return false; + } + } + + // check if shard is active on the current node or is getting relocated to the our node + String localNodeId = state.getNodes().localNode().id(); + if (localNodeId.equals(shardRouting.currentNodeId()) || localNodeId.equals(shardRouting.relocatingNodeId())) { + return false; + } + } + + return true; + } + + private void deleteShardIfExistElseWhere(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) { + List> requests = new ArrayList<>(indexShardRoutingTable.size()); + String indexUUID = state.getMetaData().index(indexShardRoutingTable.shardId().getIndex()).getUUID(); + ClusterName clusterName = state.getClusterName(); + for (ShardRouting shardRouting : indexShardRoutingTable) { + // Node can't be null, because otherwise shardCanBeDeleted() would have returned false + DiscoveryNode currentNode = state.nodes().get(shardRouting.currentNodeId()); + assert currentNode != null; + + requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId()))); + if (shardRouting.relocatingNodeId() != null) { + DiscoveryNode relocatingNode = state.nodes().get(shardRouting.relocatingNodeId()); + assert relocatingNode != null; + requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId()))); + } + } + + ShardActiveResponseHandler responseHandler = new ShardActiveResponseHandler(indexShardRoutingTable.shardId(), state, requests.size()); + for (Tuple request : requests) { + transportService.submitRequest(request.v1(), ACTION_SHARD_EXISTS, request.v2(), responseHandler); + } + } + + private class ShardActiveResponseHandler implements TransportResponseHandler { + + private final ShardId shardId; + private final int expectedActiveCopies; + private final ClusterState clusterState; + private final AtomicInteger awaitingResponses; + private final AtomicInteger activeCopies; + + public ShardActiveResponseHandler(ShardId shardId, ClusterState clusterState, int expectedActiveCopies) { + this.shardId = shardId; + this.expectedActiveCopies = expectedActiveCopies; + this.clusterState = clusterState; + this.awaitingResponses = new AtomicInteger(expectedActiveCopies); + this.activeCopies = new AtomicInteger(); + } + + @Override + public ShardActiveResponse newInstance() { + return new ShardActiveResponse(); + } + + @Override + public void handleResponse(ShardActiveResponse response) { + if (response.shardActive) { + logger.trace("[{}] exists on node [{}]", shardId, response.node); + activeCopies.incrementAndGet(); + } + + if (awaitingResponses.decrementAndGet() == 0) { + allNodesResponded(); + } + } + + @Override + public void handleException(TransportException exp) { + logger.debug("shards active request failed for {}", exp, shardId); + if (awaitingResponses.decrementAndGet() == 0) { + allNodesResponded(); + } + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + private void allNodesResponded() { + if (activeCopies.get() != expectedActiveCopies) { + logger.trace("not deleting shard [{}], expected {} active copies, but only {} found active copies", shardId, expectedActiveCopies, activeCopies.get()); + return; + } + + ClusterState latestClusterState = clusterService.state(); + if (clusterState.getVersion() != latestClusterState.getVersion()) { + logger.trace("not deleting shard [{}], the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion()); + return; + } + + IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService == null) { + // not physical allocation of the index, delete it from the file system if applicable + if (nodeEnv.hasNodeFile()) { + File[] shardLocations = nodeEnv.shardLocations(shardId); + if (FileSystemUtils.exists(shardLocations)) { + logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); + FileSystemUtils.deleteRecursively(shardLocations); + } + } + } else { + if (!indexService.hasShard(shardId.id())) { + if (indexService.store().canDeleteUnallocated(shardId)) { + logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); + try { + indexService.store().deleteUnallocated(shardId); + } catch (Exception e) { + logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, shardId.index().name(), shardId.id()); + } + } + } else { + // this state is weird, should we log? + // basically, it means that the shard is not allocated on this node using the routing + // but its still physically exists on an IndexService + // Note, this listener should run after IndicesClusterStateService... + } + } + } + + } + + private class ShardActiveRequestHandler extends BaseTransportRequestHandler { + + @Override + public ShardActiveRequest newInstance() { + return new ShardActiveRequest(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void messageReceived(ShardActiveRequest request, TransportChannel channel) throws Exception { + channel.sendResponse(new ShardActiveResponse(shardActive(request), clusterService.localNode())); + } + + private boolean shardActive(ShardActiveRequest request) { + ClusterName thisClusterName = clusterService.state().getClusterName(); + if (!thisClusterName.equals(request.clusterName)) { + logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName); + return false; + } + + ShardId shardId = request.shardId; + IndexService indexService = indicesService.indexService(shardId.index().getName()); + if (indexService != null && indexService.indexUUID().equals(request.indexUUID)) { + IndexShard indexShard = indexService.shard(shardId.getId()); + if (indexShard != null) { + return ACTIVE_STATES.contains(indexShard.state()); + } + } + return false; + } + } + + private static class ShardActiveRequest extends TransportRequest { + + private ClusterName clusterName; + private String indexUUID; + private ShardId shardId; + + ShardActiveRequest() { + } + + ShardActiveRequest(ClusterName clusterName, String indexUUID, ShardId shardId) { + this.shardId = shardId; + this.indexUUID = indexUUID; + this.clusterName = clusterName; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + clusterName = ClusterName.readClusterName(in); + indexUUID = in.readString(); + shardId = ShardId.readShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + clusterName.writeTo(out); + out.writeString(indexUUID); + shardId.writeTo(out); + } + } + + private static class ShardActiveResponse extends TransportResponse { + + private boolean shardActive; + private DiscoveryNode node; + + ShardActiveResponse() { + } + + ShardActiveResponse(boolean shardActive, DiscoveryNode node) { + this.shardActive = shardActive; + this.node = node; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardActive = in.readBoolean(); + node = DiscoveryNode.readNode(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(shardActive); + node.writeTo(out); + } + } } diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java new file mode 100644 index 00000000000..670d17c011e --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java @@ -0,0 +1,180 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.store; + +import com.google.common.base.Predicate; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.DiscoveryService; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.junit.Test; + +import java.io.File; +import java.util.Arrays; + +import static org.elasticsearch.client.Requests.createIndexRequest; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.Matchers.equalTo; + +/** + * + */ +@ClusterScope(scope= Scope.TEST, numDataNodes = 0) +public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { + private static final Settings SETTINGS = settingsBuilder().put("gateway.type", "local").build(); + + @Test + public void shardsCleanup() throws Exception { + final String node_1 = internalCluster().startNode(SETTINGS); + final String node_2 = internalCluster().startNode(SETTINGS); + logger.info("--> creating index [test] with one shard and on replica"); + client().admin().indices().create(createIndexRequest("test") + .settings(settingsBuilder().put("index.numberOfReplicas", 1).put("index.numberOfShards", 1))).actionGet(); + ensureGreen(); + + logger.info("--> making sure that shard and its replica are allocated on node_1 and node_2"); + assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true)); + assertThat(shardDirectory(node_2, "test", 0).exists(), equalTo(true)); + + logger.info("--> starting node server3"); + String node_3 = internalCluster().startNode(SETTINGS); + logger.info("--> running cluster_health"); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForNodes("3") + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + + logger.info("--> making sure that shard is not allocated on server3"); + assertThat(waitForShardDeletion(node_3, "test", 0), equalTo(false)); + + File server2Shard = shardDirectory(node_2, "test", 0); + logger.info("--> stopping node node_2"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2)); + + logger.info("--> running cluster_health"); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForGreenStatus() + .setWaitForNodes("2") + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); + + assertThat(server2Shard.exists(), equalTo(true)); + + logger.info("--> making sure that shard and its replica exist on server1, server2 and server3"); + assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true)); + assertThat(server2Shard.exists(), equalTo(true)); + assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true)); + + logger.info("--> starting node node_4"); + final String node_4 = internalCluster().startNode(SETTINGS); + + logger.info("--> running cluster_health"); + ensureGreen(); + + logger.info("--> making sure that shard and its replica are allocated on server1 and server3 but not on server2"); + assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true)); + assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true)); + assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false)); + } + + @Test + @TestLogging("indices.store:TRACE") + public void testShardActiveElseWhere() throws Exception { + String node_1 = internalCluster().startNode(SETTINGS); + String node_2 = internalCluster().startNode(SETTINGS); + final String node_1_id = internalCluster().getInstance(DiscoveryService.class, node_1).localNode().getId(); + final String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().getId(); + + final int numShards = scaledRandomIntBetween(2, 20); + ElasticsearchAssertions.assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)) + ); + ensureGreen("test"); + + ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get(); + RoutingNode routingNode = stateResponse.getState().routingNodes().node(node_2_id); + int[] node2Shards = new int[routingNode.numberOfOwningShards()]; + int i = 0; + for (MutableShardRouting mutableShardRouting : routingNode) { + node2Shards[i++] = mutableShardRouting.shardId().id(); + } + logger.info("Node 2 has shards: {}", Arrays.toString(node2Shards)); + waitNoPendingTasksOnAll(); + internalCluster().getInstance(ClusterService.class, node_2).submitStateUpdateTask("test", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder("test"); + for (int i = 0; i < numShards; i++) { + indexRoutingTableBuilder.addIndexShard( + new IndexShardRoutingTable.Builder(new ShardId("test", i), false) + .addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, 1)) + .build() + ); + } + return ClusterState.builder(currentState) + .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder).build()) + .build(); + } + + @Override + public void onFailure(String source, Throwable t) { + } + }); + waitNoPendingTasksOnAll(); + logger.info("Checking if shards aren't removed"); + for (int shard : node2Shards) { + assertTrue(waitForShardDeletion(node_2, "test", shard)); + } + } + + private File shardDirectory(String server, String index, int shard) { + NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); + return env.shardLocations(new ShardId(index, shard))[0]; + } + + private boolean waitForShardDeletion(final String server, final String index, final int shard) throws InterruptedException { + awaitBusy(new Predicate() { + public boolean apply(Object o) { + return !shardDirectory(server, index, shard).exists(); + } + }); + return shardDirectory(server, index, shard).exists(); + } + + +} diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java index c963710a16d..e0e6fc051d7 100644 --- a/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java +++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java @@ -19,97 +19,189 @@ package org.elasticsearch.indices.store; -import com.google.common.base.Predicate; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.ImmutableShardRouting; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Before; import org.junit.Test; -import java.io.File; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; -import static org.elasticsearch.client.Requests.clusterHealthRequest; -import static org.elasticsearch.client.Requests.createIndexRequest; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.*; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; /** - * */ -@ClusterScope(scope= Scope.TEST, numDataNodes =0) -public class IndicesStoreTests extends ElasticsearchIntegrationTest { - private static final Settings SETTINGS = settingsBuilder().put("gateway.type", "local").build(); +public class IndicesStoreTests extends ElasticsearchTestCase { + + private final static ShardRoutingState[] NOT_STARTED_STATES; + + static { + Set set = new HashSet<>(); + set.addAll(Arrays.asList(ShardRoutingState.values())); + set.remove(ShardRoutingState.STARTED); + NOT_STARTED_STATES = set.toArray(new ShardRoutingState[set.size()]); + } + + private IndicesStore indicesStore; + private DiscoveryNode localNode; + + @Before + public void before() { + localNode = new DiscoveryNode("abc", new LocalTransportAddress("abc"), Version.CURRENT); + indicesStore = new IndicesStore(); + } @Test - public void shardsCleanup() throws Exception { - final String node_1 = internalCluster().startNode(SETTINGS); - final String node_2 = internalCluster().startNode(SETTINGS); - logger.info("--> creating index [test] with one shard and on replica"); - client().admin().indices().create(createIndexRequest("test") - .settings(settingsBuilder().put("index.numberOfReplicas", 1).put("index.numberOfShards", 1))).actionGet(); + public void testShardCanBeDeleted_noShardRouting() throws Exception { + int numShards = randomIntBetween(1, 7); + int numReplicas = randomInt(2); - logger.info("--> running cluster_health"); - ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); + ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test")); + clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas))); + IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false); - - logger.info("--> making sure that shard and its replica are allocated on node_1 and node_2"); - assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true)); - assertThat(shardDirectory(node_2, "test", 0).exists(), equalTo(true)); - - logger.info("--> starting node server3"); - String node_3 = internalCluster().startNode(SETTINGS); - - logger.info("--> making sure that shard is not allocated on server3"); - assertThat(waitForShardDeletion(node_3, "test", 0), equalTo(false)); - - File server2Shard = shardDirectory(node_2, "test", 0); - logger.info("--> stopping node node_2"); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2)); - assertThat(server2Shard.exists(), equalTo(true)); - - logger.info("--> running cluster_health"); - clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); - - logger.info("--> making sure that shard and its replica exist on server1, server2 and server3"); - assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true)); - assertThat(server2Shard.exists(), equalTo(true)); - assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true)); - - logger.info("--> starting node node_4"); - final String node_4 = internalCluster().startNode(SETTINGS); - - logger.info("--> running cluster_health"); - clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); - - logger.info("--> making sure that shard and its replica are allocated on server1 and server3 but not on server2"); - assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true)); - assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true)); - assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false)); + assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build())); } - private File shardDirectory(String server, String index, int shard) { - NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); - return env.shardLocations(new ShardId(index, shard))[0]; - } + @Test + public void testShardCanBeDeleted_noShardStarted() throws Exception { + int numShards = randomIntBetween(1, 7); + int numReplicas = randomInt(2); - private boolean waitForShardDeletion(final String server, final String index, final int shard) throws InterruptedException { - awaitBusy(new Predicate() { - public boolean apply(Object o) { - return !shardDirectory(server, index, shard).exists(); + ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test")); + clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas))); + IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false); + + for (int i = 0; i < numShards; i++) { + int unStartedShard = randomInt(numReplicas); + for (int j=0; j <= numReplicas; j++) { + ShardRoutingState state; + if (j == unStartedShard) { + state = randomFrom(NOT_STARTED_STATES); + } else { + state = randomFrom(ShardRoutingState.values()); + } + routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", null, j == 0, state, 0)); } - }); - return shardDirectory(server, index, shard).exists(); + } + assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build())); } + @Test + public void testShardCanBeDeleted_shardExistsLocally() throws Exception { + int numShards = randomIntBetween(1, 7); + int numReplicas = randomInt(2); + + ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test")); + clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas))); + clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()).put(localNode).put(new DiscoveryNode("xyz", new LocalTransportAddress("xyz"), Version.CURRENT))); + IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false); + int localShardId = randomInt(numShards - 1); + for (int i = 0; i < numShards; i++) { + String nodeId = i == localShardId ? localNode.getId() : randomBoolean() ? "abc" : "xyz"; + String relocationNodeId = randomBoolean() ? null : randomBoolean() ? localNode.getId() : "xyz"; + routingTable.addShard(new ImmutableShardRouting("test", i, nodeId, relocationNodeId, true, ShardRoutingState.STARTED, 0)); + for (int j = 0; j < numReplicas; j++) { + routingTable.addShard(new ImmutableShardRouting("test", i, nodeId, relocationNodeId, false, ShardRoutingState.STARTED, 0)); + } + } + + // Shard exists locally, can't delete shard + assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build())); + } + + @Test + public void testShardCanBeDeleted_nodeNotInList() throws Exception { + int numShards = randomIntBetween(1, 7); + int numReplicas = randomInt(2); + + ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test")); + clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas))); + clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()).put(localNode)); + IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false); + for (int i = 0; i < numShards; i++) { + String relocatingNodeId = randomBoolean() ? null : "def"; + routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", relocatingNodeId, true, ShardRoutingState.STARTED, 0)); + for (int j = 0; j < numReplicas; j++) { + routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", relocatingNodeId, false, ShardRoutingState.STARTED, 0)); + } + } + + // null node -> false + assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build())); + } + + @Test + public void testShardCanBeDeleted_nodeVersion() throws Exception { + int numShards = randomIntBetween(1, 7); + int numReplicas = randomInt(2); + + // Most of the times don't test bwc and use current version + final Version nodeVersion = randomBoolean() ? Version.CURRENT : randomVersion(); + ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test")); + clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas))); + clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()).put(localNode).put(new DiscoveryNode("xyz", new LocalTransportAddress("xyz"), nodeVersion))); + IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false); + for (int i = 0; i < numShards; i++) { + routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", null, true, ShardRoutingState.STARTED, 0)); + for (int j = 0; j < numReplicas; j++) { + routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", null, false, ShardRoutingState.STARTED, 0)); + } + } + + final boolean canBeDeleted; + if (nodeVersion.before(Version.V_1_3_0)) { + canBeDeleted = false; + } else { + canBeDeleted = true; + } + + // shard exist on other node (abc) + assertThat(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()), is(canBeDeleted)); + } + + @Test + public void testShardCanBeDeleted_relocatingNode() throws Exception { + int numShards = randomIntBetween(1, 7); + int numReplicas = randomInt(2); + + ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test")); + clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas))); + final Version nodeVersion = randomBoolean() ? Version.CURRENT : randomVersion(); + + clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()) + .put(localNode) + .put(new DiscoveryNode("xyz", new LocalTransportAddress("xyz"), Version.CURRENT)) + .put(new DiscoveryNode("def", new LocalTransportAddress("def"), nodeVersion) // <-- only set relocating, since we're testing that in this test + )); + IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false); + for (int i = 0; i < numShards; i++) { + routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", "def", true, ShardRoutingState.STARTED, 0)); + for (int j = 0; j < numReplicas; j++) { + routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", "def", false, ShardRoutingState.STARTED, 0)); + } + } + + final boolean canBeDeleted; + if (nodeVersion.before(Version.V_1_3_0)) { + canBeDeleted = false; + } else { + canBeDeleted = true; + } + // shard exist on other node (abc and def) + assertThat(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()), is(canBeDeleted)); + } }