Store: Before removing shard physically from disk verify that another node in the cluster actually holds an active shard copy.

Closes #6692
This commit is contained in:
Martijn van Groningen 2014-07-02 20:48:28 +02:00
parent 684e698627
commit 9abb7c45b4
3 changed files with 634 additions and 145 deletions

View File

@ -20,27 +20,36 @@
package org.elasticsearch.indices.store; package org.elasticsearch.indices.store;
import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* *
@ -50,6 +59,9 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type"; public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type";
public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec"; public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec";
private static final String ACTION_SHARD_EXISTS = "index/shard/exists";
private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED);
class ApplySettings implements NodeSettingsService.Listener { class ApplySettings implements NodeSettingsService.Listener {
@Override @Override
public void onRefreshSettings(Settings settings) { public void onRefreshSettings(Settings settings) {
@ -96,6 +108,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.transportService = transportService; this.transportService = transportService;
transportService.registerHandler(ACTION_SHARD_EXISTS, new ShardActiveRequestHandler());
// we limit with 20MB / sec by default with a default type set to merge sice 0.90.1 // we limit with 20MB / sec by default with a default type set to merge sice 0.90.1
this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name()); this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name());
@ -109,6 +122,15 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
clusterService.addLast(this); clusterService.addLast(this);
} }
IndicesStore() {
super(ImmutableSettings.EMPTY);
nodeEnv = null;
nodeSettingsService = null;
indicesService = null;
this.clusterService = null;
this.transportService = null;
}
public StoreRateLimiting rateLimiting() { public StoreRateLimiting rateLimiting() {
return this.rateLimiting; return this.rateLimiting;
} }
@ -131,47 +153,147 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
// Note, closed indices will not have any routing information, so won't be deleted // Note, closed indices will not have any routing information, so won't be deleted
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) {
ShardId shardId = indexShardRoutingTable.shardId(); ShardId shardId = indexShardRoutingTable.shardId();
IndexService indexService = indicesService.indexService(shardId.getIndex());
if (indexService == null || !indexService.hasShard(shardId.getId())) {
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
}
}
}
}
}
boolean shardCanBeDeleted(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) {
// a shard can be deleted if all its copies are active, and its not allocated on this node // a shard can be deleted if all its copies are active, and its not allocated on this node
boolean shardCanBeDeleted = true;
if (indexShardRoutingTable.size() == 0) { if (indexShardRoutingTable.size() == 0) {
// should not really happen, there should always be at least 1 (primary) shard in a // should not really happen, there should always be at least 1 (primary) shard in a
// shard replication group, in any case, protected from deleting something by mistake // shard replication group, in any case, protected from deleting something by mistake
shardCanBeDeleted = false; return false;
} else { }
for (ShardRouting shardRouting : indexShardRoutingTable) { for (ShardRouting shardRouting : indexShardRoutingTable) {
// be conservative here, check on started, not even active // be conservative here, check on started, not even active
if (!shardRouting.started()) { if (!shardRouting.started()) {
shardCanBeDeleted = false; return false;
break;
} }
// if the allocated or relocation node id doesn't exists in the cluster state or we're not connected to it // if the allocated or relocation node id doesn't exists in the cluster state it may be a stale node,
// it may be a stale node, make sure we don't do anything with this until the routing table has properly been // make sure we don't do anything with this until the routing table has properly been rerouted to reflect
// rerouted to reflect the fact that the node does not exists // the fact that the node does not exists
DiscoveryNode node = event.state().nodes().get(shardRouting.currentNodeId()); DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId());
if (node == null || !transportService.nodeConnected(node)) { if (node == null) {
shardCanBeDeleted = false; return false;
break; }
// If all nodes have been upgraded to >= 1.3.0 at some point we get back here and have the chance to
// run this api. (when cluster state is then updated)
if (node.getVersion().before(Version.V_1_3_0)) {
logger.debug("Skip deleting deleting shard instance [{}], a node holding a shard instance is < 1.3.0", shardRouting);
return false;
} }
if (shardRouting.relocatingNodeId() != null) { if (shardRouting.relocatingNodeId() != null) {
node = event.state().nodes().get(shardRouting.relocatingNodeId()); node = state.nodes().get(shardRouting.relocatingNodeId());
if (node == null || !transportService.nodeConnected(node)) { if (node == null) {
shardCanBeDeleted = false; return false;
break; }
if (node.getVersion().before(Version.V_1_3_0)) {
logger.debug("Skip deleting deleting shard instance [{}], a node holding a shard instance is < 1.3.0", shardRouting);
return false;
} }
} }
// check if shard is active on the current node or is getting relocated to the our node // check if shard is active on the current node or is getting relocated to the our node
String localNodeId = clusterService.localNode().id(); String localNodeId = state.getNodes().localNode().id();
if (localNodeId.equals(shardRouting.currentNodeId()) || localNodeId.equals(shardRouting.relocatingNodeId())) { if (localNodeId.equals(shardRouting.currentNodeId()) || localNodeId.equals(shardRouting.relocatingNodeId())) {
shardCanBeDeleted = false; return false;
break;
} }
} }
return true;
} }
if (shardCanBeDeleted) {
IndexService indexService = indicesService.indexService(indexRoutingTable.index()); private void deleteShardIfExistElseWhere(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) {
List<Tuple<DiscoveryNode, ShardActiveRequest>> requests = new ArrayList<>(indexShardRoutingTable.size());
String indexUUID = state.getMetaData().index(indexShardRoutingTable.shardId().getIndex()).getUUID();
ClusterName clusterName = state.getClusterName();
for (ShardRouting shardRouting : indexShardRoutingTable) {
// Node can't be null, because otherwise shardCanBeDeleted() would have returned false
DiscoveryNode currentNode = state.nodes().get(shardRouting.currentNodeId());
assert currentNode != null;
requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId())));
if (shardRouting.relocatingNodeId() != null) {
DiscoveryNode relocatingNode = state.nodes().get(shardRouting.relocatingNodeId());
assert relocatingNode != null;
requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId())));
}
}
ShardActiveResponseHandler responseHandler = new ShardActiveResponseHandler(indexShardRoutingTable.shardId(), state, requests.size());
for (Tuple<DiscoveryNode, ShardActiveRequest> request : requests) {
transportService.submitRequest(request.v1(), ACTION_SHARD_EXISTS, request.v2(), responseHandler);
}
}
private class ShardActiveResponseHandler implements TransportResponseHandler<ShardActiveResponse> {
private final ShardId shardId;
private final int expectedActiveCopies;
private final ClusterState clusterState;
private final AtomicInteger awaitingResponses;
private final AtomicInteger activeCopies;
public ShardActiveResponseHandler(ShardId shardId, ClusterState clusterState, int expectedActiveCopies) {
this.shardId = shardId;
this.expectedActiveCopies = expectedActiveCopies;
this.clusterState = clusterState;
this.awaitingResponses = new AtomicInteger(expectedActiveCopies);
this.activeCopies = new AtomicInteger();
}
@Override
public ShardActiveResponse newInstance() {
return new ShardActiveResponse();
}
@Override
public void handleResponse(ShardActiveResponse response) {
if (response.shardActive) {
logger.trace("[{}] exists on node [{}]", shardId, response.node);
activeCopies.incrementAndGet();
}
if (awaitingResponses.decrementAndGet() == 0) {
allNodesResponded();
}
}
@Override
public void handleException(TransportException exp) {
logger.debug("shards active request failed for {}", exp, shardId);
if (awaitingResponses.decrementAndGet() == 0) {
allNodesResponded();
}
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
private void allNodesResponded() {
if (activeCopies.get() != expectedActiveCopies) {
logger.trace("not deleting shard [{}], expected {} active copies, but only {} found active copies", shardId, expectedActiveCopies, activeCopies.get());
return;
}
ClusterState latestClusterState = clusterService.state();
if (clusterState.getVersion() != latestClusterState.getVersion()) {
logger.trace("not deleting shard [{}], the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion());
return;
}
IndexService indexService = indicesService.indexService(shardId.getIndex());
if (indexService == null) { if (indexService == null) {
// not physical allocation of the index, delete it from the file system if applicable // not physical allocation of the index, delete it from the file system if applicable
if (nodeEnv.hasNodeFile()) { if (nodeEnv.hasNodeFile()) {
@ -186,9 +308,9 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
if (indexService.store().canDeleteUnallocated(shardId)) { if (indexService.store().canDeleteUnallocated(shardId)) {
logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id());
try { try {
indexService.store().deleteUnallocated(indexShardRoutingTable.shardId()); indexService.store().deleteUnallocated(shardId);
} catch (Exception e) { } catch (Exception e) {
logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id()); logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, shardId.index().name(), shardId.id());
} }
} }
} else { } else {
@ -199,7 +321,102 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
} }
} }
} }
} }
private class ShardActiveRequestHandler extends BaseTransportRequestHandler<ShardActiveRequest> {
@Override
public ShardActiveRequest newInstance() {
return new ShardActiveRequest();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(ShardActiveRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(new ShardActiveResponse(shardActive(request), clusterService.localNode()));
}
private boolean shardActive(ShardActiveRequest request) {
ClusterName thisClusterName = clusterService.state().getClusterName();
if (!thisClusterName.equals(request.clusterName)) {
logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName);
return false;
}
ShardId shardId = request.shardId;
IndexService indexService = indicesService.indexService(shardId.index().getName());
if (indexService != null && indexService.indexUUID().equals(request.indexUUID)) {
IndexShard indexShard = indexService.shard(shardId.getId());
if (indexShard != null) {
return ACTIVE_STATES.contains(indexShard.state());
}
}
return false;
}
}
private static class ShardActiveRequest extends TransportRequest {
private ClusterName clusterName;
private String indexUUID;
private ShardId shardId;
ShardActiveRequest() {
}
ShardActiveRequest(ClusterName clusterName, String indexUUID, ShardId shardId) {
this.shardId = shardId;
this.indexUUID = indexUUID;
this.clusterName = clusterName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
indexUUID = in.readString();
shardId = ShardId.readShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
clusterName.writeTo(out);
out.writeString(indexUUID);
shardId.writeTo(out);
}
}
private static class ShardActiveResponse extends TransportResponse {
private boolean shardActive;
private DiscoveryNode node;
ShardActiveResponse() {
}
ShardActiveResponse(boolean shardActive, DiscoveryNode node) {
this.shardActive = shardActive;
this.node = node;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardActive = in.readBoolean();
node = DiscoveryNode.readNode(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(shardActive);
node.writeTo(out);
} }
} }
} }

