merge master
This commit is contained in:
commit
e2e38b3d71
|
@ -55,7 +55,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
|||
private final AllocationService allocationService;
|
||||
|
||||
private AtomicBoolean rerouting = new AtomicBoolean();
|
||||
private volatile long minDelaySettingAtLastScheduling = Long.MAX_VALUE;
|
||||
private volatile long minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
|
||||
private volatile ScheduledFuture registeredNextDelayFuture;
|
||||
|
||||
@Inject
|
||||
|
@ -100,14 +100,14 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
|||
// Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule.
|
||||
// If the minimum of the currently relevant delay settings is larger than something we scheduled in the past,
|
||||
// we are guaranteed that the planned schedule will happen before any of the current shard delays are expired.
|
||||
long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state());
|
||||
long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(settings, event.state());
|
||||
if (minDelaySetting <= 0) {
|
||||
logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling);
|
||||
minDelaySettingAtLastScheduling = Long.MAX_VALUE;
|
||||
logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos);
|
||||
minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
|
||||
FutureUtils.cancel(registeredNextDelayFuture);
|
||||
} else if (minDelaySetting < minDelaySettingAtLastScheduling) {
|
||||
} else if (minDelaySetting < minDelaySettingAtLastSchedulingNanos) {
|
||||
FutureUtils.cancel(registeredNextDelayFuture);
|
||||
minDelaySettingAtLastScheduling = minDelaySetting;
|
||||
minDelaySettingAtLastSchedulingNanos = minDelaySetting;
|
||||
TimeValue nextDelay = TimeValue.timeValueNanos(UnassignedInfo.findNextDelayedAllocationIn(event.state()));
|
||||
assert nextDelay.nanos() > 0 : "next delay must be non 0 as minDelaySetting is [" + minDelaySetting + "]";
|
||||
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
|
||||
|
@ -115,25 +115,25 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
|||
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
minDelaySettingAtLastScheduling = Long.MAX_VALUE;
|
||||
minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
|
||||
reroute("assign delayed unassigned shards");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
|
||||
minDelaySettingAtLastScheduling = Long.MAX_VALUE;
|
||||
minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling);
|
||||
logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
long getMinDelaySettingAtLastScheduling() {
|
||||
return this.minDelaySettingAtLastScheduling;
|
||||
long getMinDelaySettingAtLastSchedulingNanos() {
|
||||
return this.minDelaySettingAtLastSchedulingNanos;
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Holds additional information as to why the shard is in unassigned state.
|
||||
|
@ -110,18 +109,27 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
|
|||
private final String message;
|
||||
private final Throwable failure;
|
||||
|
||||
/**
|
||||
* creates an UnassingedInfo object based **current** time
|
||||
*
|
||||
* @param reason the cause for making this shard unassigned. See {@link Reason} for more information.
|
||||
* @param message more information about cause.
|
||||
**/
|
||||
public UnassignedInfo(Reason reason, String message) {
|
||||
this(reason, System.currentTimeMillis(), System.nanoTime(), message, null);
|
||||
this(reason, message, null, System.nanoTime(), System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure) {
|
||||
this(reason, System.currentTimeMillis(), System.nanoTime(), message, failure);
|
||||
}
|
||||
|
||||
private UnassignedInfo(Reason reason, long unassignedTimeMillis, long timestampNanos, String message, Throwable failure) {
|
||||
/**
|
||||
* @param reason the cause for making this shard unassigned. See {@link Reason} for more information.
|
||||
* @param message more information about cause.
|
||||
* @param failure the shard level failure that caused this shard to be unassigned, if exists.
|
||||
* @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation
|
||||
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
|
||||
*/
|
||||
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, long unassignedTimeNanos, long unassignedTimeMillis) {
|
||||
this.reason = reason;
|
||||
this.unassignedTimeMillis = unassignedTimeMillis;
|
||||
this.unassignedTimeNanos = timestampNanos;
|
||||
this.unassignedTimeNanos = unassignedTimeNanos;
|
||||
this.message = message;
|
||||
this.failure = failure;
|
||||
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
|
||||
|
@ -201,14 +209,14 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
|
|||
}
|
||||
|
||||
/**
|
||||
* The allocation delay value in milliseconds associated with the index (defaulting to node settings if not set).
|
||||
* The allocation delay value in nano seconds associated with the index (defaulting to node settings if not set).
|
||||
*/
|
||||
public long getAllocationDelayTimeoutSetting(Settings settings, Settings indexSettings) {
|
||||
public long getAllocationDelayTimeoutSettingNanos(Settings settings, Settings indexSettings) {
|
||||
if (reason != Reason.NODE_LEFT) {
|
||||
return 0;
|
||||
}
|
||||
TimeValue delayTimeout = indexSettings.getAsTime(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, settings.getAsTime(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, DEFAULT_DELAYED_NODE_LEFT_TIMEOUT));
|
||||
return Math.max(0l, delayTimeout.millis());
|
||||
return Math.max(0l, delayTimeout.nanos());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -221,18 +229,17 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
|
|||
|
||||
/**
|
||||
* Updates delay left based on current time (in nanoseconds) and index/node settings.
|
||||
* Should only be called from ReplicaShardAllocator.
|
||||
*
|
||||
* @return updated delay in nanoseconds
|
||||
*/
|
||||
public long updateDelay(long nanoTimeNow, Settings settings, Settings indexSettings) {
|
||||
long delayTimeoutMillis = getAllocationDelayTimeoutSetting(settings, indexSettings);
|
||||
long delayTimeoutNanos = getAllocationDelayTimeoutSettingNanos(settings, indexSettings);
|
||||
final long newComputedLeftDelayNanos;
|
||||
if (delayTimeoutMillis == 0l) {
|
||||
if (delayTimeoutNanos == 0l) {
|
||||
newComputedLeftDelayNanos = 0l;
|
||||
} else {
|
||||
assert nanoTimeNow >= unassignedTimeNanos;
|
||||
long delayTimeoutNanos = TimeUnit.NANOSECONDS.convert(delayTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
newComputedLeftDelayNanos = Math.max(0l, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
|
||||
newComputedLeftDelayNanos = Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
|
||||
}
|
||||
lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
|
||||
return newComputedLeftDelayNanos;
|
||||
|
@ -255,21 +262,21 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Finds the smallest delay expiration setting in milliseconds of all unassigned shards that are still delayed. Returns 0 if there are none.
|
||||
* Finds the smallest delay expiration setting in nanos of all unassigned shards that are still delayed. Returns 0 if there are none.
|
||||
*/
|
||||
public static long findSmallestDelayedAllocationSetting(Settings settings, ClusterState state) {
|
||||
long nextDelaySetting = Long.MAX_VALUE;
|
||||
public static long findSmallestDelayedAllocationSettingNanos(Settings settings, ClusterState state) {
|
||||
long minDelaySetting = Long.MAX_VALUE;
|
||||
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
|
||||
if (shard.primary() == false) {
|
||||
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
|
||||
long leftDelayNanos = shard.unassignedInfo().getLastComputedLeftDelayNanos();
|
||||
long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSetting(settings, indexMetaData.getSettings());
|
||||
if (leftDelayNanos > 0 && delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) {
|
||||
nextDelaySetting = delayTimeoutSetting;
|
||||
boolean delayed = shard.unassignedInfo().getLastComputedLeftDelayNanos() > 0;
|
||||
long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSettingNanos(settings, indexMetaData.getSettings());
|
||||
if (delayed && delayTimeoutSetting > 0 && delayTimeoutSetting < minDelaySetting) {
|
||||
minDelaySetting = delayTimeoutSetting;
|
||||
}
|
||||
}
|
||||
}
|
||||
return nextDelaySetting == Long.MAX_VALUE ? 0l : nextDelaySetting;
|
||||
return minDelaySetting == Long.MAX_VALUE ? 0l : minDelaySetting;
|
||||
}
|
||||
|
||||
|
||||
|
@ -320,14 +327,24 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
UnassignedInfo that = (UnassignedInfo) o;
|
||||
|
||||
if (unassignedTimeMillis != that.unassignedTimeMillis) return false;
|
||||
if (reason != that.reason) return false;
|
||||
if (message != null ? !message.equals(that.message) : that.message != null) return false;
|
||||
if (unassignedTimeMillis != that.unassignedTimeMillis) {
|
||||
return false;
|
||||
}
|
||||
if (reason != that.reason) {
|
||||
return false;
|
||||
}
|
||||
if (message != null ? !message.equals(that.message) : that.message != null) {
|
||||
return false;
|
||||
}
|
||||
return !(failure != null ? !failure.equals(that.failure) : that.failure != null);
|
||||
|
||||
}
|
||||
|
|
|
@ -172,7 +172,8 @@ public class AllocationService extends AbstractComponent {
|
|||
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShards, clusterInfoService.getClusterInfo());
|
||||
boolean changed = false;
|
||||
for (FailedRerouteAllocation.FailedShard failedShard : failedShards) {
|
||||
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure));
|
||||
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure,
|
||||
System.nanoTime(), System.currentTimeMillis()));
|
||||
}
|
||||
if (!changed) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
|
@ -216,7 +217,7 @@ public class AllocationService extends AbstractComponent {
|
|||
// we don't shuffle the unassigned shards here, to try and get as close as possible to
|
||||
// a consistent result of the effect the commands have on the routing
|
||||
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime());
|
||||
// don't short circuit deciders, we want a full explanation
|
||||
allocation.debugDecision(true);
|
||||
// we ignore disable allocation, because commands are explicit
|
||||
|
@ -255,7 +256,7 @@ public class AllocationService extends AbstractComponent {
|
|||
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
|
||||
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
|
||||
routingNodes.unassigned().shuffle();
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime());
|
||||
allocation.debugDecision(debug);
|
||||
if (!reroute(allocation)) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
|
@ -291,6 +292,8 @@ public class AllocationService extends AbstractComponent {
|
|||
|
||||
// now allocate all the unassigned to available nodes
|
||||
if (allocation.routingNodes().unassigned().size() > 0) {
|
||||
updateLeftDelayOfUnassignedShards(allocation, settings);
|
||||
|
||||
changed |= shardsAllocators.allocateUnassigned(allocation);
|
||||
}
|
||||
|
||||
|
@ -303,6 +306,15 @@ public class AllocationService extends AbstractComponent {
|
|||
return changed;
|
||||
}
|
||||
|
||||
// public for testing
|
||||
public static void updateLeftDelayOfUnassignedShards(RoutingAllocation allocation, Settings settings) {
|
||||
for (ShardRouting shardRouting : allocation.routingNodes().unassigned()) {
|
||||
final MetaData metaData = allocation.metaData();
|
||||
final IndexMetaData indexMetaData = metaData.index(shardRouting.index());
|
||||
shardRouting.unassignedInfo().updateDelay(allocation.getCurrentNanoTime(), settings, indexMetaData.getSettings());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean moveShards(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
|
||||
|
@ -364,7 +376,9 @@ public class AllocationService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
for (ShardRouting shardToFail : shardsToFail) {
|
||||
changed |= applyFailedShard(allocation, shardToFail, false, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing"));
|
||||
changed |= applyFailedShard(allocation, shardToFail, false,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
|
||||
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
|
||||
}
|
||||
|
||||
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
|
||||
|
@ -424,8 +438,9 @@ public class AllocationService extends AbstractComponent {
|
|||
}
|
||||
changed = true;
|
||||
// now, go over all the shards routing on the node, and fail them
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]");
|
||||
for (ShardRouting shardRouting : node.copyShards()) {
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null,
|
||||
allocation.getCurrentNanoTime(), System.currentTimeMillis());
|
||||
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
|
||||
}
|
||||
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
|
||||
|
@ -583,4 +598,9 @@ public class AllocationService extends AbstractComponent {
|
|||
RoutingNodes routingNodes = new RoutingNodes(clusterState, false); // this is a costly operation - only call this once!
|
||||
return routingNodes;
|
||||
}
|
||||
|
||||
/** ovrride this to control time based decisions during allocation */
|
||||
protected long currentNanoTime() {
|
||||
return System.nanoTime();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public class FailedRerouteAllocation extends RoutingAllocation {
|
|||
private final List<FailedShard> failedShards;
|
||||
|
||||
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<FailedShard> failedShards, ClusterInfo clusterInfo) {
|
||||
super(deciders, routingNodes, nodes, clusterInfo);
|
||||
super(deciders, routingNodes, nodes, clusterInfo, System.nanoTime());
|
||||
this.failedShards = failedShards;
|
||||
}
|
||||
|
||||
|
|
|
@ -132,19 +132,27 @@ public class RoutingAllocation {
|
|||
|
||||
private boolean hasPendingAsyncFetch = false;
|
||||
|
||||
private final long currentNanoTime;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new {@link RoutingAllocation}
|
||||
*
|
||||
* @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
|
||||
* @param routingNodes Routing nodes in the current cluster
|
||||
* @param nodes TODO: Documentation
|
||||
* @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
|
||||
*/
|
||||
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ClusterInfo clusterInfo) {
|
||||
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ClusterInfo clusterInfo, long currentNanoTime) {
|
||||
this.deciders = deciders;
|
||||
this.routingNodes = routingNodes;
|
||||
this.nodes = nodes;
|
||||
this.clusterInfo = clusterInfo;
|
||||
this.currentNanoTime = currentNanoTime;
|
||||
}
|
||||
|
||||
/** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */
|
||||
public long getCurrentNanoTime() {
|
||||
return currentNanoTime;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -36,7 +36,7 @@ public class StartedRerouteAllocation extends RoutingAllocation {
|
|||
private final List<? extends ShardRouting> startedShards;
|
||||
|
||||
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards, ClusterInfo clusterInfo) {
|
||||
super(deciders, routingNodes, nodes, clusterInfo);
|
||||
super(deciders, routingNodes, nodes, clusterInfo, System.nanoTime());
|
||||
this.startedShards = startedShards;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
|
@ -74,6 +74,10 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
|
|||
return changed;
|
||||
}
|
||||
|
||||
protected long nanoTime() {
|
||||
return System.nanoTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rebalance(RoutingAllocation allocation) {
|
||||
if (allocation.hasPendingAsyncFetch() == false) {
|
||||
|
|
|
@ -229,7 +229,8 @@ public class AllocateAllocationCommand implements AllocationCommand {
|
|||
// it was index creation
|
||||
if (unassigned.primary() && unassigned.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
|
||||
unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
|
||||
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure()));
|
||||
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(),
|
||||
unassigned.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis()));
|
||||
}
|
||||
it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
break;
|
||||
|
|
|
@ -21,7 +21,10 @@ package org.elasticsearch.cluster.routing.allocation.command;
|
|||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
|
@ -34,7 +37,6 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||
|
||||
/**
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.carrotsearch.hppc.ObjectLongMap;
|
|||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
|
@ -101,7 +100,8 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
// we found a better match that has a full sync id match, the existing allocation is not fully synced
|
||||
// so we found a better one, cancel this one
|
||||
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
|
||||
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]"));
|
||||
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]",
|
||||
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
@ -111,7 +111,6 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
}
|
||||
|
||||
public boolean allocateUnassigned(RoutingAllocation allocation) {
|
||||
long nanoTimeNow = System.nanoTime();
|
||||
boolean changed = false;
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||
|
@ -171,7 +170,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
}
|
||||
} else if (matchingNodes.hasAnyData() == false) {
|
||||
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed
|
||||
changed |= ignoreUnassignedIfDelayed(nanoTimeNow, allocation, unassignedIterator, shard);
|
||||
changed |= ignoreUnassignedIfDelayed(unassignedIterator, shard);
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
|
@ -185,16 +184,13 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
|||
*
|
||||
* PUBLIC FOR TESTS!
|
||||
*
|
||||
* @param timeNowNanos Timestamp in nanoseconds representing "now"
|
||||
* @param allocation the routing allocation
|
||||
* @param unassignedIterator iterator over unassigned shards
|
||||
* @param shard the shard which might be delayed
|
||||
* @return true iff allocation is delayed for this shard
|
||||
*/
|
||||
public boolean ignoreUnassignedIfDelayed(long timeNowNanos, RoutingAllocation allocation, RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
|
||||
IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
|
||||
public boolean ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
|
||||
// calculate delay and store it in UnassignedInfo to be used by RoutingService
|
||||
long delay = shard.unassignedInfo().updateDelay(timeNowNanos, settings, indexMetaData.getSettings());
|
||||
long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
|
||||
if (delay > 0) {
|
||||
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueNanos(delay));
|
||||
/**
|
||||
|
|
|
@ -224,6 +224,7 @@ public class ClusterStateCreationUtils {
|
|||
|
||||
/**
|
||||
* Creates a cluster state where local node and master node can be specified
|
||||
*
|
||||
* @param localNode node in allNodes that is the local node
|
||||
* @param masterNode node in allNodes that is the master node. Can be null if no master exists
|
||||
* @param allNodes all nodes in the cluster
|
||||
|
|
|
@ -20,25 +20,13 @@
|
|||
package org.elasticsearch.cluster;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
|
||||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
import org.elasticsearch.cluster.metadata.*;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
|
@ -50,7 +38,6 @@ import org.elasticsearch.discovery.DiscoverySettings;
|
|||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
|
|
|
@ -21,10 +21,7 @@ package org.elasticsearch.cluster.routing;
|
|||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -37,7 +36,6 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
/**
|
||||
*/
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
@LuceneTestCase.AwaitsFix(bugUrl = "http://build-us-00.elastic.co/job/es_core_master_windows-2012-r2/2074/testReport/ (boaz on it)")
|
||||
public class DelayedAllocationIT extends ESIntegTestCase {
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,9 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import static org.elasticsearch.test.ESTestCase.randomAsciiOfLength;
|
||||
import static org.elasticsearch.test.ESTestCase.randomFrom;
|
||||
import static org.elasticsearch.test.ESTestCase.randomInt;
|
||||
import static org.elasticsearch.test.ESTestCase.*;
|
||||
|
||||
/**
|
||||
* Utility class the makes random modifications to ShardRouting
|
||||
|
|
|
@ -88,15 +88,14 @@ public class RoutingServiceTests extends ESAllocationTestCase {
|
|||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
|
||||
ClusterState newState = clusterState;
|
||||
|
||||
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
|
||||
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
|
||||
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
|
||||
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
|
||||
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
|
||||
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
|
||||
}
|
||||
|
||||
public void testDelayedUnassignedScheduleReroute() throws Exception {
|
||||
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
|
||||
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
|
||||
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
|
||||
.numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -126,7 +125,6 @@ public class RoutingServiceTests extends ESAllocationTestCase {
|
|||
ClusterState prevState = clusterState;
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build();
|
||||
// make sure the replica is marked as delayed (i.e. not reallocated)
|
||||
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMillis(randomIntBetween(0, 99)).nanos());
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
|
||||
assertEquals(1, clusterState.getRoutingNodes().unassigned().size());
|
||||
|
||||
|
@ -134,7 +132,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
|
|||
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
|
||||
assertBusy(() -> assertTrue("routing service should have run a reroute", routingService.hasReroutedAndClear()));
|
||||
// verify the registration has been reset
|
||||
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
|
||||
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -144,8 +142,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
|
|||
final ThreadPool testThreadPool = new ThreadPool(getTestName());
|
||||
|
||||
try {
|
||||
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
|
||||
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
|
||||
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
|
||||
.numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -185,11 +182,13 @@ public class RoutingServiceTests extends ESAllocationTestCase {
|
|||
}
|
||||
assertNotNull(longDelayReplica);
|
||||
|
||||
final long baseTime = System.nanoTime();
|
||||
|
||||
// remove node of shortDelayReplica and node of longDelayReplica and reroute
|
||||
ClusterState prevState = clusterState;
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build();
|
||||
// make sure both replicas are marked as delayed (i.e. not reallocated)
|
||||
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
|
||||
allocation.setNanoTimeOverride(baseTime);
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
|
||||
|
||||
// check that shortDelayReplica and longDelayReplica have been marked unassigned
|
||||
|
@ -216,7 +215,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
|
|||
RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation);
|
||||
routingService.start(); // just so performReroute does not prematurely return
|
||||
// next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica, simulate that we are now 1 second after shards became unassigned
|
||||
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueSeconds(1).nanos());
|
||||
allocation.setNanoTimeOverride(baseTime + TimeValue.timeValueSeconds(1).nanos());
|
||||
// register listener on cluster state so we know when cluster state has been changed
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
clusterService.addLast(event -> latch.countDown());
|
||||
|
@ -225,50 +224,12 @@ public class RoutingServiceTests extends ESAllocationTestCase {
|
|||
// cluster service should have updated state and called routingService with clusterChanged
|
||||
latch.await();
|
||||
// verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica
|
||||
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(TimeValue.timeValueSeconds(10).millis()));
|
||||
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(TimeValue.timeValueSeconds(10).nanos()));
|
||||
} finally {
|
||||
terminate(testThreadPool);
|
||||
}
|
||||
}
|
||||
|
||||
public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception {
|
||||
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
|
||||
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
|
||||
.numberOfShards(1).numberOfReplicas(1))
|
||||
.build();
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(metaData)
|
||||
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
|
||||
// starting primaries
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
|
||||
// starting replicas
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
|
||||
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
|
||||
// remove node2 and reroute
|
||||
ClusterState prevState = clusterState;
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
|
||||
// Set it in the future so the delay will be negative
|
||||
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMinutes(1).nanos());
|
||||
|
||||
ClusterState newState = clusterState;
|
||||
|
||||
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
|
||||
|
||||
// verify the registration has been updated
|
||||
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(100L));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private class TestRoutingService extends RoutingService {
|
||||
|
||||
private AtomicBoolean rerouted = new AtomicBoolean();
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing;
|
|||
|
||||
import com.carrotsearch.hppc.IntHashSet;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -40,15 +39,8 @@ import org.elasticsearch.test.ESAllocationTestCase;
|
|||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -282,9 +274,32 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
|
|||
assertThat(delay, equalTo(0l));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that delayed allocation calculation are correct.
|
||||
*/
|
||||
public void testLeftDelayCalculation() throws Exception {
|
||||
final long baseTime = System.nanoTime();
|
||||
final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, baseTime, System.currentTimeMillis());
|
||||
final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos();
|
||||
final Settings settings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueNanos(totalDelayNanos)).build();
|
||||
long delay = unassignedInfo.updateDelay(baseTime, settings, Settings.EMPTY);
|
||||
assertThat(delay, equalTo(totalDelayNanos));
|
||||
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
|
||||
long delta1 = randomIntBetween(1, (int) (totalDelayNanos - 1));
|
||||
delay = unassignedInfo.updateDelay(baseTime + delta1, settings, Settings.EMPTY);
|
||||
assertThat(delay, equalTo(totalDelayNanos - delta1));
|
||||
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
|
||||
delay = unassignedInfo.updateDelay(baseTime + totalDelayNanos, settings, Settings.EMPTY);
|
||||
assertThat(delay, equalTo(0L));
|
||||
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
|
||||
delay = unassignedInfo.updateDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), settings, Settings.EMPTY);
|
||||
assertThat(delay, equalTo(0L));
|
||||
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
|
||||
}
|
||||
|
||||
|
||||
public void testNumberOfDelayedUnassigned() throws Exception {
|
||||
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
|
||||
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
|
||||
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
||||
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -303,17 +318,21 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
|
|||
// remove node2 and reroute
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
|
||||
// make sure both replicas are marked as delayed (i.e. not reallocated)
|
||||
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
|
||||
assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(2));
|
||||
}
|
||||
|
||||
public void testFindNextDelayedAllocation() {
|
||||
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
|
||||
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
|
||||
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
|
||||
final long baseTime = System.nanoTime();
|
||||
allocation.setNanoTimeOverride(baseTime);
|
||||
final TimeValue delayTest1 = TimeValue.timeValueMillis(randomIntBetween(1, 200));
|
||||
final TimeValue delayTest2 = TimeValue.timeValueMillis(randomIntBetween(1, 200));
|
||||
final long expectMinDelaySettingsNanos = Math.min(delayTest1.nanos(), delayTest2.nanos());
|
||||
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1))
|
||||
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1))
|
||||
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, delayTest1)).numberOfShards(1).numberOfReplicas(1))
|
||||
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, delayTest2)).numberOfShards(1).numberOfReplicas(1))
|
||||
.build();
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(metaData)
|
||||
|
@ -328,15 +347,19 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
|
|||
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
|
||||
// remove node2 and reroute
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
|
||||
// make sure both replicas are marked as delayed (i.e. not reallocated)
|
||||
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
|
||||
|
||||
long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState);
|
||||
assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis()));
|
||||
final long delta = randomBoolean() ? 0 : randomInt((int) expectMinDelaySettingsNanos);
|
||||
|
||||
if (delta > 0) {
|
||||
allocation.setNanoTimeOverride(baseTime + delta);
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "time moved")).build();
|
||||
}
|
||||
|
||||
long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(Settings.EMPTY, clusterState);
|
||||
assertThat(minDelaySetting, equalTo(expectMinDelaySettingsNanos));
|
||||
|
||||
long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(clusterState);
|
||||
assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).nanos()));
|
||||
assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).nanos()));
|
||||
assertThat(nextDelay, equalTo(expectMinDelaySettingsNanos - delta));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -847,7 +847,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
)
|
||||
);
|
||||
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime());
|
||||
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||
assertThat(decision.type(), equalTo(Decision.Type.NO));
|
||||
|
||||
|
@ -867,7 +867,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
)
|
||||
);
|
||||
clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
|
||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime());
|
||||
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||
|
||||
|
@ -966,7 +966,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
)
|
||||
);
|
||||
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime());
|
||||
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||
|
||||
// Two shards should start happily
|
||||
|
@ -1023,7 +1023,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
);
|
||||
|
||||
clusterState = ClusterState.builder(updateClusterState).routingTable(builder.build()).build();
|
||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
|
||||
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime());
|
||||
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||
|
||||
|
|
|
@ -121,7 +121,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
|
|||
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
|
||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of());
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo, System.nanoTime());
|
||||
assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation));
|
||||
assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation));
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
|
|||
shardSizes.put("[test][2][p]", 10L);
|
||||
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), shardRoutingMap.build());
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo, System.nanoTime());
|
||||
assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation));
|
||||
assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation));
|
||||
try {
|
||||
|
|
|
@ -27,13 +27,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RestoreSource;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -49,9 +43,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -189,7 +181,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null);
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null, System.nanoTime());
|
||||
|
||||
testAllocator.addData(node1, -1).addData(node2, -1);
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
|
@ -213,7 +205,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -221,7 +213,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node1, 1);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -229,7 +221,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node2, 1);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
@ -252,7 +244,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
|
||||
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -260,7 +252,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node1, 1);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(false));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -268,7 +260,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
|
||||
|
||||
testAllocator.addData(node2, 2);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
|
||||
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
|
||||
|
@ -338,7 +330,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null);
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
|
||||
}
|
||||
|
||||
class TestAllocator extends PrimaryShardAllocator {
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.gateway;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterInfo;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -28,15 +27,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
|
@ -232,6 +224,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
// we sometime return empty list of files, make sure we test this as well
|
||||
testAllocator.addData(node2, false, null);
|
||||
}
|
||||
AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY);
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
|
||||
|
@ -240,6 +233,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(),
|
||||
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT);
|
||||
testAllocator.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
|
||||
AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY);
|
||||
changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(changed, equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
|
@ -296,7 +290,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY, System.nanoTime());
|
||||
}
|
||||
|
||||
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
|
||||
|
@ -315,7 +309,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
.metaData(metaData)
|
||||
.routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
|
||||
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY, System.nanoTime());
|
||||
}
|
||||
|
||||
class TestAllocator extends ReplicaShardAllocator {
|
||||
|
|
|
@ -20,12 +20,13 @@
|
|||
package org.elasticsearch.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
|
@ -42,15 +43,9 @@ import static org.elasticsearch.client.Requests.clusterHealthRequest;
|
|||
import static org.elasticsearch.client.Requests.createIndexRequest;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -62,6 +57,7 @@ public class IndexLifecycleActionIT extends ESIntegTestCase {
|
|||
Settings settings = settingsBuilder()
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 11)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "0s")
|
||||
.build();
|
||||
|
||||
// start one server
|
||||
|
|
|
@ -56,19 +56,12 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
|
|||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -99,7 +92,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
|
|||
.nodes(DiscoveryNodes.EMPTY_NODES)
|
||||
.build(), false
|
||||
);
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), ClusterInfo.EMPTY);
|
||||
RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), ClusterInfo.EMPTY, System.nanoTime());
|
||||
allocator.allocateUnassigned(routingAllocation);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,25 +1,46 @@
|
|||
Mapper Attachments Type for Elasticsearch
|
||||
=========================================
|
||||
[[mapper-attachments]]
|
||||
=== Mapper Attachments Plugin
|
||||
|
||||
The mapper attachments plugin lets Elasticsearch index file attachments in common formats (such as PPT, XLS, PDF) using the Apache text extraction library [Tika](http://lucene.apache.org/tika/).
|
||||
The mapper attachments plugin lets Elasticsearch index file attachments in common formats (such as PPT, XLS, PDF)
|
||||
using the Apache text extraction library http://lucene.apache.org/tika/[Tika].
|
||||
|
||||
In practice, the plugin adds the `attachment` type when mapping properties so that documents can be populated with file attachment contents (encoded as `base64`).
|
||||
In practice, the plugin adds the `attachment` type when mapping properties so that documents can be populated with
|
||||
file attachment contents (encoded as `base64`).
|
||||
|
||||
Installation
|
||||
------------
|
||||
[[mapper-attachments-install]]
|
||||
[float]
|
||||
==== Installation
|
||||
|
||||
In order to install the plugin, run:
|
||||
This plugin can be installed using the plugin manager:
|
||||
|
||||
```sh
|
||||
bin/plugin install mapper-attachments
|
||||
```
|
||||
[source,sh]
|
||||
----------------------------------------------------------------
|
||||
sudo bin/plugin install mapper-attachments
|
||||
----------------------------------------------------------------
|
||||
|
||||
Hello, world
|
||||
------------
|
||||
The plugin must be installed on every node in the cluster, and each node must
|
||||
be restarted after installation.
|
||||
|
||||
[[mapper-attachments-remove]]
|
||||
[float]
|
||||
==== Removal
|
||||
|
||||
The plugin can be removed with the following command:
|
||||
|
||||
[source,sh]
|
||||
----------------------------------------------------------------
|
||||
sudo bin/plugin remove mapper-attachments
|
||||
----------------------------------------------------------------
|
||||
|
||||
The node must be stopped before removing the plugin.
|
||||
|
||||
[[mapper-attachments-helloworld]]
|
||||
==== Hello, world
|
||||
|
||||
Create a property mapping using the new type `attachment`:
|
||||
|
||||
```javascript
|
||||
[source,js]
|
||||
--------------------------
|
||||
POST /trying-out-mapper-attachments
|
||||
{
|
||||
"mappings": {
|
||||
|
@ -27,36 +48,42 @@ POST /trying-out-mapper-attachments
|
|||
"properties": {
|
||||
"cv": { "type": "attachment" }
|
||||
}}}}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
Index a new document populated with a `base64`-encoded attachment:
|
||||
|
||||
```javascript
|
||||
[source,js]
|
||||
--------------------------
|
||||
POST /trying-out-mapper-attachments/person/1
|
||||
{
|
||||
"cv": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
Search for the document using words in the attachment:
|
||||
|
||||
```javascript
|
||||
[source,js]
|
||||
--------------------------
|
||||
POST /trying-out-mapper-attachments/person/_search
|
||||
{
|
||||
"query": {
|
||||
"query_string": {
|
||||
"query": "ipsum"
|
||||
}}}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
If you get a hit for your indexed document, the plugin should be installed and working.
|
||||
|
||||
Usage
|
||||
------------------------
|
||||
[[mapper-attachments-usage]]
|
||||
==== Usage
|
||||
|
||||
Using the attachment type is simple, in your mapping JSON, simply set a certain JSON element as attachment, for example:
|
||||
|
||||
```javascript
|
||||
[source,js]
|
||||
--------------------------
|
||||
PUT /test
|
||||
PUT /test/person/_mapping
|
||||
{
|
||||
|
@ -66,20 +93,24 @@ PUT /test/person/_mapping
|
|||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
In this case, the JSON to index can be:
|
||||
|
||||
```javascript
|
||||
[source,js]
|
||||
--------------------------
|
||||
PUT /test/person/1
|
||||
{
|
||||
"my_attachment" : "... base64 encoded attachment ..."
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
Or it is possible to use more elaborated JSON if content type, resource name or language need to be set explicitly:
|
||||
|
||||
```
|
||||
[source,js]
|
||||
--------------------------
|
||||
PUT /test/person/1
|
||||
{
|
||||
"my_attachment" : {
|
||||
|
@ -89,7 +120,8 @@ PUT /test/person/1
|
|||
"_content" : "... base64 encoded attachment ..."
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
The `attachment` type not only indexes the content of the doc in `content` sub field, but also automatically adds meta
|
||||
data on the attachment as well (when available).
|
||||
|
@ -107,10 +139,11 @@ The metadata supported are:
|
|||
|
||||
They can be queried using the "dot notation", for example: `my_attachment.author`.
|
||||
|
||||
Both the meta data and the actual content are simple core type mappers (string, date, ...), thus, they can be controlled
|
||||
Both the meta data and the actual content are simple core type mappers (string, date, …), thus, they can be controlled
|
||||
in the mappings. For example:
|
||||
|
||||
```javascript
|
||||
[source,js]
|
||||
--------------------------
|
||||
PUT /test/person/_mapping
|
||||
{
|
||||
"person" : {
|
||||
|
@ -131,19 +164,21 @@ PUT /test/person/_mapping
|
|||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
In the above example, the actual content indexed is mapped under `fields` name `content`, and we decide not to index it, so
|
||||
it will only be available in the `_all` field. The other fields map to their respective metadata names, but there is no
|
||||
need to specify the `type` (like `string` or `date`) since it is already known.
|
||||
|
||||
Copy To feature
|
||||
---------------
|
||||
[[mapper-attachments-copy-to]]
|
||||
==== Copy To feature
|
||||
|
||||
If you want to use [copy_to](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-core-types.html#copy-to)
|
||||
If you want to use http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-core-types.html#copy-to[copy_to]
|
||||
feature, you need to define it on each sub-field you want to copy to another field:
|
||||
|
||||
```javascript
|
||||
[source,js]
|
||||
--------------------------
|
||||
PUT /test/person/_mapping
|
||||
{
|
||||
"person": {
|
||||
|
@ -163,16 +198,18 @@ PUT /test/person/_mapping
|
|||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
In this example, the extracted content will be copy as well to `copy` field.
|
||||
|
||||
Querying or accessing metadata
|
||||
------------------------------
|
||||
[[mapper-attachments-querying-metadata]]
|
||||
==== Querying or accessing metadata
|
||||
|
||||
If you need to query on metadata fields, use the attachment field name dot the metadata field. For example:
|
||||
|
||||
```
|
||||
[source,js]
|
||||
--------------------------
|
||||
DELETE /test
|
||||
PUT /test
|
||||
PUT /test/person/_mapping
|
||||
|
@ -204,11 +241,13 @@ GET /test/person/_search
|
|||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
Will give you:
|
||||
|
||||
```
|
||||
[source,js]
|
||||
--------------------------
|
||||
{
|
||||
"took": 2,
|
||||
"timed_out": false,
|
||||
|
@ -235,17 +274,18 @@ Will give you:
|
|||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
|
||||
Indexed Characters
|
||||
------------------
|
||||
[[mapper-attachments-indexed-characters]]
|
||||
==== Indexed Characters
|
||||
|
||||
By default, `100000` characters are extracted when indexing the content. This default value can be changed by setting
|
||||
the `index.mapping.attachment.indexed_chars` setting. It can also be provided on a per document indexed using the
|
||||
`_indexed_chars` parameter. `-1` can be set to extract all text, but note that all the text needs to be allowed to be
|
||||
represented in memory:
|
||||
|
||||
```
|
||||
[source,js]
|
||||
--------------------------
|
||||
PUT /test/person/1
|
||||
{
|
||||
"my_attachment" : {
|
||||
|
@ -253,18 +293,19 @@ PUT /test/person/1
|
|||
"_content" : "... base64 encoded attachment ..."
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
Metadata parsing error handling
|
||||
-------------------------------
|
||||
[[mapper-attachments-error-handling]]
|
||||
==== Metadata parsing error handling
|
||||
|
||||
While extracting metadata content, errors could happen for example when parsing dates.
|
||||
Parsing errors are ignored so your document is indexed.
|
||||
|
||||
You can disable this feature by setting the `index.mapping.attachment.ignore_errors` setting to `false`.
|
||||
|
||||
Language Detection
|
||||
------------------
|
||||
[[mapper-attachments-language-detection]]
|
||||
==== Language Detection
|
||||
|
||||
By default, language detection is disabled (`false`) as it could come with a cost.
|
||||
This default value can be changed by setting the `index.mapping.attachment.detect_language` setting.
|
||||
|
@ -272,22 +313,24 @@ It can also be provided on a per document indexed using the `_detect_language` p
|
|||
|
||||
Note that you can force language using `_language` field when sending your actual document:
|
||||
|
||||
```javascript
|
||||
[source,js]
|
||||
--------------------------
|
||||
{
|
||||
"my_attachment" : {
|
||||
"_language" : "en",
|
||||
"_content" : "... base64 encoded attachment ..."
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
|
||||
Highlighting attachments
|
||||
------------------------
|
||||
[[mapper-attachments-highlighting]]
|
||||
==== Highlighting attachments
|
||||
|
||||
If you want to highlight your attachment content, you will need to set `"store": true` and `"term_vector":"with_positions_offsets"`
|
||||
for your attachment field. Here is a full script which does it:
|
||||
If you want to highlight your attachment content, you will need to set `"store": true` and
|
||||
`"term_vector":"with_positions_offsets"` for your attachment field. Here is a full script which does it:
|
||||
|
||||
```
|
||||
[source,js]
|
||||
--------------------------
|
||||
DELETE /test
|
||||
PUT /test
|
||||
PUT /test/person/_mapping
|
||||
|
@ -326,11 +369,13 @@ GET /test/person/_search
|
|||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
// AUTOSENSE
|
||||
|
||||
It gives back:
|
||||
|
||||
```js
|
||||
[source,js]
|
||||
--------------------------
|
||||
{
|
||||
"took": 9,
|
||||
"timed_out": false,
|
||||
|
@ -357,10 +402,10 @@ It gives back:
|
|||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
--------------------------
|
||||
|
||||
Stand alone runner
|
||||
------------------
|
||||
[[mapper-attachments-standalone]]
|
||||
==== Stand alone runner
|
||||
|
||||
If you want to run some tests within your IDE, you can use `StandaloneRunner` class.
|
||||
It accepts arguments:
|
||||
|
@ -371,15 +416,17 @@ It accepts arguments:
|
|||
|
||||
Example:
|
||||
|
||||
```sh
|
||||
[source,sh]
|
||||
--------------------------
|
||||
StandaloneRunner BASE64Text
|
||||
StandaloneRunner -u /tmp/mydoc.pdf
|
||||
StandaloneRunner -u /tmp/mydoc.pdf --size 1000000
|
||||
```
|
||||
--------------------------
|
||||
|
||||
It produces something like:
|
||||
|
||||
```
|
||||
[source,text]
|
||||
--------------------------
|
||||
## Extracted text
|
||||
--------------------- BEGIN -----------------------
|
||||
This is the extracted text
|
||||
|
@ -393,4 +440,4 @@ This is the extracted text
|
|||
- language: null
|
||||
- name: null
|
||||
- title: null
|
||||
```
|
||||
--------------------------
|
|
@ -8,11 +8,10 @@ Mapper plugins allow new field datatypes to be added to Elasticsearch.
|
|||
|
||||
The core mapper plugins are:
|
||||
|
||||
https://github.com/elasticsearch/elasticsearch-mapper-attachments[Mapper Attachments Type plugin]::
|
||||
<<mapper-attachments>>::
|
||||
|
||||
Integrates http://lucene.apache.org/tika/[Apache Tika] to provide a new field
|
||||
type `attachment` to allow indexing of documents such as PDFs and Microsoft
|
||||
Word.
|
||||
The mapper-attachments integrates http://lucene.apache.org/tika/[Apache Tika] to provide a new field
|
||||
type `attachment` to allow indexing of documents such as PDFs and Microsoft Word.
|
||||
|
||||
<<mapper-size>>::
|
||||
|
||||
|
@ -25,5 +24,6 @@ indexes the size in bytes of the original
|
|||
The mapper-murmur3 plugin allows hashes to be computed at index-time and stored
|
||||
in the index for later use with the `cardinality` aggregation.
|
||||
|
||||
include::mapper-attachments.asciidoc[]
|
||||
include::mapper-size.asciidoc[]
|
||||
include::mapper-murmur3.asciidoc[]
|
||||
|
|
|
@ -75,13 +75,12 @@ sudo bin/plugin install lmenezes/elasticsearch-kopf/2.x <2>
|
|||
|
||||
When installing from Maven Central/Sonatype, `[org]` should be replaced by
|
||||
the artifact `groupId`, and `[user|component]` by the `artifactId`. For
|
||||
instance, to install the
|
||||
https://github.com/elastic/elasticsearch-mapper-attachments[mapper attachment]
|
||||
instance, to install the {plugins}/mapper-attachments.html[`mapper-attachments`]
|
||||
plugin from Sonatype, run:
|
||||
|
||||
[source,shell]
|
||||
-----------------------------------
|
||||
sudo bin/plugin install org.elasticsearch/elasticsearch-mapper-attachments/2.6.0 <1>
|
||||
sudo bin/plugin install org.elasticsearch.plugin/mapper-attachments/3.0.0 <1>
|
||||
-----------------------------------
|
||||
<1> When installing from `download.elastic.co` or from Maven Central/Sonatype, the
|
||||
version is required.
|
||||
|
|
|
@ -117,7 +117,7 @@ PUT _snapshot/my_backup2
|
|||
{
|
||||
"type": "azure",
|
||||
"settings": {
|
||||
"container": "backup_container",
|
||||
"container": "backup-container",
|
||||
"base_path": "backups",
|
||||
"chunk_size": "32m",
|
||||
"compress": true
|
||||
|
@ -150,7 +150,7 @@ Example using Java:
|
|||
----
|
||||
client.admin().cluster().preparePutRepository("my_backup_java1")
|
||||
.setType("azure").setSettings(Settings.settingsBuilder()
|
||||
.put(Storage.CONTAINER, "backup_container")
|
||||
.put(Storage.CONTAINER, "backup-container")
|
||||
.put(Storage.CHUNK_SIZE, new ByteSizeValue(32, ByteSizeUnit.MB))
|
||||
).get();
|
||||
----
|
||||
|
|
|
@ -37,8 +37,8 @@ document:
|
|||
|
||||
Attachment datatype::
|
||||
|
||||
See the https://github.com/elastic/elasticsearch-mapper-attachments[mapper attachment plugin]
|
||||
which supports indexing ``attachments'' like Microsoft Office formats, Open
|
||||
See the {plugins}/mapper-attachments.html[`mapper-attachments`] plugin
|
||||
which supports indexing `attachments` like Microsoft Office formats, Open
|
||||
Document formats, ePub, HTML, etc. into an `attachment` datatype.
|
||||
|
||||
[float]
|
||||
|
|
|
@ -47,12 +47,7 @@ import org.elasticsearch.node.settings.NodeSettingsService;
|
|||
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.function.Function;
|
||||
import java.util.*;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
|
||||
|
@ -63,32 +58,32 @@ import static org.hamcrest.CoreMatchers.is;
|
|||
*/
|
||||
public abstract class ESAllocationTestCase extends ESTestCase {
|
||||
|
||||
public static AllocationService createAllocationService() {
|
||||
public static MockAllocationService createAllocationService() {
|
||||
return createAllocationService(Settings.Builder.EMPTY_SETTINGS);
|
||||
}
|
||||
|
||||
public static AllocationService createAllocationService(Settings settings) {
|
||||
public static MockAllocationService createAllocationService(Settings settings) {
|
||||
return createAllocationService(settings, getRandom());
|
||||
}
|
||||
|
||||
public static AllocationService createAllocationService(Settings settings, Random random) {
|
||||
public static MockAllocationService createAllocationService(Settings settings, Random random) {
|
||||
return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random);
|
||||
}
|
||||
|
||||
public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
|
||||
return new AllocationService(settings,
|
||||
public static MockAllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
|
||||
return new MockAllocationService(settings,
|
||||
randomAllocationDeciders(settings, nodeSettingsService, random),
|
||||
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
|
||||
}
|
||||
|
||||
public static AllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
|
||||
return new AllocationService(settings,
|
||||
public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
|
||||
return new MockAllocationService(settings,
|
||||
randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
|
||||
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService);
|
||||
}
|
||||
|
||||
public static AllocationService createAllocationService(Settings settings, GatewayAllocator allocator) {
|
||||
return new AllocationService(settings,
|
||||
public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator allocator) {
|
||||
return new MockAllocationService(settings,
|
||||
randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
|
||||
new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE);
|
||||
}
|
||||
|
@ -187,9 +182,27 @@ public abstract class ESAllocationTestCase extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/** A lock {@link AllocationService} allowing tests to override time */
|
||||
protected static class MockAllocationService extends AllocationService {
|
||||
|
||||
private Long nanoTimeOverride = null;
|
||||
|
||||
public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) {
|
||||
super(settings, allocationDeciders, shardsAllocators, clusterInfoService);
|
||||
}
|
||||
|
||||
public void setNanoTimeOverride(long nanoTime) {
|
||||
this.nanoTimeOverride = nanoTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long currentNanoTime() {
|
||||
return nanoTimeOverride == null ? super.currentNanoTime() : nanoTimeOverride;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
|
||||
* Also computes delay in UnassignedInfo based on customizable time source.
|
||||
*/
|
||||
protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
|
||||
private final ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator(Settings.EMPTY) {
|
||||
|
@ -199,16 +212,11 @@ public abstract class ESAllocationTestCase extends ESTestCase {
|
|||
}
|
||||
};
|
||||
|
||||
private volatile Function<ShardRouting, Long> timeSource;
|
||||
|
||||
public DelayedShardsMockGatewayAllocator() {
|
||||
super(Settings.EMPTY, null, null);
|
||||
}
|
||||
|
||||
public void setTimeSource(Function<ShardRouting, Long> timeSource) {
|
||||
this.timeSource = timeSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyStartedShards(StartedRerouteAllocation allocation) {}
|
||||
|
||||
|
@ -224,8 +232,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
|
|||
if (shard.primary() || shard.allocatedPostIndexCreate() == false) {
|
||||
continue;
|
||||
}
|
||||
changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(timeSource == null ? System.nanoTime() : timeSource.apply(shard),
|
||||
allocation, unassignedIterator, shard);
|
||||
changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue