Optional Delayed Allocation on Node leave

Allow to set delayed allocation timeout on unassigned shards when a node leaves the cluster. This allows to wait for the node to come back for a specific period in order to try and assign the shards back to it to reduce shards movements and unnecessary relocations.

The setting is an index level setting under `index.unassigned.node_left.delayed_timeout` and defaults to 0 (== no delayed allocation). We might want to change the default, but lets do it in a different change to come up with the best value for it. The setting can be updated dynamically.

When shards are delayed, a log message with "info" level will notify how many shards are being delayed.

An implementation note, we really only need to care about delaying allocation on unassigned replica shards. If the primary shard is unassigned, anyhow we are going to wait for a copy of it, so really the only case to delay allocation is for replicas.

close #11712
This commit is contained in:
Shay Banon 2015-06-17 04:22:05 +02:00
parent 30cc984565
commit 792a545633
10 changed files with 438 additions and 8 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.health;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -58,6 +59,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
int unassignedShards = 0; int unassignedShards = 0;
int numberOfPendingTasks = 0; int numberOfPendingTasks = 0;
int numberOfInFlightFetch = 0; int numberOfInFlightFetch = 0;
int delayedUnassignedShards = 0;
boolean timedOut = false; boolean timedOut = false;
ClusterHealthStatus status = ClusterHealthStatus.RED; ClusterHealthStatus status = ClusterHealthStatus.RED;
private List<String> validationFailures; private List<String> validationFailures;
@ -68,14 +70,15 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
/** needed for plugins BWC */ /** needed for plugins BWC */
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) { public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
this(clusterName, concreteIndices, clusterState, -1, -1); this(clusterName, concreteIndices, clusterState, -1, -1, -1);
} }
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks, public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks,
int numberOfInFlightFetch) { int numberOfInFlightFetch, int delayedUnassignedShards) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks; this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch; this.numberOfInFlightFetch = numberOfInFlightFetch;
this.delayedUnassignedShards = delayedUnassignedShards;
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData()); RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
validationFailures = validation.failures(); validationFailures = validation.failures();
numberOfNodes = clusterState.nodes().size(); numberOfNodes = clusterState.nodes().size();
@ -173,6 +176,15 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
return this.numberOfInFlightFetch; return this.numberOfInFlightFetch;
} }
/**
* The number of unassigned shards that are currently being delayed (for example,
* due to node leaving the cluster and waiting for a timeout for the node to come
* back in order to allocate the shards back to it).
*/
public int getDelayedUnassignedShards() {
return this.delayedUnassignedShards;
}
/** /**
* <tt>true</tt> if the waitForXXX has timeout out and did not match. * <tt>true</tt> if the waitForXXX has timeout out and did not match.
*/ */
@ -229,6 +241,9 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
} }
numberOfInFlightFetch = in.readInt(); numberOfInFlightFetch = in.readInt();
if (in.getVersion().onOrAfter(Version.V_1_7_0)) {
delayedUnassignedShards= in.readInt();
}
} }
@Override @Override
@ -256,6 +271,9 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
} }
out.writeInt(numberOfInFlightFetch); out.writeInt(numberOfInFlightFetch);
if (out.getVersion().onOrAfter(Version.V_1_7_0)) {
out.writeInt(delayedUnassignedShards);
}
} }
@ -280,6 +298,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
static final XContentBuilderString NUMBER_OF_DATA_NODES = new XContentBuilderString("number_of_data_nodes"); static final XContentBuilderString NUMBER_OF_DATA_NODES = new XContentBuilderString("number_of_data_nodes");
static final XContentBuilderString NUMBER_OF_PENDING_TASKS = new XContentBuilderString("number_of_pending_tasks"); static final XContentBuilderString NUMBER_OF_PENDING_TASKS = new XContentBuilderString("number_of_pending_tasks");
static final XContentBuilderString NUMBER_OF_IN_FLIGHT_FETCH = new XContentBuilderString("number_of_in_flight_fetch"); static final XContentBuilderString NUMBER_OF_IN_FLIGHT_FETCH = new XContentBuilderString("number_of_in_flight_fetch");
static final XContentBuilderString DELAYED_UNASSIGNED_SHARDS = new XContentBuilderString("delayed_unassigned_shards");
static final XContentBuilderString ACTIVE_PRIMARY_SHARDS = new XContentBuilderString("active_primary_shards"); static final XContentBuilderString ACTIVE_PRIMARY_SHARDS = new XContentBuilderString("active_primary_shards");
static final XContentBuilderString ACTIVE_SHARDS = new XContentBuilderString("active_shards"); static final XContentBuilderString ACTIVE_SHARDS = new XContentBuilderString("active_shards");
static final XContentBuilderString RELOCATING_SHARDS = new XContentBuilderString("relocating_shards"); static final XContentBuilderString RELOCATING_SHARDS = new XContentBuilderString("relocating_shards");
@ -301,6 +320,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
builder.field(Fields.RELOCATING_SHARDS, getRelocatingShards()); builder.field(Fields.RELOCATING_SHARDS, getRelocatingShards());
builder.field(Fields.INITIALIZING_SHARDS, getInitializingShards()); builder.field(Fields.INITIALIZING_SHARDS, getInitializingShards());
builder.field(Fields.UNASSIGNED_SHARDS, getUnassignedShards()); builder.field(Fields.UNASSIGNED_SHARDS, getUnassignedShards());
builder.field(Fields.DELAYED_UNASSIGNED_SHARDS, getDelayedUnassignedShards());
builder.field(Fields.NUMBER_OF_PENDING_TASKS, getNumberOfPendingTasks()); builder.field(Fields.NUMBER_OF_PENDING_TASKS, getNumberOfPendingTasks());
builder.field(Fields.NUMBER_OF_IN_FLIGHT_FETCH, getNumberOfInFlightFetch()); builder.field(Fields.NUMBER_OF_IN_FLIGHT_FETCH, getNumberOfInFlightFetch());

