From f5f97391176d8fe22fc21325fc5b6dea13a46e4f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 19 Mar 2015 14:25:51 +0100 Subject: [PATCH] Recovery: only cancel when primary completed relocation When a primary moves to another node, we cancel ongoing recoveries and retry from the primary's new home. At the moment this happens when the primary relocation *starts*. It's a shame as we cancel recoveries that may be close to completion and will finish before the primary has been fully relocated. This commit only triggers the cancelation once the primary relocation is completed. Next to this, it fixes a race condition between recovery cancellation and the recovery completion. At the moment we may trigger remove a recovered shard just after it was completed. Instead, we should use the recovery cancellation logic to make sure only one code path is followed. All of the above caused the recoverWhileUnderLoadWithNodeShutdown test to fail (see http://build-us-00.elastic.co/job/es_core_15_debian/32/ ). The test creates an index and then increasingly disallows nodes for it, until only 1 node is left in the allocation filtering rules. Normally, this means we stay in green, but the premature recovery cancellation plus the race condition mentioned above caused a shard to be failed and stay unassigned and the test asserts to fail. This happens due to the following sequence: - The shard has finished recovering and sent the master a shard started command. - The recovery is cancelled locally, removing the index shard. - Master starts shard (deleting it's other copy). - Local node gets a cluster state with the shard started in it, which cause it to send a shard failed (to make the master aware). - Shard is failed and can't be re-assigned due to the allocation filter. The recoverWhileUnderLoadWithNodeShutdown is also adapted a bit to fit the current behavior of allocation filtering (in the past it used to really shut down nodes). Last, all tests in that class are given better names to fit the current terminology. Clsoes #10218 --- .../TransportBroadcastOperationAction.java | 2 +- .../cluster/IndicesClusterStateService.java | 25 +++--- .../recovery/RecoveriesCollection.java | 62 +++++++------ .../indices/recovery/RecoveryTarget.java | 25 +++--- .../recovery/RecoveriesCollectionTests.java | 86 ++++++++++++++++--- .../recovery/RecoveryWhileUnderLoadTests.java | 44 ++++------ 6 files changed, 156 insertions(+), 88 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java index a0beb6e6438..238b9f9b648 100644 --- a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java @@ -128,7 +128,7 @@ public abstract class TransportBroadcastOperationAction(expectedOps); + shardsResponses = new AtomicReferenceArray<>(expectedOps); } public void start() { diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 79297d50cda..b7082ffaba2 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -22,6 +22,7 @@ package org.elasticsearch.indices.cluster; import com.carrotsearch.hppc.IntOpenHashSet; import com.carrotsearch.hppc.ObjectContainer; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.google.common.base.Predicate; import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; @@ -64,6 +65,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryStatus; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.threadpool.ThreadPool; @@ -567,17 +569,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent shouldCancel = new Predicate() { + @Override + public boolean apply(@Nullable RecoveryStatus status) { + return status.sourceNode().equals(sourceNode) == false; } + }; + if (recoveryTarget.cancelRecoveriesForShard(indexShard.shardId(), "recovery source node changed", shouldCancel)) { + logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting); + // closing the shard will also cancel any ongoing recovery. + indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)"); + shardHasBeenRemoved = true; + } } if (shardHasBeenRemoved == false && (shardRouting.equals(indexShard.routingEntry()) == false || shardRouting.version() > indexShard.routingEntry().version())) { @@ -777,7 +782,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponentalwaysTrue()); + } + + /** + * cancel all ongoing recoveries for the given shard, if their status match a predicate + * + * @param reason reason for cancellation + * @param shardId shardId for which to cancel recoveries + * @param shouldCancel a predicate to check if a recovery should be cancelled or not. + * Note that the recovery state can change after this check, but before it is being cancelled via other + * already issued outstanding references. + * @return true if a recovery was cancelled + */ + public boolean cancelRecoveriesForShard(ShardId shardId, String reason, Predicate shouldCancel) { + boolean cancelled = false; for (RecoveryStatus status : onGoingRecoveries.values()) { if (status.shardId().equals(shardId)) { - cancelRecovery(status.recoveryId(), reason); + boolean cancel = false; + // if we can't increment the status, the recovery is not there any more. + if (status.tryIncRef()) { + try { + cancel = shouldCancel.apply(status); + } finally { + status.decRef(); + } + } + if (cancel && cancelRecovery(status.recoveryId(), reason)) { + cancelled = true; + } } } + return cancelled; } + /** * a reference to {@link RecoveryStatus}, which implements {@link AutoCloseable}. closing the reference * causes {@link RecoveryStatus#decRef()} to be called. This makes sure that the underlying resources diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 8c285908dc2..2d46c24f435 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -19,13 +19,13 @@ package org.elasticsearch.indices.recovery; +import com.google.common.base.Predicate; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -114,17 +114,18 @@ public class RecoveryTarget extends AbstractComponent { }); } - public RecoveryState recoveryState(IndexShard indexShard) { - try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.findRecoveryByShard(indexShard)) { - if (statusRef == null) { - return null; - } - final RecoveryStatus recoveryStatus = statusRef.status(); - return recoveryStatus.state(); - } catch (Exception e) { - // shouldn't really happen, but have to be here due to auto close - throw new ElasticsearchException("error while getting recovery state", e); - } + /** + * cancel all ongoing recoveries for the given shard, if their status match a predicate + * + * @param reason reason for cancellation + * @param shardId shardId for which to cancel recoveries + * @param shouldCancel a predicate to check if a recovery should be cancelled or not. Null means cancel without an extra check. + * note that the recovery state can change after this check, but before it is being cancelled via other + * already issued outstanding references. + * @return true if a recovery was cancelled + */ + public boolean cancelRecoveriesForShard(ShardId shardId, String reason, @Nullable Predicate shouldCancel) { + return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason, shouldCancel); } public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final RecoveryListener listener) { diff --git a/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index abd2d20bf0b..db256bc9a19 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.recovery; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -25,19 +27,19 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.recovery.RecoveriesCollection; -import org.elasticsearch.indices.recovery.RecoveryFailedException; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.indices.recovery.*; import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Test; +import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest { @@ -56,11 +58,7 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest { @Test public void testLastAccessTimeUpdate() throws Exception { - createIndex("test", - ImmutableSettings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .build()); - ensureGreen(); + createIndex(); final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); final long recoveryId = startRecovery(collection); try (RecoveriesCollection.StatusRef status = collection.getStatus(recoveryId)) { @@ -80,11 +78,7 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest { @Test public void testRecoveryTimeout() throws InterruptedException { - createIndex("test", - ImmutableSettings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .build()); - ensureGreen(); + createIndex(); final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); final AtomicBoolean failed = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); @@ -109,6 +103,70 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest { } + @Test + public void testRecoveryCancellationNoPredicate() throws Exception { + createIndex(); + final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); + final long recoveryId = startRecovery(collection); + final long recoveryId2 = startRecovery(collection); + try (RecoveriesCollection.StatusRef statusRef = collection.getStatus(recoveryId)) { + ShardId shardId = statusRef.status().shardId(); + assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test")); + assertThat("all recoveries should be cancelled", collection.size(), equalTo(0)); + } finally { + collection.cancelRecovery(recoveryId, "meh"); + collection.cancelRecovery(recoveryId2, "meh"); + } + } + + @Test + public void testRecoveryCancellationPredicate() throws Exception { + createIndex(); + final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); + final long recoveryId = startRecovery(collection); + final long recoveryId2 = startRecovery(collection); + final ArrayList toClose = new ArrayList<>(); + try { + RecoveriesCollection.StatusRef statusRef = collection.getStatus(recoveryId); + toClose.add(statusRef); + ShardId shardId = statusRef.status().shardId(); + assertFalse("should not have cancelled recoveries", collection.cancelRecoveriesForShard(shardId, "test", Predicates.alwaysFalse())); + final Predicate shouldCancel = new Predicate() { + @Override + public boolean apply(RecoveryStatus status) { + return status.recoveryId() == recoveryId; + } + }; + assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test", shouldCancel)); + assertThat("we should still have on recovery", collection.size(), equalTo(1)); + statusRef = collection.getStatus(recoveryId); + toClose.add(statusRef); + assertNull("recovery should have been deleted", statusRef); + statusRef = collection.getStatus(recoveryId2); + toClose.add(statusRef); + assertNotNull("recovery should NOT have been deleted", statusRef); + + } finally { + // TODO: do we want a lucene IOUtils version of this? + for (AutoCloseable closeable : toClose) { + if (closeable != null) { + closeable.close(); + } + } + collection.cancelRecovery(recoveryId, "meh"); + collection.cancelRecovery(recoveryId2, "meh"); + } + } + + protected void createIndex() { + createIndex("test", + ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + ensureGreen(); + } + + long startRecovery(RecoveriesCollection collection) { return startRecovery(collection, listener, TimeValue.timeValueMinutes(60)); } diff --git a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java index e3e0d532cf8..6805419b224 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java @@ -52,7 +52,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { @Test @Slow - public void recoverWhileUnderLoadAllocateBackupsTest() throws Exception { + public void recoverWhileUnderLoadAllocateReplicasTest() throws Exception { logger.info("--> creating test index ..."); int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); @@ -87,7 +87,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("--> waiting for GREEN health status ..."); // make sure the cluster state is green, and all has been recovered - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=2")); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus()); logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); waitForDocs(totalNumDocs, indexer); @@ -107,7 +107,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { @Test @Slow - public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws Exception { + public void recoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() throws Exception { logger.info("--> creating test index ..."); int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); @@ -139,7 +139,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { allowNodes("test", 4); logger.info("--> waiting for GREEN health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=4")); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus()); logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); @@ -161,7 +161,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { @Test @TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE") @Slow - public void recoverWhileUnderLoadWithNodeShutdown() throws Exception { + public void recoverWhileUnderLoadWithReducedAllowedNodes() throws Exception { logger.info("--> creating test index ..."); int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); @@ -194,8 +194,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { allowNodes("test", 4); logger.info("--> waiting for GREEN health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=4")); - + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForRelocatingShards(0)); logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); waitForDocs(totalNumDocs, indexer); @@ -205,24 +204,24 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { // now, shutdown nodes logger.info("--> allow 3 nodes for index [test] ..."); allowNodes("test", 3); - logger.info("--> waiting for GREEN health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=3")); + logger.info("--> waiting for relocations ..."); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0)); logger.info("--> allow 2 nodes for index [test] ..."); allowNodes("test", 2); - logger.info("--> waiting for GREEN health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=2")); + logger.info("--> waiting for relocations ..."); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0)); logger.info("--> allow 1 nodes for index [test] ..."); allowNodes("test", 1); - logger.info("--> waiting for YELLOW health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForYellowStatus().setWaitForNodes(">=1")); + logger.info("--> waiting for relocations ..."); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0)); logger.info("--> marking and waiting for indexing threads to stop ..."); indexer.stop(); logger.info("--> indexing threads stopped"); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForYellowStatus().setWaitForNodes(">=1")); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0)); logger.info("--> refreshing the index"); refreshAndAssert(); @@ -324,18 +323,13 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("iteration [{}] - returned documents: {} (expected {})", iteration, searchResponse.getHits().totalHits(), numberOfDocs); } - private void refreshAndAssert() throws InterruptedException { - assertThat(awaitBusy(new Predicate() { + private void refreshAndAssert() throws Exception { + assertBusy(new Runnable() { @Override - public boolean apply(Object o) { - try { - RefreshResponse actionGet = client().admin().indices().prepareRefresh().execute().actionGet(); - assertNoFailures(actionGet); - return actionGet.getTotalShards() == actionGet.getSuccessfulShards(); - } catch (Throwable e) { - throw new RuntimeException(e); - } + public void run() { + RefreshResponse actionGet = client().admin().indices().prepareRefresh().get(); + assertAllSuccessful(actionGet); } - }, 5, TimeUnit.MINUTES), equalTo(true)); + }, 5, TimeUnit.MINUTES); } }