Merge pull request #15281 from ywelsch/feature/alloc-ids-primary
Allocate primary shards based on allocation IDs
This commit is contained in:
commit
8f14b10863
|
@ -56,13 +56,14 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
public static class StoreStatus implements Streamable, ToXContent, Comparable<StoreStatus> {
|
||||
private DiscoveryNode node;
|
||||
private long version;
|
||||
private String allocationId;
|
||||
private Throwable storeException;
|
||||
private Allocation allocation;
|
||||
private AllocationStatus allocationStatus;
|
||||
|
||||
/**
|
||||
* The status of the shard store with respect to the cluster
|
||||
*/
|
||||
public enum Allocation {
|
||||
public enum AllocationStatus {
|
||||
|
||||
/**
|
||||
* Allocated as primary
|
||||
|
@ -81,16 +82,16 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
|
||||
private final byte id;
|
||||
|
||||
Allocation(byte id) {
|
||||
AllocationStatus(byte id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
private static Allocation fromId(byte id) {
|
||||
private static AllocationStatus fromId(byte id) {
|
||||
switch (id) {
|
||||
case 0: return PRIMARY;
|
||||
case 1: return REPLICA;
|
||||
case 2: return UNUSED;
|
||||
default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]");
|
||||
default: throw new IllegalArgumentException("unknown id for allocation status [" + id + "]");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,11 +100,11 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
case 0: return "primary";
|
||||
case 1: return "replica";
|
||||
case 2: return "unused";
|
||||
default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]");
|
||||
default: throw new IllegalArgumentException("unknown id for allocation status [" + id + "]");
|
||||
}
|
||||
}
|
||||
|
||||
private static Allocation readFrom(StreamInput in) throws IOException {
|
||||
private static AllocationStatus readFrom(StreamInput in) throws IOException {
|
||||
return fromId(in.readByte());
|
||||
}
|
||||
|
||||
|
@ -115,10 +116,11 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
private StoreStatus() {
|
||||
}
|
||||
|
||||
public StoreStatus(DiscoveryNode node, long version, Allocation allocation, Throwable storeException) {
|
||||
public StoreStatus(DiscoveryNode node, long version, String allocationId, AllocationStatus allocationStatus, Throwable storeException) {
|
||||
this.node = node;
|
||||
this.version = version;
|
||||
this.allocation = allocation;
|
||||
this.allocationId = allocationId;
|
||||
this.allocationStatus = allocationStatus;
|
||||
this.storeException = storeException;
|
||||
}
|
||||
|
||||
|
@ -130,13 +132,20 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
}
|
||||
|
||||
/**
|
||||
* Version of the store, used to select the store that will be
|
||||
* used as a primary.
|
||||
* Version of the store
|
||||
*/
|
||||
public long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
/**
|
||||
* AllocationStatus id of the store, used to select the store that will be
|
||||
* used as a primary.
|
||||
*/
|
||||
public String getAllocationId() {
|
||||
return allocationId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Exception while trying to open the
|
||||
* shard index or from when the shard failed
|
||||
|
@ -146,13 +155,13 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
}
|
||||
|
||||
/**
|
||||
* The allocation status of the store.
|
||||
* {@link Allocation#PRIMARY} indicates a primary shard copy
|
||||
* {@link Allocation#REPLICA} indicates a replica shard copy
|
||||
* {@link Allocation#UNUSED} indicates an unused shard copy
|
||||
* The allocationStatus status of the store.
|
||||
* {@link AllocationStatus#PRIMARY} indicates a primary shard copy
|
||||
* {@link AllocationStatus#REPLICA} indicates a replica shard copy
|
||||
* {@link AllocationStatus#UNUSED} indicates an unused shard copy
|
||||
*/
|
||||
public Allocation getAllocation() {
|
||||
return allocation;
|
||||
public AllocationStatus getAllocationStatus() {
|
||||
return allocationStatus;
|
||||
}
|
||||
|
||||
static StoreStatus readStoreStatus(StreamInput in) throws IOException {
|
||||
|
@ -165,7 +174,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
node = DiscoveryNode.readNode(in);
|
||||
version = in.readLong();
|
||||
allocation = Allocation.readFrom(in);
|
||||
allocationId = in.readOptionalString();
|
||||
allocationStatus = AllocationStatus.readFrom(in);
|
||||
if (in.readBoolean()) {
|
||||
storeException = in.readThrowable();
|
||||
}
|
||||
|
@ -175,7 +185,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
node.writeTo(out);
|
||||
out.writeLong(version);
|
||||
allocation.writeTo(out);
|
||||
out.writeOptionalString(allocationId);
|
||||
allocationStatus.writeTo(out);
|
||||
if (storeException != null) {
|
||||
out.writeBoolean(true);
|
||||
out.writeThrowable(storeException);
|
||||
|
@ -188,7 +199,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
node.toXContent(builder, params);
|
||||
builder.field(Fields.VERSION, version);
|
||||
builder.field(Fields.ALLOCATED, allocation.value());
|
||||
builder.field(Fields.ALLOCATION_ID, allocationId);
|
||||
builder.field(Fields.ALLOCATED, allocationStatus.value());
|
||||
if (storeException != null) {
|
||||
builder.startObject(Fields.STORE_EXCEPTION);
|
||||
ElasticsearchException.toXContent(builder, params, storeException);
|
||||
|
@ -206,7 +218,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
} else {
|
||||
int compare = Long.compare(other.version, version);
|
||||
if (compare == 0) {
|
||||
return Integer.compare(allocation.id, other.allocation.id);
|
||||
return Integer.compare(allocationStatus.id, other.allocationStatus.id);
|
||||
}
|
||||
return compare;
|
||||
}
|
||||
|
@ -379,6 +391,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
static final XContentBuilderString STORES = new XContentBuilderString("stores");
|
||||
// StoreStatus fields
|
||||
static final XContentBuilderString VERSION = new XContentBuilderString("version");
|
||||
static final XContentBuilderString ALLOCATION_ID = new XContentBuilderString("allocation_id");
|
||||
static final XContentBuilderString STORE_EXCEPTION = new XContentBuilderString("store_exception");
|
||||
static final XContentBuilderString ALLOCATED = new XContentBuilderString("allocation");
|
||||
}
|
||||
|
|
|
@ -179,8 +179,8 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
|
|||
}
|
||||
for (NodeGatewayStartedShards response : fetchResponse.responses) {
|
||||
if (shardExistsInNode(response)) {
|
||||
IndicesShardStoresResponse.StoreStatus.Allocation allocation = getAllocation(fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), response.getNode());
|
||||
storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.version(), allocation, response.storeException()));
|
||||
IndicesShardStoresResponse.StoreStatus.AllocationStatus allocationStatus = getAllocationStatus(fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), response.getNode());
|
||||
storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.version(), response.allocationId(), allocationStatus, response.storeException()));
|
||||
}
|
||||
}
|
||||
CollectionUtil.timSort(storeStatuses);
|
||||
|
@ -193,27 +193,27 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
|
|||
listener.onResponse(new IndicesShardStoresResponse(indicesStoreStatusesBuilder.build(), Collections.unmodifiableList(failureBuilder)));
|
||||
}
|
||||
|
||||
private IndicesShardStoresResponse.StoreStatus.Allocation getAllocation(String index, int shardID, DiscoveryNode node) {
|
||||
private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationStatus(String index, int shardID, DiscoveryNode node) {
|
||||
for (ShardRouting shardRouting : routingNodes.node(node.id())) {
|
||||
ShardId shardId = shardRouting.shardId();
|
||||
if (shardId.id() == shardID && shardId.getIndex().equals(index)) {
|
||||
if (shardRouting.primary()) {
|
||||
return IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY;
|
||||
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY;
|
||||
} else if (shardRouting.assignedToNode()) {
|
||||
return IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA;
|
||||
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA;
|
||||
} else {
|
||||
return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
|
||||
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
|
||||
}
|
||||
}
|
||||
}
|
||||
return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
|
||||
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
|
||||
}
|
||||
|
||||
/**
|
||||
* A shard exists/existed in a node only if shard state file exists in the node
|
||||
*/
|
||||
private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
|
||||
return response.storeException() != null || response.version() != -1;
|
||||
return response.storeException() != null || response.version() != -1 || response.allocationId() != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -621,7 +621,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
|||
public int numberOfReplicas() {
|
||||
return settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1);
|
||||
}
|
||||
|
||||
|
||||
public Builder creationDate(long creationDate) {
|
||||
settings = settingsBuilder().put(settings).put(SETTING_CREATION_DATE, creationDate).build();
|
||||
return this;
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.elasticsearch.rest.RestStatus;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
/**
|
||||
* Service responsible for submitting open/close index requests
|
||||
|
@ -92,14 +91,6 @@ public class MetaDataIndexStateService extends AbstractComponent {
|
|||
}
|
||||
|
||||
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
|
||||
IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index);
|
||||
for (IndexShardRoutingTable shard : indexRoutingTable) {
|
||||
for (ShardRouting shardRouting : shard) {
|
||||
if (shardRouting.primary() == true && shardRouting.allocatedPostIndexCreate() == false) {
|
||||
throw new IndexPrimaryShardNotAllocatedException(new Index(index));
|
||||
}
|
||||
}
|
||||
}
|
||||
indicesToClose.add(index);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -267,7 +269,7 @@ public final class ShardRouting implements Streamable, ToXContent {
|
|||
return shardIdentifier;
|
||||
}
|
||||
|
||||
public boolean allocatedPostIndexCreate() {
|
||||
public boolean allocatedPostIndexCreate(IndexMetaData indexMetaData) {
|
||||
if (active()) {
|
||||
return true;
|
||||
}
|
||||
|
@ -279,6 +281,11 @@ public final class ShardRouting implements Streamable, ToXContent {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (indexMetaData.activeAllocationIds(id()).isEmpty() && indexMetaData.getCreationVersion().onOrAfter(Version.V_3_0_0)) {
|
||||
// when no shards with this id have ever been active for this index
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,13 +22,13 @@ package org.elasticsearch.cluster.routing.allocation.decider;
|
|||
import com.carrotsearch.hppc.ObjectLookupContainer;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterInfo;
|
||||
import org.elasticsearch.cluster.ClusterInfoService;
|
||||
import org.elasticsearch.cluster.DiskUsage;
|
||||
import org.elasticsearch.cluster.EmptyClusterInfoService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
|
@ -360,7 +360,8 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
}
|
||||
|
||||
// a flag for whether the primary shard has been previously allocated
|
||||
boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate();
|
||||
IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex());
|
||||
boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData);
|
||||
|
||||
// checks for exact byte comparisons
|
||||
if (freeBytes < freeBytesThresholdLow.bytes()) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
|
@ -82,8 +83,8 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
|
|||
return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
|
||||
}
|
||||
|
||||
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).getSettings();
|
||||
String enableIndexValue = indexSettings.get(INDEX_ROUTING_ALLOCATION_ENABLE);
|
||||
IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex());
|
||||
String enableIndexValue = indexMetaData.getSettings().get(INDEX_ROUTING_ALLOCATION_ENABLE);
|
||||
final Allocation enable;
|
||||
if (enableIndexValue != null) {
|
||||
enable = Allocation.parse(enableIndexValue);
|
||||
|
@ -96,7 +97,7 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
|
|||
case NONE:
|
||||
return allocation.decision(Decision.NO, NAME, "no allocations are allowed");
|
||||
case NEW_PRIMARIES:
|
||||
if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate() == false) {
|
||||
if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData) == false) {
|
||||
return allocation.decision(Decision.YES, NAME, "new primary allocations are allowed");
|
||||
} else {
|
||||
return allocation.decision(Decision.NO, NAME, "non-new primary allocations are forbidden");
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.gateway;
|
||||
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -30,8 +31,10 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The primary shard allocator allocates primary shard that were not created as
|
||||
|
@ -39,6 +42,7 @@ import java.util.*;
|
|||
*/
|
||||
public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
|
||||
@Deprecated
|
||||
public static final String INDEX_RECOVERY_INITIAL_SHARDS = "index.recovery.initial_shards";
|
||||
|
||||
private final String initialShards;
|
||||
|
@ -56,13 +60,21 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
|
||||
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||
while (unassignedIterator.hasNext()) {
|
||||
ShardRouting shard = unassignedIterator.next();
|
||||
final ShardRouting shard = unassignedIterator.next();
|
||||
|
||||
if (needToFindPrimaryCopy(shard) == false) {
|
||||
if (shard.primary() == false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
|
||||
final IndexMetaData indexMetaData = metaData.index(shard.getIndex());
|
||||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings, Collections.emptyList());
|
||||
|
||||
if (shard.allocatedPostIndexCreate(indexMetaData) == false) {
|
||||
// when we create a fresh index
|
||||
continue;
|
||||
}
|
||||
|
||||
final AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
|
||||
if (shardState.hasData() == false) {
|
||||
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
|
||||
allocation.setHasPendingAsyncFetch();
|
||||
|
@ -70,25 +82,50 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
continue;
|
||||
}
|
||||
|
||||
IndexMetaData indexMetaData = metaData.index(shard.getIndex());
|
||||
Settings indexSettings = Settings.builder().put(settings).put(indexMetaData.getSettings()).build();
|
||||
final Set<String> lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id());
|
||||
final boolean snapshotRestore = shard.restoreSource() != null;
|
||||
final boolean recoverOnAnyNode = recoverOnAnyNode(indexSettings);
|
||||
|
||||
NodesAndVersions nodesAndVersions = buildNodesAndVersions(shard, recoverOnAnyNode(indexSettings), allocation.getIgnoreNodes(shard.shardId()), shardState);
|
||||
logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion);
|
||||
final NodesAndVersions nodesAndVersions;
|
||||
final boolean enoughAllocationsFound;
|
||||
|
||||
if (isEnoughAllocationsFound(shard, indexMetaData, nodesAndVersions) == false) {
|
||||
// if we are restoring this shard we still can allocate
|
||||
if (shard.restoreSource() == null) {
|
||||
if (lastActiveAllocationIds.isEmpty()) {
|
||||
assert indexSettings.getIndexVersionCreated().before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new";
|
||||
// when we load an old index (after upgrading cluster) or restore a snapshot of an old index
|
||||
// fall back to old version-based allocation mode
|
||||
// Note that once the shard has been active, lastActiveAllocationIds will be non-empty
|
||||
nodesAndVersions = buildNodesAndVersions(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), shardState);
|
||||
if (snapshotRestore || recoverOnAnyNode) {
|
||||
enoughAllocationsFound = nodesAndVersions.allocationsFound > 0;
|
||||
} else {
|
||||
enoughAllocationsFound = isEnoughVersionBasedAllocationsFound(shard, indexMetaData, nodesAndVersions);
|
||||
}
|
||||
logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), Version.V_3_0_0, nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion);
|
||||
} else {
|
||||
assert lastActiveAllocationIds.isEmpty() == false;
|
||||
// use allocation ids to select nodes
|
||||
nodesAndVersions = buildAllocationIdBasedNodes(shard, snapshotRestore || recoverOnAnyNode,
|
||||
allocation.getIgnoreNodes(shard.shardId()), lastActiveAllocationIds, shardState);
|
||||
enoughAllocationsFound = nodesAndVersions.allocationsFound > 0;
|
||||
logger.debug("[{}][{}]: found {} allocations of {} based on allocation ids: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, lastActiveAllocationIds);
|
||||
}
|
||||
|
||||
if (enoughAllocationsFound == false){
|
||||
if (snapshotRestore) {
|
||||
// let BalancedShardsAllocator take care of allocating this shard
|
||||
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
|
||||
} else if (recoverOnAnyNode) {
|
||||
// let BalancedShardsAllocator take care of allocating this shard
|
||||
logger.debug("[{}][{}]: missing local data, recover from any node", shard.index(), shard.id());
|
||||
} else {
|
||||
// we can't really allocate, so ignore it and continue
|
||||
unassignedIterator.removeAndIgnore();
|
||||
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound);
|
||||
} else {
|
||||
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesAndVersions);
|
||||
final NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesAndVersions.nodes);
|
||||
if (nodesToAllocate.yesNodes.isEmpty() == false) {
|
||||
DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
|
||||
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
|
||||
|
@ -109,63 +146,99 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
}
|
||||
|
||||
/**
|
||||
* Does the shard need to find a primary copy?
|
||||
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
|
||||
* lastActiveAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
|
||||
* entries with matching allocation id are always at the front of the list.
|
||||
*/
|
||||
boolean needToFindPrimaryCopy(ShardRouting shard) {
|
||||
if (shard.primary() == false) {
|
||||
return false;
|
||||
protected NodesAndVersions buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
|
||||
Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
|
||||
List<DiscoveryNode> matchingNodes = new ArrayList<>();
|
||||
List<DiscoveryNode> nonMatchingNodes = new ArrayList<>();
|
||||
long highestVersion = -1;
|
||||
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
|
||||
DiscoveryNode node = nodeShardState.getNode();
|
||||
String allocationId = nodeShardState.allocationId();
|
||||
|
||||
if (ignoreNodes.contains(node.id())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nodeShardState.storeException() == null) {
|
||||
if (allocationId == null && nodeShardState.version() != -1) {
|
||||
// old shard with no allocation id, assign dummy value so that it gets added below in case of matchAnyShard
|
||||
allocationId = "_n/a_";
|
||||
}
|
||||
|
||||
logger.trace("[{}] on node [{}] has allocation id [{}] of shard", shard, nodeShardState.getNode(), allocationId);
|
||||
} else {
|
||||
logger.trace("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", nodeShardState.storeException(), shard, nodeShardState.getNode(), allocationId);
|
||||
allocationId = null;
|
||||
}
|
||||
|
||||
if (allocationId != null) {
|
||||
if (lastActiveAllocationIds.contains(allocationId)) {
|
||||
matchingNodes.add(node);
|
||||
highestVersion = Math.max(highestVersion, nodeShardState.version());
|
||||
} else if (matchAnyShard) {
|
||||
nonMatchingNodes.add(node);
|
||||
highestVersion = Math.max(highestVersion, nodeShardState.version());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// this is an API allocation, ignore since we know there is no data...
|
||||
if (shard.allocatedPostIndexCreate() == false) {
|
||||
return false;
|
||||
}
|
||||
List<DiscoveryNode> nodes = new ArrayList<>();
|
||||
nodes.addAll(matchingNodes);
|
||||
nodes.addAll(nonMatchingNodes);
|
||||
|
||||
return true;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("{} candidates for allocation: {}", shard, nodes.stream().map(DiscoveryNode::name).collect(Collectors.joining(", ")));
|
||||
}
|
||||
return new NodesAndVersions(nodes, nodes.size(), highestVersion);
|
||||
}
|
||||
|
||||
private boolean isEnoughAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesAndVersions nodesAndVersions) {
|
||||
/**
|
||||
* used by old version-based allocation
|
||||
*/
|
||||
private boolean isEnoughVersionBasedAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesAndVersions nodesAndVersions) {
|
||||
// check if the counts meets the minimum set
|
||||
int requiredAllocation = 1;
|
||||
// if we restore from a repository one copy is more then enough
|
||||
if (shard.restoreSource() == null) {
|
||||
try {
|
||||
String initialShards = indexMetaData.getSettings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
|
||||
if ("quorum".equals(initialShards)) {
|
||||
if (indexMetaData.getNumberOfReplicas() > 1) {
|
||||
requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2) + 1;
|
||||
}
|
||||
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
|
||||
if (indexMetaData.getNumberOfReplicas() > 2) {
|
||||
requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2);
|
||||
}
|
||||
} else if ("one".equals(initialShards)) {
|
||||
requiredAllocation = 1;
|
||||
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
|
||||
requiredAllocation = indexMetaData.getNumberOfReplicas() + 1;
|
||||
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
|
||||
if (indexMetaData.getNumberOfReplicas() > 1) {
|
||||
requiredAllocation = indexMetaData.getNumberOfReplicas();
|
||||
}
|
||||
} else {
|
||||
requiredAllocation = Integer.parseInt(initialShards);
|
||||
try {
|
||||
String initialShards = indexMetaData.getSettings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
|
||||
if ("quorum".equals(initialShards)) {
|
||||
if (indexMetaData.getNumberOfReplicas() > 1) {
|
||||
requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2) + 1;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
|
||||
} else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
|
||||
if (indexMetaData.getNumberOfReplicas() > 2) {
|
||||
requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2);
|
||||
}
|
||||
} else if ("one".equals(initialShards)) {
|
||||
requiredAllocation = 1;
|
||||
} else if ("full".equals(initialShards) || "all".equals(initialShards)) {
|
||||
requiredAllocation = indexMetaData.getNumberOfReplicas() + 1;
|
||||
} else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
|
||||
if (indexMetaData.getNumberOfReplicas() > 1) {
|
||||
requiredAllocation = indexMetaData.getNumberOfReplicas();
|
||||
}
|
||||
} else {
|
||||
requiredAllocation = Integer.parseInt(initialShards);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
|
||||
}
|
||||
|
||||
return nodesAndVersions.allocationsFound >= requiredAllocation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Based on the nodes and versions, build the list of yes/no/throttle nodes that the shard applies to.
|
||||
* Split the list of nodes to lists of yes/no/throttle nodes based on allocation deciders
|
||||
*/
|
||||
private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, NodesAndVersions nodesAndVersions) {
|
||||
private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List<DiscoveryNode> nodes) {
|
||||
List<DiscoveryNode> yesNodes = new ArrayList<>();
|
||||
List<DiscoveryNode> throttledNodes = new ArrayList<>();
|
||||
List<DiscoveryNode> noNodes = new ArrayList<>();
|
||||
for (DiscoveryNode discoNode : nodesAndVersions.nodes) {
|
||||
for (DiscoveryNode discoNode : nodes) {
|
||||
RoutingNode node = allocation.routingNodes().node(discoNode.id());
|
||||
if (node == null) {
|
||||
continue;
|
||||
|
@ -184,9 +257,11 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
}
|
||||
|
||||
/**
|
||||
* Builds a list of nodes and version
|
||||
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have the highest shard version
|
||||
* are added to the list. Otherwise, any node that has a shard is added to the list, but entries with highest
|
||||
* version are always at the front of the list.
|
||||
*/
|
||||
NodesAndVersions buildNodesAndVersions(ShardRouting shard, boolean recoveryOnAnyNode, Set<String> ignoreNodes,
|
||||
NodesAndVersions buildNodesAndVersions(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
|
||||
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
|
||||
final Map<DiscoveryNode, Long> nodesWithVersion = new HashMap<>();
|
||||
int numberOfAllocationsFound = 0;
|
||||
|
@ -208,20 +283,15 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
version = -1;
|
||||
}
|
||||
|
||||
if (recoveryOnAnyNode) {
|
||||
numberOfAllocationsFound++;
|
||||
if (version > highestVersion) {
|
||||
highestVersion = version;
|
||||
}
|
||||
// We always put the node without clearing the map
|
||||
nodesWithVersion.put(node, version);
|
||||
} else if (version != -1) {
|
||||
if (version != -1) {
|
||||
numberOfAllocationsFound++;
|
||||
// If we've found a new "best" candidate, clear the
|
||||
// current candidates and add it
|
||||
if (version > highestVersion) {
|
||||
highestVersion = version;
|
||||
nodesWithVersion.clear();
|
||||
if (matchAnyShard == false) {
|
||||
nodesWithVersion.clear();
|
||||
}
|
||||
nodesWithVersion.put(node, version);
|
||||
} else if (version == highestVersion) {
|
||||
// If the candidate is the same, add it to the
|
||||
|
@ -258,9 +328,9 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
* Return {@code true} if the index is configured to allow shards to be
|
||||
* recovered on any node
|
||||
*/
|
||||
private boolean recoverOnAnyNode(Settings idxSettings) {
|
||||
return IndexMetaData.isOnSharedFilesystem(idxSettings) &&
|
||||
idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false);
|
||||
private boolean recoverOnAnyNode(IndexSettings indexSettings) {
|
||||
return indexSettings.isOnSharedFilesystem()
|
||||
&& indexSettings.getSettings().getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false);
|
||||
}
|
||||
|
||||
protected abstract AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
|
||||
|
|
|
@ -24,6 +24,8 @@ import com.carrotsearch.hppc.ObjectLongMap;
|
|||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
|
@ -56,6 +58,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
*/
|
||||
public boolean processExistingRecoveries(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
MetaData metaData = allocation.metaData();
|
||||
for (RoutingNodes.RoutingNodesIterator nodes = allocation.routingNodes().nodes(); nodes.hasNext(); ) {
|
||||
nodes.next();
|
||||
for (RoutingNodes.RoutingNodeIterator it = nodes.nodeShards(); it.hasNext(); ) {
|
||||
|
@ -69,8 +72,10 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
if (shard.relocatingNodeId() != null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
|
||||
if (shard.allocatedPostIndexCreate() == false) {
|
||||
IndexMetaData indexMetaData = metaData.index(shard.getIndex());
|
||||
if (shard.allocatedPostIndexCreate(indexMetaData) == false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -114,6 +119,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
boolean changed = false;
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||
MetaData metaData = allocation.metaData();
|
||||
while (unassignedIterator.hasNext()) {
|
||||
ShardRouting shard = unassignedIterator.next();
|
||||
if (shard.primary()) {
|
||||
|
@ -121,7 +127,8 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
}
|
||||
|
||||
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
|
||||
if (shard.allocatedPostIndexCreate() == false) {
|
||||
IndexMetaData indexMetaData = metaData.index(shard.getIndex());
|
||||
if (shard.allocatedPostIndexCreate(indexMetaData) == false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -139,7 +139,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
|||
Store.tryOpenIndex(shardPath.resolveIndex());
|
||||
} catch (Exception exception) {
|
||||
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
|
||||
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, exception);
|
||||
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
|
||||
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId, exception);
|
||||
}
|
||||
}
|
||||
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
|
||||
|
@ -149,11 +150,12 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
|||
logger.warn("{} shard state info found but indexUUID didn't match expected [{}] actual [{}]", shardId, indexUUID, shardStateMetaData.indexUUID);
|
||||
} else {
|
||||
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
|
||||
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version);
|
||||
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
|
||||
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId);
|
||||
}
|
||||
}
|
||||
logger.trace("{} no local shard info found", shardId);
|
||||
return new NodeGatewayStartedShards(clusterService.localNode(), -1);
|
||||
return new NodeGatewayStartedShards(clusterService.localNode(), -1, null);
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("failed to load started shards", e);
|
||||
}
|
||||
|
@ -277,17 +279,19 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
|||
public static class NodeGatewayStartedShards extends BaseNodeResponse {
|
||||
|
||||
private long version = -1;
|
||||
private String allocationId = null;
|
||||
private Throwable storeException = null;
|
||||
|
||||
public NodeGatewayStartedShards() {
|
||||
}
|
||||
public NodeGatewayStartedShards(DiscoveryNode node, long version) {
|
||||
this(node, version, null);
|
||||
public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId) {
|
||||
this(node, version, allocationId, null);
|
||||
}
|
||||
|
||||
public NodeGatewayStartedShards(DiscoveryNode node, long version, Throwable storeException) {
|
||||
public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId, Throwable storeException) {
|
||||
super(node);
|
||||
this.version = version;
|
||||
this.allocationId = allocationId;
|
||||
this.storeException = storeException;
|
||||
}
|
||||
|
||||
|
@ -295,6 +299,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
|||
return this.version;
|
||||
}
|
||||
|
||||
public String allocationId() {
|
||||
return this.allocationId;
|
||||
}
|
||||
|
||||
public Throwable storeException() {
|
||||
return this.storeException;
|
||||
}
|
||||
|
@ -303,16 +311,17 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
version = in.readLong();
|
||||
allocationId = in.readOptionalString();
|
||||
if (in.readBoolean()) {
|
||||
storeException = in.readThrowable();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeLong(version);
|
||||
out.writeOptionalString(allocationId);
|
||||
if (storeException != null) {
|
||||
out.writeBoolean(true);
|
||||
out.writeThrowable(storeException);
|
||||
|
|
|
@ -1119,7 +1119,8 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
// we are the first primary, recover from the gateway
|
||||
// if its post api allocation, the index should exists
|
||||
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
|
||||
final boolean shouldExist = shardRouting.allocatedPostIndexCreate();
|
||||
boolean shouldExist = shardRouting.allocatedPostIndexCreate(idxSettings.getIndexMetaData());
|
||||
|
||||
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
|
||||
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
|
||||
}
|
||||
|
|
|
@ -285,4 +285,11 @@ public class CreateIndexIT extends ESIntegTestCase {
|
|||
assertThat(messages.toString(), containsString("mapper [text] is used by multiple types"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testRestartIndexCreationAfterFullClusterRestart() throws Exception {
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get();
|
||||
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).get();
|
||||
internalCluster().fullRestart();
|
||||
ensureGreen("test");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,6 +87,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase {
|
|||
for (ObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : shardStores.values()) {
|
||||
for (IndicesShardStoresResponse.StoreStatus storeStatus : shardStoreStatuses.value) {
|
||||
assertThat(storeStatus.getVersion(), greaterThan(-1l));
|
||||
assertThat(storeStatus.getAllocationId(), notNullValue());
|
||||
assertThat(storeStatus.getNode(), notNullValue());
|
||||
assertThat(storeStatus.getStoreException(), nullValue());
|
||||
}
|
||||
|
@ -108,7 +109,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase {
|
|||
assertThat(shardStoresStatuses.size(), equalTo(unassignedShards.size()));
|
||||
for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> storesStatus : shardStoresStatuses) {
|
||||
assertThat("must report for one store", storesStatus.value.size(), equalTo(1));
|
||||
assertThat("reported store should be primary", storesStatus.value.get(0).getAllocation(), equalTo(IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY));
|
||||
assertThat("reported store should be primary", storesStatus.value.get(0).getAllocationStatus(), equalTo(IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY));
|
||||
}
|
||||
logger.info("--> enable allocation");
|
||||
enableAllocation(index);
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.shards;
|
|||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
|
@ -44,9 +45,9 @@ public class IndicesShardStoreResponseTests extends ESTestCase {
|
|||
DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
DiscoveryNode node2 = new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
List<IndicesShardStoresResponse.StoreStatus> storeStatusList = new ArrayList<>();
|
||||
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null));
|
||||
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node2, 2, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, null));
|
||||
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED, new IOException("corrupted")));
|
||||
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
|
||||
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node2, 2, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
|
||||
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, new IOException("corrupted")));
|
||||
storeStatuses.put(0, storeStatusList);
|
||||
storeStatuses.put(1, storeStatusList);
|
||||
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storesMap = storeStatuses.build();
|
||||
|
@ -89,8 +90,10 @@ public class IndicesShardStoreResponseTests extends ESTestCase {
|
|||
IndicesShardStoresResponse.StoreStatus storeStatus = storeStatusList.get(i);
|
||||
assertThat(storeInfo.containsKey("version"), equalTo(true));
|
||||
assertThat(((int) storeInfo.get("version")), equalTo(((int) storeStatus.getVersion())));
|
||||
assertThat(storeInfo.containsKey("allocation_id"), equalTo(true));
|
||||
assertThat(((String) storeInfo.get("allocation_id")), equalTo((storeStatus.getAllocationId())));
|
||||
assertThat(storeInfo.containsKey("allocation"), equalTo(true));
|
||||
assertThat(((String) storeInfo.get("allocation")), equalTo(storeStatus.getAllocation().value()));
|
||||
assertThat(((String) storeInfo.get("allocation")), equalTo(storeStatus.getAllocationStatus().value()));
|
||||
assertThat(storeInfo.containsKey(storeStatus.getNode().id()), equalTo(true));
|
||||
if (storeStatus.getStoreException() != null) {
|
||||
assertThat(storeInfo.containsKey("store_exception"), equalTo(true));
|
||||
|
@ -104,11 +107,11 @@ public class IndicesShardStoreResponseTests extends ESTestCase {
|
|||
public void testStoreStatusOrdering() throws Exception {
|
||||
DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
List<IndicesShardStoresResponse.StoreStatus> orderedStoreStatuses = new ArrayList<>();
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 2, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null));
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null));
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, null));
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED, null));
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, new IOException("corrupted")));
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 2, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, null));
|
||||
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, new IOException("corrupted")));
|
||||
|
||||
List<IndicesShardStoresResponse.StoreStatus> storeStatuses = new ArrayList<>(orderedStoreStatuses);
|
||||
Collections.shuffle(storeStatuses, random());
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
@ESIntegTestCase.SuppressLocalMode
|
||||
public class PrimaryAllocationIT extends ESIntegTestCase {
|
||||
|
||||
public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception {
|
||||
logger.info("--> starting 3 nodes, 1 master, 2 data");
|
||||
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
|
||||
internalCluster().startDataOnlyNodesAsync(2).get();
|
||||
|
||||
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get());
|
||||
ensureGreen();
|
||||
logger.info("--> indexing...");
|
||||
client().prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
|
||||
refresh();
|
||||
|
||||
ClusterState state = client().admin().cluster().prepareState().all().get().getState();
|
||||
List<ShardRouting> shards = state.routingTable().allShards("test");
|
||||
assertThat(shards.size(), equalTo(2));
|
||||
|
||||
final String primaryNode;
|
||||
final String replicaNode;
|
||||
if (shards.get(0).primary()) {
|
||||
primaryNode = state.getRoutingNodes().node(shards.get(0).currentNodeId()).node().name();
|
||||
replicaNode = state.getRoutingNodes().node(shards.get(1).currentNodeId()).node().name();
|
||||
} else {
|
||||
primaryNode = state.getRoutingNodes().node(shards.get(1).currentNodeId()).node().name();
|
||||
replicaNode = state.getRoutingNodes().node(shards.get(0).currentNodeId()).node().name();
|
||||
}
|
||||
|
||||
NetworkDisconnectPartition partition = new NetworkDisconnectPartition(
|
||||
new HashSet<>(Arrays.asList(master, replicaNode)), Collections.singleton(primaryNode), random());
|
||||
internalCluster().setDisruptionScheme(partition);
|
||||
logger.info("--> partitioning node with primary shard from rest of cluster");
|
||||
partition.startDisrupting();
|
||||
|
||||
ensureStableCluster(2, master);
|
||||
|
||||
logger.info("--> index a document into previous replica shard (that is now primary)");
|
||||
client(replicaNode).prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
|
||||
|
||||
logger.info("--> shut down node that has new acknowledged document");
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
|
||||
|
||||
ensureStableCluster(1, master);
|
||||
|
||||
partition.stopDisrupting();
|
||||
|
||||
logger.info("--> waiting for node with old primary shard to rejoin the cluster");
|
||||
ensureStableCluster(2, master);
|
||||
|
||||
logger.info("--> check that old primary shard does not get promoted to primary again");
|
||||
// kick reroute and wait for all shard states to be fetched
|
||||
client(master).admin().cluster().prepareReroute().get();
|
||||
assertBusy(() -> assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)));
|
||||
// kick reroute a second time and check that all shards are unassigned
|
||||
assertThat(client(master).admin().cluster().prepareReroute().get().getState().getRoutingNodes().unassigned().size(), equalTo(2));
|
||||
|
||||
logger.info("--> starting node that reuses data folder with the up-to-date primary shard");
|
||||
internalCluster().startDataOnlyNode(Settings.EMPTY);
|
||||
|
||||
logger.info("--> check that the up-to-date primary shard gets promoted and that documents are available");
|
||||
ensureYellow("test");
|
||||
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l);
|
||||
}
|
||||
|
||||
public void testNotWaitForQuorumCopies() throws Exception {
|
||||
logger.info("--> starting 3 nodes");
|
||||
internalCluster().startNodesAsync(3).get();
|
||||
logger.info("--> creating index with 1 primary and 2 replicas");
|
||||
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", randomIntBetween(1, 3)).put("index.number_of_replicas", 2)).get());
|
||||
ensureGreen("test");
|
||||
client().prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
|
||||
logger.info("--> removing 2 nodes from cluster");
|
||||
internalCluster().stopRandomDataNode();
|
||||
internalCluster().stopRandomDataNode();
|
||||
internalCluster().fullRestart();
|
||||
logger.info("--> checking that index still gets allocated with only 1 shard copy being available");
|
||||
ensureYellow("test");
|
||||
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 1l);
|
||||
}
|
||||
}
|
|
@ -36,7 +36,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -59,25 +59,29 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
this.testAllocator = new TestAllocator();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the canProcess method of primary allocation behaves correctly
|
||||
* and processes only the applicable shard.
|
||||
*/
|
||||
public void testNoProcessReplica() {
|
||||
ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
|
||||
assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false));
|
||||
}
|
||||
|
||||
public void testNoProcessPrimayNotAllcoatedBefore() {
|
||||
ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, true, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
|
||||
assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false));
|
||||
public void testNoProcessPrimaryNotAllocatedBefore() {
|
||||
final RoutingAllocation allocation;
|
||||
if (randomBoolean()) {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomBoolean(), Version.CURRENT);
|
||||
} else {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), true, Version.V_2_1_0);
|
||||
}
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().unassigned().iterator().next().shardId(), equalTo(shardId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when async fetch returns that there is no data, the shard will not be allocated.
|
||||
*/
|
||||
public void testNoAsyncFetchData() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
|
||||
final RoutingAllocation allocation;
|
||||
if (randomBoolean()) {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId");
|
||||
} else {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0);
|
||||
}
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -85,11 +89,17 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned.
|
||||
* Tests when the node returns that no data was found for it (-1 for version and null for allocation id),
|
||||
* it will be moved to ignore unassigned.
|
||||
*/
|
||||
public void testNoAllocationFound() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
|
||||
testAllocator.addData(node1, -1);
|
||||
final RoutingAllocation allocation;
|
||||
if (randomBoolean()) {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId");
|
||||
} else {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0);
|
||||
}
|
||||
testAllocator.addData(node1, -1, null);
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -97,11 +107,43 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned.
|
||||
* Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore unassigned.
|
||||
*/
|
||||
public void testNoMatchingAllocationIdFound() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2");
|
||||
testAllocator.addData(node1, 1, "id1");
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when there is a node to allocate the shard to, and there are no active allocation ids, it will be allocated to it.
|
||||
* This is the case when we have old shards from pre-3.0 days.
|
||||
*/
|
||||
public void testNoActiveAllocationIds() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
|
||||
testAllocator.addData(node1, 1, null);
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests when the node returns that no data was found for it, it will be moved to ignore unassigned.
|
||||
*/
|
||||
public void testStoreException() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
|
||||
testAllocator.addData(node1, 3, new CorruptIndexException("test", "test"));
|
||||
final RoutingAllocation allocation;
|
||||
if (randomBoolean()) {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
||||
testAllocator.addData(node1, 1, "allocId1", new CorruptIndexException("test", "test"));
|
||||
} else {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
|
||||
testAllocator.addData(node1, 3, null, new CorruptIndexException("test", "test"));
|
||||
}
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -112,8 +154,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
* Tests that when there is a node to allocate the shard to, it will be allocated to it.
|
||||
*/
|
||||
public void testFoundAllocationAndAllocating() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
|
||||
testAllocator.addData(node1, 10);
|
||||
final RoutingAllocation allocation;
|
||||
if (randomBoolean()) {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
||||
testAllocator.addData(node1, 1, "allocId1");
|
||||
} else {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0);
|
||||
testAllocator.addData(node1, 3, null);
|
||||
}
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
|
@ -126,8 +174,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
* it will be moved to ignore unassigned until it can be allocated to.
|
||||
*/
|
||||
public void testFoundAllocationButThrottlingDecider() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders());
|
||||
testAllocator.addData(node1, 10);
|
||||
final RoutingAllocation allocation;
|
||||
if (randomBoolean()) {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
||||
testAllocator.addData(node1, 1, "allocId1");
|
||||
} else {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0);
|
||||
testAllocator.addData(node1, 3, null);
|
||||
}
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -139,8 +193,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
* force the allocation to it.
|
||||
*/
|
||||
public void testFoundAllocationButNoDecider() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders());
|
||||
testAllocator.addData(node1, 10);
|
||||
final RoutingAllocation allocation;
|
||||
if (randomBoolean()) {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
||||
testAllocator.addData(node1, 1, "allocId1");
|
||||
} else {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0);
|
||||
testAllocator.addData(node1, 3, null);
|
||||
}
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
|
@ -149,11 +209,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests that the highest version node is chosed for allocation.
|
||||
* Tests that the highest version node is chosen for allocation.
|
||||
*/
|
||||
public void testAllocateToTheHighestVersion() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
|
||||
testAllocator.addData(node1, 10).addData(node2, 12);
|
||||
public void testAllocateToTheHighestVersionOnLegacyIndex() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0);
|
||||
testAllocator.addData(node1, 10, null).addData(node2, 12, null);
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
|
@ -162,35 +222,150 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests that when restoring from snapshot, even if we didn't find any node to allocate on, the shard
|
||||
* will remain in the unassigned list to be allocated later.
|
||||
* Tests that when restoring from a snapshot and we find a node with a shard copy and allocation
|
||||
* deciders say yes, we allocate to that node.
|
||||
*/
|
||||
public void testRestoreIgnoresNoNodesToAllocate() {
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), Version.CURRENT, shardId.getIndex()))
|
||||
.build();
|
||||
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
|
||||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null, System.nanoTime());
|
||||
public void testRestore() {
|
||||
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
|
||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
}
|
||||
|
||||
testAllocator.addData(node1, -1).addData(node2, -1);
|
||||
/**
|
||||
* Tests that when restoring from a snapshot and we find a node with a shard copy and allocation
|
||||
* deciders say throttle, we add it to ignored shards.
|
||||
*/
|
||||
public void testRestoreThrottle() {
|
||||
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
|
||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when restoring from a snapshot and we find a node with a shard copy but allocation
|
||||
* deciders say no, we still allocate to that node.
|
||||
*/
|
||||
public void testRestoreForcesAllocateIfShardAvailable() {
|
||||
RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders());
|
||||
testAllocator.addData(node1, 1, randomFrom(null, "some allocId"));
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when restoring from a snapshot and we don't find a node with a shard copy, the shard will remain in
|
||||
* the unassigned list to be allocated later.
|
||||
*/
|
||||
public void testRestoreDoesNotAssignIfNoShardAvailable() {
|
||||
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
|
||||
testAllocator.addData(node1, -1, null);
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
|
||||
}
|
||||
|
||||
private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders) {
|
||||
Version version = randomFrom(Version.CURRENT, Version.V_2_0_0);
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0)
|
||||
.putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet()))
|
||||
.build();
|
||||
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), version, shardId.getIndex()))
|
||||
.build();
|
||||
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
|
||||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy and allocation
|
||||
* deciders say yes, we allocate to that node.
|
||||
*/
|
||||
public void testRecoverOnAnyNode() {
|
||||
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
|
||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy and allocation
|
||||
* deciders say throttle, we add it to ignored shards.
|
||||
*/
|
||||
public void testRecoverOnAnyNodeThrottle() {
|
||||
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
|
||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy but allocation
|
||||
* deciders say no, we still allocate to that node.
|
||||
*/
|
||||
public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() {
|
||||
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders());
|
||||
testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when recovering using "recover_on_any_node" and we don't find a node with a shard copy we let
|
||||
* BalancedShardAllocator assign the shard
|
||||
*/
|
||||
public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() {
|
||||
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
|
||||
testAllocator.addData(node1, -1, null);
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
|
||||
}
|
||||
|
||||
private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders) {
|
||||
Version version = randomFrom(Version.CURRENT, Version.V_2_0_0);
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)
|
||||
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
||||
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true))
|
||||
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet()))
|
||||
.build();
|
||||
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), Version.CURRENT, shardId.getIndex()))
|
||||
.build();
|
||||
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
|
||||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that only when enough copies of the shard exists we are going to allocate it. This test
|
||||
* verifies that with same version (1), and quorum allocation.
|
||||
*/
|
||||
public void testEnoughCopiesFoundForAllocation() {
|
||||
public void testEnoughCopiesFoundForAllocationOnLegacyIndex() {
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.V_2_0_0)).numberOfShards(1).numberOfReplicas(2))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsRecovery(metaData.index(shardId.getIndex()))
|
||||
|
@ -207,7 +382,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node1, 1);
|
||||
testAllocator.addData(node1, 1, null);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
|
@ -215,7 +390,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node2, 1);
|
||||
testAllocator.addData(node2, 1, null);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
|
@ -229,9 +404,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
* Tests that only when enough copies of the shard exists we are going to allocate it. This test
|
||||
* verifies that even with different version, we treat different versions as a copy, and count them.
|
||||
*/
|
||||
public void testEnoughCopiesFoundForAllocationWithDifferentVersion() {
|
||||
public void testEnoughCopiesFoundForAllocationOnLegacyIndexWithDifferentVersion() {
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.V_2_0_0)).numberOfShards(1).numberOfReplicas(2))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsRecovery(metaData.index(shardId.getIndex()))
|
||||
|
@ -248,7 +423,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node1, 1);
|
||||
testAllocator.addData(node1, 1, null);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
|
@ -256,7 +431,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node2, 2);
|
||||
testAllocator.addData(node2, 2, null);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
|
@ -266,67 +441,20 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
|
||||
}
|
||||
|
||||
public void testAllocationOnAnyNodeWithSharedFs() {
|
||||
ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false,
|
||||
ShardRoutingState.UNASSIGNED, 0,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
|
||||
|
||||
Map<DiscoveryNode, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> data = new HashMap<>();
|
||||
data.put(node1, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node1, 1));
|
||||
data.put(node2, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node2, 5));
|
||||
data.put(node3, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node3, -1));
|
||||
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetches =
|
||||
new AsyncShardFetch.FetchResult(shardId, data, new HashSet<>(), new HashSet<>());
|
||||
|
||||
PrimaryShardAllocator.NodesAndVersions nAndV = testAllocator.buildNodesAndVersions(shard, false, new HashSet<String>(), fetches);
|
||||
assertThat(nAndV.allocationsFound, equalTo(2));
|
||||
assertThat(nAndV.highestVersion, equalTo(5L));
|
||||
assertThat(nAndV.nodes, contains(node2));
|
||||
|
||||
nAndV = testAllocator.buildNodesAndVersions(shard, true, new HashSet<String>(), fetches);
|
||||
assertThat(nAndV.allocationsFound, equalTo(3));
|
||||
assertThat(nAndV.highestVersion, equalTo(5L));
|
||||
// All three nodes are potential candidates because shards can be recovered on any node
|
||||
assertThat(nAndV.nodes, contains(node2, node1, node3));
|
||||
}
|
||||
|
||||
public void testAllocationOnAnyNodeShouldPutNodesWithExceptionsLast() {
|
||||
ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false,
|
||||
ShardRoutingState.UNASSIGNED, 0,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
|
||||
|
||||
Map<DiscoveryNode, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> data = new HashMap<>();
|
||||
data.put(node1, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node1, 1));
|
||||
data.put(node2, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node2, 1));
|
||||
data.put(node3, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node3, 1, new IOException("I failed to open")));
|
||||
HashSet<String> ignoredNodes = new HashSet<>();
|
||||
ignoredNodes.add(node2.id());
|
||||
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetches =
|
||||
new AsyncShardFetch.FetchResult(shardId, data, new HashSet<>(), ignoredNodes);
|
||||
|
||||
PrimaryShardAllocator.NodesAndVersions nAndV = testAllocator.buildNodesAndVersions(shard, false, ignoredNodes, fetches);
|
||||
assertThat(nAndV.allocationsFound, equalTo(1));
|
||||
assertThat(nAndV.highestVersion, equalTo(1L));
|
||||
assertThat(nAndV.nodes, contains(node1));
|
||||
|
||||
nAndV = testAllocator.buildNodesAndVersions(shard, true, ignoredNodes, fetches);
|
||||
assertThat(nAndV.allocationsFound, equalTo(2));
|
||||
assertThat(nAndV.highestVersion, equalTo(1L));
|
||||
// node3 should be last here
|
||||
assertThat(nAndV.nodes.size(), equalTo(2));
|
||||
assertThat(nAndV.nodes, contains(node1, node3));
|
||||
}
|
||||
|
||||
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders) {
|
||||
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, String... activeAllocationIds) {
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsRecovery(metaData.index(shardId.getIndex()))
|
||||
.build();
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version))
|
||||
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, new HashSet<>(Arrays.asList(activeAllocationIds))))
|
||||
.build();
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||
if (asNew) {
|
||||
routingTableBuilder.addAsNew(metaData.index(shardId.getIndex()));
|
||||
} else {
|
||||
routingTableBuilder.addAsRecovery(metaData.index(shardId.getIndex()));
|
||||
}
|
||||
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
|
||||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.routingTable(routingTableBuilder.build())
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
}
|
||||
|
@ -344,15 +472,15 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
return this;
|
||||
}
|
||||
|
||||
public TestAllocator addData(DiscoveryNode node, long version) {
|
||||
return addData(node, version, null);
|
||||
public TestAllocator addData(DiscoveryNode node, long version, String allocationId) {
|
||||
return addData(node, version, allocationId, null);
|
||||
}
|
||||
|
||||
public TestAllocator addData(DiscoveryNode node, long version, @Nullable Throwable storeException) {
|
||||
public TestAllocator addData(DiscoveryNode node, long version, String allocationId, @Nullable Throwable storeException) {
|
||||
if (data == null) {
|
||||
data = new HashMap<>();
|
||||
}
|
||||
data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, storeException));
|
||||
data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, allocationId, storeException));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,10 +20,10 @@
|
|||
package org.elasticsearch.gateway;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
|
@ -32,14 +32,10 @@ import org.elasticsearch.test.InternalTestCluster.RestartCallback;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.client.Requests.clusterHealthRequest;
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -51,72 +47,12 @@ public class QuorumGatewayIT extends ESIntegTestCase {
|
|||
return 2;
|
||||
}
|
||||
|
||||
public void testChangeInitialShardsRecovery() throws Exception {
|
||||
logger.info("--> starting 3 nodes");
|
||||
final String[] nodes = internalCluster().startNodesAsync(3).get().toArray(new String[0]);
|
||||
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
NumShards test = getNumShards("test");
|
||||
|
||||
logger.info("--> indexing...");
|
||||
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
|
||||
//We don't check for failures in the flush response: if we do we might get the following:
|
||||
// FlushNotAllowedEngineException[[test][1] recovery is in progress, flush [COMMIT_TRANSLOG] is not allowed]
|
||||
flush();
|
||||
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get();
|
||||
refresh();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l);
|
||||
}
|
||||
|
||||
final String nodeToRemove = nodes[between(0,2)];
|
||||
logger.info("--> restarting 1 nodes -- kill 2");
|
||||
internalCluster().fullRestart(new RestartCallback() {
|
||||
@Override
|
||||
public Settings onNodeStopped(String nodeName) throws Exception {
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean doRestart(String nodeName) {
|
||||
return nodeToRemove.equals(nodeName);
|
||||
}
|
||||
});
|
||||
if (randomBoolean()) {
|
||||
Thread.sleep(between(1, 400)); // wait a bit and give is a chance to try to allocate
|
||||
}
|
||||
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForNodes("1")).actionGet();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.RED)); // nothing allocated yet
|
||||
assertTrue(awaitBusy(() -> {
|
||||
ClusterStateResponse clusterStateResponse = internalCluster().smartClient().admin().cluster().prepareState().setMasterNodeTimeout("500ms").get();
|
||||
return clusterStateResponse.getState() != null && clusterStateResponse.getState().routingTable().index("test") != null;
|
||||
})); // wait until we get a cluster state - could be null if we quick enough.
|
||||
final ClusterStateResponse clusterStateResponse = internalCluster().smartClient().admin().cluster().prepareState().setMasterNodeTimeout("500ms").get();
|
||||
assertThat(clusterStateResponse.getState(), notNullValue());
|
||||
assertThat(clusterStateResponse.getState().routingTable().index("test"), notNullValue());
|
||||
assertThat(clusterStateResponse.getState().routingTable().index("test").allPrimaryShardsActive(), is(false));
|
||||
logger.info("--> change the recovery.initial_shards setting, and make sure its recovered");
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("recovery.initial_shards", 1)).get();
|
||||
|
||||
logger.info("--> running cluster_health (wait for the shards to startup), primaries only since we only have 1 node");
|
||||
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(test.numPrimaries)).actionGet();
|
||||
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l);
|
||||
}
|
||||
}
|
||||
|
||||
public void testQuorumRecovery() throws Exception {
|
||||
logger.info("--> starting 3 nodes");
|
||||
internalCluster().startNodesAsync(3).get();
|
||||
// we are shutting down nodes - make sure we don't have 2 clusters if we test network
|
||||
setMinimumMasterNodes(2);
|
||||
internalCluster().startNodesAsync(3,
|
||||
Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2).build()).get();
|
||||
|
||||
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
|
|
@ -43,9 +43,11 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
|||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -275,13 +277,16 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason) {
|
||||
ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10);
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings)).numberOfShards(1).numberOfReplicas(0))
|
||||
.build();
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings))
|
||||
.numberOfShards(1).numberOfReplicas(1)
|
||||
.putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId()))))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.add(IndexRoutingTable.builder(shardId.getIndex())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
|
||||
.addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10))
|
||||
.addShard(primaryShard)
|
||||
.addShard(ShardRouting.newUnassigned(shardId.getIndex(), shardId.getId(), null, false, new UnassignedInfo(reason, null)))
|
||||
.build())
|
||||
)
|
||||
|
@ -294,13 +299,16 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
|
||||
ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10);
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
||||
.put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1).numberOfReplicas(1)
|
||||
.putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId()))))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.add(IndexRoutingTable.builder(shardId.getIndex())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
|
||||
.addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10))
|
||||
.addShard(primaryShard)
|
||||
.addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node2.id(), null, null, false, ShardRoutingState.INITIALIZING, 10, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)))
|
||||
.build())
|
||||
)
|
||||
|
|
|
@ -133,7 +133,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
ShardId id = new ShardId("foo", 1);
|
||||
long version = between(1, Integer.MAX_VALUE / 2);
|
||||
boolean primary = randomBoolean();
|
||||
AllocationId allocationId = randomAllocationId();
|
||||
AllocationId allocationId = randomBoolean() ? null : randomAllocationId();
|
||||
ShardStateMetaData state1 = new ShardStateMetaData(version, primary, "foo", allocationId);
|
||||
write(state1, env.availableShardPaths(id));
|
||||
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(id));
|
||||
|
@ -288,7 +288,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testShardStateMetaHashCodeEquals() {
|
||||
ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), randomAllocationId());
|
||||
AllocationId allocationId = randomBoolean() ? null : randomAllocationId();
|
||||
ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId);
|
||||
|
||||
assertEquals(meta, new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID, meta.allocationId));
|
||||
assertEquals(meta.hashCode(), new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID, meta.allocationId).hashCode());
|
||||
|
@ -299,7 +300,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID + "foo", randomAllocationId())));
|
||||
Set<Integer> hashCodes = new HashSet<>();
|
||||
for (int i = 0; i < 30; i++) { // just a sanity check that we impl hashcode
|
||||
meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), randomAllocationId());
|
||||
allocationId = randomBoolean() ? null : randomAllocationId();
|
||||
meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId);
|
||||
hashCodes.add(meta.hashCode());
|
||||
}
|
||||
assertTrue("more than one unique hashcode expected but got: " + hashCodes.size(), hashCodes.size() > 1);
|
||||
|
|
|
@ -97,7 +97,7 @@ public class SimpleIndexStateIT extends ESIntegTestCase {
|
|||
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").get();
|
||||
}
|
||||
|
||||
public void testFastCloseAfterCreateDoesNotClose() {
|
||||
public void testFastCloseAfterCreateContinuesCreateAfterOpen() {
|
||||
logger.info("--> creating test index that cannot be allocated");
|
||||
client().admin().indices().prepareCreate("test").setSettings(Settings.settingsBuilder()
|
||||
.put("index.routing.allocation.include.tag", "no_such_node").build()).get();
|
||||
|
@ -106,17 +106,14 @@ public class SimpleIndexStateIT extends ESIntegTestCase {
|
|||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.RED));
|
||||
|
||||
try {
|
||||
client().admin().indices().prepareClose("test").get();
|
||||
fail("Exception should have been thrown");
|
||||
} catch(IndexPrimaryShardNotAllocatedException e) {
|
||||
// expected
|
||||
}
|
||||
client().admin().indices().prepareClose("test").get();
|
||||
|
||||
logger.info("--> updating test index settings to allow allocation");
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.settingsBuilder()
|
||||
.put("index.routing.allocation.include.tag", "").build()).get();
|
||||
|
||||
client().admin().indices().prepareOpen("test").get();
|
||||
|
||||
logger.info("--> waiting for green status");
|
||||
ensureGreen();
|
||||
|
||||
|
|
|
@ -129,19 +129,6 @@ specific index module:
|
|||
experimental[] Disables the purge of <<mapping-ttl-field,expired docs>> on
|
||||
the current index.
|
||||
|
||||
[[index.recovery.initial_shards]]`index.recovery.initial_shards`::
|
||||
+
|
||||
--
|
||||
A primary shard is only recovered only if there are enough nodes available to
|
||||
allocate sufficient replicas to form a quorum. It can be set to:
|
||||
|
||||
* `quorum` (default)
|
||||
* `quorum-1` (or `half`)
|
||||
* `full`
|
||||
* `full-1`.
|
||||
* Number values are also supported, e.g. `1`.
|
||||
--
|
||||
|
||||
|
||||
[float]
|
||||
=== Settings in other index modules
|
||||
|
|
|
@ -104,9 +104,8 @@ settings API:
|
|||
|
||||
`index.shared_filesystem.recover_on_any_node`::
|
||||
Boolean value indicating whether the primary shards for the index should be
|
||||
allowed to recover on any node in the cluster, regardless of the number of
|
||||
replicas or whether the node has previously had the shard allocated to it
|
||||
before. Defaults to `false`.
|
||||
allowed to recover on any node in the cluster. If a node holding a copy of
|
||||
the shard is found, recovery prefers that node. Defaults to `false`.
|
||||
|
||||
=== Node level settings related to shadow replicas
|
||||
|
||||
|
|
|
@ -52,8 +52,9 @@ The shard stores information is grouped by indices and shard ids.
|
|||
}
|
||||
},
|
||||
"version": 4, <4>
|
||||
"allocation_id": "2iNySv_OQVePRX-yaRH_lQ", <5>
|
||||
"allocation" : "primary" | "replica" | "unused", <6>
|
||||
"store_exception": ... <5>
|
||||
"store_exception": ... <7>
|
||||
},
|
||||
...
|
||||
]
|
||||
|
@ -66,7 +67,8 @@ The shard stores information is grouped by indices and shard ids.
|
|||
<3> The node information that hosts a copy of the store, the key
|
||||
is the unique node id.
|
||||
<4> The version of the store copy
|
||||
<5> The status of the store copy, whether it is used as a
|
||||
<5> The allocation id of the store copy
|
||||
<6> The status of the store copy, whether it is used as a
|
||||
primary, replica or not used at all
|
||||
<6> Any exception encountered while opening the shard index or
|
||||
<7> Any exception encountered while opening the shard index or
|
||||
from earlier engine failure
|
||||
|
|
|
@ -14,6 +14,7 @@ your application to Elasticsearch 3.0.
|
|||
* <<breaking_30_cache_concurrency>>
|
||||
* <<breaking_30_non_loopback>>
|
||||
* <<breaking_30_thread_pool>>
|
||||
* <<breaking_30_allocation>>
|
||||
|
||||
[[breaking_30_search_changes]]
|
||||
=== Search changes
|
||||
|
@ -515,3 +516,23 @@ from `OsStats.Cpu#getPercent`.
|
|||
Only stored fields are retrievable with this option.
|
||||
The fields option won't be able to load non stored fields from _source anymore.
|
||||
|
||||
[[breaking_30_allocation]]
|
||||
=== Primary shard allocation
|
||||
|
||||
Previously, primary shards were only assigned if a quorum of shard copies were found (configurable using
|
||||
`index.recovery.initial_shards`, now deprecated). In case where a primary had only a single replica, quorum was defined
|
||||
to be a single shard. This meant that any shard copy of an index with replication factor 1 could become primary, even it
|
||||
was a stale copy of the data on disk. This is now fixed by using allocation IDs.
|
||||
|
||||
Allocation IDs assign unique identifiers to shard copies. This allows the cluster to differentiate between multiple
|
||||
copies of the same data and track which shards have been active, so that after a cluster restart, shard copies
|
||||
containing only the most recent data can become primaries.
|
||||
|
||||
==== `index.shared_filesystem.recover_on_any_node` changes
|
||||
|
||||
The behavior of `index.shared_filesystem.recover_on_any_node = true` has been changed. Previously, in the case where no
|
||||
shard copies could be found, an arbitrary node was chosen by potentially ignoring allocation deciders. Now, we take
|
||||
balancing into account but don't assign the shard if the allocation deciders are not satisfied. The behavior has also changed
|
||||
in the case where shard copies can be found. Previously, a node not holding the shard copy was chosen if none of the nodes
|
||||
holding shard copies were satisfying the allocation deciders. Now, the shard will be assigned to a node having a shard copy,
|
||||
even if none of the nodes holding a shard copy satisfy the allocation deciders.
|
||||
|
|
|
@ -22,9 +22,8 @@ Enable or disable allocation for specific kinds of shards:
|
|||
|
||||
This setting does not affect the recovery of local primary shards when
|
||||
restarting a node. A restarted node that has a copy of an unassigned primary
|
||||
shard will recover that primary immediately, assuming that the
|
||||
<<index.recovery.initial_shards,`index.recovery.initial_shards`>> setting is
|
||||
satisfied.
|
||||
shard will recover that primary immediately, assuming that its allocation id matches
|
||||
one of the active allocation ids in the cluster state.
|
||||
|
||||
--
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterInfoService;
|
|||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.EmptyClusterInfoService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
|
@ -230,7 +231,8 @@ public abstract class ESAllocationTestCase extends ESTestCase {
|
|||
boolean changed = false;
|
||||
while (unassignedIterator.hasNext()) {
|
||||
ShardRouting shard = unassignedIterator.next();
|
||||
if (shard.primary() || shard.allocatedPostIndexCreate() == false) {
|
||||
IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
|
||||
if (shard.primary() || shard.allocatedPostIndexCreate(indexMetaData) == false) {
|
||||
continue;
|
||||
}
|
||||
changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);
|
||||
|
|
Loading…
Reference in New Issue