Remove legacy primary shard allocation mode based on versions (#23016)

Elasticsearch v5.0.0 uses allocation IDs to safely allocate primary shards whereas prior versions of ES used a version-based mode instead. Elasticsearch v5 still has support for version-based primary shard allocation as it needs to be able to load 2.x shards. ES v6 can drop the legacy support.
This commit is contained in:
Yannick Welsch 2017-02-08 10:00:55 +01:00 committed by GitHub
parent a512ab32fb
commit 9154686623
18 changed files with 167 additions and 623 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.shards;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
@ -55,7 +56,6 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
*/
public static class StoreStatus implements Streamable, ToXContent, Comparable<StoreStatus> {
private DiscoveryNode node;
private long legacyVersion;
private String allocationId;
private Exception storeException;
private AllocationStatus allocationStatus;
@ -116,9 +116,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
private StoreStatus() {
}
public StoreStatus(DiscoveryNode node, long legacyVersion, String allocationId, AllocationStatus allocationStatus, Exception storeException) {
public StoreStatus(DiscoveryNode node, String allocationId, AllocationStatus allocationStatus, Exception storeException) {
this.node = node;
this.legacyVersion = legacyVersion;
this.allocationId = allocationId;
this.allocationStatus = allocationStatus;
this.storeException = storeException;
@ -131,13 +130,6 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
return node;
}
/**
* Version of the store for pre-3.0 shards that have not yet been active
*/
public long getLegacyVersion() {
return legacyVersion;
}
/**
* AllocationStatus id of the store, used to select the store that will be
* used as a primary.
@ -173,7 +165,10 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
@Override
public void readFrom(StreamInput in) throws IOException {
node = new DiscoveryNode(in);
legacyVersion = in.readLong();
if (in.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
// legacy version
in.readLong();
}
allocationId = in.readOptionalString();
allocationStatus = AllocationStatus.readFrom(in);
if (in.readBoolean()) {
@ -184,7 +179,10 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeLong(legacyVersion);
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
// legacy version
out.writeLong(-1L);
}
out.writeOptionalString(allocationId);
allocationStatus.writeTo(out);
if (storeException != null) {
@ -198,9 +196,6 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
node.toXContent(builder, params);
if (legacyVersion != ShardStateMetaData.NO_VERSION) {
builder.field(Fields.LEGACY_VERSION, legacyVersion);
}
if (allocationId != null) {
builder.field(Fields.ALLOCATION_ID, allocationId);
}
@ -225,11 +220,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
} else if (allocationId == null && other.allocationId != null) {
return 1;
} else if (allocationId == null && other.allocationId == null) {
int compare = Long.compare(other.legacyVersion, legacyVersion);
if (compare == 0) {
return Integer.compare(allocationStatus.id, other.allocationStatus.id);
}
return compare;
return Integer.compare(allocationStatus.id, other.allocationStatus.id);
} else {
int compare = Integer.compare(allocationStatus.id, other.allocationStatus.id);
if (compare == 0) {
@ -405,7 +396,6 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
static final String FAILURES = "failures";
static final String STORES = "stores";
// StoreStatus fields
static final String LEGACY_VERSION = "legacy_version";
static final String ALLOCATION_ID = "allocation_id";
static final String STORE_EXCEPTION = "store_exception";
static final String ALLOCATED = "allocation";

View File

@ -180,7 +180,7 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
for (NodeGatewayStartedShards response : fetchResponse.responses) {
if (shardExistsInNode(response)) {
IndicesShardStoresResponse.StoreStatus.AllocationStatus allocationStatus = getAllocationStatus(fetchResponse.shardId.getIndexName(), fetchResponse.shardId.id(), response.getNode());
storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.legacyVersion(), response.allocationId(), allocationStatus, response.storeException()));
storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.allocationId(), allocationStatus, response.storeException()));
}
}
CollectionUtil.timSort(storeStatuses);
@ -213,7 +213,7 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
* A shard exists/existed in a node only if shard state file exists in the node
*/
private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
return response.storeException() != null || response.legacyVersion() != -1 || response.allocationId() != null;
return response.storeException() != null || response.allocationId() != null;
}
@Override

View File

@ -212,12 +212,6 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
}
if (closeIndices.size() > 0 && closedSettings.get(IndexMetaData.SETTING_NUMBER_OF_REPLICAS) != null) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"Can't update [%s] on closed indices %s - can leave index in an unopenable state", IndexMetaData.SETTING_NUMBER_OF_REPLICAS,
closeIndices
));
}
if (!skippedSettigns.isEmpty() && !openIndices.isEmpty()) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"Can't update non dynamic settings [%s] for open indices %s",

View File