View File

@ -25,6 +25,10 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -264,11 +268,11 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices()); concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
} catch (IndexMissingException e) { } catch (IndexMissingException e) {
// one of the specified indices is not there - treat it as RED. // one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks, numberOfInFlightFetch); ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState));
response.status = ClusterHealthStatus.RED; response.status = ClusterHealthStatus.RED;
return response; return response;
} }
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks, numberOfInFlightFetch); return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState));
} }
} }

View File

@ -22,6 +22,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.DjbHashFunction; import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.cluster.routing.HashFunction; import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.SimpleHashFunction; import org.elasticsearch.cluster.routing.SimpleHashFunction;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
@ -179,7 +180,8 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
"index.store.stats_refresh_interval", "index.store.stats_refresh_interval",
"index.translog.flush_threshold_period", "index.translog.flush_threshold_period",
"index.translog.interval", "index.translog.interval",
"index.translog.sync_interval"); "index.translog.sync_interval",
UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT);
/** /**
* Elasticsearch 2.0 requires units on byte/memory and time settings; this method adds the default unit to any such settings that are * Elasticsearch 2.0 requires units on byte/memory and time settings; this method adds the default unit to any such settings that are

View File

@ -28,10 +28,13 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
@ -62,6 +65,10 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
private volatile boolean routingTableDirty = false; private volatile boolean routingTableDirty = false;
private volatile Future scheduledRoutingTableFuture; private volatile Future scheduledRoutingTableFuture;
private AtomicBoolean rerouting = new AtomicBoolean();
private volatile long registeredNextDelaySetting = Long.MAX_VALUE;
private volatile ScheduledFuture registeredNextDelayFuture;
@Inject @Inject
public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) { public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) {
@ -125,6 +132,31 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
} }
} }
} }
// figure out when the next unassigned allocation need to happen from now. If this is larger or equal
// then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need
// to schedule again
long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state());
if (nextDelaySetting > 0 && nextDelaySetting < registeredNextDelaySetting) {
FutureUtils.cancel(registeredNextDelayFuture);
registeredNextDelaySetting = nextDelaySetting;
TimeValue nextDelay = TimeValue.timeValueMillis(UnassignedInfo.findNextDelayedAllocationIn(settings, event.state()));
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]", UnassignedInfo.getNumberOfDelayedUnassigned(settings, event.state()), nextDelay);
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
routingTableDirty = true;
reroute();
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
}
});
} else {
logger.trace("no need to schedule reroute due to delayed unassigned, next_delay_setting [{}], registered [{}]", nextDelaySetting, registeredNextDelaySetting);
}
} else { } else {
FutureUtils.cancel(scheduledRoutingTableFuture); FutureUtils.cancel(scheduledRoutingTableFuture);
scheduledRoutingTableFuture = null; scheduledRoutingTableFuture = null;
@ -139,9 +171,14 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
if (lifecycle.stopped()) { if (lifecycle.stopped()) {
return; return;
} }
if (rerouting.compareAndSet(false, true) == false) {
logger.trace("already has pending reroute, ignoring");
return;
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, Priority.HIGH, new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, Priority.HIGH, new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
rerouting.set(false);
RoutingAllocation.Result routingResult = allocationService.reroute(currentState); RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
if (!routingResult.changed()) { if (!routingResult.changed()) {
// no state changed // no state changed
@ -152,11 +189,13 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
@Override @Override
public void onNoLongerMaster(String source) { public void onNoLongerMaster(String source) {
rerouting.set(false);
// no biggie // no biggie
} }
@Override @Override
public void onFailure(String source, Throwable t) { public void onFailure(String source, Throwable t) {
rerouting.set(false);
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint()); logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
@ -166,7 +205,8 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
} }
}); });
routingTableDirty = false; routingTableDirty = false;
} catch (Exception e) { } catch (Throwable e) {
rerouting.set(false);
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
logger.warn("Failed to reroute routing table, current state:\n{}", e, state.prettyPrint()); logger.warn("Failed to reroute routing table, current state:\n{}", e, state.prettyPrint());
} }

View File

@ -19,12 +19,16 @@
package org.elasticsearch.cluster.routing; package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.joda.FormatDateTimeFormatter; import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda; import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -37,6 +41,9 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("dateOptionalTime"); public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("dateOptionalTime");
public static final String DELAYED_NODE_LEFT_TIMEOUT = "index.unassigned.node_left.delayed_timeout";
public static final TimeValue DEFAULT_DELAYED_NODE_LEFT_TIMEOUT = TimeValue.timeValueMillis(0);
/** /**
* Reason why the shard is in unassigned state. * Reason why the shard is in unassigned state.
* <p/> * <p/>
@ -141,6 +148,86 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
return this.details; return this.details;
} }
/**
* The allocation delay value associated with the index (defaulting to node settings if not set).
*/
public long getAllocationDelayTimeoutSetting(Settings settings, Settings indexSettings) {
if (reason != Reason.NODE_LEFT) {
return 0;
}
TimeValue delayTimeout = indexSettings.getAsTime(DELAYED_NODE_LEFT_TIMEOUT, settings.getAsTime(DELAYED_NODE_LEFT_TIMEOUT, DEFAULT_DELAYED_NODE_LEFT_TIMEOUT));
return Math.max(0l, delayTimeout.millis());
}
/**
* The time in millisecond until this unassigned shard can be reassigned.
*/
public long getDelayAllocationExpirationIn(Settings settings, Settings indexSettings) {
long delayTimeout = getAllocationDelayTimeoutSetting(settings, indexSettings);
if (delayTimeout == 0) {
return 0;
}
long delta = System.currentTimeMillis() - timestamp;
// account for time drift, treat it as no timeout
if (delta < 0) {
return 0;
}
return delayTimeout - delta;
}
/**
* Returns the number of shards that are unassigned and currently being delayed.
*/
public static int getNumberOfDelayedUnassigned(Settings settings, ClusterState state) {
int count = 0;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
if (delay > 0) {
count++;
}
}
}
return count;
}
/**
* Finds the smallest delay expiration setting of an unassigned shard. Returns 0 if there are none.
*/
public static long findSmallestDelayedAllocationSetting(Settings settings, ClusterState state) {
long nextDelaySetting = Long.MAX_VALUE;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSetting(settings, indexMetaData.getSettings());
if (delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) {
nextDelaySetting = delayTimeoutSetting;
}
}
}
return nextDelaySetting == Long.MAX_VALUE ? 0l : nextDelaySetting;
}
/**
* Finds the next (closest) delay expiration of an unassigned shard. Returns 0 if there are none.
*/
public static long findNextDelayedAllocationIn(Settings settings, ClusterState state) {
long nextDelay = Long.MAX_VALUE;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
if (nextShardDelay > 0 && nextShardDelay < nextDelay) {
nextDelay = nextShardDelay;
}
}
}
return nextDelay == Long.MAX_VALUE ? 0l : nextDelay;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@ -48,6 +48,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -419,6 +420,8 @@ public class GatewayAllocator extends AbstractComponent {
long lastSizeMatched = 0; long lastSizeMatched = 0;
DiscoveryNode lastDiscoNodeMatched = null; DiscoveryNode lastDiscoNodeMatched = null;
RoutingNode lastNodeMatched = null; RoutingNode lastNodeMatched = null;
boolean hasReplicaData = false;
IndexMetaData indexMetaData = metaData.index(shard.getIndex());
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> nodeStoreEntry : shardStores.getData().entrySet()) { for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> nodeStoreEntry : shardStores.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey(); DiscoveryNode discoNode = nodeStoreEntry.getKey();
@ -449,6 +452,7 @@ public class GatewayAllocator extends AbstractComponent {
} }
if (!shard.primary()) { if (!shard.primary()) {
hasReplicaData |= storeFilesMetaData.iterator().hasNext();
MutableShardRouting primaryShard = routingNodes.activePrimary(shard); MutableShardRouting primaryShard = routingNodes.activePrimary(shard);
if (primaryShard != null) { if (primaryShard != null) {
assert primaryShard.active(); assert primaryShard.active();
@ -509,6 +513,23 @@ public class GatewayAllocator extends AbstractComponent {
allocation.routingNodes().assign(shard, lastNodeMatched.nodeId()); allocation.routingNodes().assign(shard, lastNodeMatched.nodeId());
unassignedIterator.remove(); unassignedIterator.remove();
} }
} else if (hasReplicaData == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
// of the replica shard needs to be delayed, and if so, add it to the ignore unassigned list
// note: we only care about replica in delayed allocation, since if we have an unassigned primary it
// will anyhow wait to find an existing copy of the shard to be allocated
// note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
/**
* mark it as changed, since we want to kick a publishing to schedule future allocation,
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
*/
changed = true;
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
}
} }
} }
return changed; return changed;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.settings; package org.elasticsearch.index.settings;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
@ -110,6 +111,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY); indexDynamicSettings.addDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY);
indexDynamicSettings.addDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED); indexDynamicSettings.addDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED);
indexDynamicSettings.addDynamicSetting(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, Validator.BOOLEAN); indexDynamicSettings.addDynamicSetting(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, Validator.TIME);
} }
public void addDynamicSettings(String... settings) { public void addDynamicSettings(String... settings) {

View File

@ -192,12 +192,14 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
int pendingTasks = randomIntBetween(0, 200); int pendingTasks = randomIntBetween(0, 200);
int inFlight = randomIntBetween(0, 200); int inFlight = randomIntBetween(0, 200);
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, pendingTasks, inFlight); int delayedUnassigned = randomIntBetween(0, 200);
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, pendingTasks, inFlight, delayedUnassigned);
logger.info("cluster status: {}, expected {}", clusterHealth.getStatus(), counter.status()); logger.info("cluster status: {}, expected {}", clusterHealth.getStatus(), counter.status());
clusterHealth = maybeSerialize(clusterHealth); clusterHealth = maybeSerialize(clusterHealth);
assertClusterHealth(clusterHealth, counter); assertClusterHealth(clusterHealth, counter);
assertThat(clusterHealth.getNumberOfPendingTasks(), Matchers.equalTo(pendingTasks)); assertThat(clusterHealth.getNumberOfPendingTasks(), Matchers.equalTo(pendingTasks));
assertThat(clusterHealth.getNumberOfInFlightFetch(), Matchers.equalTo(inFlight)); assertThat(clusterHealth.getNumberOfInFlightFetch(), Matchers.equalTo(inFlight));
assertThat(clusterHealth.getDelayedUnassignedShards(), Matchers.equalTo(delayedUnassigned));
} }
ClusterHealthResponse maybeSerialize(ClusterHealthResponse clusterHealth) throws IOException { ClusterHealthResponse maybeSerialize(ClusterHealthResponse clusterHealth) throws IOException {
@ -225,7 +227,7 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
metaData.put(indexMetaData, true); metaData.put(indexMetaData, true);
routingTable.add(indexRoutingTable); routingTable.add(indexRoutingTable);
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, 0, 0); ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, 0, 0, 0);
clusterHealth = maybeSerialize(clusterHealth); clusterHealth = maybeSerialize(clusterHealth);
// currently we have no cluster level validation failures as index validation issues are reported per index. // currently we have no cluster level validation failures as index validation issues are reported per index.
assertThat(clusterHealth.getValidationFailures(), Matchers.hasSize(0)); assertThat(clusterHealth.getValidationFailures(), Matchers.hasSize(0));

