[store] make sure shard active request works even if shard is still in post recovery

When deleting a shard th node that deletes th shard first checks if all shard copies are
started on other nodes. A message is sent to each node tand each node checks locally for
STARTED or RELOCATED.
However, it might happen that the shard is still in state POST_RECOVERY, like this:

shard is relocating from node1 to node2
1. relocated shard on node2 goes in POST_RECOVERY and node2 sends shard started to master
2. master updates routing table and sends new cluster state to node1 and node2
3. node1 processes the cluster state and asks node2 if it has the active shard
  before node2 processes the new cluster state (which would cause it to set the shard to started)
4. node2 sends back it does not have the shard started and so node1 does not delete it

This can be avoided by waiting until cluster state that sets the shard to started is actually processed.

closes #10018
This commit is contained in:
Britta Weber 2015-03-19 14:55:05 -07:00
parent 8d204846df
commit 17dffe222b
3 changed files with 100 additions and 23 deletions

View File

@ -35,6 +35,8 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
@ -50,6 +52,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -59,6 +62,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type";
public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec";
public static final String INDICES_STORE_DELETE_SHARD_TIMEOUT = "indices.store.delete.shard.timeout";
public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists";
@ -100,6 +104,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final ApplySettings applySettings = new ApplySettings();
private TimeValue deleteShardTimeout;
@Inject
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService,
ClusterService clusterService, TransportService transportService) {
@ -117,6 +123,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
this.rateLimitingThrottle = settings.getAsBytesSize("indices.store.throttle.max_bytes_per_sec", new ByteSizeValue(10240, ByteSizeUnit.MB));
rateLimiting.setMaxRate(rateLimitingThrottle);
this.deleteShardTimeout = settings.getAsTime(INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS));
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
nodeSettingsService.addListener(applySettings);
@ -213,11 +221,11 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
DiscoveryNode currentNode = state.nodes().get(shardRouting.currentNodeId());
assert currentNode != null;
requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId())));
requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId(), deleteShardTimeout)));
if (shardRouting.relocatingNodeId() != null) {
DiscoveryNode relocatingNode = state.nodes().get(shardRouting.relocatingNodeId());
assert relocatingNode != null;
requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId())));
requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId(), deleteShardTimeout)));
}
}
@ -333,31 +341,89 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
@Override
public void messageReceived(ShardActiveRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(new ShardActiveResponse(shardActive(request), clusterService.localNode()));
public void messageReceived(final ShardActiveRequest request, final TransportChannel channel) throws Exception {
IndexShard indexShard = getShard(request);
// make sure shard is really there before register cluster state observer
if (indexShard == null) {
channel.sendResponse(new ShardActiveResponse(false, clusterService.localNode()));
}
// create observer here. we need to register it here because we need to capture the current cluster state
// which will then be compared to the one that is applied when we call waitForNextChange(). if we create it
// later we might miss an update and wait forever in case no new cluster state comes in.
// in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly.
// instead we wait for the cluster state changes because we know any shard state change will trigger or be
// triggered by a cluster state change.
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger);
// check if shard is active. if so, all is good
boolean shardActive = shardActive(indexShard);
if (shardActive) {
channel.sendResponse(new ShardActiveResponse(true, clusterService.localNode()));
} else {
// shard is not active, might be POST_RECOVERY so check if cluster state changed inbetween or wait for next change
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
sendResult(shardActive(getShard(request)));
}
@Override
public void onClusterServiceClose() {
sendResult(false);
}
@Override
public void onTimeout(TimeValue timeout) {
sendResult(shardActive(getShard(request)));
}
public void sendResult(boolean shardActive) {
try {
channel.sendResponse(new ShardActiveResponse(shardActive, clusterService.localNode()));
} catch (IOException e) {
logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId);
} catch (EsRejectedExecutionException e) {
logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId);
}
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
// the shard is not there in which case we want to send back a false (shard is not active), so the cluster state listener must be notified
// or the shard is active in which case we want to send back that the shard is active
// here we could also evaluate the cluster state and get the information from there. we
// don't do it because we would have to write another method for this that would have the same effect
IndexShard indexShard = getShard(request);
return indexShard == null || shardActive(indexShard);
}
});
}
}
private boolean shardActive(ShardActiveRequest request) {
private boolean shardActive(IndexShard indexShard) {
if (indexShard != null) {
return ACTIVE_STATES.contains(indexShard.state());
}
return false;
}
private IndexShard getShard(ShardActiveRequest request) {
ClusterName thisClusterName = clusterService.state().getClusterName();
if (!thisClusterName.equals(request.clusterName)) {
logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName);
return false;
return null;
}
ShardId shardId = request.shardId;
IndexService indexService = indicesService.indexService(shardId.index().getName());
if (indexService != null && indexService.indexUUID().equals(request.indexUUID)) {
IndexShard indexShard = indexService.shard(shardId.getId());
if (indexShard != null) {
return ACTIVE_STATES.contains(indexShard.state());
}
return indexService.shard(shardId.id());
}
return false;
return null;
}
}
private static class ShardActiveRequest extends TransportRequest {
protected TimeValue timeout = null;
private ClusterName clusterName;
private String indexUUID;
private ShardId shardId;
@ -365,10 +431,11 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
ShardActiveRequest() {
}
ShardActiveRequest(ClusterName clusterName, String indexUUID, ShardId shardId) {
ShardActiveRequest(ClusterName clusterName, String indexUUID, ShardId shardId, TimeValue timeout) {
this.shardId = shardId;
this.indexUUID = indexUUID;
this.clusterName = clusterName;
this.timeout = timeout;
}
@Override
@ -377,6 +444,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
clusterName = ClusterName.readClusterName(in);
indexUUID = in.readString();
shardId = ShardId.readShardId(in);
timeout = new TimeValue(in.readLong(), TimeUnit.MILLISECONDS);
}
@Override
@ -385,6 +453,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
clusterName.writeTo(out);
out.writeString(indexUUID);
shardId.writeTo(out);
out.writeLong(timeout.millis());
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.junit.Test;
import java.nio.file.Files;
@ -55,11 +56,10 @@ import static org.hamcrest.Matchers.equalTo;
/**
*
*/
@ClusterScope(scope= Scope.TEST, numDataNodes = 0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
@Test
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/10018")
public void indexCleanup() throws Exception {
final String masterNode = internalCluster().startNode(ImmutableSettings.builder().put("node.data", false));
final String node_1 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false));
@ -95,6 +95,12 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false));
logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish");
SlowClusterStateProcessing disruption = null;
if (randomBoolean()) {
disruption = new SlowClusterStateProcessing(node_3, getRandom(), 0, 0, 1000, 2000);
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();
}
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get();
clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForNodes("4")
@ -203,7 +209,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
logger.info("Node 2 has shards: {}", Arrays.toString(node2Shards));
final long shardVersions[] = new long[numShards];
final int shardIds[] = new int[numShards];
i=0;
i = 0;
for (ShardRouting shardRouting : stateResponse.getState().getRoutingTable().allShards("test")) {
shardVersions[i] = shardRouting.version();
shardIds[i] = shardRouting.getId();
@ -214,11 +220,11 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
public ClusterState execute(ClusterState currentState) throws Exception {
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder("test");
for (int i = 0; i < numShards; i++) {
indexRoutingTableBuilder.addIndexShard(
new IndexShardRoutingTable.Builder(new ShardId("test", i), false)
.addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, shardVersions[shardIds[i]]))
.build()
);
indexRoutingTableBuilder.addIndexShard(
new IndexShardRoutingTable.Builder(new ShardId("test", i), false)
.addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, shardVersions[shardIds[i]]))
.build()
);
}
return ClusterState.builder(currentState)
.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder).build())
@ -250,7 +256,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
return env.shardPaths(new ShardId(index, shard))[0];
}
private boolean waitForShardDeletion(final String server, final String index, final int shard) throws InterruptedException {
private boolean waitForShardDeletion(final String server, final String index, final int shard) throws InterruptedException {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
@ -260,7 +266,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
return Files.exists(shardDirectory(server, index, shard));
}
private boolean waitForIndexDeletion(final String server, final String index) throws InterruptedException {
private boolean waitForIndexDeletion(final String server, final String index) throws InterruptedException {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {

View File

@ -1608,6 +1608,8 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "1b")
.put("script.indexed", "on")
.put("script.inline", "on")
// wait short time for other active shards before actually deleting, default 30s not needed in tests
.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(1, TimeUnit.SECONDS))
.build();
}