From 6dacac49b394df9c96d20f3b4c74c8a8d7e22558 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 18 May 2016 10:51:57 +0200 Subject: [PATCH] Simplify recovery logic in IndicesClusterStateService (#18405) - Moves recovery logic into IndexShard - Simplifies logic to cancel peer recovery of shard where recovery source node changed - Ensures routing entry is set on initialization of IndexShard --- .../cluster/routing/ShardRouting.java | 7 + .../org/elasticsearch/index/IndexService.java | 5 +- .../elasticsearch/index/shard/IndexShard.java | 74 ++++++-- .../index/shard/ShadowIndexShard.java | 8 +- .../cluster/IndicesClusterStateService.java | 161 ++++++------------ .../recovery/RecoveriesCollection.java | 25 +-- .../recovery/RecoveryTargetService.java | 7 +- .../index/shard/IndexShardTests.java | 43 ++--- .../IndexingMemoryControllerTests.java | 5 +- .../recovery/RecoveriesCollectionTests.java | 35 +--- 10 files changed, 159 insertions(+), 211 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index ce31483f47f..60bb455dfe6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -252,6 +252,13 @@ public final class ShardRouting implements Writeable, ToXContent { return true; } + /** + * returns true for initializing shards that recover their data from another shard copy + */ + public boolean isPeerRecovery() { + return state == ShardRoutingState.INITIALIZING && (primary() == false || relocatingNodeId != null); + } + /** * A shard iterator with just this shard in it. */ diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index ea2cfe9f106..60e3250e49d 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -336,18 +336,17 @@ public final class IndexService extends AbstractIndexComponent implements IndexC store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId))); if (useShadowEngine(primary, indexSettings)) { - indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, + indexShard = new ShadowIndexShard(routing, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, searchOperationListeners); // no indexing listeners - shadow engines don't index } else { - indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, + indexShard = new IndexShard(routing, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, searchOperationListeners, indexingOperationListeners); } eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); - indexShard.updateRoutingEntry(routing, true); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); success = true; return indexShard; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 59ea1a29bcf..768db935308 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; @@ -103,6 +104,8 @@ import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryTargetService; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.search.suggest.completion.CompletionFieldStats; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat; @@ -194,12 +197,14 @@ public class IndexShard extends AbstractIndexShardComponent { */ private final AtomicBoolean active = new AtomicBoolean(); - public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, + public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays, - Engine.Warmer warmer, List searchOperationListener, List listeners) { - super(shardId, indexSettings); + Engine.Warmer warmer, List searchOperationListener, List listeners) throws IOException { + super(shardRouting.shardId(), indexSettings); + assert shardRouting.initializing(); + this.shardRouting = shardRouting; final Settings settings = indexSettings.getSettings(); this.codecService = new CodecService(mapperService, logger); this.warmer = warmer; @@ -243,6 +248,7 @@ public class IndexShard extends AbstractIndexShardComponent { suspendableRefContainer = new SuspendableRefContainer(); searcherWrapper = indexSearcherWrapper; primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); + persistMetadata(shardRouting, null); } public Store store() { @@ -313,8 +319,7 @@ public class IndexShard extends AbstractIndexShardComponent { } /** - * Returns the latest cluster routing entry received with this shard. Might be null if the - * shard was just created. + * Returns the latest cluster routing entry received with this shard. */ public ShardRouting routingEntry() { return this.shardRouting; @@ -1325,6 +1330,58 @@ public class IndexShard extends AbstractIndexShardComponent { return this.currentEngineReference.get(); } + public void startRecovery(DiscoveryNode localNode, DiscoveryNode sourceNode, RecoveryTargetService recoveryTargetService, + RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService) { + final RestoreSource restoreSource = shardRouting.restoreSource(); + + if (shardRouting.isPeerRecovery()) { + assert sourceNode != null : "peer recovery started but sourceNode is null"; + // we don't mark this one as relocated at the end. + // For primaries: requests in any case are routed to both when its relocating and that way we handle + // the edge case where its mark as relocated, and we might need to roll it back... + // For replicas: we are recovering a backup from a primary + RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.PRIMARY_RELOCATION : RecoveryState.Type.REPLICA; + RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), type, sourceNode, localNode); + try { + markAsRecovering("from " + sourceNode, recoveryState); + recoveryTargetService.startRecovery(this, type, sourceNode, recoveryListener); + } catch (Throwable e) { + failShard("corrupted preexisting index", e); + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, e), true); + } + } else if (restoreSource == null) { + // recover from filesystem store + final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), + RecoveryState.Type.STORE, localNode, localNode); + markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread + threadPool.generic().execute(() -> { + try { + if (recoverFromStore(localNode)) { + recoveryListener.onRecoveryDone(recoveryState); + } + } catch (Throwable t) { + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true); + } + + }); + } else { + // recover from a restore + final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), + RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), localNode); + markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread + threadPool.generic().execute(() -> { + try { + final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository()); + if (restoreFromRepository(indexShardRepository, localNode)) { + recoveryListener.onRecoveryDone(recoveryState); + } + } catch (Throwable first) { + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, first), true); + } + }); + } + } + class ShardEventListener implements Engine.EventListener { private final CopyOnWriteArrayList> delegates = new CopyOnWriteArrayList<>(); @@ -1367,13 +1424,6 @@ public class IndexShard extends AbstractIndexShardComponent { return engineFactory.newReadWriteEngine(config); } - /** - * Returns true iff this shard allows primary promotion, otherwise false - */ - public boolean allowsPrimaryPromotion() { - return true; - } - // pkg private for testing void persistMetadata(ShardRouting newRouting, @Nullable ShardRouting currentRouting) throws IOException { assert newRouting != null : "newRouting must not be null"; diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index e75973f2ccc..bf35d02fea2 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -46,12 +46,12 @@ import java.util.List; */ public final class ShadowIndexShard extends IndexShard { - public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, + public ShadowIndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, ThreadPool threadPool, BigArrays bigArrays, Engine.Warmer engineWarmer, List searchOperationListeners) throws IOException { - super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, + super(shardRouting, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, threadPool, bigArrays, engineWarmer, searchOperationListeners, Collections.emptyList()); } @@ -92,10 +92,6 @@ public final class ShadowIndexShard extends IndexShard { return false; } - public boolean allowsPrimaryPromotion() { - return false; - } - @Override public TranslogStats translogStats() { return null; // shadow engine has no translog diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 4487a36c0ae..6f9b1684b9e 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -31,7 +31,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -40,6 +39,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Callback; @@ -57,7 +57,6 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; -import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.indices.recovery.RecoveryFailedException; @@ -512,21 +511,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent !status.sourceNode().equals(sourceNode))) { - logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting); - // closing the shard will also cancel any ongoing recovery. - indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)"); - shardHasBeenRemoved = true; + if (shardRouting.isPeerRecovery()) { + RecoveryState recoveryState = indexShard.recoveryState(); + final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); + if (recoveryState.getSourceNode().equals(sourceNode) == false) { + if (recoveryTargetService.cancelRecoveriesForShard(currentRoutingEntry.shardId(), "recovery source node changed")) { + // getting here means that the shard was still recovering + logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting); + indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)"); + shardHasBeenRemoved = true; + } } } if (shardHasBeenRemoved == false) { - // shadow replicas do not support primary promotion. The master would reinitialize the shard, giving it a new allocation, meaning we should be there. - assert (shardRouting.primary() && currentRoutingEntry.primary() == false) == false || indexShard.allowsPrimaryPromotion() : - "shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry; try { indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false); } catch (Throwable e) { @@ -536,12 +534,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { - try { - if (indexShard.recoverFromStore(nodes.getLocalNode())) { - shardStateAction.shardStarted(shardRouting, "after recovery from store", SHARD_STATE_ACTION_LISTENER); - } - } catch (Throwable t) { - handleRecoveryFailure(indexService, shardRouting, true, t); - } - - }); - } else { - // recover from a restore - final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(), - RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), nodes.getLocalNode()); - indexShard.markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread - threadPool.generic().execute(() -> { - final ShardId sId = indexShard.shardId(); - try { - final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository()); - if (indexShard.restoreFromRepository(indexShardRepository, nodes.getLocalNode())) { - restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId); - shardStateAction.shardStarted(shardRouting, "after recovery from repository", SHARD_STATE_ACTION_LISTENER); - } - } catch (Throwable first) { - try { - if (Lucene.isCorruptionException(first)) { - restoreService.failRestore(restoreSource.snapshotId(), sId); - } - } catch (Throwable second) { - first.addSuppressed(second); - } finally { - handleRecoveryFailure(indexService, shardRouting, true, first); - } - } - }); - } + indexShard.startRecovery(nodes.getLocalNode(), sourceNode, recoveryTargetService, + new RecoveryListener(shardRouting, indexService), repositoriesService); } /** * Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard - * routing to *require* peer recovery, use {@link #isPeerRecovery(org.elasticsearch.cluster.routing.ShardRouting)} to + * routing to *require* peer recovery, use {@link ShardRouting#isPeerRecovery()} to * check if its needed or not. */ - private DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) { + private static DiscoveryNode findSourceNodeForPeerRecovery(ESLogger logger, RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) { DiscoveryNode sourceNode = null; if (!shardRouting.primary()) { - IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id()); - for (ShardRouting entry : shardRoutingTable) { - if (entry.primary() && entry.active()) { - // only recover from started primary, if we can't find one, we will do it next round - sourceNode = nodes.get(entry.currentNodeId()); - if (sourceNode == null) { - logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node.", entry); - return null; - } - break; + ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard(); + // only recover from started primary, if we can't find one, we will do it next round + if (primary.active()) { + sourceNode = nodes.get(primary.currentNodeId()); + if (sourceNode == null) { + logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node.", primary); } - } - - if (sourceNode == null) { - logger.trace("can't find replica source node for {} because a primary shard can not be found.", shardRouting.shardId()); + } else { + logger.trace("can't find replica source node because primary shard {} is not active.", primary); } } else if (shardRouting.relocatingNodeId() != null) { sourceNode = nodes.get(shardRouting.relocatingNodeId()); @@ -711,30 +643,49 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent true); - } - /** - * cancel all ongoing recoveries for the given shard, if their status match a predicate + * cancel all ongoing recoveries for the given shard * * @param reason reason for cancellation * @param shardId shardId for which to cancel recoveries - * @param shouldCancel a predicate to check if a recovery should be cancelled or not. - * Note that the recovery state can change after this check, but before it is being cancelled via other - * already issued outstanding references. * @return true if a recovery was cancelled */ - public boolean cancelRecoveriesForShard(ShardId shardId, String reason, Predicate shouldCancel) { + public boolean cancelRecoveriesForShard(ShardId shardId, String reason) { boolean cancelled = false; for (RecoveryTarget status : onGoingRecoveries.values()) { if (status.shardId().equals(shardId)) { - boolean cancel = false; - // if we can't increment the status, the recovery is not there any more. - if (status.tryIncRef()) { - try { - cancel = shouldCancel.test(status); - } finally { - status.decRef(); - } - } - if (cancel && cancelRecovery(status.recoveryId(), reason)) { - cancelled = true; - } + cancelled |= cancelRecovery(status.recoveryId(), reason); } } return cancelled; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java index f0a0b13b872..53feb8e6135 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java @@ -124,13 +124,10 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve * * @param reason reason for cancellation * @param shardId shardId for which to cancel recoveries - * @param shouldCancel a predicate to check if a recovery should be cancelled or not. Null means cancel without an extra check. - * note that the recovery state can change after this check, but before it is being cancelled via other - * already issued outstanding references. * @return true if a recovery was cancelled */ - public boolean cancelRecoveriesForShard(ShardId shardId, String reason, @Nullable Predicate shouldCancel) { - return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason, shouldCancel); + public boolean cancelRecoveriesForShard(ShardId shardId, String reason) { + return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason); } public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 551b6389b84..52c16dd74d9 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -980,8 +980,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - routing = ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); + newShard.updateRoutingEntry(routing.moveToStarted(), true); SearchResponse response = client().prepareSearch().get(); assertHitCount(response, 1); } @@ -1009,8 +1008,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - routing = ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); + newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true); SearchResponse response = client().prepareSearch().get(); assertHitCount(response, 0); } @@ -1060,8 +1058,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(localNode)); - routing = ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); + newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true); SearchResponse response = client().prepareSearch().get(); assertHitCount(response, 0); // we can't issue this request through a client because of the inconsistencies we created with the cluster state @@ -1147,8 +1144,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { } }, localNode)); - routing = ShardRoutingHelper.moveToStarted(routing); - test_target_shard.updateRoutingEntry(routing, true); + test_target_shard.updateRoutingEntry(routing.moveToStarted(), true); assertHitCount(client().prepareSearch("test_target").get(), 1); assertSearchHits(client().prepareSearch("test_target").get(), "0"); } @@ -1355,7 +1351,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { }; final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener); shardRef.set(newShard); - recoverShard(newShard, shard.routingEntry()); + recoverShard(newShard); try { ExceptionsHelper.rethrowAndSuppress(failures); @@ -1396,22 +1392,20 @@ public class IndexShardTests extends ESSingleNodeTestCase { public static final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { IndexShard newShard = newIndexShard(indexService, shard, wrapper, listeners); - return recoverShard(newShard, shard.routingEntry()); + return recoverShard(newShard); } - public static final IndexShard recoverShard(IndexShard newShard, ShardRouting oldRouting) throws IOException { - ShardRouting routing = ShardRoutingHelper.reinit(oldRouting); - newShard.updateRoutingEntry(routing, false); + public static final IndexShard recoverShard(IndexShard newShard) throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); + newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), newShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode)); assertTrue(newShard.recoverFromStore(localNode)); - routing = ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted(), true); return newShard; } public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { - IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), + ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry()); + IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners) @@ -1419,6 +1413,14 @@ public class IndexShardTests extends ESSingleNodeTestCase { return newShard; } + private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { + ShardRouting shardRouting = TestShardRouting.newShardRouting(existingShardRouting.shardId(), + existingShardRouting.currentNodeId(), null, existingShardRouting.primary(), ShardRoutingState.INITIALIZING, + existingShardRouting.allocationId()); + shardRouting = shardRouting.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "fake recovery")); + return shardRouting; + } + public void testTranslogRecoverySyncsTranslog() throws IOException { createIndex("testindexfortranslogsync"); client().admin().indices().preparePutMapping("testindexfortranslogsync").setType("testtype").setSource(jsonBuilder().startObject() @@ -1432,7 +1434,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("testindexfortranslogsync")); IndexShard shard = test.getShardOrNull(0); - ShardRouting routing = shard.routingEntry(); + ShardRouting routing = getInitializingShardRouting(shard.routingEntry()); test.removeShard(0, "b/c britta says so"); IndexShard newShard = test.createShard(routing); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); @@ -1459,7 +1461,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("index")); IndexShard shard = test.getShardOrNull(0); - ShardRouting routing = shard.routingEntry(); + ShardRouting routing = getInitializingShardRouting(shard.routingEntry()); test.removeShard(0, "b/c britta says so"); IndexShard newShard = test.createShard(routing); newShard.shardRouting = routing; @@ -1488,10 +1490,9 @@ public class IndexShardTests extends ESSingleNodeTestCase { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("index")); IndexShard shard = test.getShardOrNull(0); - ShardRouting routing = shard.routingEntry(); + ShardRouting routing = getInitializingShardRouting(shard.routingEntry()); test.removeShard(0, "b/c britta says so"); IndexShard newShard = test.createShard(routing); - newShard.shardRouting = routing; DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("for testing", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.REPLICA, localNode, localNode)); // Shard is still inactive since we haven't started recovering yet diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index efd42f7f260..4b11bb1bc96 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -448,15 +448,14 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { shardRef.set(newShard); try { assertEquals(0, imc.availableShards().size()); - ShardRouting routing = ShardRoutingHelper.reinit(shard.routingEntry()); - newShard.updateRoutingEntry(routing, false); + ShardRouting routing = newShard.routingEntry(); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); assertEquals(1, imc.availableShards().size()); assertTrue(newShard.recoverFromStore(localNode)); assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); - newShard.updateRoutingEntry(ShardRoutingHelper.moveToStarted(routing), true); + newShard.updateRoutingEntry(routing.moveToStarted(), true); } finally { newShard.close("simon says", false); } diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index 65bec885ea8..2564b31488b 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -104,7 +104,7 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase { } - public void testRecoveryCancellationNoPredicate() throws Exception { + public void testRecoveryCancellation() throws Exception { createIndex(); final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); final long recoveryId = startRecovery(collection); @@ -119,39 +119,6 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase { } } - public void testRecoveryCancellationPredicate() throws Exception { - createIndex(); - final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); - final long recoveryId = startRecovery(collection); - final long recoveryId2 = startRecovery(collection); - final ArrayList toClose = new ArrayList<>(); - try { - RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId); - toClose.add(recoveryRef); - ShardId shardId = recoveryRef.status().shardId(); - assertFalse("should not have cancelled recoveries", collection.cancelRecoveriesForShard(shardId, "test", status -> false)); - final Predicate shouldCancel = status -> status.recoveryId() == recoveryId; - assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test", shouldCancel)); - assertThat("we should still have on recovery", collection.size(), equalTo(1)); - recoveryRef = collection.getRecovery(recoveryId); - toClose.add(recoveryRef); - assertNull("recovery should have been deleted", recoveryRef); - recoveryRef = collection.getRecovery(recoveryId2); - toClose.add(recoveryRef); - assertNotNull("recovery should NOT have been deleted", recoveryRef); - - } finally { - // TODO: do we want a lucene IOUtils version of this? - for (AutoCloseable closeable : toClose) { - if (closeable != null) { - closeable.close(); - } - } - collection.cancelRecovery(recoveryId, "meh"); - collection.cancelRecovery(recoveryId2, "meh"); - } - } - protected void createIndex() { createIndex("test", Settings.builder()