View File

@ -0,0 +1,162 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
public class DelayedAllocationTests extends ElasticsearchIntegrationTest {
/**
* Verifies that when there is no delay timeout, a 1/1 index shard will immediately
* get allocated to a free node when the node hosting it leaves the cluster.
*/
@Test
public void testNoDelayedTimeout() throws Exception {
internalCluster().startNodesAsync(3).get();
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.DEFAULT_DELAYED_NODE_LEFT_TIMEOUT, 0)).get();
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(0));
ensureGreen("test");
}
/**
* When we do have delayed allocation set, verifies that even though there is a node
* free to allocate the unassigned shard when the node hosting it leaves, it doesn't
* get allocated. Once we bring the node back, it gets allocated since it existed
* on it before.
*/
@Test
public void testDelayedAllocationNodeLeavesAndComesBack() throws Exception {
internalCluster().startNodesAsync(3).get();
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueHours(1))).get();
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertThat(client().admin().cluster().prepareState().all().get().getState().routingNodes().hasUnassigned(), equalTo(true));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
internalCluster().startNode(); // this will use the same data location as the stopped node
ensureGreen("test");
}
/**
* With a very small delay timeout, verify that it expires and we get to green even
* though the node hosting the shard is not coming back.
*/
@Test
public void testDelayedAllocationTimesOut() throws Exception {
internalCluster().startNodesAsync(3).get();
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueMillis(100))).get();
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
ensureGreen("test");
}
/**
* Verify that when explicitly changing the value of the index setting for the delayed
* allocation to a very small value, it kicks the allocation of the unassigned shard
* even though the node it was hosted on will not come back.
*/
@Test
public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception {
internalCluster().startNodesAsync(3).get();
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueHours(1))).get();
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertThat(client().admin().cluster().prepareState().all().get().getState().routingNodes().hasUnassigned(), equalTo(true));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueMillis(100))).get());
ensureGreen("test");
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(0));
}
/**
* Verify that when explicitly changing the value of the index setting for the delayed
* allocation to 0, it kicks the allocation of the unassigned shard
* even though the node it was hosted on will not come back.
*/
@Test
public void testDelayedAllocationChangeWithSettingTo0() throws Exception {
internalCluster().startNodesAsync(3).get();
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueHours(1))).get();
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertThat(client().admin().cluster().prepareState().all().get().getState().routingNodes().hasUnassigned(), equalTo(true));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueMillis(0))).get());
ensureGreen("test");
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(0));
}
private void indexRandomData() throws Exception {
int numDocs = scaledRandomIntBetween(100, 1000);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "type").setSource("field", "value");
}
// we want to test both full divergent copies of the shard in terms of segments, and
// a case where they are the same (using sync flush), index Random does all this goodness
// already
indexRandom(true, builders);
}
private String findNodeWithShard() {
ClusterState state = client().admin().cluster().prepareState().get().getState();
List<ShardRouting> startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED);
Collections.shuffle(startedShards, getRandom());
return state.nodes().get(startedShards.get(0).currentNodeId()).getName();
}
}

