Core: cancel entire recovery if shard closes on target node during the recovery operations.

Closes #6645
This commit is contained in:
Martijn van Groningen 2014-06-30 11:27:12 +02:00
parent fd1d02fd07
commit 5668b1cfc5
2 changed files with 119 additions and 52 deletions

View File

@ -378,14 +378,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());
onGoingRecovery.indexShard.performRecoveryPrepareForTranslog();
onGoingRecovery.stage(RecoveryState.Stage.TRANSLOG);
@ -409,14 +402,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());
onGoingRecovery.stage(RecoveryState.Stage.FINALIZE);
onGoingRecovery.indexShard.performRecoveryFinalization(false, onGoingRecovery);
@ -442,21 +428,11 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (Translog.Operation operation : request.operations()) {
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());
shard.performRecoveryOperation(operation);
onGoingRecovery.recoveryState.getTranslog().incrementTranslogOperations();
}
@ -479,14 +455,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());
onGoingRecovery.recoveryState().getIndex().addFileDetails(request.phase1FileNames, request.phase1FileSizes);
onGoingRecovery.recoveryState().getIndex().addReusedFileDetails(request.phase1ExistingFileNames, request.phase1ExistingFileSizes);
@ -513,14 +482,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());
final Store store = onGoingRecovery.indexShard.store();
store.incRef();
@ -586,14 +548,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());
Store store = onGoingRecovery.indexShard.store();
store.incRef();
@ -671,4 +626,20 @@ public class RecoveryTarget extends AbstractComponent {
}
}
}
private void validateRecoveryStatus(RecoveryStatus onGoingRecovery, ShardId shardId) {
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shardId);
}
if (onGoingRecovery.indexShard.state() == IndexShardState.CLOSED) {
cancelRecovery(onGoingRecovery.indexShard);
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shardId);
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shardId);
}
}
}

View File

@ -21,22 +21,31 @@ package org.elasticsearch.recovery;
import com.carrotsearch.hppc.IntOpenHashSet;
import com.carrotsearch.hppc.procedures.IntProcedure;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.BackgroundIndexer;
@ -52,8 +61,10 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
*/
@ -321,4 +332,89 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
}
}
@Test
public void testMoveShardsWhileRelocation() throws Exception {
final String indexName = "test";
ListenableFuture<String> blueFuture = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "blue").build());
internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "green").build());
ListenableFuture<String> redFuture = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "red").build());
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
assertThat(response.isTimedOut(), is(false));
String blueNodeName = blueFuture.get();
final String redNodeName = redFuture.get();
client().admin().indices().prepareCreate(indexName)
.setSettings(
ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numDocs; i++) {
requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
}
indexRandom(true, requests);
ensureSearchable(indexName);
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
String blueNodeId = internalCluster().getInstance(DiscoveryService.class, blueNodeName).localNode().id();
assertFalse(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());
SearchResponse searchResponse = client().prepareSearch(indexName).get();
assertHitCount(searchResponse, numDocs);
// Slow down recovery in order to make recovery cancellations more likely
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(indexName).get();
long chunkSize = statsResponse.getIndex(indexName).getShards()[0].getStats().getStore().size().bytes() / 10;
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder()
// one chunk per sec..
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, chunkSize)
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, chunkSize)
)
.get().isAcknowledged());
client().admin().indices().prepareUpdateSettings(indexName).setSettings(
ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")
).get();
// Lets wait a bit and then move again to hopefully trigger recovery cancellations.
boolean applied = awaitBusy(
new Predicate<Object>() {
@Override
public boolean apply(Object input) {
RecoveryResponse recoveryResponse = internalCluster().client(redNodeName).admin().indices().prepareRecoveries(indexName)
.get();
return !recoveryResponse.shardResponses().get(indexName).isEmpty();
}
}
);
assertTrue(applied);
client().admin().indices().prepareUpdateSettings(indexName).setSettings(
ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "green")
).get();
// Restore the recovery speed to not timeout cluster health call
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb")
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, "512kb")
)
.get().isAcknowledged());
// this also waits for all ongoing recoveries to complete:
ensureSearchable(indexName);
searchResponse = client().prepareSearch(indexName).get();
assertHitCount(searchResponse, numDocs);
stateResponse = client().admin().cluster().prepareState().get();
assertTrue(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());
}
}