@ -139,8 +139,7 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
"allocation set " + inSyncAllocationIds);
}
if (indexMetaData.getCreationVersion().onOrAfter(Version.V_5_0_0_alpha1) &&
indexMetaData.isIndexUsingShadowReplicas() == false && // see #20650
if (indexMetaData.isIndexUsingShadowReplicas() == false && // see #20650
shardRouting.primary() && shardRouting.initializing() && shardRouting.relocating() == false &&
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) == false &&
inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
@ -445,12 +444,6 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
if (indexMetaData.inSyncAllocationIds(shardNumber).isEmpty() == false) {
// we have previous valid copies for this shard. use them for recovery
primaryRecoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE;
} else if (indexMetaData.getCreationVersion().before(Version.V_5_0_0_alpha1) &&
unassignedInfo.getReason() != UnassignedInfo.Reason.INDEX_CREATED // tests can create old indices
) {
// the index is old and didn't maintain inSyncAllocationIds. Fall back to old behavior and require
// finding existing copies
primaryRecoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE;
} else if (indexMetaData.getMergeSourceIndex() != null) {
// this is a new index but the initial shards should merged from another index
primaryRecoverySource = LocalShardsRecoverySource.INSTANCE;

View File

@ -298,7 +298,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT,
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
PrimaryShardAllocator.NODE_INITIAL_SHARDS_SETTING,
ScriptService.SCRIPT_CACHE_SIZE_SETTING,
ScriptService.SCRIPT_CACHE_EXPIRE_SETTING,
ScriptService.SCRIPT_AUTO_RELOAD_ENABLED_SETTING,

View File

@ -139,7 +139,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING,
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING,
IndexModule.INDEX_QUERY_CACHE_TERM_QUERIES_SETTING,
PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,

View File

@ -22,8 +22,6 @@ package org.elasticsearch.gateway;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
@ -37,8 +35,6 @@ import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult.ShardSt
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.gateway.AsyncShardFetch.FetchResult;
@ -52,7 +48,6 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -71,34 +66,8 @@ import java.util.stream.Stream;
*/
public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
private static final Function<String, String> INITIAL_SHARDS_PARSER = (value) -> {
switch (value) {
case "quorum":
case "quorum-1":
case "half":
case "one":
case "full":
case "full-1":
case "all-1":
case "all":
return value;
default:
Integer.parseInt(value); // it can be parsed that's all we care here?
return value;
}
};
public static final Setting<String> NODE_INITIAL_SHARDS_SETTING =
new Setting<>("gateway.initial_shards", (settings) -> settings.get("gateway.local.initial_shards", "quorum"), INITIAL_SHARDS_PARSER,
Property.Dynamic, Property.NodeScope);
@Deprecated
public static final Setting<String> INDEX_RECOVERY_INITIAL_SHARDS_SETTING =
new Setting<>("index.recovery.initial_shards", (settings) -> NODE_INITIAL_SHARDS_SETTING.get(settings) , INITIAL_SHARDS_PARSER,
Property.Dynamic, Property.IndexScope);
public PrimaryShardAllocator(Settings settings) {
super(settings);
logger.debug("using initial_shards [{}]", NODE_INITIAL_SHARDS_SETTING.get(settings));
}
/**
@ -139,34 +108,13 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;
final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData);
final NodeShardsResult nodeShardsResult;
final boolean enoughAllocationsFound;
if (inSyncAllocationIds.isEmpty()) {
assert Version.indexCreated(indexMetaData.getSettings()).before(Version.V_5_0_0_alpha1) :
"trying to allocate a primary with an empty in sync allocation id set, but index is new. index: "
+ indexMetaData.getIndex();
// when we load an old index (after upgrading cluster) or restore a snapshot of an old index
// fall back to old version-based allocation mode
// Note that once the shard has been active, lastActiveAllocationIds will be non-empty
nodeShardsResult = buildVersionBasedNodeShardsResult(unassignedShard, snapshotRestore || recoverOnAnyNode,
allocation.getIgnoreNodes(unassignedShard.shardId()), shardState, logger);
if (snapshotRestore || recoverOnAnyNode) {
enoughAllocationsFound = nodeShardsResult.allocationsFound > 0;
} else {
enoughAllocationsFound = isEnoughVersionBasedAllocationsFound(indexMetaData, nodeShardsResult);
}
logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}", unassignedShard.index(),
unassignedShard.id(), Version.V_5_0_0_alpha1, nodeShardsResult.allocationsFound, unassignedShard);
} else {
assert inSyncAllocationIds.isEmpty() == false;
// use allocation ids to select nodes
nodeShardsResult = buildAllocationIdBasedNodeShardsResult(unassignedShard, snapshotRestore || recoverOnAnyNode,
allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);
enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(),
unassignedShard.id(), nodeShardsResult.orderedAllocationCandidates.size(), unassignedShard, inSyncAllocationIds);
}
assert inSyncAllocationIds.isEmpty() == false;
// use in-sync allocation ids to select nodes
final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore || recoverOnAnyNode,
allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);
final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(),
unassignedShard.id(), nodeShardsResult.orderedAllocationCandidates.size(), unassignedShard, inSyncAllocationIds);
if (enoughAllocationsFound == false) {
if (snapshotRestore) {
@ -293,10 +241,10 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
* inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
* entries with matching allocation id are always at the front of the list.
*/
protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard,
Set<String> ignoreNodes, Set<String> inSyncAllocationIds,
FetchResult<NodeGatewayStartedShards> shardState,
Logger logger) {
protected static NodeShardsResult buildNodeShardsResult(ShardRouting shard, boolean matchAnyShard,
Set<String> ignoreNodes, Set<String> inSyncAllocationIds,
FetchResult<NodeGatewayStartedShards> shardState,
Logger logger) {
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
int numberOfAllocationsFound = 0;
for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
@ -308,13 +256,10 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
}
if (nodeShardState.storeException() == null) {
if (allocationId == null && nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION) {
if (allocationId == null) {
logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
} else if (allocationId != null) {
assert nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION : "Allocation id and legacy version cannot be both present";
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), allocationId);
} else {
logger.trace("[{}] on node [{}] has no allocation id, out-dated shard (shard state version: [{}])", shard, nodeShardState.getNode(), nodeShardState.legacyVersion());
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), allocationId);
}
} else {
final String finalAllocationId = allocationId;
@ -355,37 +300,6 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
}
/**
* used by old version-based allocation
*/
private boolean isEnoughVersionBasedAllocationsFound(IndexMetaData indexMetaData, NodeShardsResult nodeShardsResult) {
// check if the counts meets the minimum set
int requiredAllocation = 1;
// if we restore from a repository one copy is more then enough
String initialShards = INDEX_RECOVERY_INITIAL_SHARDS_SETTING.get(indexMetaData.getSettings(), settings);
if ("quorum".equals(initialShards)) {
if (indexMetaData.getNumberOfReplicas() > 1) {
requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2) + 1;
}
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
if (indexMetaData.getNumberOfReplicas() > 2) {
requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2);
}
} else if ("one".equals(initialShards)) {
requiredAllocation = 1;
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
requiredAllocation = indexMetaData.getNumberOfReplicas() + 1;
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
if (indexMetaData.getNumberOfReplicas() > 1) {
requiredAllocation = indexMetaData.getNumberOfReplicas();
}
} else {
requiredAllocation = Integer.parseInt(initialShards);
}
return nodeShardsResult.allocationsFound >= requiredAllocation;
}
/**
* Split the list of node shard states into groups yes/no/throttle based on allocation deciders
*/
@ -417,85 +331,6 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
Collections.unmodifiableList(noNodeShards));
}
/**
* Builds a list of previously started shards. If matchAnyShard is set to false, only shards with the highest shard version are added to
* the list. Otherwise, any existing shard is added to the list, but entries with highest version are always at the front of the list.
*/
static NodeShardsResult buildVersionBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
FetchResult<NodeGatewayStartedShards> shardState, Logger logger) {
final List<NodeGatewayStartedShards> allocationCandidates = new ArrayList<>();
int numberOfAllocationsFound = 0;
long highestVersion = ShardStateMetaData.NO_VERSION;
for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
long version = nodeShardState.legacyVersion();
DiscoveryNode node = nodeShardState.getNode();
if (ignoreNodes.contains(node.getId())) {
continue;
}
if (nodeShardState.storeException() == null) {
if (version == ShardStateMetaData.NO_VERSION && nodeShardState.allocationId() == null) {
logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
} else if (version != ShardStateMetaData.NO_VERSION) {
assert nodeShardState.allocationId() == null : "Allocation id and legacy version cannot be both present";
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
} else {
// shard was already selected in a 5.x cluster as primary for recovery, was initialized (and wrote a new state file) but
// did not make it to STARTED state before the cluster crashed (otherwise list of active allocation ids would be
// non-empty and allocation id - based allocation mode would be chosen).
// Prefer this shard copy again.
version = Long.MAX_VALUE;
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId());
}
} else {
final long finalVersion = version;
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException());
if (nodeShardState.allocationId() != null) {
version = Long.MAX_VALUE; // shard was already selected in a 5.x cluster as primary, prefer this shard copy again.
} else {
version = 0L; // treat as lowest version so that this shard is the least likely to be selected as primary
}
} else {
// disregard the reported version and assign it as no version (same as shard does not exist)
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException());
version = ShardStateMetaData.NO_VERSION;
}
}
if (version != ShardStateMetaData.NO_VERSION) {
numberOfAllocationsFound++;
// If we've found a new "best" candidate, clear the
// current candidates and add it
if (version > highestVersion) {
highestVersion = version;
if (matchAnyShard == false) {
allocationCandidates.clear();
}
allocationCandidates.add(nodeShardState);
} else if (version == highestVersion) {
// If the candidate is the same, add it to the
// list, but keep the current candidate
allocationCandidates.add(nodeShardState);
}
}
}
// sort array so the node with the highest version is at the beginning
CollectionUtil.timSort(allocationCandidates, Comparator.comparing(NodeGatewayStartedShards::legacyVersion).reversed());
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("[");
for (NodeGatewayStartedShards n : allocationCandidates) {
sb.append("[").append(n.getNode().getName()).append("]").append(" -> ").append(n.legacyVersion()).append(", ");
}
sb.append("]");
logger.trace("{} candidates for allocation: {}", shard, sb.toString());
}
return new NodeShardsResult(Collections.unmodifiableList(allocationCandidates), numberOfAllocationsFound);
}
/**
* Return {@code true} if the index is configured to allow shards to be
* recovered on any node

View File

@ -22,6 +22,7 @@ package org.elasticsearch.gateway;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
@ -154,19 +155,18 @@ public class TransportNodesListGatewayStartedShards extends
exception);
String allocationId = shardStateMetaData.allocationId != null ?
shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
allocationId, shardStateMetaData.primary, exception);
return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetaData.primary,
exception);
}
}
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
String allocationId = shardStateMetaData.allocationId != null ?
shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
allocationId, shardStateMetaData.primary);
return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetaData.primary);
}
logger.trace("{} no local shard info found", shardId);
return new NodeGatewayStartedShards(clusterService.localNode(), ShardStateMetaData.NO_VERSION, null, false);
return new NodeGatewayStartedShards(clusterService.localNode(), null, false);
} catch (Exception e) {
throw new ElasticsearchException("failed to load started shards", e);
}
@ -257,7 +257,6 @@ public class TransportNodesListGatewayStartedShards extends
public static class NodeGatewayStartedShards extends BaseNodeResponse {
private long legacyVersion = ShardStateMetaData.NO_VERSION; // for pre-3.0 shards that have not yet been active
private String allocationId = null;
private boolean primary = false;
private Exception storeException = null;
@ -265,23 +264,17 @@ public class TransportNodesListGatewayStartedShards extends
public NodeGatewayStartedShards() {
}
public NodeGatewayStartedShards(DiscoveryNode node, long legacyVersion, String allocationId, boolean primary) {
this(node, legacyVersion, allocationId, primary, null);
public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary) {
this(node, allocationId, primary, null);
}
public NodeGatewayStartedShards(DiscoveryNode node, long legacyVersion, String allocationId, boolean primary,
Exception storeException) {
public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary, Exception storeException) {
super(node);
this.legacyVersion = legacyVersion;
this.allocationId = allocationId;
this.primary = primary;
this.storeException = storeException;
}
public long legacyVersion() {
return this.legacyVersion;
}
public String allocationId() {
return this.allocationId;
}
@ -297,7 +290,10 @@ public class TransportNodesListGatewayStartedShards extends
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
legacyVersion = in.readLong();
if (in.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
// legacy version
in.readLong();
}
allocationId = in.readOptionalString();
primary = in.readBoolean();
if (in.readBoolean()) {
@ -308,7 +304,10 @@ public class TransportNodesListGatewayStartedShards extends
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(legacyVersion);
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
// legacy version
out.writeLong(-1L);
}
out.writeOptionalString(allocationId);
out.writeBoolean(primary);
if (storeException != null) {
@ -330,9 +329,6 @@ public class TransportNodesListGatewayStartedShards extends
NodeGatewayStartedShards that = (NodeGatewayStartedShards) o;
if (legacyVersion != that.legacyVersion) {
return false;
}
if (primary != that.primary) {
return false;
}
@ -345,8 +341,7 @@ public class TransportNodesListGatewayStartedShards extends
@Override
public int hashCode() {
int result = Long.hashCode(legacyVersion);
result = 31 * result + (allocationId != null ? allocationId.hashCode() : 0);
int result = (allocationId != null ? allocationId.hashCode() : 0);
result = 31 * result + (primary ? 1 : 0);
result = 31 * result + (storeException != null ? storeException.hashCode() : 0);
return result;
@ -357,8 +352,7 @@ public class TransportNodesListGatewayStartedShards extends
StringBuilder buf = new StringBuilder();
buf.append("NodeGatewayStartedShards[")
.append("allocationId=").append(allocationId)
.append(",primary=").append(primary)
.append(",legacyVersion=").append(legacyVersion);
.append(",primary=").append(primary);
if (storeException != null) {
buf.append(",storeException=").append(storeException);
}

View File

@ -35,25 +35,17 @@ public final class ShardStateMetaData {
private static final String SHARD_STATE_FILE_PREFIX = "state-";
private static final String PRIMARY_KEY = "primary";
private static final String VERSION_KEY = "version";
private static final String VERSION_KEY = "version"; // for pre-5.0 shards that have not yet been active
private static final String INDEX_UUID_KEY = "index_uuid";
private static final String ALLOCATION_ID_KEY = "allocation_id";
public static final long NO_VERSION = -1L;
public final long legacyVersion; // for pre-3.0 shards that have not yet been active
public final String indexUUID;
public final boolean primary;
@Nullable
public final AllocationId allocationId; // can be null if we read from legacy format (see fromXContent and MultiDataPathUpgrader)
public ShardStateMetaData(boolean primary, String indexUUID, AllocationId allocationId) {
this(NO_VERSION, primary, indexUUID, allocationId);
}
ShardStateMetaData(long legacyVersion, boolean primary, String indexUUID, AllocationId allocationId) {
assert indexUUID != null;
this.legacyVersion = legacyVersion;
this.primary = primary;
this.indexUUID = indexUUID;
this.allocationId = allocationId;
@ -73,9 +65,6 @@ public final class ShardStateMetaData {
if (primary != that.primary) {
return false;
}
if (legacyVersion != that.legacyVersion) {
return false;
}
if (indexUUID != null ? !indexUUID.equals(that.indexUUID) : that.indexUUID != null) {
return false;
}
@ -88,8 +77,7 @@ public final class ShardStateMetaData {
@Override
public int hashCode() {
int result = Long.hashCode(legacyVersion);
result = 31 * result + (indexUUID != null ? indexUUID.hashCode() : 0);
int result = (indexUUID != null ? indexUUID.hashCode() : 0);
result = 31 * result + (allocationId != null ? allocationId.hashCode() : 0);
result = 31 * result + (primary ? 1 : 0);
return result;
@ -97,7 +85,7 @@ public final class ShardStateMetaData {
@Override
public String toString() {
return "version [" + legacyVersion + "], primary [" + primary + "], allocation [" + allocationId + "]";
return "primary [" + primary + "], allocation [" + allocationId + "]";
}
public static final MetaDataStateFormat<ShardStateMetaData> FORMAT = new MetaDataStateFormat<ShardStateMetaData>(XContentType.SMILE, SHARD_STATE_FILE_PREFIX) {
@ -111,7 +99,6 @@ public final class ShardStateMetaData {
@Override
public void toXContent(XContentBuilder builder, ShardStateMetaData shardStateMetaData) throws IOException {
builder.field(VERSION_KEY, shardStateMetaData.legacyVersion);
builder.field(PRIMARY_KEY, shardStateMetaData.primary);
builder.field(INDEX_UUID_KEY, shardStateMetaData.indexUUID);
if (shardStateMetaData.allocationId != null) {
@ -125,7 +112,6 @@ public final class ShardStateMetaData {
if (token == null) {
return null;
}
long version = NO_VERSION;
Boolean primary = null;
String currentFieldName = null;
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
@ -134,12 +120,12 @@ public final class ShardStateMetaData {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (VERSION_KEY.equals(currentFieldName)) {
version = parser.longValue();
} else if (PRIMARY_KEY.equals(currentFieldName)) {
if (PRIMARY_KEY.equals(currentFieldName)) {
primary = parser.booleanValue();
} else if (INDEX_UUID_KEY.equals(currentFieldName)) {
indexUUID = parser.text();
} else if (VERSION_KEY.equals(currentFieldName)) {
// ES versions before 6.0 wrote this for legacy reasons, just ignore for now and remove in 7.0
} else {
throw new CorruptStateException("unexpected field in shard state [" + currentFieldName + "]");
}
@ -156,7 +142,7 @@ public final class ShardStateMetaData {
if (primary == null) {
throw new CorruptStateException("missing value for [primary] in shard state");
}
return new ShardStateMetaData(version, primary, indexUUID, allocationId);
return new ShardStateMetaData(primary, indexUUID, allocationId);
}
};
}

View File

@ -194,10 +194,8 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase {
for (IndicesShardStoresResponse.StoreStatus status : shardStatus.value) {
if (corruptedShardIDMap.containsKey(shardStatus.key)
&& corruptedShardIDMap.get(shardStatus.key).contains(status.getNode().getName())) {
assertThat(status.getLegacyVersion(), greaterThanOrEqualTo(0L));
assertThat(status.getStoreException(), notNullValue());
} else {
assertThat(status.getLegacyVersion(), greaterThanOrEqualTo(0L));
assertNull(status.getStoreException());
}
}

View File

@ -56,9 +56,9 @@ public class IndicesShardStoreResponseTests extends ESTestCase {
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
List<IndicesShardStoresResponse.StoreStatus> storeStatusList = new ArrayList<>();
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node2, ShardStateMetaData.NO_VERSION, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, ShardStateMetaData.NO_VERSION, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, new IOException("corrupted")));
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node2, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, new IOException("corrupted")));
storeStatuses.put(0, storeStatusList);
storeStatuses.put(1, storeStatusList);
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storesMap = storeStatuses.build();
@ -99,16 +99,7 @@ public class IndicesShardStoreResponseTests extends ESTestCase {
for (int i = 0; i < stores.size(); i++) {
HashMap storeInfo = ((HashMap) stores.get(i));
IndicesShardStoresResponse.StoreStatus storeStatus = storeStatusList.get(i);
boolean eitherLegacyVersionOrAllocationIdSet = false;
if (storeInfo.containsKey("legacy_version")) {
assertThat(((int) storeInfo.get("legacy_version")), equalTo(((int) storeStatus.getLegacyVersion())));
eitherLegacyVersionOrAllocationIdSet = true;
}
if (storeInfo.containsKey("allocation_id")) {
assertThat(((String) storeInfo.get("allocation_id")), equalTo((storeStatus.getAllocationId())));
eitherLegacyVersionOrAllocationIdSet = true;
}
assertThat(eitherLegacyVersionOrAllocationIdSet, equalTo(true));
assertThat(((String) storeInfo.get("allocation_id")), equalTo((storeStatus.getAllocationId())));
assertThat(storeInfo.containsKey("allocation"), equalTo(true));
assertThat(((String) storeInfo.get("allocation")), equalTo(storeStatus.getAllocationStatus().value()));
assertThat(storeInfo.containsKey(storeStatus.getNode().getId()), equalTo(true));
@ -124,15 +115,14 @@ public class IndicesShardStoreResponseTests extends ESTestCase {
public void testStoreStatusOrdering() throws Exception {
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
List<IndicesShardStoresResponse.StoreStatus> orderedStoreStatuses = new ArrayList<>();
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, ShardStateMetaData.NO_VERSION, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, ShardStateMetaData.NO_VERSION, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, ShardStateMetaData.NO_VERSION, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 2, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, ShardStateMetaData.NO_VERSION, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, new IOException("corrupted")));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, new IOException("corrupted")));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, UUIDs.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, new IOException("corrupted")));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, new IOException("corrupted")));
List<IndicesShardStoresResponse.StoreStatus> storeStatuses = new ArrayList<>(orderedStoreStatuses);
Collections.shuffle(storeStatuses, random());

View File

@ -48,7 +48,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardStateMetaData;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.junit.Before;
@ -62,7 +61,6 @@ import java.util.Map;
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED;
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.INDEX_CREATED;
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.INDEX_REOPENED;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -93,7 +91,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
final RoutingAllocation allocation;
// with old version, we can't know if a shard was allocated before or not
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(),
randomFrom(INDEX_CREATED, CLUSTER_RECOVERED, INDEX_REOPENED), Version.CURRENT);
randomFrom(INDEX_CREATED, CLUSTER_RECOVERED, INDEX_REOPENED));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
@ -105,12 +103,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* Tests that when async fetch returns that there is no data, the shard will not be allocated.
*/
public void testNoAsyncFetchData() {
final RoutingAllocation allocation;
if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.CURRENT, "allocId");
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_0);
}
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
"allocId");
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -119,17 +113,13 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
}
/**
* Tests when the node returns that no data was found for it ({@link ShardStateMetaData#NO_VERSION} for version and null for allocation id),
* Tests when the node returns that no data was found for it (null for allocation id),
* it will be moved to ignore unassigned.
*/
public void testNoAllocationFound() {
final RoutingAllocation allocation;
if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.CURRENT, "allocId");
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_0);
}
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean());
final RoutingAllocation allocation =
routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, "allocId");
testAllocator.addData(node1, null, randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -141,8 +131,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore unassigned.
*/
public void testNoMatchingAllocationIdFound() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.CURRENT, "id2");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "id1", randomBoolean());
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, "id2");
testAllocator.addData(node1, "id1", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -150,34 +140,13 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
/**
* Tests that when there is a node to allocate the shard to, and there are no active allocation ids, it will be allocated to it.
* This is the case when we have old shards from pre-3.0 days.
*/
public void testNoActiveAllocationIds() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
testAllocator.addData(node1, 1, null, randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
/**
* Tests when the node returns that no data was found for it, it will be moved to ignore unassigned.
*/
public void testStoreException() {
final RoutingAllocation allocation;
if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(), new CorruptIndexException("test", "test"));
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test"));
}
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
"allocId1");
testAllocator.addData(node1, "allocId1", randomBoolean(), new CorruptIndexException("test", "test"));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -189,26 +158,16 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* Tests that when the node returns a ShardLockObtainFailedException, it will be considered as a valid shard copy
*/
public void testShardLockObtainFailedException() {
final RoutingAllocation allocation;
boolean useAllocationIds = randomBoolean();
if (useAllocationIds) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(),
new ShardLockObtainFailedException(shardId, "test"));
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
}
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
"allocId1");
testAllocator.addData(node1, "allocId1", randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
if (useAllocationIds) {
// check that allocation id is reused
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
}
// check that allocation id is reused
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
@ -217,34 +176,20 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* select the second node as target
*/
public void testShardLockObtainFailedExceptionPreferOtherValidCopies() {
final RoutingAllocation allocation;
boolean useAllocationIds = randomBoolean();
String allocId1 = randomAsciiOfLength(10);
String allocId2 = randomAsciiOfLength(10);
if (useAllocationIds) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), allocId1, allocId2);
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, allocId1, randomBoolean(),
new ShardLockObtainFailedException(shardId, "test"));
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, allocId2, randomBoolean(), null);
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
if (randomBoolean()) {
testAllocator.addData(node2, randomIntBetween(2, 4), null, randomBoolean(), null);
} else {
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some alloc id", randomBoolean(), null);
}
}
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
allocId1, allocId2);;
testAllocator.addData(node1, allocId1, randomBoolean(),
new ShardLockObtainFailedException(shardId, "test"));
testAllocator.addData(node2, allocId2, randomBoolean(), null);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
if (useAllocationIds) {
// check that allocation id is reused
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2));
}
// check that allocation id is reused
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
@ -252,26 +197,16 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* Tests that when there is a node to allocate the shard to, it will be allocated to it.
*/
public void testFoundAllocationAndAllocating() {
final RoutingAllocation allocation;
boolean useAllocationIds = randomBoolean();
if (useAllocationIds) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED),
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED),
Version.V_2_2_0);
testAllocator.addData(node1, 3, null, randomBoolean());
}
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED),
"allocId1");
testAllocator.addData(node1, "allocId1", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
if (useAllocationIds) {
// check that allocation id is reused
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
}
// check that allocation id is reused
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
@ -281,13 +216,13 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* returns a YES decision for at least one of those NO nodes, then we force allocate to one of them
*/
public void testForceAllocatePrimary() {
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
testAllocator.addData(node1, "allocId1", randomBoolean());
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, Arrays.asList(
// since the deciders return a NO decision for allocating a shard (due to the guaranteed NO decision from the second decider),
// the allocator will see if it can force assign the primary, where the decision will be YES
new TestAllocateDecision(randomBoolean() ? Decision.YES : Decision.NO), getNoDeciderThatAllowsForceAllocate()
));
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, Version.CURRENT, "allocId1");
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, "allocId1");
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertTrue(allocation.routingNodes().unassigned().ignored().isEmpty());
@ -301,7 +236,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* returns a NO or THROTTLE decision for a node, then we do not force allocate to that node.
*/
public void testDontAllocateOnNoOrThrottleForceAllocationDecision() {
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
testAllocator.addData(node1, "allocId1", randomBoolean());
boolean forceDecisionNo = randomBoolean();
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, Arrays.asList(
// since both deciders here return a NO decision for allocating a shard,
@ -310,7 +245,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
new TestAllocateDecision(Decision.NO), forceDecisionNo ? getNoDeciderThatDeniesForceAllocate() :
getNoDeciderThatThrottlesForceAllocate()
));
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, Version.CURRENT, "allocId1");
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, "allocId1");
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored();
@ -325,7 +260,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* then we do not force allocate to that node but instead throttle.
*/
public void testDontForceAllocateOnThrottleDecision() {
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
testAllocator.addData(node1, "allocId1", randomBoolean());
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, Arrays.asList(
// since we have a NO decision for allocating a shard (because the second decider returns a NO decision),
// the allocator will see if it can force assign the primary, and in this case,
@ -334,7 +269,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
// force allocating the shard, we still THROTTLE due to the decision from TestAllocateDecision
new TestAllocateDecision(Decision.THROTTLE), getNoDeciderThatAllowsForceAllocate()
));
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, Version.CURRENT, "allocId1");
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, "allocId1");
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored();
@ -350,10 +285,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
String primaryAllocId = UUIDs.randomBase64UUID();
String replicaAllocId = UUIDs.randomBase64UUID();
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(),
randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), randomFrom(Version.V_2_0_0, Version.CURRENT), primaryAllocId, replicaAllocId);
randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), primaryAllocId, replicaAllocId);
boolean node1HasPrimaryShard = randomBoolean();
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard);
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard);
testAllocator.addData(node1, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard);
testAllocator.addData(node2, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -368,15 +303,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* it will be moved to ignore unassigned until it can be allocated to.
*/
public void testFoundAllocationButThrottlingDecider() {
final RoutingAllocation allocation;
if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_2_0);
testAllocator.addData(node1, 3, null, randomBoolean());
}
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), CLUSTER_RECOVERED,
"allocId1");
testAllocator.addData(node1, "allocId1", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -389,15 +318,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* force the allocation to it.
*/
public void testFoundAllocationButNoDecider() {
final RoutingAllocation allocation;
if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_0_0);
testAllocator.addData(node1, 3, null, randomBoolean());
}
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), CLUSTER_RECOVERED,
"allocId1");
testAllocator.addData(node1, "allocId1", randomBoolean());;
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -406,51 +329,13 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
/**
* Tests that the highest version node is chosen for allocation.
*/
public void testAllocateToTheHighestVersionOnLegacyIndex() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_0_0);
testAllocator.addData(node1, 10, null, randomBoolean()).addData(node2, 12, null, randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
/**
* Tests that shard with allocation id is chosen if such a shard is available in version-based allocation mode. This happens if a shard
* was already selected in a 5.x cluster as primary for recovery, was initialized (and wrote a new state file) but did not make it to
* STARTED state before the cluster crashed (otherwise list of active allocation ids would be non-empty and allocation id - based
* allocation mode would be chosen).
*/
public void testVersionBasedAllocationPrefersShardWithAllocationId() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_0_0);
testAllocator.addData(node1, 10, null, randomBoolean());
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some allocId", randomBoolean());
testAllocator.addData(node3, 12, null, randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("some allocId"));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
/**
* Tests that when restoring from a snapshot and we find a node with a shard copy and allocation
* deciders say yes, we allocate to that node.
*/
public void testRestore() {
boolean shardStateHasAllocationId = randomBoolean();
String allocationId = shardStateHasAllocationId ? "some allocId" : null;
long legacyVersion = shardStateHasAllocationId ? ShardStateMetaData.NO_VERSION : 1;
boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false;
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), clusterHasActiveAllocationIds);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), "allocId");
testAllocator.addData(node1, "some allocId", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -463,12 +348,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say throttle, we add it to ignored shards.
*/
public void testRestoreThrottle() {
boolean shardStateHasAllocationId = randomBoolean();
String allocationId = shardStateHasAllocationId ? "some allocId" : null;
long legacyVersion = shardStateHasAllocationId ? ShardStateMetaData.NO_VERSION : 1;
boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false;
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders(), clusterHasActiveAllocationIds);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders(), "allocId");
testAllocator.addData(node1, "some allocId", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
@ -480,12 +361,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say no, we still allocate to that node.
*/
public void testRestoreForcesAllocateIfShardAvailable() {
boolean shardStateHasAllocationId = randomBoolean();
String allocationId = shardStateHasAllocationId ? "some allocId" : null;
long legacyVersion = shardStateHasAllocationId ? ShardStateMetaData.NO_VERSION : 1;
boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false;
RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders(), clusterHasActiveAllocationIds);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders(), "allocId");
testAllocator.addData(node1, "some allocId", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -498,8 +375,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* the unassigned list to be allocated later.
*/
public void testRestoreDoesNotAssignIfNoShardAvailable() {
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, false);
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), "allocId");
testAllocator.addData(node1, null, false);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -507,16 +384,15 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders, boolean hasActiveAllocation) {
Version version = hasActiveAllocation ? Version.CURRENT : Version.V_2_0_0;
private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders, String... allocIds) {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0)
.putInSyncAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet()))
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)
.putInSyncAllocationIds(0, Sets.newHashSet(allocIds)))
.build();
final Snapshot snapshot = new Snapshot("test", new SnapshotId("test", UUIDs.randomBase64UUID()));
RoutingTable routingTable = RoutingTable.builder()
.addAsRestore(metaData.index(shardId.getIndex()), new SnapshotRecoverySource(snapshot, version, shardId.getIndexName()))
.addAsRestore(metaData.index(shardId.getIndex()), new SnapshotRecoverySource(snapshot, Version.CURRENT, shardId.getIndexName()))
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
@ -530,11 +406,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say yes, we allocate to that node.
*/
public void testRecoverOnAnyNode() {
boolean hasActiveAllocation = randomBoolean();
String allocationId = hasActiveAllocation ? "allocId" : null;
long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1;
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), hasActiveAllocation);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), "allocId");
testAllocator.addData(node1, "allocId", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -547,11 +420,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say throttle, we add it to ignored shards.
*/
public void testRecoverOnAnyNodeThrottle() {
boolean hasActiveAllocation = randomBoolean();
String allocationId = hasActiveAllocation ? "allocId" : null;
long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1;
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(throttleAllocationDeciders(), hasActiveAllocation);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(throttleAllocationDeciders(), "allocId");
testAllocator.addData(node1, "allocId", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
@ -563,11 +433,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say no, we still allocate to that node.
*/
public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() {
boolean hasActiveAllocation = randomBoolean();
String allocationId = hasActiveAllocation ? "allocId" : null;
long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1;
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders(), hasActiveAllocation);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders(), "allocId");
testAllocator.addData(node1, "allocId", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -580,8 +447,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* BalancedShardAllocator assign the shard
*/
public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean());
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), "allocId");
testAllocator.addData(node1, null, randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -589,13 +456,12 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders, boolean hasActiveAllocation) {
Version version = hasActiveAllocation ? Version.CURRENT : Version.V_2_0_0;
private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders, String... allocIds) {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version)
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true))
.numberOfShards(1).numberOfReplicas(0).putInSyncAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet()))
.numberOfShards(1).numberOfReplicas(0).putInSyncAllocationIds(0, Sets.newHashSet(allocIds)))
.build();
RoutingTable routingTable = RoutingTable.builder()
@ -608,99 +474,10 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state, null, System.nanoTime(), false);
}
/**
* Tests that only when enough copies of the shard exists we are going to allocate it. This test
* verifies that with same version (1), and quorum allocation.
*/
public void testEnoughCopiesFoundForAllocationOnLegacyIndex() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.V_2_0_0)).numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRecovery(metaData.index(shardId.getIndex()))
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
testAllocator.addData(node1, 1, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
testAllocator.addData(node2, 1, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), anyOf(equalTo(node2.getId()), equalTo(node1.getId())));
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
}
/**
* Tests that only when enough copies of the shard exists we are going to allocate it. This test
* verifies that even with different version, we treat different versions as a copy, and count them.
*/
public void testEnoughCopiesFoundForAllocationOnLegacyIndexWithDifferentVersion() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.V_2_0_0)).numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRecovery(metaData.index(shardId.getIndex()))
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
testAllocator.addData(node1, 1, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
testAllocator.addData(node2, 2, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
}
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders,
UnassignedInfo.Reason reason, Version version,
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, UnassignedInfo.Reason reason,
String... activeAllocationIds) {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version))
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).putInSyncAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds)))
.build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
@ -773,15 +550,15 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
return this;
}
public TestAllocator addData(DiscoveryNode node, long version, String allocationId, boolean primary) {
return addData(node, version, allocationId, primary, null);
public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) {
return addData(node, allocationId, primary, null);
}
public TestAllocator addData(DiscoveryNode node, long version, String allocationId, boolean primary, @Nullable Exception storeException) {
public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) {
if (data == null) {
data = new HashMap<>();
}
data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, allocationId, primary, storeException));
data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, allocationId, primary, storeException));
return this;
}

