From d76161d51a09452505e6789bfcc5df086c5a52ec Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 11 Feb 2016 20:08:21 +0100 Subject: [PATCH] Write shard state metadata as soon as shard is created / initializing As we rely on active allocation ids persisted in the cluster state to select the primary shard copy, we can write shard state metadata on the allocated node as soon as the node knows about receiving this shard. This also ensures that in case of primary relocation, when the relocation target is marked as started by the master node, the shard state metadata with the correct allocation id has already been written on the relocation target. Before this change, shard state metadata was only written once the node knows it is marked as started. In case of failures between master marking the node as started and the node receiving and processing this event, the relation between the shard copy on disk and the cluster state could get lost. This means that manual allocation of the shard using the reroute command allocate_stale_primary was necessary. Closes #16625 --- .../gateway/PrimaryShardAllocator.java | 27 +++- .../index/cache/bitset/BitsetFilterCache.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 150 ++++++++---------- .../index/shard/ShadowIndexShard.java | 4 +- .../elasticsearch/indices/IndicesService.java | 2 +- .../indices/recovery/RecoveryTarget.java | 2 - .../search/sort/GeoDistanceSortParser.java | 2 +- .../gateway/PrimaryShardAllocatorTests.java | 94 +++++++---- .../index/shard/IndexShardTests.java | 38 +---- 9 files changed, 159 insertions(+), 162 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index 8809f68853b..32963d5189c 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -187,12 +187,14 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { } if (nodeShardState.storeException() == null) { - if (allocationId == null && nodeShardState.legacyVersion() != ShardStateMetaData.NO_VERSION) { - // old shard with no allocation id, assign dummy value so that it gets added below in case of matchAnyShard - allocationId = "_n/a_"; + if (allocationId == null && nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION) { + logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode()); + } else if (allocationId != null) { + assert nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION : "Allocation id and legacy version cannot be both present"; + logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), allocationId); + } else { + logger.trace("[{}] on node [{}] has no allocation id, out-dated shard (shard state version: [{}])", shard, nodeShardState.getNode(), nodeShardState.legacyVersion()); } - - logger.trace("[{}] on node [{}] has allocation id [{}] of shard", shard, nodeShardState.getNode(), allocationId); } else { logger.trace("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", nodeShardState.storeException(), shard, nodeShardState.getNode(), allocationId); allocationId = null; @@ -299,9 +301,20 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { continue; } - // no version means it does not exists, which is what the API returns, and what we expect to if (nodeShardState.storeException() == null) { - logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version); + if (version == ShardStateMetaData.NO_VERSION && nodeShardState.allocationId() == null) { + logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode()); + } else if (version != ShardStateMetaData.NO_VERSION) { + assert nodeShardState.allocationId() == null : "Allocation id and legacy version cannot be both present"; + logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version); + } else { + // shard was already selected in a 5.x cluster as primary for recovery, was initialized (and wrote a new state file) but + // did not make it to STARTED state before the cluster crashed (otherwise list of active allocation ids would be + // non-empty and allocation id - based allocation mode would be chosen). + // Prefer this shard copy again. + version = Long.MAX_VALUE; + logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId()); + } } else { // when there is an store exception, we disregard the reported version and assign it as no version (same as shard does not exist) logger.trace("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", nodeShardState.storeException(), shard, nodeShardState.getNode(), version); diff --git a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java index 7d5540b6224..f7802330ab7 100644 --- a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java +++ b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java @@ -215,7 +215,7 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L @Override public IndexWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) { - if (indexSettings.getIndex().equals(indexShard.getIndexSettings().getIndex()) == false) { + if (indexSettings.getIndex().equals(indexShard.indexSettings().getIndex()) == false) { // this is from a different index return TerminationHandle.NO_WAIT; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9ee48a5a006..14b1278e51b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -153,7 +153,6 @@ public class IndexShard extends AbstractIndexShardComponent { private final EngineConfig engineConfig; private final TranslogConfig translogConfig; private final IndexEventListener indexEventListener; - private final IndexSettings idxSettings; /** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents @@ -205,7 +204,6 @@ public class IndexShard extends AbstractIndexShardComponent { IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) { super(shardId, indexSettings); final Settings settings = indexSettings.getSettings(); - this.idxSettings = indexSettings; this.codecService = new CodecService(mapperService, logger); this.warmer = warmer; this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); @@ -248,7 +246,7 @@ public class IndexShard extends AbstractIndexShardComponent { this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); this.suspendableRefContainer = new SuspendableRefContainer(); this.searcherWrapper = indexSearcherWrapper; - QueryShardContext queryShardContext = new QueryShardContext(idxSettings, indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry()); + QueryShardContext queryShardContext = new QueryShardContext(indexSettings, indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry()); this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryShardContext); } @@ -256,10 +254,6 @@ public class IndexShard extends AbstractIndexShardComponent { return this.store; } - public IndexSettings getIndexSettings() { - return idxSettings; - } - /** returns true if this shard supports indexing (i.e., write) operations. */ public boolean canIndex() { return true; @@ -319,8 +313,9 @@ public class IndexShard extends AbstractIndexShardComponent { * unless explicitly disabled. * * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted + * @throws IOException if shard state could not be persisted */ - public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) { + public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException { final ShardRouting currentRouting = this.shardRouting; if (!newRouting.shardId().equals(shardId())) { throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]"); @@ -328,57 +323,54 @@ public class IndexShard extends AbstractIndexShardComponent { if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) { throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting); } - try { - if (currentRouting != null) { - if (!newRouting.primary() && currentRouting.primary()) { - logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode"); - } - // if its the same routing, return - if (currentRouting.equals(newRouting)) { - return; - } + if (currentRouting != null) { + if (!newRouting.primary() && currentRouting.primary()) { + logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode"); } + // if its the same routing, return + if (currentRouting.equals(newRouting)) { + return; + } + } - if (state == IndexShardState.POST_RECOVERY) { - // if the state is started or relocating (cause it might move right away from started to relocating) - // then move to STARTED - if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) { - // we want to refresh *before* we move to internal STARTED state - try { - getEngine().refresh("cluster_state_started"); - } catch (Throwable t) { - logger.debug("failed to refresh due to move to cluster wide started", t); - } + if (state == IndexShardState.POST_RECOVERY) { + // if the state is started or relocating (cause it might move right away from started to relocating) + // then move to STARTED + if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) { + // we want to refresh *before* we move to internal STARTED state + try { + getEngine().refresh("cluster_state_started"); + } catch (Throwable t) { + logger.debug("failed to refresh due to move to cluster wide started", t); + } - boolean movedToStarted = false; - synchronized (mutex) { - // do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY - if (state == IndexShardState.POST_RECOVERY) { - changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); - movedToStarted = true; - } else { - logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state()); - } - } - if (movedToStarted) { - indexEventListener.afterIndexShardStarted(this); + boolean movedToStarted = false; + synchronized (mutex) { + // do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY + if (state == IndexShardState.POST_RECOVERY) { + changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); + movedToStarted = true; + } else { + logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state()); } } + if (movedToStarted) { + indexEventListener.afterIndexShardStarted(this); + } } + } - if (state == IndexShardState.RELOCATED && - (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { - // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery - // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two - // active primaries. - throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); - } - this.shardRouting = newRouting; - indexEventListener.shardRoutingChanged(this, currentRouting, newRouting); - } finally { - if (persistState) { - persistMetadata(newRouting, currentRouting); - } + if (state == IndexShardState.RELOCATED && + (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { + // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery + // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two + // active primaries. + throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); + } + this.shardRouting = newRouting; + indexEventListener.shardRoutingChanged(this, currentRouting, newRouting); + if (persistState) { + persistMetadata(newRouting, currentRouting); } } @@ -733,7 +725,7 @@ public class IndexShard extends AbstractIndexShardComponent { luceneVersion = segment.getVersion(); } } - return luceneVersion == null ? idxSettings.getIndexVersionCreated().luceneVersion : luceneVersion; + return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion; } /** @@ -1046,18 +1038,6 @@ public class IndexShard extends AbstractIndexShardComponent { } } - /** - * Deletes the shards metadata state. This method can only be executed if the shard is not active. - * - * @throws IOException if the delete fails - */ - public void deleteShardState() throws IOException { - if (this.routingEntry() != null && this.routingEntry().active()) { - throw new IllegalStateException("Can't delete shard state on an active shard"); - } - MetaDataStateFormat.deleteMetaState(shardPath().getDataPath()); - } - public boolean isActive() { return active.get(); } @@ -1070,7 +1050,7 @@ public class IndexShard extends AbstractIndexShardComponent { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; - boolean shouldExist = shardRouting.allocatedPostIndexCreate(idxSettings.getIndexMetaData()); + boolean shouldExist = shardRouting.allocatedPostIndexCreate(indexSettings.getIndexMetaData()); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); return storeRecovery.recoverFromStore(this, shouldExist, localNode); @@ -1344,27 +1324,25 @@ public class IndexShard extends AbstractIndexShardComponent { } // pkg private for testing - void persistMetadata(ShardRouting newRouting, ShardRouting currentRouting) { + void persistMetadata(ShardRouting newRouting, @Nullable ShardRouting currentRouting) throws IOException { assert newRouting != null : "newRouting must not be null"; - if (newRouting.active()) { - try { - final String writeReason; - if (currentRouting == null) { - writeReason = "freshly started, allocation id [" + newRouting.allocationId() + "]"; - } else if (currentRouting.equals(newRouting) == false) { - writeReason = "routing changed from " + currentRouting + " to " + newRouting; - } else { - logger.trace("{} skip writing shard state, has been written before", shardId); - return; - } - final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId()); - logger.trace("{} writing shard state, reason [{}]", shardId, writeReason); - ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.legacyVersion, shardPath().getShardStatePath()); - } catch (IOException e) { // this is how we used to handle it.... :( - logger.warn("failed to write shard state", e); - // we failed to write the shard state, we will try and write - // it next time... + + // only persist metadata if routing information that is persisted in shard state metadata actually changed + if (currentRouting == null + || currentRouting.primary() != newRouting.primary() + || currentRouting.allocationId().equals(newRouting.allocationId()) == false) { + assert currentRouting == null || currentRouting.isSameAllocation(newRouting); + final String writeReason; + if (currentRouting == null) { + writeReason = "initial state with allocation id [" + newRouting.allocationId() + "]"; + } else { + writeReason = "routing changed from " + currentRouting + " to " + newRouting; } + logger.trace("{} writing shard state, reason [{}]", shardId, writeReason); + final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId()); + ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.legacyVersion, shardPath().getShardStatePath()); + } else { + logger.trace("{} skip writing shard state, has been written before", shardId); } } @@ -1396,7 +1374,7 @@ public class IndexShard extends AbstractIndexShardComponent { return new EngineConfig(shardId, threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, - idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME)); + indexSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME)); } public Releasable acquirePrimaryOperationLock() { diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 355e4ee0cad..5518d1b1273 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -51,12 +51,12 @@ public final class ShadowIndexShard extends IndexShard { /** * In addition to the regular accounting done in - * {@link IndexShard#updateRoutingEntry(org.elasticsearch.cluster.routing.ShardRouting, boolean)}, + * {@link IndexShard#updateRoutingEntry(ShardRouting, boolean)}, * if this shadow replica needs to be promoted to a primary, the shard is * failed in order to allow a new primary to be re-allocated. */ @Override - public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) { + public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) throws IOException { if (newRouting.primary() == true) {// becoming a primary throw new IllegalStateException("can't promote shard to primary"); } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index c7d1be4bf71..440a11a1904 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -904,7 +904,7 @@ public class IndicesService extends AbstractLifecycleComponent i if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) { return false; } - IndexSettings settings = context.indexShard().getIndexSettings(); + IndexSettings settings = context.indexShard().indexSettings(); // if not explicitly set in the request, use the index setting, if not, use the request if (request.requestCache() == null) { if (settings.getValue(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index ec40a0431c1..661c8371537 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -342,8 +342,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget // first, we go and move files that were created with the recovery id suffix to // the actual names, its ok if we have a corrupted index here, since we have replicas // to recover from in case of a full cluster shutdown just when this code executes... - indexShard().deleteShardState(); // we have to delete it first since even if we fail to rename the shard - // might be invalid renameAllTempFiles(); final Store store = store(); // now write checksums diff --git a/core/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortParser.java b/core/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortParser.java index 248a051021e..27c8b8e0ed5 100644 --- a/core/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortParser.java +++ b/core/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortParser.java @@ -71,7 +71,7 @@ public class GeoDistanceSortParser implements SortParser { MultiValueMode sortMode = null; NestedInnerQueryParseSupport nestedHelper = null; - final boolean indexCreatedBeforeV2_0 = context.indexShard().getIndexSettings().getIndexVersionCreated().before(Version.V_2_0_0); + final boolean indexCreatedBeforeV2_0 = context.indexShard().indexSettings().getIndexVersionCreated().before(Version.V_2_0_0); boolean coerce = GeoDistanceSortBuilder.DEFAULT_COERCE; boolean ignoreMalformed = GeoDistanceSortBuilder.DEFAULT_IGNORE_MALFORMED; diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index d1cd8d974c6..572b87dfa6b 100644 --- a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -117,7 +117,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { */ public void testNoMatchingAllocationIdFound() { RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2"); - testAllocator.addData(node1, 1, "id1", randomBoolean()); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "id1", randomBoolean()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -145,7 +145,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { final RoutingAllocation allocation; if (randomBoolean()) { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); - testAllocator.addData(node1, 1, "allocId1", randomBoolean(), new CorruptIndexException("test", "test")); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(), new CorruptIndexException("test", "test")); } else { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test")); @@ -164,7 +164,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { boolean useAllocationIds = randomBoolean(); if (useAllocationIds) { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); - testAllocator.addData(node1, 1, "allocId1", randomBoolean()); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean()); } else { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0); testAllocator.addData(node1, 3, null, randomBoolean()); @@ -188,8 +188,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { String replicaAllocId = Strings.randomBase64UUID(); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), primaryAllocId, replicaAllocId); boolean node1HasPrimaryShard = randomBoolean(); - testAllocator.addData(node1, 1, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard); - testAllocator.addData(node2, 1, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard); + testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -206,7 +206,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { final RoutingAllocation allocation; if (randomBoolean()) { allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); - testAllocator.addData(node1, 1, "allocId1", randomBoolean()); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean()); } else { allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0); testAllocator.addData(node1, 3, null, randomBoolean()); @@ -225,7 +225,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { final RoutingAllocation allocation; if (randomBoolean()) { allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); - testAllocator.addData(node1, 1, "allocId1", randomBoolean()); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean()); } else { allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0); testAllocator.addData(node1, 3, null, randomBoolean()); @@ -250,13 +250,36 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id())); } + /** + * Tests that shard with allocation id is chosen if such a shard is available in version-based allocation mode. This happens if a shard + * was already selected in a 5.x cluster as primary for recovery, was initialized (and wrote a new state file) but did not make it to + * STARTED state before the cluster crashed (otherwise list of active allocation ids would be non-empty and allocation id - based + * allocation mode would be chosen). + */ + public void testVersionBasedAllocationPrefersShardWithAllocationId() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0); + testAllocator.addData(node1, 10, null, randomBoolean()); + testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some allocId", randomBoolean()); + testAllocator.addData(node3, 12, null, randomBoolean()); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id())); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("some allocId")); + } + /** * Tests that when restoring from a snapshot and we find a node with a shard copy and allocation * deciders say yes, we allocate to that node. */ public void testRestore() { - RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders()); - testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean()); + boolean shardStateHasAllocationId = randomBoolean(); + String allocationId = shardStateHasAllocationId ? "some allocId" : null; + long legacyVersion = shardStateHasAllocationId ? ShardStateMetaData.NO_VERSION : 1; + boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false; + RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), clusterHasActiveAllocationIds); + testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -268,8 +291,12 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * deciders say throttle, we add it to ignored shards. */ public void testRestoreThrottle() { - RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders()); - testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean()); + boolean shardStateHasAllocationId = randomBoolean(); + String allocationId = shardStateHasAllocationId ? "some allocId" : null; + long legacyVersion = shardStateHasAllocationId ? ShardStateMetaData.NO_VERSION : 1; + boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false; + RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders(), clusterHasActiveAllocationIds); + testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); @@ -280,8 +307,12 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * deciders say no, we still allocate to that node. */ public void testRestoreForcesAllocateIfShardAvailable() { - RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders()); - testAllocator.addData(node1, 1, randomFrom(null, "some allocId"), randomBoolean()); + boolean shardStateHasAllocationId = randomBoolean(); + String allocationId = shardStateHasAllocationId ? "some allocId" : null; + long legacyVersion = shardStateHasAllocationId ? ShardStateMetaData.NO_VERSION : 1; + boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false; + RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders(), clusterHasActiveAllocationIds); + testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -293,7 +324,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * the unassigned list to be allocated later. */ public void testRestoreDoesNotAssignIfNoShardAvailable() { - RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders()); + RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), randomBoolean()); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, false); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); @@ -301,11 +332,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); } - private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders) { - Version version = randomFrom(Version.CURRENT, Version.V_2_0_0); + private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders, boolean hasActiveAllocation) { + Version version = hasActiveAllocation ? Version.CURRENT : Version.V_2_0_0; MetaData metaData = MetaData.builder() .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0) - .putActiveAllocationIds(0, version == Version.CURRENT ? Sets.newHashSet("allocId") : Collections.emptySet())) + .putActiveAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet())) .build(); RoutingTable routingTable = RoutingTable.builder() @@ -323,8 +354,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * deciders say yes, we allocate to that node. */ public void testRecoverOnAnyNode() { - RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders()); - testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean()); + boolean hasActiveAllocation = randomBoolean(); + String allocationId = hasActiveAllocation ? "allocId" : null; + long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1; + RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), hasActiveAllocation); + testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -336,8 +370,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * deciders say throttle, we add it to ignored shards. */ public void testRecoverOnAnyNodeThrottle() { - RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders()); - testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean()); + boolean hasActiveAllocation = randomBoolean(); + String allocationId = hasActiveAllocation ? "allocId" : null; + long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1; + RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(throttleAllocationDeciders(), hasActiveAllocation); + testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); @@ -348,8 +385,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * deciders say no, we still allocate to that node. */ public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() { - RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders()); - testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean()); + boolean hasActiveAllocation = randomBoolean(); + String allocationId = hasActiveAllocation ? "allocId" : null; + long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1; + RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders(), hasActiveAllocation); + testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -361,7 +401,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * BalancedShardAllocator assign the shard */ public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() { - RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders()); + RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), randomBoolean()); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); @@ -369,13 +409,13 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); } - private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders) { - Version version = randomFrom(Version.CURRENT, Version.V_2_0_0); + private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders, boolean hasActiveAllocation) { + Version version = hasActiveAllocation ? Version.CURRENT : Version.V_2_0_0; MetaData metaData = MetaData.builder() .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version) .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) .put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true)) - .numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, version == Version.CURRENT ? Sets.newHashSet("allocId") : Collections.emptySet())) + .numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet())) .build(); RoutingTable routingTable = RoutingTable.builder() diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index bb670837ea0..e70ca9ec6de 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -204,13 +204,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(shardStateMetaData, getShardStateMetadata(shard)); assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); - // test if we still write it even if the shard is not active - ShardRouting inactiveRouting = TestShardRouting.newShardRouting(shard.shardRouting.index(), shard.shardRouting.shardId().id(), shard.shardRouting.currentNodeId(), null, null, true, ShardRoutingState.INITIALIZING); - shard.persistMetadata(inactiveRouting, shard.shardRouting); - shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); - assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, getShardStateMetadata(shard)); - assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); - + // check that we don't write shard state metadata if persist == false ShardRouting updatedRouting = new ShardRouting(shard.shardRouting); TestShardRouting.relocate(updatedRouting, "some node", 42L); shard.updateRoutingEntry(updatedRouting, false); @@ -218,6 +212,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard))); assertEquals("shard state persisted despite of persist=false", shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); + // check that we write shard state metadata if persist == true shard.updateRoutingEntry(routing, false); // move back state in IndexShard routing = new ShardRouting(updatedRouting); shard.updateRoutingEntry(routing, true); @@ -226,33 +221,6 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); } - public void testDeleteShardState() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class); - IndexService test = indicesService.indexService("test"); - IndexShard shard = test.getShardOrNull(0); - try { - shard.deleteShardState(); - fail("shard is active metadata delete must fail"); - } catch (IllegalStateException ex) { - // fine - only delete if non-active - } - - ShardRouting routing = shard.routingEntry(); - ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); - assertEquals(shardStateMetaData, getShardStateMetadata(shard)); - - routing = TestShardRouting.newShardRouting(shard.shardId.getIndex(), shard.shardId.id(), routing.currentNodeId(), null, routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.allocationId()); - shard.updateRoutingEntry(routing, true); - shard.deleteShardState(); - - assertNull("no shard state expected after delete on initializing", load(logger, env.availableShardPaths(shard.shardId))); - - - } - public void testFailShard() throws Exception { createIndex("test"); ensureGreen(); @@ -973,7 +941,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertHitCount(client().prepareSearch().get(), 1); } - public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedException { + public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedException, IOException { createIndex("test"); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class);