Merge pull request #15708 from ywelsch/feature/reroute-stale-primary

Extend reroute with an option to force assign stale primary shard copies
This commit is contained in:
Yannick Welsch 2016-01-19 12:26:02 +01:00
commit 0a69f1e283
16 changed files with 966 additions and 308 deletions

View File

@ -319,7 +319,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
throw new IllegalArgumentException("resolved [" + node + "] into [" + resolvedNodeIds.length + "] nodes, where expected to be resolved to a single node");
}
if (resolvedNodeIds.length == 0) {
throw new IllegalArgumentException("failed to resolve [" + node + " ], no matching nodes");
throw new IllegalArgumentException("failed to resolve [" + node + "], no matching nodes");
}
return nodes.get(resolvedNodeIds[0]);
}

View File

@ -114,6 +114,16 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
return shard;
}
/**
* All shards for the provided {@link ShardId}
* @return All the shard routing entries for the given index and shard id
* @throws IndexNotFoundException if provided index does not exist
* @throws ShardNotFoundException if provided shard id is unknown
*/
public IndexShardRoutingTable shardRoutingTable(ShardId shardId) {
return shardRoutingTable(shardId.getIndex(), shardId.getId());
}
public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException {
RoutingTableValidation validation = validate(metaData);
if (!validation.valid()) {

View File

@ -32,7 +32,7 @@ import org.elasticsearch.gateway.GatewayAllocator;
/**
* The {@link ShardsAllocator} class offers methods for allocating shard within a cluster.
* These methods include moving shards and re-balancing the cluster. It also allows management
* of shards by their state.
* of shards by their state.
*/
public class ShardsAllocators extends AbstractComponent implements ShardsAllocator {

View File

@ -0,0 +1,240 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.StreamableReader;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.function.Consumer;
/**
* Abstract base class for allocating an unassigned shard to a node
*/
public abstract class AbstractAllocateAllocationCommand implements AllocationCommand, ToXContent {
private static final String INDEX_KEY = "index";
private static final String SHARD_KEY = "shard";
private static final String NODE_KEY = "node";
protected static <T extends Builder> ObjectParser<T, Void> createAllocateParser(String command) {
ObjectParser<T, Void> parser = new ObjectParser<>(command);
parser.declareString(Builder::setIndex, new ParseField(INDEX_KEY));
parser.declareInt(Builder::setShard, new ParseField(SHARD_KEY));
parser.declareString(Builder::setNode, new ParseField(NODE_KEY));
return parser;
}
protected static abstract class Builder<T extends AbstractAllocateAllocationCommand> implements StreamableReader<Builder<T>> {
protected String index;
protected int shard = -1;
protected String node;
public void setIndex(String index) {
this.index = index;
}
public void setShard(int shard) {
this.shard = shard;
}
public void setNode(String node) {
this.node = node;
}
@Override
public Builder<T> readFrom(StreamInput in) throws IOException {
index = in.readString();
shard = in.readVInt();
node = in.readString();
return this;
}
public abstract Builder<T> parse(XContentParser parser) throws IOException;
public abstract T build();
protected void validate() {
if (index == null) {
throw new IllegalArgumentException("Argument [index] must be defined");
}
if (shard < 0) {
throw new IllegalArgumentException("Argument [shard] must be defined and non-negative");
}
if (node == null) {
throw new IllegalArgumentException("Argument [node] must be defined");
}
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field(INDEX_KEY, shardId().index().name());
builder.field(SHARD_KEY, shardId().id());
builder.field(NODE_KEY, node());
return builder;
}
public void writeTo(StreamOutput out) throws IOException {
out.writeString(shardId.getIndex());
out.writeVInt(shardId.getId());
out.writeString(node);
}
public static abstract class Factory<T extends AbstractAllocateAllocationCommand> implements AllocationCommand.Factory<T> {
protected abstract Builder<T> newBuilder();
@Override
public T readFrom(StreamInput in) throws IOException {
return newBuilder().readFrom(in).build();
}
@Override
public void writeTo(T command, StreamOutput out) throws IOException {
command.writeTo(out);
}
@Override
public T fromXContent(XContentParser parser) throws IOException {
return newBuilder().parse(parser).build();
}
@Override
public void toXContent(T command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
if (objectName == null) {
builder.startObject();
} else {
builder.startObject(objectName);
}
builder.endObject();
}
}
protected final ShardId shardId;
protected final String node;
protected AbstractAllocateAllocationCommand(ShardId shardId, String node) {
this.shardId = shardId;
this.node = node;
}
/**
* Get the shard id
*
* @return id of the shard
*/
public ShardId shardId() {
return this.shardId;
}
/**
* Get the id of the node
*
* @return id of the node
*/
public String node() {
return this.node;
}
/**
* Handle case where a disco node cannot be found in the routing table. Usually means that it's not a data node.
*/
protected RerouteExplanation explainOrThrowMissingRoutingNode(RoutingAllocation allocation, boolean explain, DiscoveryNode discoNode) {
if (!discoNode.dataNode()) {
return explainOrThrowRejectedCommand(explain, allocation, "allocation can only be done on data nodes, not [" + node + "]");
} else {
return explainOrThrowRejectedCommand(explain, allocation, "could not find [" + node + "] among the routing nodes");
}
}
/**
* Utility method for rejecting the current allocation command based on provided reason
*/
protected RerouteExplanation explainOrThrowRejectedCommand(boolean explain, RoutingAllocation allocation, String reason) {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, name() + " (allocation command)", reason));
}
throw new IllegalArgumentException("[" + name() + "] " + reason);
}
/**
* Utility method for rejecting the current allocation command based on provided exception
*/
protected RerouteExplanation explainOrThrowRejectedCommand(boolean explain, RoutingAllocation allocation, RuntimeException rte) {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, name() + " (allocation command)", rte.getMessage()));
}
throw rte;
}
/**
* Initializes an unassigned shard on a node and removes it from the unassigned
*
* @param allocation the allocation
* @param routingNodes the routing nodes
* @param routingNode the node to initialize it to
* @param shardRouting the shard routing that is to be matched in unassigned shards
*/
protected void initializeUnassignedShard(RoutingAllocation allocation, RoutingNodes routingNodes, RoutingNode routingNode, ShardRouting shardRouting) {
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, null);
}
/**
* Initializes an unassigned shard on a node and removes it from the unassigned
*
* @param allocation the allocation
* @param routingNodes the routing nodes
* @param routingNode the node to initialize it to
* @param shardRouting the shard routing that is to be matched in unassigned shards
* @param shardRoutingChanges changes to apply for shard routing in unassigned shards before initialization
*/
protected void initializeUnassignedShard(RoutingAllocation allocation, RoutingNodes routingNodes, RoutingNode routingNode,
ShardRouting shardRouting, @Nullable Consumer<ShardRouting> shardRoutingChanges) {
for (RoutingNodes.UnassignedShards.UnassignedIterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
ShardRouting unassigned = it.next();
if (!unassigned.equalsIgnoringMetaData(shardRouting)) {
continue;
}
if (shardRoutingChanges != null) {
shardRoutingChanges.accept(unassigned);
}
it.initialize(routingNode.nodeId(), unassigned.version(),
allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
return;
}
assert false : "shard to initialize not found in list of unassigned shards";
}
}

