diff --git a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java index a0beb6e6438..238b9f9b648 100644 --- a/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java @@ -128,7 +128,7 @@ public abstract class TransportBroadcastOperationAction(expectedOps); + shardsResponses = new AtomicReferenceArray<>(expectedOps); } public void start() { diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 79297d50cda..b7082ffaba2 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -22,6 +22,7 @@ package org.elasticsearch.indices.cluster; import com.carrotsearch.hppc.IntOpenHashSet; import com.carrotsearch.hppc.ObjectContainer; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.google.common.base.Predicate; import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; @@ -64,6 +65,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryStatus; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.threadpool.ThreadPool; @@ -567,17 +569,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent shouldCancel = new Predicate() { + @Override + public boolean apply(@Nullable RecoveryStatus status) { + return status.sourceNode().equals(sourceNode) == false; } + }; + if (recoveryTarget.cancelRecoveriesForShard(indexShard.shardId(), "recovery source node changed", shouldCancel)) { + logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting); + // closing the shard will also cancel any ongoing recovery. + indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)"); + shardHasBeenRemoved = true; + } } if (shardHasBeenRemoved == false && (shardRouting.equals(indexShard.routingEntry()) == false || shardRouting.version() > indexShard.routingEntry().version())) { @@ -777,7 +782,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponentalwaysTrue()); + } + + /** + * cancel all ongoing recoveries for the given shard, if their status match a predicate + * + * @param reason reason for cancellation + * @param shardId shardId for which to cancel recoveries + * @param shouldCancel a predicate to check if a recovery should be cancelled or not. + * Note that the recovery state can change after this check, but before it is being cancelled via other + * already issued outstanding references. + * @return true if a recovery was cancelled + */ + public boolean cancelRecoveriesForShard(ShardId shardId, String reason, Predicate shouldCancel) { + boolean cancelled = false; for (RecoveryStatus status : onGoingRecoveries.values()) { if (status.shardId().equals(shardId)) { - cancelRecovery(status.recoveryId(), reason); + boolean cancel = false; + // if we can't increment the status, the recovery is not there any more. + if (status.tryIncRef()) { + try { + cancel = shouldCancel.apply(status); + } finally { + status.decRef(); + } + } + if (cancel && cancelRecovery(status.recoveryId(), reason)) { + cancelled = true; + } } } + return cancelled; } + /** * a reference to {@link RecoveryStatus}, which implements {@link AutoCloseable}. closing the reference * causes {@link RecoveryStatus#decRef()} to be called. This makes sure that the underlying resources diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 8c285908dc2..2d46c24f435 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -19,13 +19,13 @@ package org.elasticsearch.indices.recovery; +import com.google.common.base.Predicate; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -114,17 +114,18 @@ public class RecoveryTarget extends AbstractComponent { }); } - public RecoveryState recoveryState(IndexShard indexShard) { - try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.findRecoveryByShard(indexShard)) { - if (statusRef == null) { - return null; - } - final RecoveryStatus recoveryStatus = statusRef.status(); - return recoveryStatus.state(); - } catch (Exception e) { - // shouldn't really happen, but have to be here due to auto close - throw new ElasticsearchException("error while getting recovery state", e); - } + /** + * cancel all ongoing recoveries for the given shard, if their status match a predicate + * + * @param reason reason for cancellation + * @param shardId shardId for which to cancel recoveries + * @param shouldCancel a predicate to check if a recovery should be cancelled or not. Null means cancel without an extra check. + * note that the recovery state can change after this check, but before it is being cancelled via other + * already issued outstanding references. + * @return true if a recovery was cancelled + */ + public boolean cancelRecoveriesForShard(ShardId shardId, String reason, @Nullable Predicate shouldCancel) { + return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason, shouldCancel); } public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final RecoveryListener listener) { diff --git a/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index abd2d20bf0b..db256bc9a19 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.recovery; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -25,19 +27,19 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.recovery.RecoveriesCollection; -import org.elasticsearch.indices.recovery.RecoveryFailedException; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.indices.recovery.*; import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Test; +import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest { @@ -56,11 +58,7 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest { @Test public void testLastAccessTimeUpdate() throws Exception { - createIndex("test", - ImmutableSettings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .build()); - ensureGreen(); + createIndex(); final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); final long recoveryId = startRecovery(collection); try (RecoveriesCollection.StatusRef status = collection.getStatus(recoveryId)) { @@ -80,11 +78,7 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest { @Test public void testRecoveryTimeout() throws InterruptedException { - createIndex("test", - ImmutableSettings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .build()); - ensureGreen(); + createIndex(); final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); final AtomicBoolean failed = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); @@ -109,6 +103,70 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest { } + @Test + public void testRecoveryCancellationNoPredicate() throws Exception { + createIndex(); + final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); + final long recoveryId = startRecovery(collection); + final long recoveryId2 = startRecovery(collection); + try (RecoveriesCollection.StatusRef statusRef = collection.getStatus(recoveryId)) { + ShardId shardId = statusRef.status().shardId(); + assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test")); + assertThat("all recoveries should be cancelled", collection.size(), equalTo(0)); + } finally { + collection.cancelRecovery(recoveryId, "meh"); + collection.cancelRecovery(recoveryId2, "meh"); + } + } + + @Test + public void testRecoveryCancellationPredicate() throws Exception { + createIndex(); + final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); + final long recoveryId = startRecovery(collection); + final long recoveryId2 = startRecovery(collection); + final ArrayList toClose = new ArrayList<>(); + try { + RecoveriesCollection.StatusRef statusRef = collection.getStatus(recoveryId); + toClose.add(statusRef); + ShardId shardId = statusRef.status().shardId(); + assertFalse("should not have cancelled recoveries", collection.cancelRecoveriesForShard(shardId, "test", Predicates.alwaysFalse())); + final Predicate shouldCancel = new Predicate() { + @Override + public boolean apply(RecoveryStatus status) { + return status.recoveryId() == recoveryId; + } + }; + assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test", shouldCancel)); + assertThat("we should still have on recovery", collection.size(), equalTo(1)); + statusRef = collection.getStatus(recoveryId); + toClose.add(statusRef); + assertNull("recovery should have been deleted", statusRef); + statusRef = collection.getStatus(recoveryId2); + toClose.add(statusRef); + assertNotNull("recovery should NOT have been deleted", statusRef); + + } finally { + // TODO: do we want a lucene IOUtils version of this? + for (AutoCloseable closeable : toClose) { + if (closeable != null) { + closeable.close(); + } + } + collection.cancelRecovery(recoveryId, "meh"); + collection.cancelRecovery(recoveryId2, "meh"); + } + } + + protected void createIndex() { + createIndex("test", + ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + ensureGreen(); + } + + long startRecovery(RecoveriesCollection collection) { return startRecovery(collection, listener, TimeValue.timeValueMinutes(60)); } diff --git a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java index e3e0d532cf8..6805419b224 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java @@ -52,7 +52,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { @Test @Slow - public void recoverWhileUnderLoadAllocateBackupsTest() throws Exception { + public void recoverWhileUnderLoadAllocateReplicasTest() throws Exception { logger.info("--> creating test index ..."); int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); @@ -87,7 +87,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("--> waiting for GREEN health status ..."); // make sure the cluster state is green, and all has been recovered - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=2")); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus()); logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); waitForDocs(totalNumDocs, indexer); @@ -107,7 +107,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { @Test @Slow - public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws Exception { + public void recoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() throws Exception { logger.info("--> creating test index ..."); int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); @@ -139,7 +139,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { allowNodes("test", 4); logger.info("--> waiting for GREEN health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=4")); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus()); logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); @@ -161,7 +161,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { @Test @TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE") @Slow - public void recoverWhileUnderLoadWithNodeShutdown() throws Exception { + public void recoverWhileUnderLoadWithReducedAllowedNodes() throws Exception { logger.info("--> creating test index ..."); int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); @@ -194,8 +194,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { allowNodes("test", 4); logger.info("--> waiting for GREEN health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=4")); - + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForRelocatingShards(0)); logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); waitForDocs(totalNumDocs, indexer); @@ -205,24 +204,24 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { // now, shutdown nodes logger.info("--> allow 3 nodes for index [test] ..."); allowNodes("test", 3); - logger.info("--> waiting for GREEN health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=3")); + logger.info("--> waiting for relocations ..."); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0)); logger.info("--> allow 2 nodes for index [test] ..."); allowNodes("test", 2); - logger.info("--> waiting for GREEN health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNodes(">=2")); + logger.info("--> waiting for relocations ..."); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0)); logger.info("--> allow 1 nodes for index [test] ..."); allowNodes("test", 1); - logger.info("--> waiting for YELLOW health status ..."); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForYellowStatus().setWaitForNodes(">=1")); + logger.info("--> waiting for relocations ..."); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0)); logger.info("--> marking and waiting for indexing threads to stop ..."); indexer.stop(); logger.info("--> indexing threads stopped"); - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForYellowStatus().setWaitForNodes(">=1")); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForRelocatingShards(0)); logger.info("--> refreshing the index"); refreshAndAssert(); @@ -324,18 +323,13 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("iteration [{}] - returned documents: {} (expected {})", iteration, searchResponse.getHits().totalHits(), numberOfDocs); } - private void refreshAndAssert() throws InterruptedException { - assertThat(awaitBusy(new Predicate() { + private void refreshAndAssert() throws Exception { + assertBusy(new Runnable() { @Override - public boolean apply(Object o) { - try { - RefreshResponse actionGet = client().admin().indices().prepareRefresh().execute().actionGet(); - assertNoFailures(actionGet); - return actionGet.getTotalShards() == actionGet.getSuccessfulShards(); - } catch (Throwable e) { - throw new RuntimeException(e); - } + public void run() { + RefreshResponse actionGet = client().admin().indices().prepareRefresh().get(); + assertAllSuccessful(actionGet); } - }, 5, TimeUnit.MINUTES), equalTo(true)); + }, 5, TimeUnit.MINUTES); } }