View File

@ -158,20 +158,19 @@ public class IndexShardTests extends IndexShardTestCase {
public void testWriteShardState() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
ShardId id = new ShardId("foo", "fooUUID", 1);
long version = between(1, Integer.MAX_VALUE / 2);
boolean primary = randomBoolean();
AllocationId allocationId = randomBoolean() ? null : randomAllocationId();
ShardStateMetaData state1 = new ShardStateMetaData(version, primary, "fooUUID", allocationId);
ShardStateMetaData state1 = new ShardStateMetaData(primary, "fooUUID", allocationId);
write(state1, env.availableShardPaths(id));
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(id));
assertEquals(shardStateMetaData, state1);
ShardStateMetaData state2 = new ShardStateMetaData(version, primary, "fooUUID", allocationId);
ShardStateMetaData state2 = new ShardStateMetaData(primary, "fooUUID", allocationId);
write(state2, env.availableShardPaths(id));
shardStateMetaData = load(logger, env.availableShardPaths(id));
assertEquals(shardStateMetaData, state1);
ShardStateMetaData state3 = new ShardStateMetaData(version + 1, primary, "fooUUID", allocationId);
ShardStateMetaData state3 = new ShardStateMetaData(primary, "fooUUID", allocationId);
write(state3, env.availableShardPaths(id));
shardStateMetaData = load(logger, env.availableShardPaths(id));
assertEquals(shardStateMetaData, state3);
@ -236,21 +235,20 @@ public class IndexShardTests extends IndexShardTestCase {
public void testShardStateMetaHashCodeEquals() {
AllocationId allocationId = randomBoolean() ? null : randomAllocationId();
ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(),
ShardStateMetaData meta = new ShardStateMetaData(randomBoolean(),
randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId);
assertEquals(meta, new ShardStateMetaData(meta.legacyVersion, meta.primary, meta.indexUUID, meta.allocationId));
assertEquals(meta, new ShardStateMetaData(meta.primary, meta.indexUUID, meta.allocationId));
assertEquals(meta.hashCode(),
new ShardStateMetaData(meta.legacyVersion, meta.primary, meta.indexUUID, meta.allocationId).hashCode());
new ShardStateMetaData(meta.primary, meta.indexUUID, meta.allocationId).hashCode());
assertFalse(meta.equals(new ShardStateMetaData(meta.legacyVersion, !meta.primary, meta.indexUUID, meta.allocationId)));
assertFalse(meta.equals(new ShardStateMetaData(meta.legacyVersion + 1, meta.primary, meta.indexUUID, meta.allocationId)));
assertFalse(meta.equals(new ShardStateMetaData(meta.legacyVersion, !meta.primary, meta.indexUUID + "foo", meta.allocationId)));
assertFalse(meta.equals(new ShardStateMetaData(meta.legacyVersion, !meta.primary, meta.indexUUID + "foo", randomAllocationId())));
assertFalse(meta.equals(new ShardStateMetaData(!meta.primary, meta.indexUUID, meta.allocationId)));
assertFalse(meta.equals(new ShardStateMetaData(!meta.primary, meta.indexUUID + "foo", meta.allocationId)));
assertFalse(meta.equals(new ShardStateMetaData(!meta.primary, meta.indexUUID + "foo", randomAllocationId())));
Set<Integer> hashCodes = new HashSet<>();
for (int i = 0; i < 30; i++) { // just a sanity check that we impl hashcode
allocationId = randomBoolean() ? null : randomAllocationId();
meta = new ShardStateMetaData(randomLong(), randomBoolean(),
meta = new ShardStateMetaData(randomBoolean(),
randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId);
hashCodes.add(meta.hashCode());
}

View File

@ -43,7 +43,7 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", "0xDEADBEEF", 0);
Path[] paths = env.availableShardPaths(shardId);
Path path = randomFrom(paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(2, true, "0xDEADBEEF", AllocationId.newInitializing()), path);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path);
ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings));
assertEquals(path, shardPath.getDataPath());
assertEquals("0xDEADBEEF", shardPath.getShardId().getIndex().getUUID());
@ -62,8 +62,7 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", indexUUID, 0);
Path[] paths = env.availableShardPaths(shardId);
assumeTrue("This test tests multi data.path but we only got one", paths.length > 1);
int id = randomIntBetween(1, 10);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(id, true, indexUUID, AllocationId.newInitializing()), paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths);
Exception e = expectThrows(IllegalStateException.class, () ->
ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings)));
assertThat(e.getMessage(), containsString("more than one shard state found"));
@ -78,8 +77,7 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", "foobar", 0);
Path[] paths = env.availableShardPaths(shardId);
Path path = randomFrom(paths);
int id = randomIntBetween(1, 10);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(id, true, "0xDEADBEEF", AllocationId.newInitializing()), path);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path);
Exception e = expectThrows(IllegalStateException.class, () ->
ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings)));
assertThat(e.getMessage(), containsString("expected: foobar on shard path"));
@ -131,7 +129,7 @@ public class ShardPathTests extends ESTestCase {
ShardId shardId = new ShardId("foo", indexUUID, 0);
Path[] paths = env.availableShardPaths(shardId);
Path path = randomFrom(paths);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(2, true, indexUUID, AllocationId.newInitializing()), path);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path);
ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId,
IndexSettingsModule.newIndexSettings(shardId.getIndex(), indexSettings, nodeSettings));
boolean found = false;