View File

@ -1,240 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/**
* Allocates an unassigned shard to a specific node. Note, primary allocation will "force"
* allocation which might mean one will loose data if using local gateway..., use with care
* with the <tt>allowPrimary</tt> flag.
*/
public class AllocateAllocationCommand implements AllocationCommand {
public static final String NAME = "allocate";
public static class Factory implements AllocationCommand.Factory<AllocateAllocationCommand> {
@Override
public AllocateAllocationCommand readFrom(StreamInput in) throws IOException {
return new AllocateAllocationCommand(ShardId.readShardId(in), in.readString(), in.readBoolean());
}
@Override
public void writeTo(AllocateAllocationCommand command, StreamOutput out) throws IOException {
command.shardId().writeTo(out);
out.writeString(command.node());
out.writeBoolean(command.allowPrimary());
}
@Override
public AllocateAllocationCommand fromXContent(XContentParser parser) throws IOException {
String index = null;
int shardId = -1;
String nodeId = null;
boolean allowPrimary = false;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
index = parser.text();
} else if ("shard".equals(currentFieldName)) {
shardId = parser.intValue();
} else if ("node".equals(currentFieldName)) {
nodeId = parser.text();
} else if ("allow_primary".equals(currentFieldName) || "allowPrimary".equals(currentFieldName)) {
allowPrimary = parser.booleanValue();
} else {
throw new ElasticsearchParseException("[{}] command does not support field [{}]", NAME, currentFieldName);
}
} else {
throw new ElasticsearchParseException("[{}] command does not support complex json tokens [{}]", NAME, token);
}
}
if (index == null) {
throw new ElasticsearchParseException("[{}] command missing the index parameter", NAME);
}
if (shardId == -1) {
throw new ElasticsearchParseException("[{}] command missing the shard parameter", NAME);
}
if (nodeId == null) {
throw new ElasticsearchParseException("[{}] command missing the node parameter", NAME);
}
return new AllocateAllocationCommand(new ShardId(index, shardId), nodeId, allowPrimary);
}
@Override
public void toXContent(AllocateAllocationCommand command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
if (objectName == null) {
builder.startObject();
} else {
builder.startObject(objectName);
}
builder.field("index", command.shardId().index().name());
builder.field("shard", command.shardId().id());
builder.field("node", command.node());
builder.field("allow_primary", command.allowPrimary());
builder.endObject();
}
}
private final ShardId shardId;
private final String node;
private final boolean allowPrimary;
/**
* Create a new {@link AllocateAllocationCommand}
*
* @param shardId {@link ShardId} of the shrad to assign
* @param node Node to assign the shard to
* @param allowPrimary should the node be allow to allocate the shard as primary
*/
public AllocateAllocationCommand(ShardId shardId, String node, boolean allowPrimary) {
this.shardId = shardId;
this.node = node;
this.allowPrimary = allowPrimary;
}
@Override
public String name() {
return NAME;
}
/**
* Get the shards id
*
* @return id of the shard
*/
public ShardId shardId() {
return this.shardId;
}
/**
* Get the id of the Node
*
* @return id of the Node
*/
public String node() {
return this.node;
}
/**
* Determine if primary allocation is allowed
*
* @return <code>true</code> if primary allocation is allowed. Otherwise <code>false</code>
*/
public boolean allowPrimary() {
return this.allowPrimary;
}
@Override
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
final DiscoveryNode discoNode = allocation.nodes().resolveNode(node);
final RoutingNodes routingNodes = allocation.routingNodes();
ShardRouting shardRouting = null;
for (ShardRouting routing : routingNodes.unassigned()) {
if (routing.shardId().equals(shardId)) {
// prefer primaries first to allocate
if (shardRouting == null || routing.primary()) {
shardRouting = routing;
}
}
}
if (shardRouting == null) {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
"failed to find " + shardId + " on the list of unassigned shards"));
}
throw new IllegalArgumentException("[allocate] failed to find " + shardId + " on the list of unassigned shards");
}
if (shardRouting.primary() && !allowPrimary) {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
"trying to allocate a primary shard " + shardId + ", which is disabled"));
}
throw new IllegalArgumentException("[allocate] trying to allocate a primary shard " + shardId + ", which is disabled");
}
RoutingNode routingNode = routingNodes.node(discoNode.id());
if (routingNode == null) {
if (!discoNode.dataNode()) {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
"Allocation can only be done on data nodes, not [" + node + "]"));
}
throw new IllegalArgumentException("Allocation can only be done on data nodes, not [" + node + "]");
} else {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
"Could not find [" + node + "] among the routing nodes"));
}
throw new IllegalStateException("Could not find [" + node + "] among the routing nodes");
}
}
Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
if (explain) {
return new RerouteExplanation(this, decision);
}
throw new IllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
}
// go over and remove it from the unassigned
for (RoutingNodes.UnassignedShards.UnassignedIterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
ShardRouting unassigned = it.next();
if (unassigned != shardRouting) {
continue;
}
// if we force allocation of a primary, we need to move the unassigned info back to treat it as if
// it was index creation
if (unassigned.primary() && unassigned.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(),
unassigned.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis()));
}
it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
break;
}
return new RerouteExplanation(this, decision);
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import java.io.IOException;
/**
* Allocates an unassigned empty primary shard to a specific node. Use with extreme care as this will result in data loss.
* Allocation deciders are ignored.
*/
public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocationCommand {
public static final String NAME = "allocate_empty_primary";
private static final ObjectParser<Builder, Void> EMPTY_PRIMARY_PARSER = BasePrimaryAllocationCommand.createAllocatePrimaryParser(NAME);
/**
* Creates a new {@link AllocateEmptyPrimaryAllocationCommand}
*
* @param shardId {@link ShardId} of the shard to assign
* @param node node id of the node to assign the shard to
* @param acceptDataLoss whether the user agrees to data loss
*/
public AllocateEmptyPrimaryAllocationCommand(ShardId shardId, String node, boolean acceptDataLoss) {
super(shardId, node, acceptDataLoss);
}
@Override
public String name() {
return NAME;
}
public static class Builder extends BasePrimaryAllocationCommand.Builder<AllocateEmptyPrimaryAllocationCommand> {
@Override
public Builder parse(XContentParser parser) throws IOException {
return EMPTY_PRIMARY_PARSER.parse(parser, this);
}
@Override
public AllocateEmptyPrimaryAllocationCommand build() {
validate();
return new AllocateEmptyPrimaryAllocationCommand(new ShardId(index, shard), node, acceptDataLoss);
}
}
public static class Factory extends AbstractAllocateAllocationCommand.Factory<AllocateEmptyPrimaryAllocationCommand> {
@Override
protected Builder newBuilder() {
return new Builder();
}
}
@Override
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
final DiscoveryNode discoNode;
try {
discoNode = allocation.nodes().resolveNode(node);
} catch (IllegalArgumentException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
final RoutingNodes routingNodes = allocation.routingNodes();
RoutingNode routingNode = routingNodes.node(discoNode.id());
if (routingNode == null) {
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}
final ShardRouting shardRouting;
try {
shardRouting = allocation.routingTable().shardRoutingTable(shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (shardRouting.unassigned() == false) {
return explainOrThrowRejectedCommand(explain, allocation, "primary " + shardId + " is already assigned");
}
if (shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED && acceptDataLoss == false) {
return explainOrThrowRejectedCommand(explain, allocation,
"allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true");
}
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting,
shr -> {
if (shr.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
// we need to move the unassigned info back to treat it as if it was index creation
shr.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(),
shardRouting.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis()));
}
});
return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import java.io.IOException;
import java.util.List;
/**
* Allocates an unassigned replica shard to a specific node. Checks if allocation deciders allow allocation.
*/
public class AllocateReplicaAllocationCommand extends AbstractAllocateAllocationCommand {
public static final String NAME = "allocate_replica";
private static final ObjectParser<AllocateReplicaAllocationCommand.Builder, Void> REPLICA_PARSER = createAllocateParser(NAME);
/**
* Creates a new {@link AllocateReplicaAllocationCommand}
*
* @param shardId {@link ShardId} of the shard to assign
* @param node node id of the node to assign the shard to
*/
public AllocateReplicaAllocationCommand(ShardId shardId, String node) {
super(shardId, node);
}
@Override
public String name() {
return NAME;
}
protected static class Builder extends AbstractAllocateAllocationCommand.Builder<AllocateReplicaAllocationCommand> {
@Override
public Builder parse(XContentParser parser) throws IOException {
return REPLICA_PARSER.parse(parser, this);
}
@Override
public AllocateReplicaAllocationCommand build() {
validate();
return new AllocateReplicaAllocationCommand(new ShardId(index, shard), node);
}
}
public static class Factory extends AbstractAllocateAllocationCommand.Factory<AllocateReplicaAllocationCommand> {
@Override
protected Builder newBuilder() {
return new Builder();
}
}
@Override
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
final DiscoveryNode discoNode;
try {
discoNode = allocation.nodes().resolveNode(node);
} catch (IllegalArgumentException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
final RoutingNodes routingNodes = allocation.routingNodes();
RoutingNode routingNode = routingNodes.node(discoNode.id());
if (routingNode == null) {
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}
final ShardRouting primaryShardRouting;
try {
primaryShardRouting = allocation.routingTable().shardRoutingTable(shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (primaryShardRouting.unassigned()) {
return explainOrThrowRejectedCommand(explain, allocation,
"trying to allocate a replica shard " + shardId + ", while corresponding primary shard is still unassigned");
}
List<ShardRouting> replicaShardRoutings = allocation.routingTable().shardRoutingTable(shardId).replicaShardsWithState(ShardRoutingState.UNASSIGNED);
ShardRouting shardRouting;
if (replicaShardRoutings.isEmpty()) {
return explainOrThrowRejectedCommand(explain, allocation,
"all copies of " + shardId +" are already assigned. Use the move allocation command instead");
} else {
shardRouting = replicaShardRoutings.get(0);
}
Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
// don't use explainOrThrowRejectedCommand to keep the original "NO" decision
if (explain) {
return new RerouteExplanation(this, decision);
}
throw new IllegalArgumentException("[" + name() + "] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
}
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting);
return new RerouteExplanation(this, decision);
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import java.io.IOException;
/**
* Allocates an unassigned stale primary shard to a specific node. Use with extreme care as this will result in data loss.
* Allocation deciders are ignored.
*/
public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocationCommand {
public static final String NAME = "allocate_stale_primary";
private static final ObjectParser<Builder, Void> STALE_PRIMARY_PARSER = BasePrimaryAllocationCommand.createAllocatePrimaryParser(NAME);
/**
* Creates a new {@link AllocateStalePrimaryAllocationCommand}
*
* @param shardId {@link ShardId} of the shard to assign
* @param node node id of the node to assign the shard to
* @param acceptDataLoss whether the user agrees to data loss
*/
public AllocateStalePrimaryAllocationCommand(ShardId shardId, String node, boolean acceptDataLoss) {
super(shardId, node, acceptDataLoss);
}
@Override
public String name() {
return NAME;
}
public static class Builder extends BasePrimaryAllocationCommand.Builder<AllocateStalePrimaryAllocationCommand> {
@Override
public Builder parse(XContentParser parser) throws IOException {
return STALE_PRIMARY_PARSER.parse(parser, this);
}
@Override
public AllocateStalePrimaryAllocationCommand build() {
validate();
return new AllocateStalePrimaryAllocationCommand(new ShardId(index, shard), node, acceptDataLoss);
}
}
public static class Factory extends AbstractAllocateAllocationCommand.Factory<AllocateStalePrimaryAllocationCommand> {
@Override
protected Builder newBuilder() {
return new Builder();
}
}
@Override
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
final DiscoveryNode discoNode;
try {
discoNode = allocation.nodes().resolveNode(node);
} catch (IllegalArgumentException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
final RoutingNodes routingNodes = allocation.routingNodes();
RoutingNode routingNode = routingNodes.node(discoNode.id());
if (routingNode == null) {
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}
final ShardRouting shardRouting;
try {
shardRouting = allocation.routingTable().shardRoutingTable(shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (shardRouting.unassigned() == false) {
return explainOrThrowRejectedCommand(explain, allocation, "primary " + shardId + " is already assigned");
}
if (acceptDataLoss == false) {
return explainOrThrowRejectedCommand(explain, allocation,
"allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true");
}
final IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex());
if (shardRouting.allocatedPostIndexCreate(indexMetaData) == false) {
return explainOrThrowRejectedCommand(explain, allocation,
"trying to allocate an existing primary shard " + shardId + ", while no such shard has ever been active");
}
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting);
return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
}
}

View File

@ -67,7 +67,9 @@ public class AllocationCommands {
}
static {
registerFactory(AllocateAllocationCommand.NAME, new AllocateAllocationCommand.Factory());
registerFactory(AllocateEmptyPrimaryAllocationCommand.NAME, new AllocateEmptyPrimaryAllocationCommand.Factory());
registerFactory(AllocateStalePrimaryAllocationCommand.NAME, new AllocateStalePrimaryAllocationCommand.Factory());
registerFactory(AllocateReplicaAllocationCommand.NAME, new AllocateReplicaAllocationCommand.Factory());
registerFactory(CancelAllocationCommand.NAME, new CancelAllocationCommand.Factory());
registerFactory(MoveAllocationCommand.NAME, new MoveAllocationCommand.Factory());
}
@ -76,7 +78,7 @@ public class AllocationCommands {
/**
* Creates a new set of {@link AllocationCommands}
*
*
* @param commands {@link AllocationCommand}s that are wrapped by this instance
*/
public AllocationCommands(AllocationCommand... commands) {
@ -122,7 +124,7 @@ public class AllocationCommands {
* Reads a {@link AllocationCommands} from a {@link StreamInput}
* @param in {@link StreamInput} to read from
* @return {@link AllocationCommands} read
*
*
* @throws IOException if something happens during read
*/
public static AllocationCommands readFrom(StreamInput in) throws IOException {
@ -137,7 +139,7 @@ public class AllocationCommands {
/**
* Writes {@link AllocationCommands} to a {@link StreamOutput}
*
*
* @param commands Commands to write
* @param out {@link StreamOutput} to write the commands to
* @throws IOException if something happens during write
@ -149,7 +151,7 @@ public class AllocationCommands {
lookupFactorySafe(command.name()).writeTo(command, out);
}
}
/**
* Reads {@link AllocationCommands} from a {@link XContentParser}
* <pre>
@ -161,7 +163,7 @@ public class AllocationCommands {
* </pre>
* @param parser {@link XContentParser} to read the commands from
* @return {@link AllocationCommands} read
* @throws IOException if something bad happens while reading the stream
* @throws IOException if something bad happens while reading the stream
*/
public static AllocationCommands fromXContent(XContentParser parser) throws IOException {
AllocationCommands commands = new AllocationCommands();
@ -203,10 +205,10 @@ public class AllocationCommands {
}
return commands;
}
/**
* Writes {@link AllocationCommands} to a {@link XContentBuilder}
*
*
* @param commands {@link AllocationCommands} to write
* @param builder {@link XContentBuilder} to use
* @param params Parameters to use for building

View File

@ -0,0 +1,88 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/**
* Abstract base class for allocating an unassigned primary shard to a node
*/
public abstract class BasePrimaryAllocationCommand extends AbstractAllocateAllocationCommand {
private static final String ACCEPT_DATA_LOSS_KEY = "accept_data_loss";
protected static <T extends Builder> ObjectParser<T, Void> createAllocatePrimaryParser(String command) {
ObjectParser<T, Void> parser = AbstractAllocateAllocationCommand.createAllocateParser(command);
parser.declareBoolean(Builder::setAcceptDataLoss, new ParseField(ACCEPT_DATA_LOSS_KEY));
return parser;
}
protected final boolean acceptDataLoss;
protected BasePrimaryAllocationCommand(ShardId shardId, String node, boolean acceptDataLoss) {
super(shardId, node);
this.acceptDataLoss = acceptDataLoss;
}
/**
* The operation only executes if the user explicitly agrees to possible data loss
*
* @return whether data loss is acceptable
*/
public boolean acceptDataLoss() {
return acceptDataLoss;
}
protected static abstract class Builder<T extends BasePrimaryAllocationCommand> extends AbstractAllocateAllocationCommand.Builder<T> {
protected boolean acceptDataLoss;
public void setAcceptDataLoss(boolean acceptDataLoss) {
this.acceptDataLoss = acceptDataLoss;
}
@Override
public Builder readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acceptDataLoss = in.readBoolean();
return this;
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.toXContent(builder, params);
builder.field(ACCEPT_DATA_LOSS_KEY, acceptDataLoss);
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acceptDataLoss);
}
}

View File

@ -125,7 +125,7 @@ public class CancelAllocationCommand implements AllocationCommand {
/**
* Creates a new {@link CancelAllocationCommand}
*
*
* @param shardId id of the shard which allocation should be canceled
* @param node id of the node that manages the shard which allocation should be canceled
*/

View File

@ -29,7 +29,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@ -100,7 +100,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
logger.info("--> explicitly allocate shard 1, *under dry_run*");
state = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
.add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node_1, true))
.setDryRun(true)
.execute().actionGet().getState();
assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
@ -113,7 +113,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
logger.info("--> explicitly allocate shard 1, actually allocating, no dry run");
state = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
.add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node_1, true))
.execute().actionGet().getState();
assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).id()).get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
@ -212,7 +212,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
logger.info("--> explicitly allocate shard 1, actually allocating, no dry run");
state = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
.add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node_1, true))
.execute().actionGet().getState();
assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).id()).get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
@ -246,7 +246,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
logger.info("--> explicitly allocate primary");
state = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
.add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node_1, true))
.execute().actionGet().getState();
assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).id()).get(0).state(), equalTo(ShardRoutingState.INITIALIZING));

