Recovery: only cancel when primary completed relocation

When a primary moves to another node, we cancel ongoing recoveries and retry from the primary's new home. At the moment this happens when the primary relocation *starts*. It's a shame as we cancel recoveries that may be close to completion and will finish before the primary has been fully relocated. This commit only triggers the cancelation once the primary relocation is completed.

Next to this, it fixes a race condition between recovery cancellation and the recovery completion. At the moment we may trigger remove a recovered shard just after it was completed. Instead, we should use the recovery cancellation logic to make sure only one code path is followed.

All of the above caused the recoverWhileUnderLoadWithNodeShutdown test to fail (see http://build-us-00.elastic.co/job/es_core_15_debian/32/ ). The test creates an index and then increasingly disallows nodes for it, until only 1 node is left in the allocation filtering rules. Normally, this means we stay in green, but the premature recovery cancellation plus the race condition mentioned above caused a shard to be failed and stay unassigned and the test asserts to fail. This happens due to the following sequence:

- The shard has finished recovering and sent the master a shard started command.
- The recovery is cancelled locally, removing the index shard.
- Master starts shard (deleting it's other copy).
- Local node gets a cluster state with the shard started in it, which cause it to send a shard failed (to make the master aware).
- Shard is failed and can't be re-assigned due to the allocation filter.

The recoverWhileUnderLoadWithNodeShutdown is also adapted a bit to fit the current behavior of allocation filtering (in the past it used to really shut down nodes). Last, all tests in that class are given better names to fit the current terminology.

Clsoes #10218
This commit is contained in:
Boaz Leskes 2015-03-19 14:25:51 +01:00
parent cae2707375
commit f5f9739117
6 changed files with 156 additions and 88 deletions

View File

@ -128,7 +128,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
shardsIts = shards(clusterState, request, concreteIndices);
expectedOps = shardsIts.size();
shardsResponses = new AtomicReferenceArray<Object>(expectedOps);
shardsResponses = new AtomicReferenceArray<>(expectedOps);
}
public void start() {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.indices.cluster;
import com.carrotsearch.hppc.IntOpenHashSet;
import com.carrotsearch.hppc.ObjectContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
@ -64,6 +65,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
@ -567,17 +569,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
shardHasBeenRemoved = true;
} else if (isPeerRecovery(shardRouting)) {
final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
RecoveryState recoveryState = recoveryTarget.recoveryState(indexShard);
if (recoveryState != null && recoveryState.getStage() != RecoveryState.Stage.DONE) {
// we have an ongoing recovery, find the source based on current routing and compare them
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
if (!recoveryState.getSourceNode().equals(sourceNode)) {
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
shardHasBeenRemoved = true;
final Predicate<RecoveryStatus> shouldCancel = new Predicate<RecoveryStatus>() {
@Override
public boolean apply(@Nullable RecoveryStatus status) {
return status.sourceNode().equals(sourceNode) == false;
}
};
if (recoveryTarget.cancelRecoveriesForShard(indexShard.shardId(), "recovery source node changed", shouldCancel)) {
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
shardHasBeenRemoved = true;
}
}
if (shardHasBeenRemoved == false && (shardRouting.equals(indexShard.routingEntry()) == false || shardRouting.version() > indexShard.routingEntry().version())) {
@ -777,7 +782,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!shardRouting.primary()) {
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
for (ShardRouting entry : shardRoutingTable) {
if (entry.primary() && entry.started()) {
if (entry.primary() && entry.active()) {
// only recover from started primary, if we can't find one, we will do it next round
sourceNode = nodes.get(entry.currentNodeId());
if (sourceNode == null) {

View File

@ -19,9 +19,10 @@
package org.elasticsearch.indices.recovery;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -95,13 +96,16 @@ public class RecoveriesCollection {
}
/** cancel the recovery with the given id (if found) and remove it from the recovery collection */
public void cancelRecovery(long id, String reason) {
public boolean cancelRecovery(long id, String reason) {
RecoveryStatus removed = onGoingRecoveries.remove(id);
boolean cancelled = false;
if (removed != null) {
logger.trace("{} canceled recovery from {}, id [{}] (reason [{}])",
removed.shardId(), removed.sourceNode(), removed.recoveryId(), reason);
removed.cancel(reason);
cancelled = true;
}
return cancelled;
}
/**
@ -128,42 +132,48 @@ public class RecoveriesCollection {
}
}
/**
* Try to find an ongoing recovery for a given shard. returns null if not found.
*/
@Nullable
public StatusRef findRecoveryByShard(IndexShard indexShard) {
for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) {
// check if the recovery has already finished and if not protect
// against it being closed on us while we check
if (recoveryStatus.tryIncRef()) {
try {
if (recoveryStatus.indexShard() == indexShard) {
recoveryStatus.incRef();
return new StatusRef(recoveryStatus);
}
} finally {
recoveryStatus.decRef();
}
}
}
return null;
}
/** the number of ongoing recoveries */
public int size() {
return onGoingRecoveries.size();
}
/** cancel all ongoing recoveries for the given shard. typically because the shards is closed */
public void cancelRecoveriesForShard(ShardId shardId, String reason) {
public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
return cancelRecoveriesForShard(shardId, reason, Predicates.<RecoveryStatus>alwaysTrue());
}
/**
* cancel all ongoing recoveries for the given shard, if their status match a predicate
*
* @param reason reason for cancellation
* @param shardId shardId for which to cancel recoveries
* @param shouldCancel a predicate to check if a recovery should be cancelled or not.
* Note that the recovery state can change after this check, but before it is being cancelled via other
* already issued outstanding references.
* @return true if a recovery was cancelled
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason, Predicate<RecoveryStatus> shouldCancel) {
boolean cancelled = false;
for (RecoveryStatus status : onGoingRecoveries.values()) {
if (status.shardId().equals(shardId)) {
cancelRecovery(status.recoveryId(), reason);
boolean cancel = false;
// if we can't increment the status, the recovery is not there any more.
if (status.tryIncRef()) {
try {
cancel = shouldCancel.apply(status);
} finally {
status.decRef();
}
}
if (cancel && cancelRecovery(status.recoveryId(), reason)) {
cancelled = true;
}
}
}
return cancelled;
}
/**
* a reference to {@link RecoveryStatus}, which implements {@link AutoCloseable}. closing the reference
* causes {@link RecoveryStatus#decRef()} to be called. This makes sure that the underlying resources

View File

@ -19,13 +19,13 @@
package org.elasticsearch.indices.recovery;
import com.google.common.base.Predicate;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -114,17 +114,18 @@ public class RecoveryTarget extends AbstractComponent {
});
}
public RecoveryState recoveryState(IndexShard indexShard) {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.findRecoveryByShard(indexShard)) {
if (statusRef == null) {
return null;
}
final RecoveryStatus recoveryStatus = statusRef.status();
return recoveryStatus.state();
} catch (Exception e) {
// shouldn't really happen, but have to be here due to auto close
throw new ElasticsearchException("error while getting recovery state", e);
}
/**
* cancel all ongoing recoveries for the given shard, if their status match a predicate
*
* @param reason reason for cancellation
* @param shardId shardId for which to cancel recoveries
* @param shouldCancel a predicate to check if a recovery should be cancelled or not. Null means cancel without an extra check.
* note that the recovery state can change after this check, but before it is being cancelled via other
* already issued outstanding references.
* @return true if a recovery was cancelled
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason, @Nullable Predicate<RecoveryStatus> shouldCancel) {
return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason, shouldCancel);
}
public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final RecoveryListener listener) {

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.recovery;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -25,19 +27,19 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveriesCollection;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.*;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Test;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest {
@ -56,11 +58,7 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest {
@Test
public void testLastAccessTimeUpdate() throws Exception {
createIndex("test",
ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
ensureGreen();
createIndex();
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class));
final long recoveryId = startRecovery(collection);
try (RecoveriesCollection.StatusRef status = collection.getStatus(recoveryId)) {
@ -80,11 +78,7 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest {
@Test
public void testRecoveryTimeout() throws InterruptedException {
createIndex("test",
ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
ensureGreen();
createIndex();
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class));
final AtomicBoolean failed = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
@ -109,6 +103,70 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest {
}
@Test
public void testRecoveryCancellationNoPredicate() throws Exception {
createIndex();
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class));
final long recoveryId = startRecovery(collection);
final long recoveryId2 = startRecovery(collection);
try (RecoveriesCollection.StatusRef statusRef = collection.getStatus(recoveryId)) {
ShardId shardId = statusRef.status().shardId();
assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test"));
assertThat("all recoveries should be cancelled", collection.size(), equalTo(0));
} finally {
collection.cancelRecovery(recoveryId, "meh");
collection.cancelRecovery(recoveryId2, "meh");
}
}
@Test
public void testRecoveryCancellationPredicate() throws Exception {
createIndex();
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class));
final long recoveryId = startRecovery(collection);
final long recoveryId2 = startRecovery(collection);
final ArrayList<AutoCloseable> toClose = new ArrayList<>();
try {
RecoveriesCollection.StatusRef statusRef = collection.getStatus(recoveryId);
toClose.add(statusRef);
ShardId shardId = statusRef.status().shardId();
assertFalse("should not have cancelled recoveries", collection.cancelRecoveriesForShard(shardId, "test", Predicates.<RecoveryStatus>alwaysFalse()));
final Predicate<RecoveryStatus> shouldCancel = new Predicate<RecoveryStatus>() {
@Override
public boolean apply(RecoveryStatus status) {
return status.recoveryId() == recoveryId;
}
};
assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test", shouldCancel));
assertThat("we should still have on recovery", collection.size(), equalTo(1));
statusRef = collection.getStatus(recoveryId);
toClose.add(statusRef);
assertNull("recovery should have been deleted", statusRef);
statusRef = collection.getStatus(recoveryId2);
toClose.add(statusRef);
assertNotNull("recovery should NOT have been deleted", statusRef);
} finally {
// TODO: do we want a lucene IOUtils version of this?
for (AutoCloseable closeable : toClose) {
if (closeable != null) {
closeable.close();
}
}
collection.cancelRecovery(recoveryId, "meh");
collection.cancelRecovery(recoveryId2, "meh");
}
}
protected void createIndex() {
createIndex("test",
ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
ensureGreen();
}
long startRecovery(RecoveriesCollection collection) {
return startRecovery(collection, listener, TimeValue.timeValueMinutes(60));
}

View File

@ -52,7 +52,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
@Test
@Slow
public void recoverWhileUnderLoadAllocateBackupsTest() throws Exception {
public void recoverWhileUnderLoadAllocateReplicasTest() throws Exception {
logger.info("--> creating test index ...");
int numberOfShards = numberOfShards();
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
@ -87,7 +87,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
logger.info("--> waiting for GREEN health status ...");
// make sure the cluster state is green, and all has been recovered
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=2"));
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus());
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
waitForDocs(totalNumDocs, indexer);
@ -107,7 +107,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
@Test
@Slow
public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws Exception {
public void recoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() throws Exception {
logger.info("--> creating test index ...");
int numberOfShards = numberOfShards();
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
@ -139,7 +139,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
allowNodes("test", 4);
logger.info("--> waiting for GREEN health status ...");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=4"));
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus());
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
@ -161,7 +161,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
@Test
@TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE")
@Slow
public void recoverWhileUnderLoadWithNodeShutdown() throws Exception {
public void recoverWhileUnderLoadWithReducedAllowedNodes() throws Exception {
logger.info("--> creating test index ...");
int numberOfShards = numberOfShards();
assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
@ -194,8 +194,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
allowNodes("test", 4);
logger.info("--> waiting for GREEN health status ...");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=4"));
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForRelocatingShards(0));
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
waitForDocs(totalNumDocs, indexer);
@ -205,24 +204,24 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
// now, shutdown nodes
logger.info("--> allow 3 nodes for index [test] ...");
allowNodes("test", 3);
logger.info("--> waiting for GREEN health status ...");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=3"));
logger.info("--> waiting for relocations ...");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0));
logger.info("--> allow 2 nodes for index [test] ...");
allowNodes("test", 2);
logger.info("--> waiting for GREEN health status ...");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=2"));
logger.info("--> waiting for relocations ...");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0));
logger.info("--> allow 1 nodes for index [test] ...");
allowNodes("test", 1);
logger.info("--> waiting for YELLOW health status ...");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForYellowStatus().setWaitForNodes(">=1"));
logger.info("--> waiting for relocations ...");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0));
logger.info("--> marking and waiting for indexing threads to stop ...");
indexer.stop();
logger.info("--> indexing threads stopped");
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForYellowStatus().setWaitForNodes(">=1"));
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0));
logger.info("--> refreshing the index");
refreshAndAssert();
@ -324,18 +323,13 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
logger.info("iteration [{}] - returned documents: {} (expected {})", iteration, searchResponse.getHits().totalHits(), numberOfDocs);
}
private void refreshAndAssert() throws InterruptedException {
assertThat(awaitBusy(new Predicate<Object>() {
private void refreshAndAssert() throws Exception {
assertBusy(new Runnable() {
@Override
public boolean apply(Object o) {
try {
RefreshResponse actionGet = client().admin().indices().prepareRefresh().execute().actionGet();
assertNoFailures(actionGet);
return actionGet.getTotalShards() == actionGet.getSuccessfulShards();
} catch (Throwable e) {
throw new RuntimeException(e);
}
public void run() {
RefreshResponse actionGet = client().admin().indices().prepareRefresh().get();
assertAllSuccessful(actionGet);
}
}, 5, TimeUnit.MINUTES), equalTo(true));
}, 5, TimeUnit.MINUTES);
}
}