View File

@ -0,0 +1,180 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.store;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.io.File;
import java.util.Arrays;
import static org.elasticsearch.client.Requests.createIndexRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.equalTo;
/**
*
*/
@ClusterScope(scope= Scope.TEST, numDataNodes = 0)
public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
private static final Settings SETTINGS = settingsBuilder().put("gateway.type", "local").build();
@Test
public void shardsCleanup() throws Exception {
final String node_1 = internalCluster().startNode(SETTINGS);
final String node_2 = internalCluster().startNode(SETTINGS);
logger.info("--> creating index [test] with one shard and on replica");
client().admin().indices().create(createIndexRequest("test")
.settings(settingsBuilder().put("index.numberOfReplicas", 1).put("index.numberOfShards", 1))).actionGet();
ensureGreen();
logger.info("--> making sure that shard and its replica are allocated on node_1 and node_2");
assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true));
assertThat(shardDirectory(node_2, "test", 0).exists(), equalTo(true));
logger.info("--> starting node server3");
String node_3 = internalCluster().startNode(SETTINGS);
logger.info("--> running cluster_health");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForNodes("3")
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
logger.info("--> making sure that shard is not allocated on server3");
assertThat(waitForShardDeletion(node_3, "test", 0), equalTo(false));
File server2Shard = shardDirectory(node_2, "test", 0);
logger.info("--> stopping node node_2");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2));
logger.info("--> running cluster_health");
clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForGreenStatus()
.setWaitForNodes("2")
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
assertThat(server2Shard.exists(), equalTo(true));
logger.info("--> making sure that shard and its replica exist on server1, server2 and server3");
assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true));
assertThat(server2Shard.exists(), equalTo(true));
assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true));
logger.info("--> starting node node_4");
final String node_4 = internalCluster().startNode(SETTINGS);
logger.info("--> running cluster_health");
ensureGreen();
logger.info("--> making sure that shard and its replica are allocated on server1 and server3 but not on server2");
assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true));
assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true));
assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false));
}
@Test
@TestLogging("indices.store:TRACE")
public void testShardActiveElseWhere() throws Exception {
String node_1 = internalCluster().startNode(SETTINGS);
String node_2 = internalCluster().startNode(SETTINGS);
final String node_1_id = internalCluster().getInstance(DiscoveryService.class, node_1).localNode().getId();
final String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().getId();
final int numShards = scaledRandomIntBetween(2, 20);
ElasticsearchAssertions.assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards))
);
ensureGreen("test");
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
RoutingNode routingNode = stateResponse.getState().routingNodes().node(node_2_id);
int[] node2Shards = new int[routingNode.numberOfOwningShards()];
int i = 0;
for (MutableShardRouting mutableShardRouting : routingNode) {
node2Shards[i++] = mutableShardRouting.shardId().id();
}
logger.info("Node 2 has shards: {}", Arrays.toString(node2Shards));
waitNoPendingTasksOnAll();
internalCluster().getInstance(ClusterService.class, node_2).submitStateUpdateTask("test", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder("test");
for (int i = 0; i < numShards; i++) {
indexRoutingTableBuilder.addIndexShard(
new IndexShardRoutingTable.Builder(new ShardId("test", i), false)
.addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, 1))
.build()
);
}
return ClusterState.builder(currentState)
.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder).build())
.build();
}
@Override
public void onFailure(String source, Throwable t) {
}
});
waitNoPendingTasksOnAll();
logger.info("Checking if shards aren't removed");
for (int shard : node2Shards) {
assertTrue(waitForShardDeletion(node_2, "test", shard));
}
}
private File shardDirectory(String server, String index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
return env.shardLocations(new ShardId(index, shard))[0];
}
private boolean waitForShardDeletion(final String server, final String index, final int shard) throws InterruptedException {
awaitBusy(new Predicate<Object>() {
public boolean apply(Object o) {
return !shardDirectory(server, index, shard).exists();
}
});
return shardDirectory(server, index, shard).exists();
}
}

