Limit retries of failed allocations per index (#18467)
Today if a shard fails during initialization phase due to misconfiguration, broken disks, missing analyzers, not installed plugins etc. elasticsaerch keeps on trying to initialize or rather allocate that shard. Yet, in the worst case scenario this ends in an endless allocation loop. To prevent this loop and all it's sideeffects like spamming log files over and over again this commit adds an allocation decider that stops allocating a shard that failed more than N times in a row to allocate. The number or retries can be configured via `index.allocation.max_retry` and it's default is set to `5`. Once the setting is updated shards with less failures than the number set per index will be allowed to allocate again. Internally we maintain a counter on the UnassignedInfo that is reset to `0` once the shards has been started. Relates to #18417
This commit is contained in:
parent
a2ff002e2b
commit
35e705877b
|
@ -250,7 +250,7 @@ public class TransportClusterAllocationExplainAction
|
|||
final ActionListener<ClusterAllocationExplainResponse> listener) {
|
||||
final RoutingNodes routingNodes = state.getRoutingNodes();
|
||||
final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, state,
|
||||
clusterInfoService.getClusterInfo(), System.nanoTime());
|
||||
clusterInfoService.getClusterInfo(), System.nanoTime(), false);
|
||||
|
||||
ShardRouting foundShard = null;
|
||||
if (request.useAnyUnassignedShard()) {
|
||||
|
|
|
@ -38,9 +38,10 @@ import java.io.IOException;
|
|||
* Request to submit cluster reroute allocation commands
|
||||
*/
|
||||
public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteRequest> {
|
||||
AllocationCommands commands = new AllocationCommands();
|
||||
boolean dryRun;
|
||||
boolean explain;
|
||||
private AllocationCommands commands = new AllocationCommands();
|
||||
private boolean dryRun;
|
||||
private boolean explain;
|
||||
private boolean retryFailed;
|
||||
|
||||
public ClusterRerouteRequest() {
|
||||
}
|
||||
|
@ -81,6 +82,15 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the retry failed flag (defaults to <tt>false</tt>). If true, the
|
||||
* request will retry allocating shards that can't currently be allocated due to too many allocation failures.
|
||||
*/
|
||||
public ClusterRerouteRequest setRetryFailed(boolean retryFailed) {
|
||||
this.retryFailed = retryFailed;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current explain flag
|
||||
*/
|
||||
|
@ -88,6 +98,14 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
|
|||
return this.explain;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current retry failed flag
|
||||
*/
|
||||
public boolean isRetryFailed() {
|
||||
return this.retryFailed;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the allocation commands to execute.
|
||||
*/
|
||||
|
@ -96,6 +114,13 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the allocation commands to execute
|
||||
*/
|
||||
public AllocationCommands getCommands() {
|
||||
return commands;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the source for the request.
|
||||
*/
|
||||
|
@ -136,6 +161,7 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
|
|||
commands = AllocationCommands.readFrom(in);
|
||||
dryRun = in.readBoolean();
|
||||
explain = in.readBoolean();
|
||||
retryFailed = in.readBoolean();
|
||||
readTimeout(in);
|
||||
}
|
||||
|
||||
|
@ -145,6 +171,7 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
|
|||
AllocationCommands.writeTo(commands, out);
|
||||
out.writeBoolean(dryRun);
|
||||
out.writeBoolean(explain);
|
||||
out.writeBoolean(retryFailed);
|
||||
writeTimeout(out);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,6 +60,15 @@ public class ClusterRerouteRequestBuilder extends AcknowledgedRequestBuilder<Clu
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the retry failed flag (defaults to <tt>false</tt>). If true, the
|
||||
* request will retry allocating shards that can't currently be allocated due to too many allocation failures.
|
||||
*/
|
||||
public ClusterRerouteRequestBuilder setRetryFailed(boolean retryFailed) {
|
||||
request.setRetryFailed(retryFailed);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the commands for the request to execute.
|
||||
*/
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -68,11 +69,28 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction<Clu
|
|||
|
||||
@Override
|
||||
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) {
|
||||
clusterService.submitStateUpdateTask("cluster_reroute (api)", new AckedClusterStateUpdateTask<ClusterRerouteResponse>(Priority.IMMEDIATE, request, listener) {
|
||||
clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
|
||||
allocationService, request, listener));
|
||||
}
|
||||
|
||||
static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {
|
||||
|
||||
private final ClusterRerouteRequest request;
|
||||
private final ActionListener<ClusterRerouteResponse> listener;
|
||||
private final ESLogger logger;
|
||||
private final AllocationService allocationService;
|
||||
private volatile ClusterState clusterStateToSend;
|
||||
private volatile RoutingExplanations explanations;
|
||||
|
||||
ClusterRerouteResponseAckedClusterStateUpdateTask(ESLogger logger, AllocationService allocationService, ClusterRerouteRequest request,
|
||||
ActionListener<ClusterRerouteResponse> listener) {
|
||||
super(Priority.IMMEDIATE, request, listener);
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
this.logger = logger;
|
||||
this.allocationService = allocationService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterRerouteResponse newResponse(boolean acknowledged) {
|
||||
return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations);
|
||||
|
@ -91,15 +109,15 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction<Clu
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands, request.explain());
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.getCommands(), request.explain(),
|
||||
request.isRetryFailed());
|
||||
ClusterState newState = ClusterState.builder(currentState).routingResult(routingResult).build();
|
||||
clusterStateToSend = newState;
|
||||
explanations = routingResult.explanations();
|
||||
if (request.dryRun) {
|
||||
if (request.dryRun()) {
|
||||
return currentState;
|
||||
}
|
||||
return newState;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDeci
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
|
||||
|
@ -79,6 +80,7 @@ public class ClusterModule extends AbstractModule {
|
|||
new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), Property.NodeScope);
|
||||
public static final List<Class<? extends AllocationDecider>> DEFAULT_ALLOCATION_DECIDERS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
MaxRetryAllocationDecider.class,
|
||||
SameShardAllocationDecider.class,
|
||||
FilterAllocationDecider.class,
|
||||
ReplicaAfterPrimaryActiveAllocationDecider.class,
|
||||
|
|
|
@ -48,7 +48,6 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
|||
public static final Setting<TimeValue> INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING =
|
||||
Setting.timeSetting("index.unassigned.node_left.delayed_timeout", DEFAULT_DELAYED_NODE_LEFT_TIMEOUT, Property.Dynamic,
|
||||
Property.IndexScope);
|
||||
|
||||
/**
|
||||
* Reason why the shard is in unassigned state.
|
||||
* <p>
|
||||
|
@ -103,7 +102,11 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
|||
/**
|
||||
* A better replica location is identified and causes the existing replica allocation to be cancelled.
|
||||
*/
|
||||
REALLOCATED_REPLICA;
|
||||
REALLOCATED_REPLICA,
|
||||
/**
|
||||
* Unassigned as a result of a failed primary while the replica was initializing.
|
||||
*/
|
||||
PRIMARY_FAILED;
|
||||
}
|
||||
|
||||
private final Reason reason;
|
||||
|
@ -112,6 +115,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
|||
private final long lastComputedLeftDelayNanos; // how long to delay shard allocation, not serialized (always positive, 0 means no delay)
|
||||
private final String message;
|
||||
private final Throwable failure;
|
||||
private final int failedAllocations;
|
||||
|
||||
/**
|
||||
* creates an UnassingedInfo object based **current** time
|
||||
|
@ -120,7 +124,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
|||
* @param message more information about cause.
|
||||
**/
|
||||
public UnassignedInfo(Reason reason, String message) {
|
||||
this(reason, message, null, System.nanoTime(), System.currentTimeMillis());
|
||||
this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -130,13 +134,16 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
|||
* @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation
|
||||
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
|
||||
*/
|
||||
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, long unassignedTimeNanos, long unassignedTimeMillis) {
|
||||
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, int failedAllocations, long unassignedTimeNanos, long unassignedTimeMillis) {
|
||||
this.reason = reason;
|
||||
this.unassignedTimeMillis = unassignedTimeMillis;
|
||||
this.unassignedTimeNanos = unassignedTimeNanos;
|
||||
this.lastComputedLeftDelayNanos = 0L;
|
||||
this.message = message;
|
||||
this.failure = failure;
|
||||
this.failedAllocations = failedAllocations;
|
||||
assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED):
|
||||
"failedAllocations: " + failedAllocations + " for reason " + reason;
|
||||
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
|
||||
}
|
||||
|
||||
|
@ -147,17 +154,19 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
|||
this.lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
|
||||
this.message = unassignedInfo.message;
|
||||
this.failure = unassignedInfo.failure;
|
||||
this.failedAllocations = unassignedInfo.failedAllocations;
|
||||
}
|
||||
|
||||
public UnassignedInfo(StreamInput in) throws IOException {
|
||||
this.reason = Reason.values()[(int) in.readByte()];
|
||||
this.unassignedTimeMillis = in.readLong();
|
||||
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.
|
||||
// This means that in master failover situations, elapsed delay time is forgotten.
|
||||
// This means that in master fail-over situations, elapsed delay time is forgotten.
|
||||
this.unassignedTimeNanos = System.nanoTime();
|
||||
this.lastComputedLeftDelayNanos = 0L;
|
||||
this.message = in.readOptionalString();
|
||||
this.failure = in.readThrowable();
|
||||
this.failedAllocations = in.readVInt();
|
||||
}
|
||||
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
|
@ -166,12 +175,18 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
|||
// Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs
|
||||
out.writeOptionalString(message);
|
||||
out.writeThrowable(failure);
|
||||
out.writeVInt(failedAllocations);
|
||||
}
|
||||
|
||||
public UnassignedInfo readFrom(StreamInput in) throws IOException {
|
||||
return new UnassignedInfo(in);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of previously failed allocations of this shard.
|
||||
*/
|
||||
public int getNumFailedAllocations() { return failedAllocations; }
|
||||
|
||||
/**
|
||||
* The reason why the shard is unassigned.
|
||||
*/
|
||||
|
@ -325,7 +340,11 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
|||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("[reason=").append(reason).append("]");
|
||||
sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)).append("]");
|
||||
if (failedAllocations > 0) {
|
||||
sb.append(", failed_attempts[").append(failedAllocations).append("]");
|
||||
}
|
||||
String details = getDetails();
|
||||
|
||||
if (details != null) {
|
||||
sb.append(", details[").append(details).append("]");
|
||||
}
|
||||
|
@ -342,6 +361,9 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
|||
builder.startObject("unassigned_info");
|
||||
builder.field("reason", reason);
|
||||
builder.field("at", DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis));
|
||||
if (failedAllocations > 0) {
|
||||
builder.field("failed_attempts", failedAllocations);
|
||||
}
|
||||
String details = getDetails();
|
||||
if (details != null) {
|
||||
builder.field("details", details);
|
||||
|
|
|
@ -222,8 +222,10 @@ public class AllocationService extends AbstractComponent {
|
|||
List<FailedRerouteAllocation.FailedShard> orderedFailedShards = new ArrayList<>(failedShards);
|
||||
orderedFailedShards.sort(Comparator.comparing(failedShard -> failedShard.shard.primary()));
|
||||
for (FailedRerouteAllocation.FailedShard failedShard : orderedFailedShards) {
|
||||
UnassignedInfo unassignedInfo = failedShard.shard.unassignedInfo();
|
||||
final int failedAllocations = unassignedInfo != null ? unassignedInfo.getNumFailedAllocations() : 0;
|
||||
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure,
|
||||
System.nanoTime(), System.currentTimeMillis()));
|
||||
failedAllocations + 1, System.nanoTime(), System.currentTimeMillis()));
|
||||
}
|
||||
if (!changed) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
|
@ -257,16 +259,13 @@ public class AllocationService extends AbstractComponent {
|
|||
.collect(Collectors.joining(", "));
|
||||
}
|
||||
|
||||
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) {
|
||||
return reroute(clusterState, commands, false);
|
||||
}
|
||||
|
||||
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean explain) {
|
||||
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean explain, boolean retryFailed) {
|
||||
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
|
||||
// we don't shuffle the unassigned shards here, to try and get as close as possible to
|
||||
// a consistent result of the effect the commands have on the routing
|
||||
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, clusterInfoService.getClusterInfo(), currentNanoTime());
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
|
||||
clusterInfoService.getClusterInfo(), currentNanoTime(), retryFailed);
|
||||
// don't short circuit deciders, we want a full explanation
|
||||
allocation.debugDecision(true);
|
||||
// we ignore disable allocation, because commands are explicit
|
||||
|
@ -305,7 +304,8 @@ public class AllocationService extends AbstractComponent {
|
|||
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
|
||||
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
|
||||
routingNodes.unassigned().shuffle();
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, clusterInfoService.getClusterInfo(), currentNanoTime());
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
|
||||
clusterInfoService.getClusterInfo(), currentNanoTime(), false);
|
||||
allocation.debugDecision(debug);
|
||||
if (!reroute(allocation)) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
|
@ -437,7 +437,7 @@ public class AllocationService extends AbstractComponent {
|
|||
// now, go over all the shards routing on the node, and fail them
|
||||
for (ShardRouting shardRouting : node.copyShards()) {
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null,
|
||||
allocation.getCurrentNanoTime(), System.currentTimeMillis());
|
||||
0, allocation.getCurrentNanoTime(), System.currentTimeMillis());
|
||||
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
|
||||
}
|
||||
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
|
||||
|
@ -457,8 +457,8 @@ public class AllocationService extends AbstractComponent {
|
|||
boolean changed = false;
|
||||
for (ShardRouting routing : replicas) {
|
||||
changed |= applyFailedShard(allocation, routing, false,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
|
||||
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
|
||||
new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing",
|
||||
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public class FailedRerouteAllocation extends RoutingAllocation {
|
|||
private final List<FailedShard> failedShards;
|
||||
|
||||
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List<FailedShard> failedShards, ClusterInfo clusterInfo) {
|
||||
super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime());
|
||||
super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime(), false);
|
||||
this.failedShards = failedShards;
|
||||
}
|
||||
|
||||
|
|
|
@ -134,6 +134,8 @@ public class RoutingAllocation {
|
|||
|
||||
private boolean ignoreDisable = false;
|
||||
|
||||
private final boolean retryFailed;
|
||||
|
||||
private boolean debugDecision = false;
|
||||
|
||||
private boolean hasPendingAsyncFetch = false;
|
||||
|
@ -148,7 +150,7 @@ public class RoutingAllocation {
|
|||
* @param clusterState cluster state before rerouting
|
||||
* @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
|
||||
*/
|
||||
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, ClusterInfo clusterInfo, long currentNanoTime) {
|
||||
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, ClusterInfo clusterInfo, long currentNanoTime, boolean retryFailed) {
|
||||
this.deciders = deciders;
|
||||
this.routingNodes = routingNodes;
|
||||
this.metaData = clusterState.metaData();
|
||||
|
@ -156,6 +158,7 @@ public class RoutingAllocation {
|
|||
this.customs = clusterState.customs();
|
||||
this.clusterInfo = clusterInfo;
|
||||
this.currentNanoTime = currentNanoTime;
|
||||
this.retryFailed = retryFailed;
|
||||
}
|
||||
|
||||
/** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */
|
||||
|
@ -297,4 +300,8 @@ public class RoutingAllocation {
|
|||
public void setHasPendingAsyncFetch() {
|
||||
this.hasPendingAsyncFetch = true;
|
||||
}
|
||||
|
||||
public boolean isRetryFailed() {
|
||||
return retryFailed;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public class StartedRerouteAllocation extends RoutingAllocation {
|
|||
private final List<? extends ShardRouting> startedShards;
|
||||
|
||||
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List<? extends ShardRouting> startedShards, ClusterInfo clusterInfo) {
|
||||
super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime());
|
||||
super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime(), false);
|
||||
this.startedShards = startedShards;
|
||||
}
|
||||
|
||||
|
|
|
@ -125,7 +125,7 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
|
|||
// we need to move the unassigned info back to treat it as if it was index creation
|
||||
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
|
||||
"force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(),
|
||||
shardRouting.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis());
|
||||
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis());
|
||||
}
|
||||
|
||||
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate);
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
/**
|
||||
* An allocation decider that prevents shards from being allocated on any node if the shards allocation has been retried N times without
|
||||
* success. This means if a shard has been INITIALIZING N times in a row without being moved to STARTED the shard will be ignored until
|
||||
* the setting for <tt>index.allocation.max_retry</tt> is raised. The default value is <tt>5</tt>.
|
||||
* Note: This allocation decider also allows allocation of repeatedly failing shards when the <tt>/_cluster/reroute?retry_failed=true</tt>
|
||||
* API is manually invoked. This allows single retries without raising the limits.
|
||||
*
|
||||
* @see RoutingAllocation#isRetryFailed()
|
||||
*/
|
||||
public class MaxRetryAllocationDecider extends AllocationDecider {
|
||||
|
||||
public static final Setting<Integer> SETTING_ALLOCATION_MAX_RETRY = Setting.intSetting("index.allocation.max_retries", 5, 0,
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
|
||||
public static final String NAME = "max_retry";
|
||||
|
||||
/**
|
||||
* Initializes a new {@link MaxRetryAllocationDecider}
|
||||
*
|
||||
* @param settings {@link Settings} used by this {@link AllocationDecider}
|
||||
*/
|
||||
@Inject
|
||||
public MaxRetryAllocationDecider(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
|
||||
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
|
||||
if (unassignedInfo != null && unassignedInfo.getNumFailedAllocations() > 0) {
|
||||
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
|
||||
final int maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(indexMetaData.getSettings());
|
||||
if (allocation.isRetryFailed()) { // manual allocation - retry
|
||||
// if we are called via the _reroute API we ignore the failure counter and try to allocate
|
||||
// this improves the usability since people don't need to raise the limits to issue retries since a simple _reroute call is
|
||||
// enough to manually retry.
|
||||
return allocation.decision(Decision.YES, NAME, "shard has already failed allocating ["
|
||||
+ unassignedInfo.getNumFailedAllocations() + "] times vs. [" + maxRetry + "] retries allowed "
|
||||
+ unassignedInfo.toString() + " - retrying once on manual allocation");
|
||||
} else if (unassignedInfo.getNumFailedAllocations() >= maxRetry) {
|
||||
return allocation.decision(Decision.NO, NAME, "shard has already failed allocating ["
|
||||
+ unassignedInfo.getNumFailedAllocations() + "] times vs. [" + maxRetry + "] retries allowed "
|
||||
+ unassignedInfo.toString() + " - manually call [/_cluster/reroute?retry_failed=true] to retry");
|
||||
}
|
||||
}
|
||||
return allocation.decision(Decision.YES, NAME, "shard has no previous failures");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
return canAllocate(shardRouting, allocation);
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.common.settings;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||
|
@ -40,7 +41,6 @@ import org.elasticsearch.index.similarity.SimilarityService;
|
|||
import org.elasticsearch.index.store.FsDirectoryService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.IndexWarmer;
|
||||
import org.elasticsearch.indices.IndicesRequestCache;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -59,6 +59,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
|
|||
public static final Predicate<String> INDEX_SETTINGS_KEY_PREDICATE = (s) -> s.startsWith(IndexMetaData.INDEX_SETTING_PREFIX);
|
||||
|
||||
public static final Set<Setting<?>> BUILT_IN_INDEX_SETTINGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY,
|
||||
IndexSettings.INDEX_TTL_DISABLE_PURGE_SETTING,
|
||||
IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING,
|
||||
IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
|
||||
|
|
|
@ -108,7 +108,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
currentNode, nodeWithHighestMatch);
|
||||
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
|
||||
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]",
|
||||
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
|
||||
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,6 +64,7 @@ public class RestClusterRerouteAction extends BaseRestHandler {
|
|||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
|
||||
final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest();
|
||||
clusterRerouteRequest.dryRun(request.paramAsBoolean("dry_run", clusterRerouteRequest.dryRun()));
|
||||
clusterRerouteRequest.setRetryFailed(request.paramAsBoolean("retry_failed", clusterRerouteRequest.isRetryFailed()));
|
||||
clusterRerouteRequest.explain(request.paramAsBoolean("explain", clusterRerouteRequest.explain()));
|
||||
clusterRerouteRequest.timeout(request.paramAsTime("timeout", clusterRerouteRequest.timeout()));
|
||||
clusterRerouteRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterRerouteRequest.masterNodeTimeout()));
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.reroute;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.EmptyClusterInfoService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
|
||||
|
||||
public class ClusterRerouteTests extends ESAllocationTestCase {
|
||||
|
||||
public void testSerializeRequest() throws IOException {
|
||||
ClusterRerouteRequest req = new ClusterRerouteRequest();
|
||||
req.setRetryFailed(randomBoolean());
|
||||
req.dryRun(randomBoolean());
|
||||
req.explain(randomBoolean());
|
||||
req.commands(new AllocateEmptyPrimaryAllocationCommand("foo", 1, "bar", randomBoolean()));
|
||||
req.timeout(TimeValue.timeValueMillis(randomIntBetween(0, 100)));
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
req.writeTo(out);
|
||||
BytesReference bytes = out.bytes();
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
new NetworkModule(null, Settings.EMPTY, true, namedWriteableRegistry);
|
||||
StreamInput wrap = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes.toBytes()),
|
||||
namedWriteableRegistry);
|
||||
ClusterRerouteRequest deserializedReq = new ClusterRerouteRequest();
|
||||
deserializedReq.readFrom(wrap);
|
||||
|
||||
assertEquals(req.isRetryFailed(), deserializedReq.isRetryFailed());
|
||||
assertEquals(req.dryRun(), deserializedReq.dryRun());
|
||||
assertEquals(req.explain(), deserializedReq.explain());
|
||||
assertEquals(req.timeout(), deserializedReq.timeout());
|
||||
assertEquals(1, deserializedReq.getCommands().commands().size()); // allocation commands have their own tests
|
||||
assertEquals(req.getCommands().commands().size(), deserializedReq.getCommands().commands().size());
|
||||
}
|
||||
|
||||
public void testClusterStateUpdateTask() {
|
||||
AllocationService allocationService = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
|
||||
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
|
||||
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
|
||||
ClusterState clusterState = createInitialClusterState(allocationService);
|
||||
ClusterRerouteRequest req = new ClusterRerouteRequest();
|
||||
req.dryRun(true);
|
||||
AtomicReference<ClusterRerouteResponse> responseRef = new AtomicReference<>();
|
||||
ActionListener<ClusterRerouteResponse> responseActionListener = new ActionListener<ClusterRerouteResponse>() {
|
||||
@Override
|
||||
public void onResponse(ClusterRerouteResponse clusterRerouteResponse) {
|
||||
responseRef.set(clusterRerouteResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
|
||||
}
|
||||
};
|
||||
TransportClusterRerouteAction.ClusterRerouteResponseAckedClusterStateUpdateTask task =
|
||||
new TransportClusterRerouteAction.ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, req,
|
||||
responseActionListener );
|
||||
ClusterState execute = task.execute(clusterState);
|
||||
assertSame(execute, clusterState); // dry-run
|
||||
task.onAllNodesAcked(null);
|
||||
assertNotSame(responseRef.get().getState(), execute);
|
||||
|
||||
req.dryRun(false);// now we allocate
|
||||
|
||||
final int retries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY);
|
||||
// now fail it N-1 times
|
||||
for (int i = 0; i < retries; i++) {
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertNotSame(newState, clusterState); // dry-run=false
|
||||
clusterState = newState;
|
||||
RoutingTable routingTable = clusterState.routingTable();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i);
|
||||
List<FailedRerouteAllocation.FailedShard> failedShards = Collections.singletonList(
|
||||
new FailedRerouteAllocation.FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i,
|
||||
new UnsupportedOperationException()));
|
||||
RoutingAllocation.Result result = allocationService.applyFailedShards(clusterState, failedShards);
|
||||
assertTrue(result.changed());
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(result.routingTable()).build();
|
||||
routingTable = clusterState.routingTable();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
if (i == retries-1) {
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
|
||||
} else {
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
}
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i+1);
|
||||
}
|
||||
|
||||
|
||||
// without retry_failed we won't allocate that shard
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertNotSame(newState, clusterState); // dry-run=false
|
||||
task.onAllNodesAcked(null);
|
||||
assertSame(responseRef.get().getState(), newState);
|
||||
RoutingTable routingTable = clusterState.routingTable();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries);
|
||||
|
||||
req.setRetryFailed(true); // now we manually retry and get the shard back into initializing
|
||||
newState = task.execute(clusterState);
|
||||
assertNotSame(newState, clusterState); // dry-run=false
|
||||
clusterState = newState;
|
||||
routingTable = clusterState.routingTable();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries);
|
||||
}
|
||||
|
||||
private ClusterState createInitialClusterState(AllocationService service) {
|
||||
MetaData.Builder metaBuilder = MetaData.builder();
|
||||
metaBuilder.put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0));
|
||||
MetaData metaData = metaBuilder.build();
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||
routingTableBuilder.addAsNew(metaData.index("idx"));
|
||||
|
||||
RoutingTable routingTable = routingTableBuilder.build();
|
||||
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
|
||||
.metaData(metaData).routingTable(routingTable).build();
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")))
|
||||
.build();
|
||||
RoutingTable prevRoutingTable = routingTable;
|
||||
routingTable = service.reroute(clusterState, "reroute").routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
|
||||
assertEquals(prevRoutingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(prevRoutingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
|
||||
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
return clusterState;
|
||||
}
|
||||
}
|
|
@ -64,7 +64,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
|
|||
UnassignedInfo.Reason.NODE_LEFT,
|
||||
UnassignedInfo.Reason.REROUTE_CANCELLED,
|
||||
UnassignedInfo.Reason.REINITIALIZED,
|
||||
UnassignedInfo.Reason.REALLOCATED_REPLICA};
|
||||
UnassignedInfo.Reason.REALLOCATED_REPLICA,
|
||||
UnassignedInfo.Reason.PRIMARY_FAILED};
|
||||
for (int i = 0; i < order.length; i++) {
|
||||
assertThat(order[i].ordinal(), equalTo(i));
|
||||
}
|
||||
|
@ -72,7 +73,10 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
public void testSerialization() throws Exception {
|
||||
UnassignedInfo meta = new UnassignedInfo(RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()), randomBoolean() ? randomAsciiOfLength(4) : null);
|
||||
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values());
|
||||
UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
|
||||
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis()):
|
||||
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null);
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
meta.writeTo(out);
|
||||
out.close();
|
||||
|
@ -82,6 +86,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
|
|||
assertThat(read.getUnassignedTimeInMillis(), equalTo(meta.getUnassignedTimeInMillis()));
|
||||
assertThat(read.getMessage(), equalTo(meta.getMessage()));
|
||||
assertThat(read.getDetails(), equalTo(meta.getDetails()));
|
||||
assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations()));
|
||||
}
|
||||
|
||||
public void testIndexCreated() {
|
||||
|
@ -273,7 +278,10 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
|
|||
public void testUnassignedDelayOnlyNodeLeftNonNodeLeftReason() throws Exception {
|
||||
EnumSet<UnassignedInfo.Reason> reasons = EnumSet.allOf(UnassignedInfo.Reason.class);
|
||||
reasons.remove(UnassignedInfo.Reason.NODE_LEFT);
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(RandomPicks.randomFrom(random(), reasons), null);
|
||||
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), reasons);
|
||||
UnassignedInfo unassignedInfo = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
|
||||
new UnassignedInfo(reason, null, null, 1, System.nanoTime(), System.currentTimeMillis()):
|
||||
new UnassignedInfo(reason, null);
|
||||
unassignedInfo = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
|
||||
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "10h").build(), Settings.EMPTY);
|
||||
long delay = unassignedInfo.getLastComputedLeftDelayNanos();
|
||||
|
@ -287,7 +295,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
|
|||
*/
|
||||
public void testLeftDelayCalculation() throws Exception {
|
||||
final long baseTime = System.nanoTime();
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, baseTime, System.currentTimeMillis());
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, System.currentTimeMillis());
|
||||
final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos();
|
||||
final Settings settings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build();
|
||||
unassignedInfo = unassignedInfo.updateDelay(baseTime, settings, Settings.EMPTY);
|
||||
|
|
|
@ -94,7 +94,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
} else {
|
||||
toNodeId = "node1";
|
||||
}
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", 0, existingNodeId, toNodeId)));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", 0, existingNodeId, toNodeId)), false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node(existingNodeId).iterator().next().state(), equalTo(ShardRoutingState.RELOCATING));
|
||||
|
@ -148,7 +148,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> allocating to non-existent node, should fail");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(index, shardId.id(), "node42")));
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(index, shardId.id(), "node42")), false, false);
|
||||
fail("expected IllegalArgumentException when allocating to non-existing node");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("failed to resolve [node42], no matching nodes"));
|
||||
|
@ -156,7 +156,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> allocating to non-data node, should fail");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(index, shardId.id(), "node4")));
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(index, shardId.id(), "node4")), false, false);
|
||||
fail("expected IllegalArgumentException when allocating to non-data node");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("allocation can only be done on data nodes"));
|
||||
|
@ -164,7 +164,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> allocating non-existing shard, should fail");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand("test", 1, "node2")));
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand("test", 1, "node2")), false, false);
|
||||
fail("expected ShardNotFoundException when allocating non-existing shard");
|
||||
} catch (ShardNotFoundException e) {
|
||||
assertThat(e.getMessage(), containsString("no such shard"));
|
||||
|
@ -172,7 +172,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> allocating non-existing index, should fail");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand("test2", 0, "node2")));
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand("test2", 0, "node2")), false, false);
|
||||
fail("expected ShardNotFoundException when allocating non-existing index");
|
||||
} catch (IndexNotFoundException e) {
|
||||
assertThat(e.getMessage(), containsString("no such index"));
|
||||
|
@ -180,7 +180,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> allocating empty primary with acceptDataLoss flag set to false");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand("test", 0, "node1", false)));
|
||||
allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand("test", 0, "node1", false)), false, false);
|
||||
fail("expected IllegalArgumentException when allocating empty primary with acceptDataLoss flag set to false");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true"));
|
||||
|
@ -188,14 +188,14 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> allocating stale primary with acceptDataLoss flag set to false");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(new AllocateStalePrimaryAllocationCommand(index, shardId.id(), "node1", false)));
|
||||
allocation.reroute(clusterState, new AllocationCommands(new AllocateStalePrimaryAllocationCommand(index, shardId.id(), "node1", false)), false, false);
|
||||
fail("expected IllegalArgumentException when allocating stale primary with acceptDataLoss flag set to false");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true"));
|
||||
}
|
||||
|
||||
logger.info("--> allocating empty primary with acceptDataLoss flag set to true");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand("test", 0, "node1", true)));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand("test", 0, "node1", true)), false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
|
@ -211,13 +211,13 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> allocate the replica shard on the primary shard node, should fail");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node1")));
|
||||
allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node1")), false, false);
|
||||
fail("expected IllegalArgumentException when allocating replica shard on the primary shard node");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
||||
logger.info("--> allocate the replica shard on on the second node");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node2")));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node2")), false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
|
@ -236,7 +236,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> verify that we fail when there are no unassigned shards");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand("test", 0, "node3")));
|
||||
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand("test", 0, "node3")), false, false);
|
||||
fail("expected IllegalArgumentException when allocating shard while no unassigned shard available");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
|
||||
|
||||
logger.info("--> allocating empty primary shard with accept_data_loss flag set to true");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand("test", 0, "node1", true)));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand("test", 0, "node1", true)), false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
|
@ -277,7 +277,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> cancel primary allocation, make sure it fails...");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", false)));
|
||||
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", false)), false, false);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
@ -291,13 +291,13 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> cancel primary allocation, make sure it fails...");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", false)));
|
||||
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", false)), false, false);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
||||
logger.info("--> allocate the replica shard on on the second node");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node2")));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node2")), false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
|
@ -306,7 +306,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1));
|
||||
|
||||
logger.info("--> cancel the relocation allocation");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node2", false)));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node2", false)), false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
|
@ -315,7 +315,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0));
|
||||
|
||||
logger.info("--> allocate the replica shard on on the second node");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node2")));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node2")), false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
|
@ -325,7 +325,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> cancel the primary being replicated, make sure it fails");
|
||||
try {
|
||||
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", false)));
|
||||
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", false)), false, false);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
@ -339,7 +339,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1));
|
||||
|
||||
logger.info("--> cancel allocation of the replica shard");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node2", false)));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node2", false)), false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
|
@ -348,7 +348,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0));
|
||||
|
||||
logger.info("--> allocate the replica shard on on the second node");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node2")));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand("test", 0, "node2")), false, false);
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
|
@ -364,7 +364,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1));
|
||||
|
||||
logger.info("--> move the replica shard");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", 0, "node2", "node3")));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", 0, "node2", "node3")), false, false);
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
|
||||
|
@ -374,7 +374,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(INITIALIZING).size(), equalTo(1));
|
||||
|
||||
logger.info("--> cancel the move of the replica shard");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node3", false)));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node3", false)), false, false);
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
|
||||
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
|
||||
|
@ -383,7 +383,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
|
||||
|
||||
logger.info("--> cancel the primary allocation (with allow_primary set to true)");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", true)));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", true)), false, false);
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
logger.error(clusterState.prettyPrint());
|
||||
|
|
|
@ -868,7 +868,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase {
|
|||
}
|
||||
commands.add(new MoveAllocationCommand("test", 0, primaryNode, "A-4"));
|
||||
|
||||
routingTable = strategy.reroute(clusterState, commands).routingTable();
|
||||
routingTable = strategy.reroute(clusterState, commands, false, false).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
|
||||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0));
|
||||
|
|
|
@ -149,8 +149,8 @@ public class DeadNodesAllocationTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> moving primary shard to node3");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
|
||||
new MoveAllocationCommand("test", 0, clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
|
||||
);
|
||||
new MoveAllocationCommand("test", 0, clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")),
|
||||
false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next().state(), equalTo(RELOCATING));
|
||||
|
@ -223,8 +223,8 @@ public class DeadNodesAllocationTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> moving primary shard to node3");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
|
||||
new MoveAllocationCommand("test",0 , clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
|
||||
);
|
||||
new MoveAllocationCommand("test",0 , clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")),
|
||||
false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next().state(), equalTo(RELOCATING));
|
||||
|
|
|
@ -149,7 +149,7 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
|
|||
} else {
|
||||
toNodeId = "node1";
|
||||
}
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", 0, existingNodeId, toNodeId)));
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", 0, existingNodeId, toNodeId)), false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertEquals(clusterState.getRoutingNodes().node(existingNodeId).iterator().next().state(), ShardRoutingState.RELOCATING);
|
||||
|
|
|
@ -109,8 +109,8 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> moving primary shard to node3");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
|
||||
new MoveAllocationCommand("test", 0, clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
|
||||
);
|
||||
new MoveAllocationCommand("test", 0, clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")),
|
||||
false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next().state(), equalTo(RELOCATING));
|
||||
|
@ -125,8 +125,8 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
|
|||
|
||||
logger.info("--> moving primary shard to node3");
|
||||
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
|
||||
new MoveAllocationCommand("test", 0, clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
|
||||
);
|
||||
new MoveAllocationCommand("test", 0, clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")),
|
||||
false, false);
|
||||
assertThat(rerouteResult.changed(), equalTo(true));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.getRoutingNodes().node(origPrimaryNodeId).iterator().next().state(), equalTo(RELOCATING));
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.EmptyClusterInfoService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
|
||||
|
||||
public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
|
||||
|
||||
private AllocationService strategy;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
strategy = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
|
||||
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
|
||||
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
|
||||
}
|
||||
|
||||
private ClusterState createInitialClusterState() {
|
||||
MetaData.Builder metaBuilder = MetaData.builder();
|
||||
metaBuilder.put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0));
|
||||
MetaData metaData = metaBuilder.build();
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||
routingTableBuilder.addAsNew(metaData.index("idx"));
|
||||
|
||||
RoutingTable routingTable = routingTableBuilder.build();
|
||||
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
|
||||
.metaData(metaData).routingTable(routingTable).build();
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")))
|
||||
.build();
|
||||
RoutingTable prevRoutingTable = routingTable;
|
||||
routingTable = strategy.reroute(clusterState, "reroute", false).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
|
||||
assertEquals(prevRoutingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(prevRoutingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
|
||||
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
public void testSingleRetryOnIgnore() {
|
||||
ClusterState clusterState = createInitialClusterState();
|
||||
RoutingTable routingTable = clusterState.routingTable();
|
||||
final int retries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY);
|
||||
// now fail it N-1 times
|
||||
for (int i = 0; i < retries-1; i++) {
|
||||
List<FailedRerouteAllocation.FailedShard> failedShards = Collections.singletonList(
|
||||
new FailedRerouteAllocation.FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i,
|
||||
new UnsupportedOperationException()));
|
||||
RoutingAllocation.Result result = strategy.applyFailedShards(clusterState, failedShards);
|
||||
assertTrue(result.changed());
|
||||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i+1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom" + i);
|
||||
}
|
||||
// now we go and check that we are actually stick to unassigned on the next failure
|
||||
List<FailedRerouteAllocation.FailedShard> failedShards = Collections.singletonList(
|
||||
new FailedRerouteAllocation.FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom",
|
||||
new UnsupportedOperationException()));
|
||||
RoutingAllocation.Result result = strategy.applyFailedShards(clusterState, failedShards);
|
||||
assertTrue(result.changed());
|
||||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom");
|
||||
|
||||
result = strategy.reroute(clusterState, new AllocationCommands(), false, true); // manual reroute should retry once
|
||||
assertTrue(result.changed());
|
||||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom");
|
||||
|
||||
// now we go and check that we are actually stick to unassigned on the next failure ie. no retry
|
||||
failedShards = Collections.singletonList(
|
||||
new FailedRerouteAllocation.FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom",
|
||||
new UnsupportedOperationException()));
|
||||
result = strategy.applyFailedShards(clusterState, failedShards);
|
||||
assertTrue(result.changed());
|
||||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries+1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom");
|
||||
|
||||
}
|
||||
|
||||
public void testFailedAllocation() {
|
||||
ClusterState clusterState = createInitialClusterState();
|
||||
RoutingTable routingTable = clusterState.routingTable();
|
||||
final int retries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY);
|
||||
// now fail it N-1 times
|
||||
for (int i = 0; i < retries-1; i++) {
|
||||
List<FailedRerouteAllocation.FailedShard> failedShards = Collections.singletonList(
|
||||
new FailedRerouteAllocation.FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom" + i,
|
||||
new UnsupportedOperationException()));
|
||||
RoutingAllocation.Result result = strategy.applyFailedShards(clusterState, failedShards);
|
||||
assertTrue(result.changed());
|
||||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i+1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom" + i);
|
||||
}
|
||||
// now we go and check that we are actually stick to unassigned on the next failure
|
||||
{
|
||||
List<FailedRerouteAllocation.FailedShard> failedShards = Collections.singletonList(
|
||||
new FailedRerouteAllocation.FailedShard(routingTable.index("idx").shard(0).shards().get(0), "boom",
|
||||
new UnsupportedOperationException()));
|
||||
RoutingAllocation.Result result = strategy.applyFailedShards(clusterState, failedShards);
|
||||
assertTrue(result.changed());
|
||||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom");
|
||||
}
|
||||
|
||||
// change the settings and ensure we can do another round of allocation for that index.
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable)
|
||||
.metaData(MetaData.builder(clusterState.metaData())
|
||||
.put(IndexMetaData.builder(clusterState.metaData().index("idx")).settings(
|
||||
Settings.builder().put(clusterState.metaData().index("idx").getSettings()).put("index.allocation.max_retries",
|
||||
retries+1).build()
|
||||
).build(), true).build()).build();
|
||||
RoutingAllocation.Result result = strategy.reroute(clusterState, "settings changed", false);
|
||||
assertTrue(result.changed());
|
||||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
// good we are initializing and we are maintaining failure information
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom");
|
||||
|
||||
// now we start the shard
|
||||
routingTable = strategy.applyStartedShards(clusterState, Collections.singletonList(routingTable.index("idx")
|
||||
.shard(0).shards().get(0))).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
|
||||
// all counters have been reset to 0 ie. no unassigned info
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertNull(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo());
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), STARTED);
|
||||
|
||||
// now fail again and see if it has a new counter
|
||||
List<FailedRerouteAllocation.FailedShard> failedShards = Collections.singletonList(
|
||||
new FailedRerouteAllocation.FailedShard(routingTable.index("idx").shard(0).shards().get(0), "ZOOOMG",
|
||||
new UnsupportedOperationException()));
|
||||
result = strategy.applyFailedShards(clusterState, failedShards);
|
||||
assertTrue(result.changed());
|
||||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "ZOOOMG");
|
||||
}
|
||||
}
|
|
@ -337,7 +337,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
|
|||
AllocationService strategy = new MockAllocationService(Settings.EMPTY,
|
||||
allocationDeciders,
|
||||
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
|
||||
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
|
||||
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true, false);
|
||||
// the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match
|
||||
state = ClusterState.builder(state).routingResult(result).build();
|
||||
assertThat(result.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
|
||||
|
@ -369,7 +369,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
|
|||
AllocationService strategy = new MockAllocationService(Settings.EMPTY,
|
||||
allocationDeciders,
|
||||
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
|
||||
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
|
||||
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true, false);
|
||||
|
||||
// Make sure that primary shards are only allocated on the new node
|
||||
for (int i = 0; i < numberOfShards; i++) {
|
||||
|
|
|
@ -283,7 +283,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
|
|||
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
|
||||
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
|
||||
|
||||
RoutingAllocation.Result reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", clusterState.getRoutingNodes().node("node1").iterator().next().shardId().id(), "node1", "node2")));
|
||||
RoutingAllocation.Result reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", clusterState.getRoutingNodes().node("node1").iterator().next().shardId().id(), "node1", "node2")), false, false);
|
||||
assertEquals(reroute.explanations().explanations().size(), 1);
|
||||
assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.YES);
|
||||
routingTable = reroute.routingTable();
|
||||
|
@ -296,7 +296,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
|
|||
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
|
||||
|
||||
// outgoing throttles
|
||||
reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", clusterState.getRoutingNodes().node("node3").iterator().next().shardId().id(), "node3", "node1")), true);
|
||||
reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", clusterState.getRoutingNodes().node("node3").iterator().next().shardId().id(), "node3", "node1")), true, false);
|
||||
assertEquals(reroute.explanations().explanations().size(), 1);
|
||||
assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.THROTTLE);
|
||||
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
|
||||
|
@ -311,7 +311,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
|
|||
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
|
||||
|
||||
// incoming throttles
|
||||
reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", clusterState.getRoutingNodes().node("node3").iterator().next().shardId().id(), "node3", "node2")), true);
|
||||
reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", clusterState.getRoutingNodes().node("node3").iterator().next().shardId().id(), "node3", "node2")), true, false);
|
||||
assertEquals(reroute.explanations().explanations().size(), 1);
|
||||
assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.THROTTLE);
|
||||
|
||||
|
|
|
@ -796,7 +796,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
AllocationCommand relocate1 = new MoveAllocationCommand("test", 0, "node2", "node3");
|
||||
AllocationCommands cmds = new AllocationCommands(relocate1);
|
||||
|
||||
routingTable = strategy.reroute(clusterState, cmds).routingTable();
|
||||
routingTable = strategy.reroute(clusterState, cmds, false, false).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
logShardStates(clusterState);
|
||||
|
||||
|
@ -808,7 +808,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
// node3, which will put it over the low watermark when it
|
||||
// completes, with shard relocations taken into account this should
|
||||
// throw an exception about not being able to complete
|
||||
strategy.reroute(clusterState, cmds).routingTable();
|
||||
strategy.reroute(clusterState, cmds, false, false).routingTable();
|
||||
fail("should not have been able to reroute the shard");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat("can't allocated because there isn't enough room: " + e.getMessage(),
|
||||
|
@ -876,7 +876,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
);
|
||||
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo,
|
||||
System.nanoTime());
|
||||
System.nanoTime(), false);
|
||||
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||
assertThat(decision.type(), equalTo(Decision.Type.NO));
|
||||
|
||||
|
@ -896,7 +896,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
)
|
||||
);
|
||||
clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo, System.nanoTime());
|
||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo, System.nanoTime(),
|
||||
false);
|
||||
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||
|
||||
|
@ -992,7 +993,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
);
|
||||
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo,
|
||||
System.nanoTime());
|
||||
System.nanoTime(), false);
|
||||
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||
|
||||
// Two shards should start happily
|
||||
|
@ -1051,7 +1052,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
);
|
||||
|
||||
clusterState = ClusterState.builder(updateClusterState).routingTable(builder.build()).build();
|
||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo, System.nanoTime());
|
||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), clusterState, clusterInfo, System.nanoTime(),
|
||||
false);
|
||||
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||
|
||||
|
|
|
@ -136,7 +136,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
|
|||
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
|
||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of());
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime());
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime(), false);
|
||||
assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation));
|
||||
assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation));
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
|
|||
shardSizes.put("[test][2][p]", 10L);
|
||||
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), shardRoutingMap.build());
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime());
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime(), false);
|
||||
assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation));
|
||||
assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation));
|
||||
try {
|
||||
|
|
|
@ -346,7 +346,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state, null, System.nanoTime());
|
||||
return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state, null, System.nanoTime(), false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -425,7 +425,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state, null, System.nanoTime());
|
||||
return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state, null, System.nanoTime(), false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -444,7 +444,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime());
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -452,7 +452,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node1, 1, null, randomBoolean());
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime());
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -460,7 +460,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node2, 1, null, randomBoolean());
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime());
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
|
||||
|
@ -485,7 +485,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime());
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -493,7 +493,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node1, 1, null, randomBoolean());
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime());
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -501,7 +501,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node2, 2, null, randomBoolean());
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime());
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
|
||||
|
@ -525,7 +525,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
.metaData(metaData)
|
||||
.routingTable(routingTableBuilder.build())
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, System.nanoTime());
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, System.nanoTime(), false);
|
||||
}
|
||||
|
||||
class TestAllocator extends PrimaryShardAllocator {
|
||||
|
|
|
@ -302,7 +302,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime());
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime(), false);
|
||||
}
|
||||
|
||||
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
|
||||
|
@ -324,7 +324,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime());
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime(), false);
|
||||
}
|
||||
|
||||
class TestAllocator extends ReplicaShardAllocator {
|
||||
|
|
|
@ -103,7 +103,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
|
|||
.nodes(DiscoveryNodes.EMPTY_NODES)
|
||||
.build(), false
|
||||
);
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current, ClusterInfo.EMPTY, System.nanoTime());
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current, ClusterInfo.EMPTY, System.nanoTime(), false);
|
||||
allocator.allocateUnassigned(routingAllocation);
|
||||
}
|
||||
|
||||
|
|
|
@ -640,7 +640,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(Settings.builder().put("location", repositoryLocation)));
|
||||
|
||||
createIndex("test-idx");
|
||||
prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get();
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
|
|
|
@ -103,3 +103,16 @@ are available:
|
|||
To ensure that these implications are well-understood,
|
||||
this command requires the special field `accept_data_loss` to be
|
||||
explicitly set to `true` for it to work.
|
||||
|
||||
[float]
|
||||
=== Retry failed shards
|
||||
|
||||
The cluster will attempt to allocate a shard a maximum of
|
||||
`index.allocation.max_retries` times in a row (defaults to `5`), before giving
|
||||
up and leaving the shard unallocated. This scenario can be caused by
|
||||
structural problems such as having an analyzer which refers to a stopwords
|
||||
file which doesn't exist on all nodes.
|
||||
|
||||
Once the problem has been corrected, allocation can be manually retried by
|
||||
calling the <<cluster-reroute,`_reroute`>> API with `?retry_failed`, which
|
||||
will attempt a single retry round for these shards.
|
|
@ -16,6 +16,10 @@
|
|||
"type" : "boolean",
|
||||
"description" : "Return an explanation of why the commands can or cannot be executed"
|
||||
},
|
||||
"retry_failed": {
|
||||
"type" : "boolean",
|
||||
"description" : "Retries allocation of shards that are blocked due to too many subsequent allocation failures"
|
||||
},
|
||||
"metric": {
|
||||
"type": "list",
|
||||
"options": ["_all", "blocks", "metadata", "nodes", "routing_table", "master_node", "version"],
|
||||
|
|
Loading…
Reference in New Issue