View File

@ -537,7 +537,6 @@ public class CorruptedFileIT extends ESIntegTestCase {
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING.getKey(), "one")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on purpose

View File

@ -191,20 +191,17 @@ public class UpdateSettingsIT extends ESIntegTestCase {
client().admin().indices().prepareClose("test").execute().actionGet();
try {
client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))
.execute()
.actionGet();
fail("can't change number of replicas on a closed index");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getMessage(), ex.getMessage().startsWith("Can't update [index.number_of_replicas] on closed indices [[test/"));
assertTrue(ex.getMessage(), ex.getMessage().endsWith("]] - can leave index in an unopenable state"));
// expected
}
client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))
.execute()
.actionGet();
indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
assertThat(indexMetaData.getNumberOfReplicas(), equalTo(1));
client()
.admin()
.indices()

View File

@ -52,9 +52,8 @@ The shard stores information is grouped by indices and shard ids.
}
},
"allocation_id": "2iNySv_OQVePRX-yaRH_lQ", <4>
"legacy_version": 42, <5>
"allocation" : "primary" | "replica" | "unused", <6>
"store_exception": ... <7>
"allocation" : "primary" | "replica" | "unused", <5>
"store_exception": ... <6>
},
...
]
@ -67,9 +66,7 @@ The shard stores information is grouped by indices and shard ids.
<3> The node information that hosts a copy of the store, the key
is the unique node id.
<4> The allocation id of the store copy
<5> The version of the store copy (available only for legacy shard copies that have
not yet been active in a current version of Elasticsearch)
<6> The status of the store copy, whether it is used as a
<5> The status of the store copy, whether it is used as a
primary, replica or not used at all
<7> Any exception encountered while opening the shard index or
<6> Any exception encountered while opening the shard index or
from earlier engine failure

View File

@ -75,7 +75,7 @@ public class TestGatewayAllocator extends GatewayAllocator {
routing -> currentNodes.get(routing.currentNodeId()),
routing ->
new NodeGatewayStartedShards(
currentNodes.get(routing.currentNodeId()), -1, routing.allocationId().getId(), routing.primary())));
currentNodes.get(routing.currentNodeId()), routing.allocationId().getId(), routing.primary())));
return new AsyncShardFetch.FetchResult<>(shardId, foundShards, Collections.emptySet(), ignoreNodes);
}