View File

@ -19,97 +19,189 @@
package org.elasticsearch.indices.store; package org.elasticsearch.indices.store;
import com.google.common.base.Predicate; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.junit.Before;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.client.Requests.clusterHealthRequest; import static org.hamcrest.Matchers.is;
import static org.elasticsearch.client.Requests.createIndexRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.hamcrest.Matchers.equalTo;
/** /**
*
*/ */
@ClusterScope(scope= Scope.TEST, numDataNodes =0) public class IndicesStoreTests extends ElasticsearchTestCase {
public class IndicesStoreTests extends ElasticsearchIntegrationTest {
private static final Settings SETTINGS = settingsBuilder().put("gateway.type", "local").build(); private final static ShardRoutingState[] NOT_STARTED_STATES;
static {
Set<ShardRoutingState> set = new HashSet<>();
set.addAll(Arrays.asList(ShardRoutingState.values()));
set.remove(ShardRoutingState.STARTED);
NOT_STARTED_STATES = set.toArray(new ShardRoutingState[set.size()]);
}
private IndicesStore indicesStore;
private DiscoveryNode localNode;
@Before
public void before() {
localNode = new DiscoveryNode("abc", new LocalTransportAddress("abc"), Version.CURRENT);
indicesStore = new IndicesStore();
}
@Test @Test
public void shardsCleanup() throws Exception { public void testShardCanBeDeleted_noShardRouting() throws Exception {
final String node_1 = internalCluster().startNode(SETTINGS); int numShards = randomIntBetween(1, 7);
final String node_2 = internalCluster().startNode(SETTINGS); int numReplicas = randomInt(2);
logger.info("--> creating index [test] with one shard and on replica");
client().admin().indices().create(createIndexRequest("test")
.settings(settingsBuilder().put("index.numberOfReplicas", 1).put("index.numberOfShards", 1))).actionGet();
logger.info("--> running cluster_health"); ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas)));
assertThat(clusterHealth.isTimedOut(), equalTo(false)); IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
logger.info("--> making sure that shard and its replica are allocated on node_1 and node_2");
assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true));
assertThat(shardDirectory(node_2, "test", 0).exists(), equalTo(true));
logger.info("--> starting node server3");
String node_3 = internalCluster().startNode(SETTINGS);
logger.info("--> making sure that shard is not allocated on server3");
assertThat(waitForShardDeletion(node_3, "test", 0), equalTo(false));
File server2Shard = shardDirectory(node_2, "test", 0);
logger.info("--> stopping node node_2");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2));
assertThat(server2Shard.exists(), equalTo(true));
logger.info("--> running cluster_health");
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
logger.info("--> making sure that shard and its replica exist on server1, server2 and server3");
assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true));
assertThat(server2Shard.exists(), equalTo(true));
assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true));
logger.info("--> starting node node_4");
final String node_4 = internalCluster().startNode(SETTINGS);
logger.info("--> running cluster_health");
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
logger.info("--> making sure that shard and its replica are allocated on server1 and server3 but not on server2");
assertThat(shardDirectory(node_1, "test", 0).exists(), equalTo(true));
assertThat(shardDirectory(node_3, "test", 0).exists(), equalTo(true));
assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false));
} }
private File shardDirectory(String server, String index, int shard) { @Test
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); public void testShardCanBeDeleted_noShardStarted() throws Exception {
return env.shardLocations(new ShardId(index, shard))[0]; int numShards = randomIntBetween(1, 7);
int numReplicas = randomInt(2);
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas)));
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
for (int i = 0; i < numShards; i++) {
int unStartedShard = randomInt(numReplicas);
for (int j=0; j <= numReplicas; j++) {
ShardRoutingState state;
if (j == unStartedShard) {
state = randomFrom(NOT_STARTED_STATES);
} else {
state = randomFrom(ShardRoutingState.values());
}
routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", null, j == 0, state, 0));
}
}
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
} }
private boolean waitForShardDeletion(final String server, final String index, final int shard) throws InterruptedException { @Test
awaitBusy(new Predicate<Object>() { public void testShardCanBeDeleted_shardExistsLocally() throws Exception {
public boolean apply(Object o) { int numShards = randomIntBetween(1, 7);
return !shardDirectory(server, index, shard).exists(); int numReplicas = randomInt(2);
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas)));
clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()).put(localNode).put(new DiscoveryNode("xyz", new LocalTransportAddress("xyz"), Version.CURRENT)));
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
int localShardId = randomInt(numShards - 1);
for (int i = 0; i < numShards; i++) {
String nodeId = i == localShardId ? localNode.getId() : randomBoolean() ? "abc" : "xyz";
String relocationNodeId = randomBoolean() ? null : randomBoolean() ? localNode.getId() : "xyz";
routingTable.addShard(new ImmutableShardRouting("test", i, nodeId, relocationNodeId, true, ShardRoutingState.STARTED, 0));
for (int j = 0; j < numReplicas; j++) {
routingTable.addShard(new ImmutableShardRouting("test", i, nodeId, relocationNodeId, false, ShardRoutingState.STARTED, 0));
} }
});
return shardDirectory(server, index, shard).exists();
} }
// Shard exists locally, can't delete shard
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
}
@Test
public void testShardCanBeDeleted_nodeNotInList() throws Exception {
int numShards = randomIntBetween(1, 7);
int numReplicas = randomInt(2);
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas)));
clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()).put(localNode));
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
for (int i = 0; i < numShards; i++) {
String relocatingNodeId = randomBoolean() ? null : "def";
routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", relocatingNodeId, true, ShardRoutingState.STARTED, 0));
for (int j = 0; j < numReplicas; j++) {
routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", relocatingNodeId, false, ShardRoutingState.STARTED, 0));
}
}
// null node -> false
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
}
@Test
public void testShardCanBeDeleted_nodeVersion() throws Exception {
int numShards = randomIntBetween(1, 7);
int numReplicas = randomInt(2);
// Most of the times don't test bwc and use current version
final Version nodeVersion = randomBoolean() ? Version.CURRENT : randomVersion();
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas)));
clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()).put(localNode).put(new DiscoveryNode("xyz", new LocalTransportAddress("xyz"), nodeVersion)));
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
for (int i = 0; i < numShards; i++) {
routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", null, true, ShardRoutingState.STARTED, 0));
for (int j = 0; j < numReplicas; j++) {
routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", null, false, ShardRoutingState.STARTED, 0));
}
}
final boolean canBeDeleted;
if (nodeVersion.before(Version.V_1_3_0)) {
canBeDeleted = false;
} else {
canBeDeleted = true;
}
// shard exist on other node (abc)
assertThat(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()), is(canBeDeleted));
}
@Test
public void testShardCanBeDeleted_relocatingNode() throws Exception {
int numShards = randomIntBetween(1, 7);
int numReplicas = randomInt(2);
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").numberOfShards(numShards).numberOfReplicas(numReplicas)));
final Version nodeVersion = randomBoolean() ? Version.CURRENT : randomVersion();
clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id())
.put(localNode)
.put(new DiscoveryNode("xyz", new LocalTransportAddress("xyz"), Version.CURRENT))
.put(new DiscoveryNode("def", new LocalTransportAddress("def"), nodeVersion) // <-- only set relocating, since we're testing that in this test
));
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
for (int i = 0; i < numShards; i++) {
routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", "def", true, ShardRoutingState.STARTED, 0));
for (int j = 0; j < numReplicas; j++) {
routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", "def", false, ShardRoutingState.STARTED, 0));
}
}
final boolean canBeDeleted;
if (nodeVersion.before(Version.V_1_3_0)) {
canBeDeleted = false;
} else {
canBeDeleted = true;
}
// shard exist on other node (abc and def)
assertThat(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()), is(canBeDeleted));
}
} }