View File

@ -19,10 +19,17 @@ package org.elasticsearch.cluster.routing;
* under the License.
*/
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
@ -33,11 +40,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
@ -50,7 +59,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
return pluginList(MockTransportService.TestPlugin.class);
}
public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception {
private void createStaleReplicaScenario() throws Exception {
logger.info("--> starting 3 nodes, 1 master, 2 data");
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodesAsync(2).get();
@ -103,6 +112,10 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
assertBusy(() -> assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)));
// kick reroute a second time and check that all shards are unassigned
assertThat(client(master).admin().cluster().prepareReroute().get().getState().getRoutingNodes().unassigned().size(), equalTo(2));
}
public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception {
createStaleReplicaScenario();
logger.info("--> starting node that reuses data folder with the up-to-date primary shard");
internalCluster().startDataOnlyNode(Settings.EMPTY);
@ -112,6 +125,67 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l);
}
public void testFailedAllocationOfStalePrimaryToDataNodeWithNoData() throws Exception {
String dataNodeWithShardCopy = internalCluster().startNode();
logger.info("--> create single shard index");
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
ensureGreen("test");
String dataNodeWithNoShardCopy = internalCluster().startNode();
ensureStableCluster(2);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataNodeWithShardCopy));
ensureStableCluster(1);
assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));
logger.info("--> force allocation of stale copy to node that does not have shard copy");
client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand(new ShardId("test", 0), dataNodeWithNoShardCopy, true)).get();
logger.info("--> wait until shard is failed and becomes unassigned again");
assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
}
public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
boolean useStaleReplica = randomBoolean(); // if true, use stale replica, otherwise a completely empty copy
createStaleReplicaScenario();
logger.info("--> explicitly promote old primary shard");
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin().indices().prepareShardStores("test").get().getStoreStatuses().get("test");
ClusterRerouteRequestBuilder rerouteBuilder = client().admin().cluster().prepareReroute();
for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : storeStatuses) {
int shardId = shardStoreStatuses.key;
IndicesShardStoresResponse.StoreStatus storeStatus = randomFrom(shardStoreStatuses.value);
logger.info("--> adding allocation command for shard " + shardId);
// force allocation based on node id
if (useStaleReplica) {
rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand(new ShardId("test", shardId), storeStatus.getNode().getId(), true));
} else {
rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", shardId), storeStatus.getNode().getId(), true));
}
}
rerouteBuilder.get();
logger.info("--> check that the stale primary shard gets allocated and that documents are available");
ensureYellow("test");
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1l : 0l);
}
public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException {
String node = internalCluster().startNode();
client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.routing.allocation.exclude._name", node)
.put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get();
assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().shardRoutingTable("test", 0).assignedShards(), empty());
client().admin().cluster().prepareReroute().add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node, true)).get();
ensureGreen("test");
}
public void testNotWaitForQuorumCopies() throws Exception {
logger.info("--> starting 3 nodes");
internalCluster().startNodesAsync(3).get();

View File

@ -26,7 +26,10 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
@ -38,7 +41,9 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.test.ESAllocationTestCase;
import static java.util.Collections.singletonMap;
@ -46,6 +51,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
/**
@ -96,6 +102,14 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().node(toNodeId).get(0).state(), equalTo(ShardRoutingState.STARTED));
}
private AbstractAllocateAllocationCommand randomAllocateCommand(ShardId shardId, String node) {
return randomFrom(
new AllocateReplicaAllocationCommand(shardId, node),
new AllocateEmptyPrimaryAllocationCommand(shardId, node, true),
new AllocateStalePrimaryAllocationCommand(shardId, node, true)
);
}
public void testAllocateCommand() {
AllocationService allocation = createAllocationService(settingsBuilder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
@ -106,10 +120,13 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
// shard routing is added as "from recovery" instead of "new index creation" so that we can test below that allocating an empty
// primary with accept_data_loss flag set to false fails
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.addAsRecovery(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
ShardId shardId = new ShardId("test", 0);
logger.info("--> adding 3 nodes on same rack and do rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
@ -122,22 +139,56 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
logger.info("--> allocating with primary flag set to false, should fail");
logger.info("--> allocating to non-existent node, should fail");
try {
allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", false)));
fail();
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(shardId, "node42")));
fail("expected IllegalArgumentException when allocating to non-existing node");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("failed to resolve [node42], no matching nodes"));
}
logger.info("--> allocating to non-data node, should fail");
try {
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node4", true)));
fail();
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(shardId, "node4")));
fail("expected IllegalArgumentException when allocating to non-data node");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("allocation can only be done on data nodes"));
}
logger.info("--> allocating with primary flag set to true");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true)));
logger.info("--> allocating non-existing shard, should fail");
try {
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(new ShardId("test", 1), "node2")));
fail("expected ShardNotFoundException when allocating non-existing shard");
} catch (ShardNotFoundException e) {
assertThat(e.getMessage(), containsString("no such shard"));
}
logger.info("--> allocating non-existing index, should fail");
try {
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(new ShardId("test2", 0), "node2")));
fail("expected ShardNotFoundException when allocating non-existing index");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), containsString("no such index"));
}
logger.info("--> allocating empty primary with acceptDataLoss flag set to false");
try {
allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), "node1", false)));
fail("expected IllegalArgumentException when allocating empty primary with acceptDataLoss flag set to false");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true"));
}
logger.info("--> allocating stale primary with acceptDataLoss flag set to false");
try {
allocation.reroute(clusterState, new AllocationCommands(new AllocateStalePrimaryAllocationCommand(shardId, "node1", false)));
fail("expected IllegalArgumentException when allocating stale primary with acceptDataLoss flag set to false");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true"));
}
logger.info("--> allocating empty primary with acceptDataLoss flag set to true");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), "node1", true)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@ -153,13 +204,13 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
logger.info("--> allocate the replica shard on the primary shard node, should fail");
try {
allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", false)));
fail();
allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node1")));
fail("expected IllegalArgumentException when allocating replica shard on the primary shard node");
} catch (IllegalArgumentException e) {
}
logger.info("--> allocate the replica shard on on the second node");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node2")));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@ -178,8 +229,8 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
logger.info("--> verify that we fail when there are no unassigned shards");
try {
allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node3", false)));
fail();
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(new ShardId("test", 0), "node3")));
fail("expected IllegalArgumentException when allocating shard while no unassigned shard available");
} catch (IllegalArgumentException e) {
}
}
@ -209,8 +260,8 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
logger.info("--> allocating with primary flag set to true");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true)));
logger.info("--> allocating empty primary shard with accept_data_loss flag set to true");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), "node1", true)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@ -239,7 +290,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
}
logger.info("--> allocate the replica shard on on the second node");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node2")));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@ -257,7 +308,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0));
logger.info("--> allocate the replica shard on on the second node");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node2")));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@ -290,7 +341,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0));
logger.info("--> allocate the replica shard on on the second node");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node2")));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(rerouteResult.changed(), equalTo(true));
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@ -335,7 +386,9 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
public void testSerialization() throws Exception {
AllocationCommands commands = new AllocationCommands(
new AllocateAllocationCommand(new ShardId("test", 1), "node1", true),
new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 1), "node1", true),
new AllocateStalePrimaryAllocationCommand(new ShardId("test", 2), "node1", true),
new AllocateReplicaAllocationCommand(new ShardId("test", 2), "node1"),
new MoveAllocationCommand(new ShardId("test", 3), "node2", "node3"),
new CancelAllocationCommand(new ShardId("test", 4), "node5", true)
);
@ -343,24 +396,33 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
AllocationCommands.writeTo(commands, bytes);
AllocationCommands sCommands = AllocationCommands.readFrom(StreamInput.wrap(bytes.bytes()));
assertThat(sCommands.commands().size(), equalTo(3));
assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(new ShardId("test", 1)));
assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).node(), equalTo("node1"));
assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).allowPrimary(), equalTo(true));
assertThat(sCommands.commands().size(), equalTo(5));
assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(new ShardId("test", 1)));
assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).node(), equalTo("node1"));
assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).acceptDataLoss(), equalTo(true));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).shardId(), equalTo(new ShardId("test", 3)));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).fromNode(), equalTo("node2"));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).toNode(), equalTo("node3"));
assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).shardId(), equalTo(new ShardId("test", 2)));
assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).node(), equalTo("node1"));
assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).acceptDataLoss(), equalTo(true));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4)));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5"));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).allowPrimary(), equalTo(true));
assertThat(((AllocateReplicaAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 2)));
assertThat(((AllocateReplicaAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node1"));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).shardId(), equalTo(new ShardId("test", 3)));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).fromNode(), equalTo("node2"));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).toNode(), equalTo("node3"));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).shardId(), equalTo(new ShardId("test", 4)));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).node(), equalTo("node5"));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).allowPrimary(), equalTo(true));
}
public void testXContent() throws Exception {
String commands = "{\n" +
" \"commands\" : [\n" +
" {\"allocate\" : {\"index\" : \"test\", \"shard\" : 1, \"node\" : \"node1\", \"allow_primary\" : true}}\n" +
" {\"allocate_empty_primary\" : {\"index\" : \"test\", \"shard\" : 1, \"node\" : \"node1\", \"accept_data_loss\" : true}}\n" +
" ,{\"allocate_stale_primary\" : {\"index\" : \"test\", \"shard\" : 2, \"node\" : \"node1\", \"accept_data_loss\" : true}}\n" +
" ,{\"allocate_replica\" : {\"index\" : \"test\", \"shard\" : 2, \"node\" : \"node1\"}}\n" +
" ,{\"move\" : {\"index\" : \"test\", \"shard\" : 3, \"from_node\" : \"node2\", \"to_node\" : \"node3\"}} \n" +
" ,{\"cancel\" : {\"index\" : \"test\", \"shard\" : 4, \"node\" : \"node5\", \"allow_primary\" : true}} \n" +
" ]\n" +
@ -371,17 +433,24 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
parser.nextToken();
AllocationCommands sCommands = AllocationCommands.fromXContent(parser);
assertThat(sCommands.commands().size(), equalTo(3));
assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(new ShardId("test", 1)));
assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).node(), equalTo("node1"));
assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).allowPrimary(), equalTo(true));
assertThat(sCommands.commands().size(), equalTo(5));
assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(new ShardId("test", 1)));
assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).node(), equalTo("node1"));
assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).acceptDataLoss(), equalTo(true));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).shardId(), equalTo(new ShardId("test", 3)));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).fromNode(), equalTo("node2"));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).toNode(), equalTo("node3"));
assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).shardId(), equalTo(new ShardId("test", 2)));
assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).node(), equalTo("node1"));
assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).acceptDataLoss(), equalTo(true));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4)));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5"));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).allowPrimary(), equalTo(true));
assertThat(((AllocateReplicaAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 2)));
assertThat(((AllocateReplicaAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node1"));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).shardId(), equalTo(new ShardId("test", 3)));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).fromNode(), equalTo("node2"));
assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).toNode(), equalTo("node3"));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).shardId(), equalTo(new ShardId("test", 4)));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).node(), equalTo("node5"));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).allowPrimary(), equalTo(true));
}
}

View File

@ -20,7 +20,7 @@ curl -XPOST 'localhost:9200/_cluster/reroute' -d '{
}
},
{
"allocate" : {
"allocate_replica" : {
"index" : "test", "shard" : 1, "node" : "node3"
}
}
@ -64,14 +64,42 @@ The commands supported are:
from the primary shard by cancelling them and allowing them to be
reinitialized through the standard reallocation process.
`allocate`::
Allocate an unassigned shard to a node. Accepts the
`allocate_replica`::
Allocate an unassigned replica shard to a node. Accepts the
`index` and `shard` for index name and shard number, and `node` to
allocate the shard to. It also accepts `allow_primary` flag to
explicitly specify that it is allowed to explicitly allocate a primary
shard (might result in data loss).
allocate the shard to. Takes <<modules-cluster,allocation deciders>> into account.
WARNING: The `allow_primary` parameter will force a new _empty_ primary shard
to be allocated *without any data*. If a node which has a copy of the original
primary shard (including data) rejoins the cluster later on, that data will be
deleted: the old shard copy will be replaced by the new live shard copy.
Two more commands are available that allow the allocation of a primary shard
to a node. These commands should however be used with extreme care, as primary
shard allocation is usually fully automatically handled by Elasticsearch.
Reasons why a primary shard cannot be automatically allocated include the following:
- A new index was created but there is no node which satisfies the allocation deciders.
- An up-to-date shard copy of the data cannot be found on the current data nodes in
the cluster. To prevent data loss, the system does not automatically promote a stale
shard copy to primary.
As a manual override, two commands to forcefully allocate primary shards
are available:
`allocate_stale_primary`::
Allocate a primary shard to a node that holds a stale copy. Accepts the
`index` and `shard` for index name and shard number, and `node` to
allocate the shard to. Using this command may lead to data loss
for the provided shard id. If a node which has the good copy of the
data rejoins the cluster later on, that data will be overwritten with
the data of the stale copy that was forcefully allocated with this
command. To ensure that these implications are well-understood,
this command requires the special field `accept_data_loss` to be
explicitly set to `true` for it to work.
`allocate_empty_primary`::
Allocate an empty primary shard to a node. Accepts the
`index` and `shard` for index name and shard number, and `node` to
allocate the shard to. Using this command leads to a complete loss
of all data that was indexed into this shard, if it was previously
started. If a node which has a copy of the
data rejoins the cluster later on, that data will be deleted!
To ensure that these implications are well-understood,
this command requires the special field `accept_data_loss` to be
explicitly set to `true` for it to work.

View File

@ -603,6 +603,13 @@ Allocation IDs assign unique identifiers to shard copies. This allows the cluste
copies of the same data and track which shards have been active, so that after a cluster restart, shard copies
containing only the most recent data can become primaries.
=== Reroute commands
The reroute command `allocate` has been split into two distinct commands `allocate_replica` and `allocate_empty_primary`.
This was done as we introduced a new `allocate_stale_primary` command. The new `allocate_replica` command corresponds to the
old `allocate` command with `allow_primary` set to false. The new `allocate_empty_primary` command corresponds to the old
`allocate` command with `allow_primary` set to true.
==== `index.shared_filesystem.recover_on_any_node` changes
The behavior of `index.shared_filesystem.recover_on_any_node = true` has been changed. Previously, in the case where no