View File

@ -33,9 +33,13 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchAllocationTestCase; import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test; import org.junit.Test;
import java.util.EnumSet;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -253,4 +257,90 @@ public class UnassignedInfoTests extends ElasticsearchAllocationTestCase {
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getDetails(), equalTo("test fail")); assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getDetails(), equalTo("test fail"));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getTimestampInMillis(), greaterThan(0l)); assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getTimestampInMillis(), greaterThan(0l));
} }
/**
* Verifies that delayed allocation calculation are correct.
*/
@Test
public void testUnassignedDelayedOnlyOnNodeLeft() throws Exception {
final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null);
long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), Settings.EMPTY);
assertThat(delay, equalTo(TimeValue.timeValueHours(10).millis()));
assertBusy(new Runnable() {
@Override
public void run() {
long delay = unassignedInfo.getDelayAllocationExpirationIn(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), Settings.EMPTY);
assertThat(delay, greaterThan(0l));
assertThat(delay, lessThan(TimeValue.timeValueHours(10).millis()));
}
});
}
/**
* Verifies that delayed allocation is only computed when the reason is NODE_LEFT.
*/
@Test
public void testUnassignedDelayOnlyNodeLeftNonNodeLeftReason() throws Exception {
EnumSet<UnassignedInfo.Reason> reasons = EnumSet.allOf(UnassignedInfo.Reason.class);
reasons.remove(UnassignedInfo.Reason.NODE_LEFT);
UnassignedInfo unassignedInfo = new UnassignedInfo(RandomPicks.randomFrom(getRandom(), reasons), null);
long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), Settings.EMPTY);
assertThat(delay, equalTo(0l));
delay = unassignedInfo.getDelayAllocationExpirationIn(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), Settings.EMPTY);
assertThat(delay, equalTo(0l));
}
@Test
public void testNumberOfDelayedUnassigned() throws Exception {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState), equalTo(0));
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false));
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState), equalTo(2));
}
@Test
public void testFindNextDelayedAllocation() {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState), equalTo(0));
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false));
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState);
assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis()));
long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState);
assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).millis()));
assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).millis()));
}
} }