From 3a442db9bd4843bb2b12bad9279ecb35f05315cc Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 7 Dec 2015 15:09:45 +0100 Subject: [PATCH] Allocate primary shards based on allocation ids Closes #15281 --- .../shards/IndicesShardStoresResponse.java | 55 ++- .../TransportIndicesShardStoresAction.java | 16 +- .../cluster/metadata/IndexMetaData.java | 2 +- .../metadata/MetaDataIndexStateService.java | 9 - .../cluster/routing/ShardRouting.java | 9 +- .../decider/DiskThresholdDecider.java | 5 +- .../decider/EnableAllocationDecider.java | 7 +- .../gateway/PrimaryShardAllocator.java | 196 +++++++--- .../gateway/ReplicaShardAllocator.java | 11 +- ...ransportNodesListGatewayStartedShards.java | 23 +- .../elasticsearch/index/shard/IndexShard.java | 3 +- .../admin/indices/create/CreateIndexIT.java | 7 + .../shards/IndicesShardStoreRequestIT.java | 3 +- .../IndicesShardStoreResponseTests.java | 21 +- .../cluster/routing/PrimaryAllocationIT.java | 103 +++++ .../gateway/PrimaryShardAllocatorTests.java | 358 ++++++++++++------ .../gateway/QuorumGatewayIT.java | 74 +--- .../gateway/ReplicaShardAllocatorTests.java | 18 +- .../index/shard/IndexShardTests.java | 8 +- .../indices/state/SimpleIndexStateIT.java | 11 +- docs/reference/index-modules.asciidoc | 13 - .../indices/shadow-replicas.asciidoc | 5 +- docs/reference/indices/shard-stores.asciidoc | 8 +- docs/reference/migration/migrate_3_0.asciidoc | 21 + .../cluster/shards_allocation.asciidoc | 5 +- .../test/ESAllocationTestCase.java | 4 +- 26 files changed, 645 insertions(+), 350 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java index 84b39d4c689..380f6e00890 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java @@ -56,13 +56,14 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon public static class StoreStatus implements Streamable, ToXContent, Comparable { private DiscoveryNode node; private long version; + private String allocationId; private Throwable storeException; - private Allocation allocation; + private AllocationStatus allocationStatus; /** * The status of the shard store with respect to the cluster */ - public enum Allocation { + public enum AllocationStatus { /** * Allocated as primary @@ -81,16 +82,16 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon private final byte id; - Allocation(byte id) { + AllocationStatus(byte id) { this.id = id; } - private static Allocation fromId(byte id) { + private static AllocationStatus fromId(byte id) { switch (id) { case 0: return PRIMARY; case 1: return REPLICA; case 2: return UNUSED; - default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]"); + default: throw new IllegalArgumentException("unknown id for allocation status [" + id + "]"); } } @@ -99,11 +100,11 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon case 0: return "primary"; case 1: return "replica"; case 2: return "unused"; - default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]"); + default: throw new IllegalArgumentException("unknown id for allocation status [" + id + "]"); } } - private static Allocation readFrom(StreamInput in) throws IOException { + private static AllocationStatus readFrom(StreamInput in) throws IOException { return fromId(in.readByte()); } @@ -115,10 +116,11 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon private StoreStatus() { } - public StoreStatus(DiscoveryNode node, long version, Allocation allocation, Throwable storeException) { + public StoreStatus(DiscoveryNode node, long version, String allocationId, AllocationStatus allocationStatus, Throwable storeException) { this.node = node; this.version = version; - this.allocation = allocation; + this.allocationId = allocationId; + this.allocationStatus = allocationStatus; this.storeException = storeException; } @@ -130,13 +132,20 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon } /** - * Version of the store, used to select the store that will be - * used as a primary. + * Version of the store */ public long getVersion() { return version; } + /** + * AllocationStatus id of the store, used to select the store that will be + * used as a primary. + */ + public String getAllocationId() { + return allocationId; + } + /** * Exception while trying to open the * shard index or from when the shard failed @@ -146,13 +155,13 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon } /** - * The allocation status of the store. - * {@link Allocation#PRIMARY} indicates a primary shard copy - * {@link Allocation#REPLICA} indicates a replica shard copy - * {@link Allocation#UNUSED} indicates an unused shard copy + * The allocationStatus status of the store. + * {@link AllocationStatus#PRIMARY} indicates a primary shard copy + * {@link AllocationStatus#REPLICA} indicates a replica shard copy + * {@link AllocationStatus#UNUSED} indicates an unused shard copy */ - public Allocation getAllocation() { - return allocation; + public AllocationStatus getAllocationStatus() { + return allocationStatus; } static StoreStatus readStoreStatus(StreamInput in) throws IOException { @@ -165,7 +174,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon public void readFrom(StreamInput in) throws IOException { node = DiscoveryNode.readNode(in); version = in.readLong(); - allocation = Allocation.readFrom(in); + allocationId = in.readOptionalString(); + allocationStatus = AllocationStatus.readFrom(in); if (in.readBoolean()) { storeException = in.readThrowable(); } @@ -175,7 +185,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon public void writeTo(StreamOutput out) throws IOException { node.writeTo(out); out.writeLong(version); - allocation.writeTo(out); + out.writeOptionalString(allocationId); + allocationStatus.writeTo(out); if (storeException != null) { out.writeBoolean(true); out.writeThrowable(storeException); @@ -188,7 +199,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { node.toXContent(builder, params); builder.field(Fields.VERSION, version); - builder.field(Fields.ALLOCATED, allocation.value()); + builder.field(Fields.ALLOCATION_ID, allocationId); + builder.field(Fields.ALLOCATED, allocationStatus.value()); if (storeException != null) { builder.startObject(Fields.STORE_EXCEPTION); ElasticsearchException.toXContent(builder, params, storeException); @@ -206,7 +218,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon } else { int compare = Long.compare(other.version, version); if (compare == 0) { - return Integer.compare(allocation.id, other.allocation.id); + return Integer.compare(allocationStatus.id, other.allocationStatus.id); } return compare; } @@ -379,6 +391,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon static final XContentBuilderString STORES = new XContentBuilderString("stores"); // StoreStatus fields static final XContentBuilderString VERSION = new XContentBuilderString("version"); + static final XContentBuilderString ALLOCATION_ID = new XContentBuilderString("allocation_id"); static final XContentBuilderString STORE_EXCEPTION = new XContentBuilderString("store_exception"); static final XContentBuilderString ALLOCATED = new XContentBuilderString("allocation"); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index 336ebc254b4..d345c0e7d45 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -179,8 +179,8 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc } for (NodeGatewayStartedShards response : fetchResponse.responses) { if (shardExistsInNode(response)) { - IndicesShardStoresResponse.StoreStatus.Allocation allocation = getAllocation(fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), response.getNode()); - storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.version(), allocation, response.storeException())); + IndicesShardStoresResponse.StoreStatus.AllocationStatus allocationStatus = getAllocationStatus(fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), response.getNode()); + storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.version(), response.allocationId(), allocationStatus, response.storeException())); } } CollectionUtil.timSort(storeStatuses); @@ -193,27 +193,27 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc listener.onResponse(new IndicesShardStoresResponse(indicesStoreStatusesBuilder.build(), Collections.unmodifiableList(failureBuilder))); } - private IndicesShardStoresResponse.StoreStatus.Allocation getAllocation(String index, int shardID, DiscoveryNode node) { + private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationStatus(String index, int shardID, DiscoveryNode node) { for (ShardRouting shardRouting : routingNodes.node(node.id())) { ShardId shardId = shardRouting.shardId(); if (shardId.id() == shardID && shardId.getIndex().equals(index)) { if (shardRouting.primary()) { - return IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY; + return IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY; } else if (shardRouting.assignedToNode()) { - return IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA; + return IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA; } else { - return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED; + return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED; } } } - return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED; + return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED; } /** * A shard exists/existed in a node only if shard state file exists in the node */ private boolean shardExistsInNode(final NodeGatewayStartedShards response) { - return response.storeException() != null || response.version() != -1; + return response.storeException() != null || response.version() != -1 || response.allocationId() != null; } @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 669d71477ca..93961bf1fbb 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -621,7 +621,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild public int numberOfReplicas() { return settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1); } - + public Builder creationDate(long creationDate) { settings = settingsBuilder().put(settings).put(SETTING_CREATION_DATE, creationDate).build(); return this; diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 1fa1b702f66..b38e99d4493 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -47,7 +47,6 @@ import org.elasticsearch.rest.RestStatus; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Locale; /** * Service responsible for submitting open/close index requests @@ -92,14 +91,6 @@ public class MetaDataIndexStateService extends AbstractComponent { } if (indexMetaData.getState() != IndexMetaData.State.CLOSE) { - IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); - for (IndexShardRoutingTable shard : indexRoutingTable) { - for (ShardRouting shardRouting : shard) { - if (shardRouting.primary() == true && shardRouting.allocatedPostIndexCreate() == false) { - throw new IndexPrimaryShardNotAllocatedException(new Index(index)); - } - } - } indicesToClose.add(index); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 8dd71e3fba5..5ffaee0f2f9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -267,7 +269,7 @@ public final class ShardRouting implements Streamable, ToXContent { return shardIdentifier; } - public boolean allocatedPostIndexCreate() { + public boolean allocatedPostIndexCreate(IndexMetaData indexMetaData) { if (active()) { return true; } @@ -279,6 +281,11 @@ public final class ShardRouting implements Streamable, ToXContent { return false; } + if (indexMetaData.activeAllocationIds(id()).isEmpty() && indexMetaData.getCreationVersion().onOrAfter(Version.V_3_0_0)) { + // when no shards with this id have ever been active for this index + return false; + } + return true; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index a02c72c5745..0df4959a5c5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -22,13 +22,13 @@ package org.elasticsearch.cluster.routing.allocation.decider; import com.carrotsearch.hppc.ObjectLookupContainer; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -360,7 +360,8 @@ public class DiskThresholdDecider extends AllocationDecider { } // a flag for whether the primary shard has been previously allocated - boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate(); + IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex()); + boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData); // checks for exact byte comparisons if (freeBytes < freeBytesThresholdLow.bytes()) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java index 0bbd4935044..a34cd33f7a1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation.decider; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -82,8 +83,8 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored"); } - Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).getSettings(); - String enableIndexValue = indexSettings.get(INDEX_ROUTING_ALLOCATION_ENABLE); + IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex()); + String enableIndexValue = indexMetaData.getSettings().get(INDEX_ROUTING_ALLOCATION_ENABLE); final Allocation enable; if (enableIndexValue != null) { enable = Allocation.parse(enableIndexValue); @@ -96,7 +97,7 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe case NONE: return allocation.decision(Decision.NO, NAME, "no allocations are allowed"); case NEW_PRIMARIES: - if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate() == false) { + if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData) == false) { return allocation.decision(Decision.YES, NAME, "new primary allocations are allowed"); } else { return allocation.decision(Decision.NO, NAME, "non-new primary allocations are forbidden"); diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index e560b4458b7..79bfbdac8c2 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -20,6 +20,7 @@ package org.elasticsearch.gateway; import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -30,8 +31,10 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; import java.util.*; +import java.util.stream.Collectors; /** * The primary shard allocator allocates primary shard that were not created as @@ -39,6 +42,7 @@ import java.util.*; */ public abstract class PrimaryShardAllocator extends AbstractComponent { + @Deprecated public static final String INDEX_RECOVERY_INITIAL_SHARDS = "index.recovery.initial_shards"; private final String initialShards; @@ -56,13 +60,21 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); while (unassignedIterator.hasNext()) { - ShardRouting shard = unassignedIterator.next(); + final ShardRouting shard = unassignedIterator.next(); - if (needToFindPrimaryCopy(shard) == false) { + if (shard.primary() == false) { continue; } - AsyncShardFetch.FetchResult shardState = fetchData(shard, allocation); + final IndexMetaData indexMetaData = metaData.index(shard.getIndex()); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings, Collections.emptyList()); + + if (shard.allocatedPostIndexCreate(indexMetaData) == false) { + // when we create a fresh index + continue; + } + + final AsyncShardFetch.FetchResult shardState = fetchData(shard, allocation); if (shardState.hasData() == false) { logger.trace("{}: ignoring allocation, still fetching shard started state", shard); allocation.setHasPendingAsyncFetch(); @@ -70,25 +82,50 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { continue; } - IndexMetaData indexMetaData = metaData.index(shard.getIndex()); - Settings indexSettings = Settings.builder().put(settings).put(indexMetaData.getSettings()).build(); + final Set lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id()); + final boolean snapshotRestore = shard.restoreSource() != null; + final boolean recoverOnAnyNode = recoverOnAnyNode(indexSettings); - NodesAndVersions nodesAndVersions = buildNodesAndVersions(shard, recoverOnAnyNode(indexSettings), allocation.getIgnoreNodes(shard.shardId()), shardState); - logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion); + final NodesAndVersions nodesAndVersions; + final boolean enoughAllocationsFound; - if (isEnoughAllocationsFound(shard, indexMetaData, nodesAndVersions) == false) { - // if we are restoring this shard we still can allocate - if (shard.restoreSource() == null) { + if (lastActiveAllocationIds.isEmpty()) { + assert indexSettings.getIndexVersionCreated().before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new"; + // when we load an old index (after upgrading cluster) or restore a snapshot of an old index + // fall back to old version-based allocation mode + // Note that once the shard has been active, lastActiveAllocationIds will be non-empty + nodesAndVersions = buildNodesAndVersions(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), shardState); + if (snapshotRestore || recoverOnAnyNode) { + enoughAllocationsFound = nodesAndVersions.allocationsFound > 0; + } else { + enoughAllocationsFound = isEnoughVersionBasedAllocationsFound(shard, indexMetaData, nodesAndVersions); + } + logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), Version.V_3_0_0, nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion); + } else { + assert lastActiveAllocationIds.isEmpty() == false; + // use allocation ids to select nodes + nodesAndVersions = buildAllocationIdBasedNodes(shard, snapshotRestore || recoverOnAnyNode, + allocation.getIgnoreNodes(shard.shardId()), lastActiveAllocationIds, shardState); + enoughAllocationsFound = nodesAndVersions.allocationsFound > 0; + logger.debug("[{}][{}]: found {} allocations of {} based on allocation ids: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, lastActiveAllocationIds); + } + + if (enoughAllocationsFound == false){ + if (snapshotRestore) { + // let BalancedShardsAllocator take care of allocating this shard + logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource()); + } else if (recoverOnAnyNode) { + // let BalancedShardsAllocator take care of allocating this shard + logger.debug("[{}][{}]: missing local data, recover from any node", shard.index(), shard.id()); + } else { // we can't really allocate, so ignore it and continue unassignedIterator.removeAndIgnore(); logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound); - } else { - logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource()); } continue; } - NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesAndVersions); + final NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesAndVersions.nodes); if (nodesToAllocate.yesNodes.isEmpty() == false) { DiscoveryNode node = nodesToAllocate.yesNodes.get(0); logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node); @@ -109,63 +146,99 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { } /** - * Does the shard need to find a primary copy? + * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching + * lastActiveAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but + * entries with matching allocation id are always at the front of the list. */ - boolean needToFindPrimaryCopy(ShardRouting shard) { - if (shard.primary() == false) { - return false; + protected NodesAndVersions buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, + Set lastActiveAllocationIds, AsyncShardFetch.FetchResult shardState) { + List matchingNodes = new ArrayList<>(); + List nonMatchingNodes = new ArrayList<>(); + long highestVersion = -1; + for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { + DiscoveryNode node = nodeShardState.getNode(); + String allocationId = nodeShardState.allocationId(); + + if (ignoreNodes.contains(node.id())) { + continue; + } + + if (nodeShardState.storeException() == null) { + if (allocationId == null && nodeShardState.version() != -1) { + // old shard with no allocation id, assign dummy value so that it gets added below in case of matchAnyShard + allocationId = "_n/a_"; + } + + logger.trace("[{}] on node [{}] has allocation id [{}] of shard", shard, nodeShardState.getNode(), allocationId); + } else { + logger.trace("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", nodeShardState.storeException(), shard, nodeShardState.getNode(), allocationId); + allocationId = null; + } + + if (allocationId != null) { + if (lastActiveAllocationIds.contains(allocationId)) { + matchingNodes.add(node); + highestVersion = Math.max(highestVersion, nodeShardState.version()); + } else if (matchAnyShard) { + nonMatchingNodes.add(node); + highestVersion = Math.max(highestVersion, nodeShardState.version()); + } + } } - // this is an API allocation, ignore since we know there is no data... - if (shard.allocatedPostIndexCreate() == false) { - return false; - } + List nodes = new ArrayList<>(); + nodes.addAll(matchingNodes); + nodes.addAll(nonMatchingNodes); - return true; + if (logger.isTraceEnabled()) { + logger.trace("{} candidates for allocation: {}", shard, nodes.stream().map(DiscoveryNode::name).collect(Collectors.joining(", "))); + } + return new NodesAndVersions(nodes, nodes.size(), highestVersion); } - private boolean isEnoughAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesAndVersions nodesAndVersions) { + /** + * used by old version-based allocation + */ + private boolean isEnoughVersionBasedAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesAndVersions nodesAndVersions) { // check if the counts meets the minimum set int requiredAllocation = 1; // if we restore from a repository one copy is more then enough - if (shard.restoreSource() == null) { - try { - String initialShards = indexMetaData.getSettings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); - if ("quorum".equals(initialShards)) { - if (indexMetaData.getNumberOfReplicas() > 1) { - requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2) + 1; - } - } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) { - if (indexMetaData.getNumberOfReplicas() > 2) { - requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2); - } - } else if ("one".equals(initialShards)) { - requiredAllocation = 1; - } else if ("full".equals(initialShards) || "all".equals(initialShards)) { - requiredAllocation = indexMetaData.getNumberOfReplicas() + 1; - } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) { - if (indexMetaData.getNumberOfReplicas() > 1) { - requiredAllocation = indexMetaData.getNumberOfReplicas(); - } - } else { - requiredAllocation = Integer.parseInt(initialShards); + try { + String initialShards = indexMetaData.getSettings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards)); + if ("quorum".equals(initialShards)) { + if (indexMetaData.getNumberOfReplicas() > 1) { + requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2) + 1; } - } catch (Exception e) { - logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard); + } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) { + if (indexMetaData.getNumberOfReplicas() > 2) { + requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2); + } + } else if ("one".equals(initialShards)) { + requiredAllocation = 1; + } else if ("full".equals(initialShards) || "all".equals(initialShards)) { + requiredAllocation = indexMetaData.getNumberOfReplicas() + 1; + } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) { + if (indexMetaData.getNumberOfReplicas() > 1) { + requiredAllocation = indexMetaData.getNumberOfReplicas(); + } + } else { + requiredAllocation = Integer.parseInt(initialShards); } + } catch (Exception e) { + logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard); } return nodesAndVersions.allocationsFound >= requiredAllocation; } /** - * Based on the nodes and versions, build the list of yes/no/throttle nodes that the shard applies to. + * Split the list of nodes to lists of yes/no/throttle nodes based on allocation deciders */ - private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, NodesAndVersions nodesAndVersions) { + private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List nodes) { List yesNodes = new ArrayList<>(); List throttledNodes = new ArrayList<>(); List noNodes = new ArrayList<>(); - for (DiscoveryNode discoNode : nodesAndVersions.nodes) { + for (DiscoveryNode discoNode : nodes) { RoutingNode node = allocation.routingNodes().node(discoNode.id()); if (node == null) { continue; @@ -184,9 +257,11 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { } /** - * Builds a list of nodes and version + * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have the highest shard version + * are added to the list. Otherwise, any node that has a shard is added to the list, but entries with highest + * version are always at the front of the list. */ - NodesAndVersions buildNodesAndVersions(ShardRouting shard, boolean recoveryOnAnyNode, Set ignoreNodes, + NodesAndVersions buildNodesAndVersions(ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, AsyncShardFetch.FetchResult shardState) { final Map nodesWithVersion = new HashMap<>(); int numberOfAllocationsFound = 0; @@ -208,20 +283,15 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { version = -1; } - if (recoveryOnAnyNode) { - numberOfAllocationsFound++; - if (version > highestVersion) { - highestVersion = version; - } - // We always put the node without clearing the map - nodesWithVersion.put(node, version); - } else if (version != -1) { + if (version != -1) { numberOfAllocationsFound++; // If we've found a new "best" candidate, clear the // current candidates and add it if (version > highestVersion) { highestVersion = version; - nodesWithVersion.clear(); + if (matchAnyShard == false) { + nodesWithVersion.clear(); + } nodesWithVersion.put(node, version); } else if (version == highestVersion) { // If the candidate is the same, add it to the @@ -258,9 +328,9 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { * Return {@code true} if the index is configured to allow shards to be * recovered on any node */ - private boolean recoverOnAnyNode(Settings idxSettings) { - return IndexMetaData.isOnSharedFilesystem(idxSettings) && - idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false); + private boolean recoverOnAnyNode(IndexSettings indexSettings) { + return indexSettings.isOnSharedFilesystem() + && indexSettings.getSettings().getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false); } protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index c87f4d94755..0b5f2bc58d9 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -24,6 +24,8 @@ import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -56,6 +58,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { */ public boolean processExistingRecoveries(RoutingAllocation allocation) { boolean changed = false; + MetaData metaData = allocation.metaData(); for (RoutingNodes.RoutingNodesIterator nodes = allocation.routingNodes().nodes(); nodes.hasNext(); ) { nodes.next(); for (RoutingNodes.RoutingNodeIterator it = nodes.nodeShards(); it.hasNext(); ) { @@ -69,8 +72,10 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { if (shard.relocatingNodeId() != null) { continue; } + // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... - if (shard.allocatedPostIndexCreate() == false) { + IndexMetaData indexMetaData = metaData.index(shard.getIndex()); + if (shard.allocatedPostIndexCreate(indexMetaData) == false) { continue; } @@ -114,6 +119,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { boolean changed = false; final RoutingNodes routingNodes = allocation.routingNodes(); final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); + MetaData metaData = allocation.metaData(); while (unassignedIterator.hasNext()) { ShardRouting shard = unassignedIterator.next(); if (shard.primary()) { @@ -121,7 +127,8 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { } // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... - if (shard.allocatedPostIndexCreate() == false) { + IndexMetaData indexMetaData = metaData.index(shard.getIndex()); + if (shard.allocatedPostIndexCreate(indexMetaData) == false) { continue; } diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index d91b4bd8cdd..539ac924262 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -139,7 +139,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction Store.tryOpenIndex(shardPath.resolveIndex()); } catch (Exception exception) { logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : ""); - return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, exception); + String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null; + return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId, exception); } } // old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata @@ -149,11 +150,12 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction logger.warn("{} shard state info found but indexUUID didn't match expected [{}] actual [{}]", shardId, indexUUID, shardStateMetaData.indexUUID); } else { logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData); - return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version); + String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null; + return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId); } } logger.trace("{} no local shard info found", shardId); - return new NodeGatewayStartedShards(clusterService.localNode(), -1); + return new NodeGatewayStartedShards(clusterService.localNode(), -1, null); } catch (Exception e) { throw new ElasticsearchException("failed to load started shards", e); } @@ -277,17 +279,19 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction public static class NodeGatewayStartedShards extends BaseNodeResponse { private long version = -1; + private String allocationId = null; private Throwable storeException = null; public NodeGatewayStartedShards() { } - public NodeGatewayStartedShards(DiscoveryNode node, long version) { - this(node, version, null); + public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId) { + this(node, version, allocationId, null); } - public NodeGatewayStartedShards(DiscoveryNode node, long version, Throwable storeException) { + public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId, Throwable storeException) { super(node); this.version = version; + this.allocationId = allocationId; this.storeException = storeException; } @@ -295,6 +299,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction return this.version; } + public String allocationId() { + return this.allocationId; + } + public Throwable storeException() { return this.storeException; } @@ -303,16 +311,17 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction public void readFrom(StreamInput in) throws IOException { super.readFrom(in); version = in.readLong(); + allocationId = in.readOptionalString(); if (in.readBoolean()) { storeException = in.readThrowable(); } - } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(version); + out.writeOptionalString(allocationId); if (storeException != null) { out.writeBoolean(true); out.writeThrowable(storeException); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c0bf9244673..03c0611d172 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1099,7 +1099,8 @@ public class IndexShard extends AbstractIndexShardComponent { // we are the first primary, recover from the gateway // if its post api allocation, the index should exists assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; - final boolean shouldExist = shardRouting.allocatedPostIndexCreate(); + boolean shouldExist = shardRouting.allocatedPostIndexCreate(idxSettings.getIndexMetaData()); + StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); return storeRecovery.recoverFromStore(this, shouldExist, localNode); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index bb154218215..3ce9e99f4dc 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -285,4 +285,11 @@ public class CreateIndexIT extends ESIntegTestCase { assertThat(messages.toString(), containsString("mapper [text] is used by multiple types")); } } + + public void testRestartIndexCreationAfterFullClusterRestart() throws Exception { + client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get(); + client().admin().indices().prepareCreate("test").setSettings(indexSettings()).get(); + internalCluster().fullRestart(); + ensureGreen("test"); + } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java index ffb9e630b70..ebd32ccb482 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java @@ -87,6 +87,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase { for (ObjectCursor> shardStoreStatuses : shardStores.values()) { for (IndicesShardStoresResponse.StoreStatus storeStatus : shardStoreStatuses.value) { assertThat(storeStatus.getVersion(), greaterThan(-1l)); + assertThat(storeStatus.getAllocationId(), notNullValue()); assertThat(storeStatus.getNode(), notNullValue()); assertThat(storeStatus.getStoreException(), nullValue()); } @@ -108,7 +109,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase { assertThat(shardStoresStatuses.size(), equalTo(unassignedShards.size())); for (IntObjectCursor> storesStatus : shardStoresStatuses) { assertThat("must report for one store", storesStatus.value.size(), equalTo(1)); - assertThat("reported store should be primary", storesStatus.value.get(0).getAllocation(), equalTo(IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY)); + assertThat("reported store should be primary", storesStatus.value.get(0).getAllocationStatus(), equalTo(IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY)); } logger.info("--> enable allocation"); enableAllocation(index); diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreResponseTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreResponseTests.java index cf197a27faf..6d1159c82a5 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreResponseTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreResponseTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.shards; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -44,9 +45,9 @@ public class IndicesShardStoreResponseTests extends ESTestCase { DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT); DiscoveryNode node2 = new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT); List storeStatusList = new ArrayList<>(); - storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null)); - storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node2, 2, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, null)); - storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED, new IOException("corrupted"))); + storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null)); + storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node2, 2, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null)); + storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, new IOException("corrupted"))); storeStatuses.put(0, storeStatusList); storeStatuses.put(1, storeStatusList); ImmutableOpenIntMap> storesMap = storeStatuses.build(); @@ -89,8 +90,10 @@ public class IndicesShardStoreResponseTests extends ESTestCase { IndicesShardStoresResponse.StoreStatus storeStatus = storeStatusList.get(i); assertThat(storeInfo.containsKey("version"), equalTo(true)); assertThat(((int) storeInfo.get("version")), equalTo(((int) storeStatus.getVersion()))); + assertThat(storeInfo.containsKey("allocation_id"), equalTo(true)); + assertThat(((String) storeInfo.get("allocation_id")), equalTo((storeStatus.getAllocationId()))); assertThat(storeInfo.containsKey("allocation"), equalTo(true)); - assertThat(((String) storeInfo.get("allocation")), equalTo(storeStatus.getAllocation().value())); + assertThat(((String) storeInfo.get("allocation")), equalTo(storeStatus.getAllocationStatus().value())); assertThat(storeInfo.containsKey(storeStatus.getNode().id()), equalTo(true)); if (storeStatus.getStoreException() != null) { assertThat(storeInfo.containsKey("store_exception"), equalTo(true)); @@ -104,11 +107,11 @@ public class IndicesShardStoreResponseTests extends ESTestCase { public void testStoreStatusOrdering() throws Exception { DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT); List orderedStoreStatuses = new ArrayList<>(); - orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 2, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null)); - orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null)); - orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, null)); - orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED, null)); - orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, new IOException("corrupted"))); + orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 2, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null)); + orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null)); + orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null)); + orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, null)); + orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, new IOException("corrupted"))); List storeStatuses = new ArrayList<>(orderedStoreStatuses); Collections.shuffle(storeStatuses, random()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java new file mode 100644 index 00000000000..dcd35303b75 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -0,0 +1,103 @@ +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.disruption.NetworkDisconnectPartition; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +@ESIntegTestCase.SuppressLocalMode +public class PrimaryAllocationIT extends ESIntegTestCase { + + public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception { + logger.info("--> starting 3 nodes, 1 master, 2 data"); + String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); + internalCluster().startDataOnlyNodesAsync(2).get(); + + assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get()); + ensureGreen(); + logger.info("--> indexing..."); + client().prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get(); + refresh(); + + ClusterState state = client().admin().cluster().prepareState().all().get().getState(); + List shards = state.routingTable().allShards("test"); + assertThat(shards.size(), equalTo(2)); + + final String primaryNode; + final String replicaNode; + if (shards.get(0).primary()) { + primaryNode = state.getRoutingNodes().node(shards.get(0).currentNodeId()).node().name(); + replicaNode = state.getRoutingNodes().node(shards.get(1).currentNodeId()).node().name(); + } else { + primaryNode = state.getRoutingNodes().node(shards.get(1).currentNodeId()).node().name(); + replicaNode = state.getRoutingNodes().node(shards.get(0).currentNodeId()).node().name(); + } + + NetworkDisconnectPartition partition = new NetworkDisconnectPartition( + new HashSet<>(Arrays.asList(master, replicaNode)), Collections.singleton(primaryNode), random()); + internalCluster().setDisruptionScheme(partition); + logger.info("--> partitioning node with primary shard from rest of cluster"); + partition.startDisrupting(); + + ensureStableCluster(2, master); + + logger.info("--> index a document into previous replica shard (that is now primary)"); + client(replicaNode).prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get(); + + logger.info("--> shut down node that has new acknowledged document"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode)); + + ensureStableCluster(1, master); + + partition.stopDisrupting(); + + logger.info("--> waiting for node with old primary shard to rejoin the cluster"); + ensureStableCluster(2, master); + + logger.info("--> check that old primary shard does not get promoted to primary again"); + // kick reroute and wait for all shard states to be fetched + client(master).admin().cluster().prepareReroute().get(); + assertBusy(() -> assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0))); + // kick reroute a second time and check that all shards are unassigned + assertThat(client(master).admin().cluster().prepareReroute().get().getState().getRoutingNodes().unassigned().size(), equalTo(2)); + + logger.info("--> starting node that reuses data folder with the up-to-date primary shard"); + internalCluster().startDataOnlyNode(Settings.EMPTY); + + logger.info("--> check that the up-to-date primary shard gets promoted and that documents are available"); + ensureYellow("test"); + assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l); + } + + public void testNotWaitForQuorumCopies() throws Exception { + logger.info("--> starting 3 nodes"); + internalCluster().startNodesAsync(3).get(); + logger.info("--> creating index with 1 primary and 2 replicas"); + assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + .put("index.number_of_shards", randomIntBetween(1, 3)).put("index.number_of_replicas", 2)).get()); + ensureGreen("test"); + client().prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get(); + logger.info("--> removing 2 nodes from cluster"); + internalCluster().stopRandomDataNode(); + internalCluster().stopRandomDataNode(); + internalCluster().fullRestart(); + logger.info("--> checking that index still gets allocated with only 1 shard copy being available"); + ensureYellow("test"); + assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 1l); + } +} diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index 73cbb51faed..193985a1c68 100644 --- a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -36,7 +36,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESAllocationTestCase; import org.junit.Before; -import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -59,25 +59,29 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { this.testAllocator = new TestAllocator(); } - /** - * Verifies that the canProcess method of primary allocation behaves correctly - * and processes only the applicable shard. - */ - public void testNoProcessReplica() { - ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)); - assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false)); - } - - public void testNoProcessPrimayNotAllcoatedBefore() { - ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, true, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); - assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false)); + public void testNoProcessPrimaryNotAllocatedBefore() { + final RoutingAllocation allocation; + if (randomBoolean()) { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomBoolean(), Version.CURRENT); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), true, Version.V_2_1_0); + } + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); + assertThat(allocation.routingNodes().unassigned().iterator().next().shardId(), equalTo(shardId)); } /** * Tests that when async fetch returns that there is no data, the shard will not be allocated. */ public void testNoAsyncFetchData() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); + final RoutingAllocation allocation; + if (randomBoolean()) { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId"); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0); + } boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -85,11 +89,17 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { } /** - * Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned. + * Tests when the node returns that no data was found for it (-1 for version and null for allocation id), + * it will be moved to ignore unassigned. */ public void testNoAllocationFound() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); - testAllocator.addData(node1, -1); + final RoutingAllocation allocation; + if (randomBoolean()) { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId"); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0); + } + testAllocator.addData(node1, -1, null); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -97,11 +107,43 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { } /** - * Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned. + * Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore unassigned. + */ + public void testNoMatchingAllocationIdFound() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2"); + testAllocator.addData(node1, 1, "id1"); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); + assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); + } + + /** + * Tests that when there is a node to allocate the shard to, and there are no active allocation ids, it will be allocated to it. + * This is the case when we have old shards from pre-3.0 days. + */ + public void testNoActiveAllocationIds() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); + testAllocator.addData(node1, 1, null); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id())); + } + + /** + * Tests when the node returns that no data was found for it, it will be moved to ignore unassigned. */ public void testStoreException() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); - testAllocator.addData(node1, 3, new CorruptIndexException("test", "test")); + final RoutingAllocation allocation; + if (randomBoolean()) { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + testAllocator.addData(node1, 1, "allocId1", new CorruptIndexException("test", "test")); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); + testAllocator.addData(node1, 3, null, new CorruptIndexException("test", "test")); + } boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -112,8 +154,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * Tests that when there is a node to allocate the shard to, it will be allocated to it. */ public void testFoundAllocationAndAllocating() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); - testAllocator.addData(node1, 10); + final RoutingAllocation allocation; + if (randomBoolean()) { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + testAllocator.addData(node1, 1, "allocId1"); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0); + testAllocator.addData(node1, 3, null); + } boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -126,8 +174,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * it will be moved to ignore unassigned until it can be allocated to. */ public void testFoundAllocationButThrottlingDecider() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders()); - testAllocator.addData(node1, 10); + final RoutingAllocation allocation; + if (randomBoolean()) { + allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + testAllocator.addData(node1, 1, "allocId1"); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0); + testAllocator.addData(node1, 3, null); + } boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -139,8 +193,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * force the allocation to it. */ public void testFoundAllocationButNoDecider() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders()); - testAllocator.addData(node1, 10); + final RoutingAllocation allocation; + if (randomBoolean()) { + allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + testAllocator.addData(node1, 1, "allocId1"); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0); + testAllocator.addData(node1, 3, null); + } boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -149,11 +209,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { } /** - * Tests that the highest version node is chosed for allocation. + * Tests that the highest version node is chosen for allocation. */ - public void testAllocateToTheHighestVersion() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders()); - testAllocator.addData(node1, 10).addData(node2, 12); + public void testAllocateToTheHighestVersionOnLegacyIndex() { + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0); + testAllocator.addData(node1, 10, null).addData(node2, 12, null); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -162,35 +222,150 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { } /** - * Tests that when restoring from snapshot, even if we didn't find any node to allocate on, the shard - * will remain in the unassigned list to be allocated later. + * Tests that when restoring from a snapshot and we find a node with a shard copy and allocation + * deciders say yes, we allocate to that node. */ - public void testRestoreIgnoresNoNodesToAllocate() { - MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) - .build(); - RoutingTable routingTable = RoutingTable.builder() - .addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), Version.CURRENT, shardId.getIndex())) - .build(); - ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) - .metaData(metaData) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null, System.nanoTime()); + public void testRestore() { + RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders()); + testAllocator.addData(node1, 1, randomFrom(null, "allocId")); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + } - testAllocator.addData(node1, -1).addData(node2, -1); + /** + * Tests that when restoring from a snapshot and we find a node with a shard copy and allocation + * deciders say throttle, we add it to ignored shards. + */ + public void testRestoreThrottle() { + RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders()); + testAllocator.addData(node1, 1, randomFrom(null, "allocId")); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); + } + + /** + * Tests that when restoring from a snapshot and we find a node with a shard copy but allocation + * deciders say no, we still allocate to that node. + */ + public void testRestoreForcesAllocateIfShardAvailable() { + RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders()); + testAllocator.addData(node1, 1, randomFrom(null, "some allocId")); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + } + + /** + * Tests that when restoring from a snapshot and we don't find a node with a shard copy, the shard will remain in + * the unassigned list to be allocated later. + */ + public void testRestoreDoesNotAssignIfNoShardAvailable() { + RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders()); + testAllocator.addData(node1, -1, null); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); + } + + private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders) { + Version version = randomFrom(Version.CURRENT, Version.V_2_0_0); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0) + .putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet())) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), version, shardId.getIndex())) + .build(); + ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); + return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); + } + + /** + * Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy and allocation + * deciders say yes, we allocate to that node. + */ + public void testRecoverOnAnyNode() { + RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders()); + testAllocator.addData(node1, 1, randomFrom(null, "allocId")); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + } + + /** + * Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy and allocation + * deciders say throttle, we add it to ignored shards. + */ + public void testRecoverOnAnyNodeThrottle() { + RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders()); + testAllocator.addData(node1, 1, randomFrom(null, "allocId")); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); + } + + /** + * Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy but allocation + * deciders say no, we still allocate to that node. + */ + public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() { + RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders()); + testAllocator.addData(node1, 1, randomFrom(null, "allocId")); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + } + + /** + * Tests that when recovering using "recover_on_any_node" and we don't find a node with a shard copy we let + * BalancedShardAllocator assign the shard + */ + public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() { + RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders()); + testAllocator.addData(node1, -1, null); + boolean changed = testAllocator.allocateUnassigned(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); + } + + private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders) { + Version version = randomFrom(Version.CURRENT, Version.V_2_0_0); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version) + .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) + .put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true)) + .numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet())) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), Version.CURRENT, shardId.getIndex())) + .build(); + ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); + return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); } /** * Tests that only when enough copies of the shard exists we are going to allocate it. This test * verifies that with same version (1), and quorum allocation. */ - public void testEnoughCopiesFoundForAllocation() { + public void testEnoughCopiesFoundForAllocationOnLegacyIndex() { MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.V_2_0_0)).numberOfShards(1).numberOfReplicas(2)) .build(); RoutingTable routingTable = RoutingTable.builder() .addAsRecovery(metaData.index(shardId.getIndex())) @@ -207,7 +382,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas - testAllocator.addData(node1, 1); + testAllocator.addData(node1, 1, null); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); @@ -215,7 +390,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas - testAllocator.addData(node2, 1); + testAllocator.addData(node2, 1, null); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); @@ -229,9 +404,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * Tests that only when enough copies of the shard exists we are going to allocate it. This test * verifies that even with different version, we treat different versions as a copy, and count them. */ - public void testEnoughCopiesFoundForAllocationWithDifferentVersion() { + public void testEnoughCopiesFoundForAllocationOnLegacyIndexWithDifferentVersion() { MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.V_2_0_0)).numberOfShards(1).numberOfReplicas(2)) .build(); RoutingTable routingTable = RoutingTable.builder() .addAsRecovery(metaData.index(shardId.getIndex())) @@ -248,7 +423,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas - testAllocator.addData(node1, 1); + testAllocator.addData(node1, 1, null); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); @@ -256,7 +431,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas - testAllocator.addData(node2, 2); + testAllocator.addData(node2, 2, null); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); @@ -266,67 +441,20 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id())); } - public void testAllocationOnAnyNodeWithSharedFs() { - ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, - ShardRoutingState.UNASSIGNED, 0, - new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)); - - Map data = new HashMap<>(); - data.put(node1, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node1, 1)); - data.put(node2, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node2, 5)); - data.put(node3, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node3, -1)); - AsyncShardFetch.FetchResult fetches = - new AsyncShardFetch.FetchResult(shardId, data, new HashSet<>(), new HashSet<>()); - - PrimaryShardAllocator.NodesAndVersions nAndV = testAllocator.buildNodesAndVersions(shard, false, new HashSet(), fetches); - assertThat(nAndV.allocationsFound, equalTo(2)); - assertThat(nAndV.highestVersion, equalTo(5L)); - assertThat(nAndV.nodes, contains(node2)); - - nAndV = testAllocator.buildNodesAndVersions(shard, true, new HashSet(), fetches); - assertThat(nAndV.allocationsFound, equalTo(3)); - assertThat(nAndV.highestVersion, equalTo(5L)); - // All three nodes are potential candidates because shards can be recovered on any node - assertThat(nAndV.nodes, contains(node2, node1, node3)); - } - - public void testAllocationOnAnyNodeShouldPutNodesWithExceptionsLast() { - ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, - ShardRoutingState.UNASSIGNED, 0, - new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)); - - Map data = new HashMap<>(); - data.put(node1, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node1, 1)); - data.put(node2, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node2, 1)); - data.put(node3, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node3, 1, new IOException("I failed to open"))); - HashSet ignoredNodes = new HashSet<>(); - ignoredNodes.add(node2.id()); - AsyncShardFetch.FetchResult fetches = - new AsyncShardFetch.FetchResult(shardId, data, new HashSet<>(), ignoredNodes); - - PrimaryShardAllocator.NodesAndVersions nAndV = testAllocator.buildNodesAndVersions(shard, false, ignoredNodes, fetches); - assertThat(nAndV.allocationsFound, equalTo(1)); - assertThat(nAndV.highestVersion, equalTo(1L)); - assertThat(nAndV.nodes, contains(node1)); - - nAndV = testAllocator.buildNodesAndVersions(shard, true, ignoredNodes, fetches); - assertThat(nAndV.allocationsFound, equalTo(2)); - assertThat(nAndV.highestVersion, equalTo(1L)); - // node3 should be last here - assertThat(nAndV.nodes.size(), equalTo(2)); - assertThat(nAndV.nodes, contains(node1, node3)); - } - - private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders) { + private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, String... activeAllocationIds) { MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) - .build(); - RoutingTable routingTable = RoutingTable.builder() - .addAsRecovery(metaData.index(shardId.getIndex())) - .build(); + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)) + .numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, new HashSet<>(Arrays.asList(activeAllocationIds)))) + .build(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + if (asNew) { + routingTableBuilder.addAsNew(metaData.index(shardId.getIndex())); + } else { + routingTableBuilder.addAsRecovery(metaData.index(shardId.getIndex())); + } ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) .metaData(metaData) - .routingTable(routingTable) + .routingTable(routingTableBuilder.build()) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); } @@ -344,15 +472,15 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { return this; } - public TestAllocator addData(DiscoveryNode node, long version) { - return addData(node, version, null); + public TestAllocator addData(DiscoveryNode node, long version, String allocationId) { + return addData(node, version, allocationId, null); } - public TestAllocator addData(DiscoveryNode node, long version, @Nullable Throwable storeException) { + public TestAllocator addData(DiscoveryNode node, long version, String allocationId, @Nullable Throwable storeException) { if (data == null) { data = new HashMap<>(); } - data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, storeException)); + data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, allocationId, storeException)); return this; } diff --git a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java index edde1720474..69c518eb9c6 100644 --- a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java @@ -20,10 +20,10 @@ package org.elasticsearch.gateway; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; @@ -32,14 +32,10 @@ import org.elasticsearch.test.InternalTestCluster.RestartCallback; import java.util.concurrent.TimeUnit; import static org.elasticsearch.client.Requests.clusterHealthRequest; -import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; /** * @@ -51,72 +47,12 @@ public class QuorumGatewayIT extends ESIntegTestCase { return 2; } - public void testChangeInitialShardsRecovery() throws Exception { - logger.info("--> starting 3 nodes"); - final String[] nodes = internalCluster().startNodesAsync(3).get().toArray(new String[0]); - - createIndex("test"); - ensureGreen(); - NumShards test = getNumShards("test"); - - logger.info("--> indexing..."); - client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get(); - //We don't check for failures in the flush response: if we do we might get the following: - // FlushNotAllowedEngineException[[test][1] recovery is in progress, flush [COMMIT_TRANSLOG] is not allowed] - flush(); - client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get(); - refresh(); - - for (int i = 0; i < 10; i++) { - assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l); - } - - final String nodeToRemove = nodes[between(0,2)]; - logger.info("--> restarting 1 nodes -- kill 2"); - internalCluster().fullRestart(new RestartCallback() { - @Override - public Settings onNodeStopped(String nodeName) throws Exception { - return Settings.EMPTY; - } - - @Override - public boolean doRestart(String nodeName) { - return nodeToRemove.equals(nodeName); - } - }); - if (randomBoolean()) { - Thread.sleep(between(1, 400)); // wait a bit and give is a chance to try to allocate - } - ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForNodes("1")).actionGet(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.RED)); // nothing allocated yet - assertTrue(awaitBusy(() -> { - ClusterStateResponse clusterStateResponse = internalCluster().smartClient().admin().cluster().prepareState().setMasterNodeTimeout("500ms").get(); - return clusterStateResponse.getState() != null && clusterStateResponse.getState().routingTable().index("test") != null; - })); // wait until we get a cluster state - could be null if we quick enough. - final ClusterStateResponse clusterStateResponse = internalCluster().smartClient().admin().cluster().prepareState().setMasterNodeTimeout("500ms").get(); - assertThat(clusterStateResponse.getState(), notNullValue()); - assertThat(clusterStateResponse.getState().routingTable().index("test"), notNullValue()); - assertThat(clusterStateResponse.getState().routingTable().index("test").allPrimaryShardsActive(), is(false)); - logger.info("--> change the recovery.initial_shards setting, and make sure its recovered"); - client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("recovery.initial_shards", 1)).get(); - - logger.info("--> running cluster_health (wait for the shards to startup), primaries only since we only have 1 node"); - clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(test.numPrimaries)).actionGet(); - logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - - for (int i = 0; i < 10; i++) { - assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l); - } - } - public void testQuorumRecovery() throws Exception { logger.info("--> starting 3 nodes"); - internalCluster().startNodesAsync(3).get(); // we are shutting down nodes - make sure we don't have 2 clusters if we test network - setMinimumMasterNodes(2); + internalCluster().startNodesAsync(3, + Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2).build()).get(); + createIndex("test"); ensureGreen(); diff --git a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 9a053b36527..0818999ea7e 100644 --- a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -43,9 +43,11 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.test.ESAllocationTestCase; import org.junit.Before; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -275,13 +277,16 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { } private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason) { + ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10); MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings)).numberOfShards(1).numberOfReplicas(0)) - .build(); + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings)) + .numberOfShards(1).numberOfReplicas(1) + .putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId())))) + .build(); RoutingTable routingTable = RoutingTable.builder() .add(IndexRoutingTable.builder(shardId.getIndex()) .addIndexShard(new IndexShardRoutingTable.Builder(shardId) - .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10)) + .addShard(primaryShard) .addShard(ShardRouting.newUnassigned(shardId.getIndex(), shardId.getId(), null, false, new UnassignedInfo(reason, null))) .build()) ) @@ -294,13 +299,16 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { } private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { + ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10); MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)) + .numberOfShards(1).numberOfReplicas(1) + .putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId())))) .build(); RoutingTable routingTable = RoutingTable.builder() .add(IndexRoutingTable.builder(shardId.getIndex()) .addIndexShard(new IndexShardRoutingTable.Builder(shardId) - .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10)) + .addShard(primaryShard) .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node2.id(), null, null, false, ShardRoutingState.INITIALIZING, 10, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null))) .build()) ) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b5f33afa94c..8dce7f1f954 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -133,7 +133,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { ShardId id = new ShardId("foo", 1); long version = between(1, Integer.MAX_VALUE / 2); boolean primary = randomBoolean(); - AllocationId allocationId = randomAllocationId(); + AllocationId allocationId = randomBoolean() ? null : randomAllocationId(); ShardStateMetaData state1 = new ShardStateMetaData(version, primary, "foo", allocationId); write(state1, env.availableShardPaths(id)); ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(id)); @@ -288,7 +288,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { } public void testShardStateMetaHashCodeEquals() { - ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), randomAllocationId()); + AllocationId allocationId = randomBoolean() ? null : randomAllocationId(); + ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId); assertEquals(meta, new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID, meta.allocationId)); assertEquals(meta.hashCode(), new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID, meta.allocationId).hashCode()); @@ -299,7 +300,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID + "foo", randomAllocationId()))); Set hashCodes = new HashSet<>(); for (int i = 0; i < 30; i++) { // just a sanity check that we impl hashcode - meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), randomAllocationId()); + allocationId = randomBoolean() ? null : randomAllocationId(); + meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId); hashCodes.add(meta.hashCode()); } assertTrue("more than one unique hashcode expected but got: " + hashCodes.size(), hashCodes.size() > 1); diff --git a/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java index a5cfa816455..af9cfeb94c1 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java @@ -97,7 +97,7 @@ public class SimpleIndexStateIT extends ESIntegTestCase { client().prepareIndex("test", "type1", "1").setSource("field1", "value1").get(); } - public void testFastCloseAfterCreateDoesNotClose() { + public void testFastCloseAfterCreateContinuesCreateAfterOpen() { logger.info("--> creating test index that cannot be allocated"); client().admin().indices().prepareCreate("test").setSettings(Settings.settingsBuilder() .put("index.routing.allocation.include.tag", "no_such_node").build()).get(); @@ -106,17 +106,14 @@ public class SimpleIndexStateIT extends ESIntegTestCase { assertThat(health.isTimedOut(), equalTo(false)); assertThat(health.getStatus(), equalTo(ClusterHealthStatus.RED)); - try { - client().admin().indices().prepareClose("test").get(); - fail("Exception should have been thrown"); - } catch(IndexPrimaryShardNotAllocatedException e) { - // expected - } + client().admin().indices().prepareClose("test").get(); logger.info("--> updating test index settings to allow allocation"); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.settingsBuilder() .put("index.routing.allocation.include.tag", "").build()).get(); + client().admin().indices().prepareOpen("test").get(); + logger.info("--> waiting for green status"); ensureGreen(); diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index a2b73a44842..56e9d4ddb91 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -129,19 +129,6 @@ specific index module: experimental[] Disables the purge of <> on the current index. -[[index.recovery.initial_shards]]`index.recovery.initial_shards`:: -+ --- -A primary shard is only recovered only if there are enough nodes available to -allocate sufficient replicas to form a quorum. It can be set to: - - * `quorum` (default) - * `quorum-1` (or `half`) - * `full` - * `full-1`. - * Number values are also supported, e.g. `1`. --- - [float] === Settings in other index modules diff --git a/docs/reference/indices/shadow-replicas.asciidoc b/docs/reference/indices/shadow-replicas.asciidoc index da74a651242..0d589adb64a 100644 --- a/docs/reference/indices/shadow-replicas.asciidoc +++ b/docs/reference/indices/shadow-replicas.asciidoc @@ -104,9 +104,8 @@ settings API: `index.shared_filesystem.recover_on_any_node`:: Boolean value indicating whether the primary shards for the index should be - allowed to recover on any node in the cluster, regardless of the number of - replicas or whether the node has previously had the shard allocated to it - before. Defaults to `false`. + allowed to recover on any node in the cluster. If a node holding a copy of + the shard is found, recovery prefers that node. Defaults to `false`. === Node level settings related to shadow replicas diff --git a/docs/reference/indices/shard-stores.asciidoc b/docs/reference/indices/shard-stores.asciidoc index d4d385bd6dc..19acbc44d3f 100644 --- a/docs/reference/indices/shard-stores.asciidoc +++ b/docs/reference/indices/shard-stores.asciidoc @@ -52,8 +52,9 @@ The shard stores information is grouped by indices and shard ids. } }, "version": 4, <4> + "allocation_id": "2iNySv_OQVePRX-yaRH_lQ", <5> "allocation" : "primary" | "replica" | "unused", <6> - "store_exception": ... <5> + "store_exception": ... <7> }, ... ] @@ -66,7 +67,8 @@ The shard stores information is grouped by indices and shard ids. <3> The node information that hosts a copy of the store, the key is the unique node id. <4> The version of the store copy -<5> The status of the store copy, whether it is used as a +<5> The allocation id of the store copy +<6> The status of the store copy, whether it is used as a primary, replica or not used at all -<6> Any exception encountered while opening the shard index or +<7> Any exception encountered while opening the shard index or from earlier engine failure diff --git a/docs/reference/migration/migrate_3_0.asciidoc b/docs/reference/migration/migrate_3_0.asciidoc index 6588f22a85a..0179e289b99 100644 --- a/docs/reference/migration/migrate_3_0.asciidoc +++ b/docs/reference/migration/migrate_3_0.asciidoc @@ -14,6 +14,7 @@ your application to Elasticsearch 3.0. * <> * <> * <> +* <> [[breaking_30_search_changes]] === Search changes @@ -515,3 +516,23 @@ from `OsStats.Cpu#getPercent`. Only stored fields are retrievable with this option. The fields option won't be able to load non stored fields from _source anymore. +[[breaking_30_allocation]] +=== Primary shard allocation + +Previously, primary shards were only assigned if a quorum of shard copies were found (configurable using +`index.recovery.initial_shards`, now deprecated). In case where a primary had only a single replica, quorum was defined +to be a single shard. This meant that any shard copy of an index with replication factor 1 could become primary, even it +was a stale copy of the data on disk. This is now fixed by using allocation IDs. + +Allocation IDs assign unique identifiers to shard copies. This allows the cluster to differentiate between multiple +copies of the same data and track which shards have been active, so that after a cluster restart, shard copies +containing only the most recent data can become primaries. + +==== `index.shared_filesystem.recover_on_any_node` changes + +The behavior of `index.shared_filesystem.recover_on_any_node = true` has been changed. Previously, in the case where no +shard copies could be found, an arbitrary node was chosen by potentially ignoring allocation deciders. Now, we take +balancing into account but don't assign the shard if the allocation deciders are not satisfied. The behavior has also changed +in the case where shard copies can be found. Previously, a node not holding the shard copy was chosen if none of the nodes +holding shard copies were satisfying the allocation deciders. Now, the shard will be assigned to a node having a shard copy, +even if none of the nodes holding a shard copy satisfy the allocation deciders. diff --git a/docs/reference/modules/cluster/shards_allocation.asciidoc b/docs/reference/modules/cluster/shards_allocation.asciidoc index 1daf131106d..b8073927a0f 100644 --- a/docs/reference/modules/cluster/shards_allocation.asciidoc +++ b/docs/reference/modules/cluster/shards_allocation.asciidoc @@ -22,9 +22,8 @@ Enable or disable allocation for specific kinds of shards: This setting does not affect the recovery of local primary shards when restarting a node. A restarted node that has a copy of an unassigned primary -shard will recover that primary immediately, assuming that the -<> setting is -satisfied. +shard will recover that primary immediately, assuming that its allocation id matches +one of the active allocation ids in the cluster state. -- diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java index e82823ae997..e6a25a3956a 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -230,7 +231,8 @@ public abstract class ESAllocationTestCase extends ESTestCase { boolean changed = false; while (unassignedIterator.hasNext()) { ShardRouting shard = unassignedIterator.next(); - if (shard.primary() || shard.allocatedPostIndexCreate() == false) { + IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex()); + if (shard.primary() || shard.allocatedPostIndexCreate(indexMetaData) == false) { continue; } changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);