Merge pull request #15372 from s1monw/trash_recovery_threads

Remove recovery threadpools and throttle outgoing recoveries on the master
This commit is contained in:
Simon Willnauer 2015-12-28 15:43:14 +01:00
commit ba755a554f
48 changed files with 518 additions and 321 deletions

View File

@ -74,7 +74,6 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -129,7 +128,6 @@ public class ClusterModule extends AbstractModule {
registerShardsAllocator(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, BalancedShardsAllocator.class);
}
private void registerBuiltinIndexSettings() {
registerIndexDynamicSetting(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
registerIndexDynamicSetting(IndexStore.INDEX_STORE_THROTTLE_TYPE, Validator.EMPTY);

View File

@ -106,7 +106,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
}
this.allShardsStarted = allShardsStarted;
this.primary = primary;
if (primary != null) {
this.primaryAsList = Collections.singletonList(primary);

View File

@ -69,6 +69,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private int relocatingShards = 0;
private final Map<String, ObjectIntHashMap<String>> nodesPerAttributeNames = new HashMap<>();
private final Map<String, Recoveries> recoveryiesPerNode = new HashMap<>();
public RoutingNodes(ClusterState clusterState) {
this(clusterState, true);
@ -91,6 +92,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
// also fill replicaSet information
for (ObjectCursor<IndexRoutingTable> indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable.value) {
assert indexShard.primary != null;
for (ShardRouting shard : indexShard) {
// to get all the shards belonging to an index, including the replicas,
// we define a replica set and keep track of it. A replica set is identified
@ -107,16 +109,18 @@ public class RoutingNodes implements Iterable<RoutingNode> {
// add the counterpart shard with relocatingNodeId reflecting the source from which
// it's relocating from.
ShardRouting targetShardRouting = shard.buildTargetRelocatingShard();
addInitialRecovery(targetShardRouting);
if (readOnly) {
targetShardRouting.freeze();
}
entries.add(targetShardRouting);
assignedShardsAdd(targetShardRouting);
} else if (!shard.active()) { // shards that are initializing without being relocated
} else if (shard.active() == false) { // shards that are initializing without being relocated
if (shard.primary()) {
inactivePrimaryCount++;
}
inactiveShardCount++;
addInitialRecovery(shard);
}
} else {
final ShardRouting sr = getRouting(shard, readOnly);
@ -132,6 +136,79 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
}
private void addRecovery(ShardRouting routing) {
addRecovery(routing, true, false);
}
private void removeRecovery(ShardRouting routing) {
addRecovery(routing, false, false);
}
public void addInitialRecovery(ShardRouting routing) {
addRecovery(routing,true, true);
}
private void addRecovery(final ShardRouting routing, final boolean increment, final boolean initializing) {
final int howMany = increment ? 1 : -1;
assert routing.initializing() : "routing must be initializing: " + routing;
Recoveries.getOrAdd(recoveryiesPerNode, routing.currentNodeId()).addIncoming(howMany);
final String sourceNodeId;
if (routing.relocatingNodeId() != null) { // this is a relocation-target
sourceNodeId = routing.relocatingNodeId();
if (routing.primary() && increment == false) { // primary is done relocating
int numRecoveringReplicas = 0;
for (ShardRouting assigned : assignedShards(routing)) {
if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) {
numRecoveringReplicas++;
}
}
// we transfer the recoveries to the relocated primary
recoveryiesPerNode.get(sourceNodeId).addOutgoing(-numRecoveringReplicas);
recoveryiesPerNode.get(routing.currentNodeId()).addOutgoing(numRecoveringReplicas);
}
} else if (routing.primary() == false) { // primary without relocationID is initial recovery
ShardRouting primary = findPrimary(routing);
if (primary == null && initializing) {
primary = routingTable.index(routing.index()).shard(routing.shardId().id()).primary;
} else if (primary == null) {
throw new IllegalStateException("replica is initializing but primary is unassigned");
}
sourceNodeId = primary.currentNodeId();
} else {
sourceNodeId = null;
}
if (sourceNodeId != null) {
Recoveries.getOrAdd(recoveryiesPerNode, sourceNodeId).addOutgoing(howMany);
}
}
public int getIncomingRecoveries(String nodeId) {
return recoveryiesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming();
}
public int getOutgoingRecoveries(String nodeId) {
return recoveryiesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getOutgoing();
}
private ShardRouting findPrimary(ShardRouting routing) {
List<ShardRouting> shardRoutings = assignedShards.get(routing.shardId());
ShardRouting primary = null;
if (shardRoutings != null) {
for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.primary()) {
if (shardRouting.active()) {
return shardRouting;
} else if (primary == null) {
primary = shardRouting;
} else if (primary.relocatingNodeId() != null) {
primary = shardRouting;
}
}
}
}
return primary;
}
private static ShardRouting getRouting(ShardRouting src, boolean readOnly) {
if (readOnly) {
src.freeze(); // we just freeze and reuse this instance if we are read only
@ -352,6 +429,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
if (shard.primary()) {
inactivePrimaryCount++;
}
addRecovery(shard);
assignedShardsAdd(shard);
}
@ -367,6 +445,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
ShardRouting target = shard.buildTargetRelocatingShard();
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);
addRecovery(target);
return target;
}
@ -383,9 +462,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
inactivePrimaryCount--;
}
}
removeRecovery(shard);
shard.moveToStarted();
}
/**
* Cancels a relocation of a shard that shard must relocating.
*/
@ -440,6 +522,9 @@ public class RoutingNodes implements Iterable<RoutingNode> {
cancelRelocation(shard);
}
assignedShardsRemove(shard);
if (shard.initializing()) {
removeRecovery(shard);
}
}
private void assignedShardsAdd(ShardRouting shard) {
@ -749,6 +834,34 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
}
for (Map.Entry<String, Recoveries> recoveries : routingNodes.recoveryiesPerNode.entrySet()) {
String node = recoveries.getKey();
final Recoveries value = recoveries.getValue();
int incoming = 0;
int outgoing = 0;
RoutingNode routingNode = routingNodes.nodesToShards.get(node);
if (routingNode != null) { // node might have dropped out of the cluster
for (ShardRouting routing : routingNode) {
if (routing.initializing()) {
incoming++;
} else if (routing.relocating()) {
outgoing++;
}
if (routing.primary() && (routing.initializing() && routing.relocatingNodeId() != null) == false) { // we don't count the initialization end of the primary relocation
List<ShardRouting> shardRoutings = routingNodes.assignedShards.get(routing.shardId());
for (ShardRouting assigned : shardRoutings) {
if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) {
outgoing++;
}
}
}
}
}
assert incoming == value.incoming : incoming + " != " + value.incoming;
assert outgoing == value.outgoing : outgoing + " != " + value.outgoing + " node: " + routingNode;
}
assert unassignedPrimaryCount == routingNodes.unassignedShards.getNumPrimaries() :
"Unassigned primaries is [" + unassignedPrimaryCount + "] but RoutingNodes returned unassigned primaries [" + routingNodes.unassigned().getNumPrimaries() + "]";
assert unassignedIgnoredPrimaryCount == routingNodes.unassignedShards.getNumIgnoredPrimaries() :
@ -856,4 +969,41 @@ public class RoutingNodes implements Iterable<RoutingNode> {
throw new IllegalStateException("can't modify RoutingNodes - readonly");
}
}
private static final class Recoveries {
private static final Recoveries EMPTY = new Recoveries();
private int incoming = 0;
private int outgoing = 0;
int getTotal() {
return incoming + outgoing;
}
void addOutgoing(int howMany) {
assert outgoing + howMany >= 0 : outgoing + howMany+ " must be >= 0";
outgoing += howMany;
}
void addIncoming(int howMany) {
assert incoming + howMany >= 0 : incoming + howMany+ " must be >= 0";
incoming += howMany;
}
int getOutgoing() {
return outgoing;
}
int getIncoming() {
return incoming;
}
public static Recoveries getOrAdd(Map<String, Recoveries> map, String key) {
Recoveries recoveries = map.get(key);
if (recoveries == null) {
recoveries = new Recoveries();
map.put(key, recoveries);
}
return recoveries;
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -364,35 +365,17 @@ public class AllocationService extends AbstractComponent {
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes();
final RoutingNodes routingNodes = allocation.routingNodes();
if (routingNodes.unassigned().getNumPrimaries() == 0) {
// move out if we don't have unassigned primaries
return changed;
}
// go over and remove dangling replicas that are initializing for primary shards
List<ShardRouting> shardsToFail = new ArrayList<>();
for (ShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
for (ShardRouting routing : routingNodes.assignedShards(shardEntry)) {
if (!routing.primary() && routing.initializing()) {
shardsToFail.add(routing);
}
}
}
}
for (ShardRouting shardToFail : shardsToFail) {
changed |= applyFailedShard(allocation, shardToFail, false,
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
// routingNodes.hasUnassignedPrimaries() will potentially be false
for (ShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
// remove dangling replicas that are initializing for primary shards
changed |= failReplicasForUnassignedPrimary(allocation, shardEntry);
ShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
if (candidate != null) {
IndexMetaData index = allocation.metaData().index(candidate.index());
@ -457,6 +440,22 @@ public class AllocationService extends AbstractComponent {
return changed;
}
private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, ShardRouting primary) {
List<ShardRouting> replicas = new ArrayList<>();
for (ShardRouting routing : allocation.routingNodes().assignedShards(primary)) {
if (!routing.primary() && routing.initializing()) {
replicas.add(routing);
}
}
boolean changed = false;
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
return changed;
}
private boolean applyStartedShards(RoutingNodes routingNodes, Iterable<? extends ShardRouting> startedShardEntries) {
boolean dirty = false;
// apply shards might be called several times with the same shard, ignore it
@ -523,7 +522,6 @@ public class AllocationService extends AbstractComponent {
logger.debug("{} ignoring shard failure, unknown index in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}
RoutingNodes routingNodes = allocation.routingNodes();
RoutingNodes.RoutingNodeIterator matchedNode = routingNodes.routingNodeIter(failedShard.currentNodeId());
@ -546,7 +544,10 @@ public class AllocationService extends AbstractComponent {
logger.debug("{} ignoring shard failure, unknown allocation id in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
return false;
}
if (failedShard.primary()) {
// fail replicas first otherwise we move RoutingNodes into an inconsistent state
failReplicasForUnassignedPrimary(allocation, failedShard);
}
// replace incoming instance to make sure we work on the latest one. Copy it to maintain information during modifications.
failedShard = new ShardRouting(matchedNode.current());

View File

@ -50,26 +50,36 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = 2;
public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = 4;
public static final String NAME = "throttling";
public static final String CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES = "cluster.routing.allocation.concurrent_recoveries";
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_recoveries", Integer.toString(DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_recoveries"), true, Setting.Scope.CLUSTER);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING = Setting.intSetting("cluster.routing.allocation.node_initial_primaries_recoveries", DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 0, true, Setting.Scope.CLUSTER);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_recoveries", (s) -> s.get(CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES,Integer.toString(DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES)), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_recoveries"), true, Setting.Scope.CLUSTER);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_incoming_recoveries", (s) -> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getRaw(s), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_incoming_recoveries"), true, Setting.Scope.CLUSTER);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_outgoing_recoveries", (s) -> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getRaw(s), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_outgoing_recoveries"), true, Setting.Scope.CLUSTER);
private volatile int primariesInitialRecoveries;
private volatile int concurrentRecoveries;
private volatile int concurrentIncomingRecoveries;
private volatile int concurrentOutgoingRecoveries;
@Inject
public ThrottlingAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
super(settings);
this.primariesInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.get(settings);
this.concurrentRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.get(settings);
logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries);
concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings);
concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, this::setPrimariesInitialRecoveries);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING, this::setConcurrentRecoveries);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, this::setConcurrentIncomingRecoverries);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, this::setConcurrentOutgoingRecoverries);
logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries);
}
private void setConcurrentRecoveries(int concurrentRecoveries) {
this.concurrentRecoveries = concurrentRecoveries;
private void setConcurrentIncomingRecoverries(int concurrentIncomingRecoveries) {
this.concurrentIncomingRecoveries = concurrentIncomingRecoveries;
}
private void setConcurrentOutgoingRecoverries(int concurrentOutgoingRecoveries) {
this.concurrentOutgoingRecoveries = concurrentOutgoingRecoveries;
}
private void setPrimariesInitialRecoveries(int primariesInitialRecoveries) {
@ -99,7 +109,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
}
}
}
// TODO should we allow shards not allocated post API to always allocate?
// either primary or replica doing recovery (from peer shard)
// count the number of recoveries on the node, its for both target (INITIALIZING) and source (RELOCATING)
@ -108,17 +118,16 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
int currentRecoveries = 0;
for (ShardRouting shard : node) {
if (shard.initializing()) {
currentRecoveries++;
}
}
if (currentRecoveries >= concurrentRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many shards currently recovering [%d], limit: [%d]",
currentRecoveries, concurrentRecoveries);
} else {
return allocation.decision(Decision.YES, NAME, "below shard recovery limit of [%d]", concurrentRecoveries);
int currentOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId());
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
if (currentOutRecoveries >= concurrentOutgoingRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards currently recovering [%d], limit: [%d]",
currentOutRecoveries, concurrentOutgoingRecoveries);
} else if (currentInRecoveries >= concurrentIncomingRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards currently recovering [%d], limit: [%d]",
currentInRecoveries, concurrentIncomingRecoveries);
} else {
return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d] incoming: [%d]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries);
}
}
}

View File

@ -109,8 +109,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
IndicesTTLService.INDICES_TTL_INTERVAL_SETTING,
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MetaData.SETTING_READ_ONLY_SETTING,
RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING,
RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
@ -119,6 +117,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
ThreadPool.THREADPOOL_GROUP_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING,
DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,

View File

@ -29,19 +29,9 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class RecoverySettings extends AbstractComponent {
/**
*/
public class RecoverySettings extends AbstractComponent implements Closeable {
public static final Setting<Integer> INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING = Setting.intSetting("indices.recovery.concurrent_streams", 3, true, Setting.Scope.CLUSTER);
public static final Setting<Integer> INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING = Setting.intSetting("indices.recovery.concurrent_small_file_streams", 2, true, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting("indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), true, Setting.Scope.CLUSTER);
/**
@ -68,15 +58,8 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
*/
public static final Setting<TimeValue> INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING = Setting.timeSetting("indices.recovery.recovery_activity_timeout", (s) -> INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.getRaw(s) , TimeValue.timeValueSeconds(0), true, Setting.Scope.CLUSTER);
public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb", "SMALL_FILE_CUTOFF_BYTES").bytes();
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
private volatile int concurrentStreams;
private volatile int concurrentSmallFileStreams;
private final ThreadPoolExecutor concurrentStreamPool;
private final ThreadPoolExecutor concurrentSmallFileStreamPool;
private volatile ByteSizeValue maxBytesPerSec;
private volatile SimpleRateLimiter rateLimiter;
private volatile TimeValue retryDelayStateSync;
@ -101,14 +84,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.concurrentStreams = INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.get(settings);
this.concurrentStreamPool = EsExecutors.newScaling("recovery_stream", 0, concurrentStreams, 60, TimeUnit.SECONDS,
EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.concurrentSmallFileStreams = INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.get(settings);
this.concurrentSmallFileStreamPool = EsExecutors.newScaling("small_file_recovery_stream", 0, concurrentSmallFileStreams, 60,
TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]"));
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (maxBytesPerSec.bytes() <= 0) {
rateLimiter = null;
@ -116,11 +91,9 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac());
}
logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}]",
maxBytesPerSec, concurrentStreams);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING, this::setConcurrentStreams);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING, this::setConcurrentSmallFileStreams);
logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork);
@ -129,20 +102,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
}
@Override
public void close() {
ThreadPool.terminate(concurrentStreamPool, 1, TimeUnit.SECONDS);
ThreadPool.terminate(concurrentSmallFileStreamPool, 1, TimeUnit.SECONDS);
}
public ThreadPoolExecutor concurrentStreamPool() {
return concurrentStreamPool;
}
public ThreadPoolExecutor concurrentSmallFileStreamPool() {
return concurrentSmallFileStreamPool;
}
public RateLimiter rateLimiter() {
return rateLimiter;
}
@ -176,10 +135,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
this.chunkSize = chunkSize;
}
private void setConcurrentStreams(int concurrentStreams) {
this.concurrentStreams = concurrentStreams;
concurrentStreamPool.setMaximumPoolSize(concurrentStreams);
}
public void setRetryDelayStateSync(TimeValue retryDelayStateSync) {
this.retryDelayStateSync = retryDelayStateSync;
@ -211,9 +166,4 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac());
}
}
private void setConcurrentSmallFileStreams(int concurrentSmallFileStreams) {
this.concurrentSmallFileStreams = concurrentSmallFileStreams;
concurrentSmallFileStreamPool.setMaximumPoolSize(concurrentSmallFileStreams);
}
}

View File

@ -58,9 +58,6 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.StreamSupport;
@ -69,6 +66,12 @@ import java.util.stream.StreamSupport;
* RecoverySourceHandler handles the three phases of shard recovery, which is
* everything relating to copying the segment files as well as sending translog
* operations across the wire once the segments have been copied.
*
* Note: There is always one source handler per recovery that handles all the
* file and translog transfer. This handler is completely isolated from other recoveries
* while the {@link RateLimiter} passed via {@link RecoverySettings} is shared across recoveries
* originating from this nodes to throttle the number bytes send during file transfer. The transaction log
* phase bypasses the rate limiter entirely.
*/
public class RecoverySourceHandler {
@ -458,10 +461,6 @@ public class RecoverySourceHandler {
// index docs to replicas while the index files are recovered
// the lock can potentially be removed, in which case, it might
// make sense to re-enable throttling in this phase
// if (recoverySettings.rateLimiter() != null) {
// recoverySettings.rateLimiter().pause(size);
// }
cancellableThreads.execute(() -> {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
@ -554,6 +553,7 @@ public class RecoverySourceHandler {
cancellableThreads.execute(() -> {
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = recoverySettings.rateLimiter();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
@ -591,100 +591,38 @@ public class RecoverySourceHandler {
void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Throwable {
store.incRef();
try {
Future[] runners = asyncSendFiles(store, files, outputStreamFactory);
IOException corruptedEngine = null;
final List<Throwable> exceptions = new ArrayList<>();
for (int i = 0; i < runners.length; i++) {
StoreFileMetaData md = files[i];
try {
runners[i].get();
} catch (ExecutionException t) {
corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t.getCause());
} catch (InterruptedException t) {
corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t);
ArrayUtil.timSort(files, (a,b) -> Long.compare(a.length(), b.length())); // send smallest first
for (int i = 0; i < files.length; i++) {
final StoreFileMetaData md = files[i];
try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
// it's fine that we are only having the indexInput in the try/with block. The copy methods handles
// exceptions during close correctly and doesn't hide the original exception.
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md));
} catch (Throwable t) {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(t)) != null) {
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
failEngine(corruptIndexException);
throw corruptIndexException;
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(t);
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK",
corruptIndexException, shardId, request.targetNode(), md);
throw exception;
}
} else {
throw t;
}
}
}
if (corruptedEngine != null) {
failEngine(corruptedEngine);
throw corruptedEngine;
} else {
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
} finally {
store.decRef();
}
}
private IOException handleExecutionException(Store store, IOException corruptedEngine, List<Throwable> exceptions, StoreFileMetaData md, Throwable t) {
logger.debug("Failed to transfer file [" + md + "] on recovery");
final IOException corruptIndexException;
final boolean checkIntegrity = corruptedEngine == null;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(t)) != null) {
if (checkIntegrity && store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
corruptedEngine = corruptIndexException;
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(t);
if (checkIntegrity) {
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK",
corruptIndexException, shardId, request.targetNode(), md);
} else {
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum are skipped",
corruptIndexException, shardId, request.targetNode(), md);
}
exceptions.add(exception);
}
} else {
exceptions.add(t);
}
return corruptedEngine;
}
protected void failEngine(IOException cause) {
shard.failShard("recovery", cause);
}
Future<Void>[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) {
store.incRef();
try {
final Future<Void>[] futures = new Future[files.length];
for (int i = 0; i < files.length; i++) {
final StoreFileMetaData md = files[i];
long fileSize = md.length();
// Files are split into two categories, files that are "small"
// (under 5mb) and other files. Small files are transferred
// using a separate thread pool dedicated to small files.
//
// The idea behind this is that while we are transferring an
// older, large index, a user may create a new index, but that
// index will not be able to recover until the large index
// finishes, by using two different thread pools we can allow
// tiny files (like segments for a brand new index) to be
// recovered while ongoing large segment recoveries are
// happening. It also allows these pools to be configured
// separately.
ThreadPoolExecutor pool;
if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) {
pool = recoverySettings.concurrentStreamPool();
} else {
pool = recoverySettings.concurrentSmallFileStreamPool();
}
Future<Void> future = pool.submit(() -> {
try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
// it's fine that we are only having the indexInput int he try/with block. The copy methods handles
// exceptions during close correctly and doesn't hide the original exception.
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md));
}
return null;
});
futures[i] = future;
}
return futures;
} finally {
store.decRef();
}
}
}

View File

@ -322,7 +322,6 @@ public class Node implements Releasable {
for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) {
injector.getInstance(plugin).stop();
}
injector.getInstance(RecoverySettings.class).close();
// we should stop this last since it waits for resources to get released
// if we had scroll searchers etc or recovery going on we wait for to finish.
injector.getInstance(IndicesService.class).stop();

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.admin.indices.upgrade.UpgradeIT;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
@ -117,7 +118,8 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
public Settings nodeSettings(int ord) {
return Settings.builder()
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) // disable merging so no segments will be upgraded
.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), 30) // increase recovery speed for small files
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 30) // speed up recoveries
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 30)
.build();
}

View File

@ -50,7 +50,8 @@ public class AckClusterUpdateSettingsIT extends ESIntegTestCase {
.put(super.nodeSettings(nodeOrdinal))
//make sure that enough concurrent reroutes can happen at the same time
//we have a minimum of 2 nodes, and a maximum of 10 shards, thus 5 should be enough
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 5)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 5)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 5)
.put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), 10)
.build();
}

View File

@ -162,7 +162,8 @@ public class ClusterRerouteIT extends ESIntegTestCase {
public void testDelayWithALargeAmountOfShards() throws Exception {
Settings commonSettings = settingsBuilder()
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, 1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, 1)
.build();
logger.info("--> starting 4 nodes");
String node_1 = internalCluster().startNode(commonSettings);

View File

@ -32,7 +32,7 @@ public final class RandomShardRoutingMutator {
}
public static void randomChange(ShardRouting shardRouting, String[] nodes) {
switch (randomInt(3)) {
switch (randomInt(2)) {
case 0:
if (shardRouting.unassigned() == false) {
shardRouting.moveToUnassigned(new UnassignedInfo(randomReason(), randomAsciiOfLength(10)));
@ -46,13 +46,6 @@ public final class RandomShardRoutingMutator {
}
break;
case 2:
if (shardRouting.primary()) {
shardRouting.moveFromPrimary();
} else {
shardRouting.moveToPrimary();
}
break;
case 3:
if (shardRouting.initializing()) {
shardRouting.moveToStarted();
}

View File

@ -50,7 +50,7 @@ public class RoutingTableTests extends ESAllocationTestCase {
private int totalNumberOfShards;
private final static Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
private final AllocationService ALLOCATION_SERVICE = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.build());
private ClusterState clusterState;

View File

@ -54,7 +54,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(AllocationCommandsTests.class);
public void testMoveShardCommand() {
AllocationService allocation = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService allocation = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("creating an index with 1 shard, no replica");
MetaData metaData = MetaData.builder()

View File

@ -38,9 +38,10 @@ public class AllocationPriorityTests extends ESAllocationTestCase {
*/
public void testPrioritizedIndicesAllocatedFirst() {
AllocationService allocation = createAllocationService(settingsBuilder().
put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1)
put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1).build());
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1).build());
final String highPriorityName;
final String lowPriorityName;
final int priorityFirst;
@ -84,7 +85,7 @@ public class AllocationPriorityTests extends ESAllocationTestCase {
routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
assertEquals(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).toString(),2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index());
assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index());

View File

@ -54,7 +54,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase {
public void testMoveShardOnceNewNodeWithAttributeAdded1() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
@ -386,7 +386,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase {
public void testMoveShardOnceNewNodeWithAttributeAdded5() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
@ -464,7 +464,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase {
public void testMoveShardOnceNewNodeWithAttributeAdded6() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
@ -544,7 +544,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase {
public void testFullAwareness1() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
@ -611,7 +611,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase {
public void testFullAwareness2() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
@ -827,7 +827,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase {
public void testUnassignedShardsWithUnbalancedZones() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "zone")
.build());

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
@ -358,7 +359,9 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
public boolean allocateUnassigned(RoutingAllocation allocation) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
boolean changed = !unassigned.isEmpty();
for (ShardRouting sr : unassigned.drain()) {
ShardRouting[] drain = unassigned.drain();
ArrayUtil.timSort(drain, (a, b) -> { return a.primary() ? -1 : 1; }); // we have to allocate primaries first
for (ShardRouting sr : drain) {
switch (sr.id()) {
case 0:
if (sr.primary()) {

View File

@ -43,7 +43,7 @@ public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase {
public void testClusterConcurrentRebalance() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 3)
.build());
@ -145,4 +145,4 @@ public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase {
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(0));
}
}
}

View File

@ -45,7 +45,7 @@ public class DeadNodesAllocationTests extends ESAllocationTestCase {
public void testSimpleDeadNodeOnStartedPrimaryShard() {
AllocationService allocation = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());
@ -96,7 +96,7 @@ public class DeadNodesAllocationTests extends ESAllocationTestCase {
public void testDeadNodeWhileRelocatingOnToNode() {
AllocationService allocation = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());
@ -170,7 +170,7 @@ public class DeadNodesAllocationTests extends ESAllocationTestCase {
public void testDeadNodeWhileRelocatingOnFromNode() {
AllocationService allocation = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());

View File

@ -43,7 +43,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests extends ESAllocationTest
private final ESLogger logger = Loggers.getLogger(ElectReplicaAsPrimaryDuringRelocationTests.class);
public void testElectReplicaAsPrimaryDuringRelocation() {
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -56,7 +56,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
public void testFailedShardPrimaryRelocatingToAndFrom() {
AllocationService allocation = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());
@ -144,7 +144,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
public void testFailPrimaryStartedCheckReplicaElected() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());
@ -225,7 +225,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
public void testFirstAllocationFailureSingleNode() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());
@ -281,7 +281,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
public void testSingleShardMultipleAllocationFailures() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());
@ -337,7 +337,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
public void testFirstAllocationFailureTwoNodes() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());
@ -397,7 +397,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
public void testRebalanceFailure() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());

View File

@ -57,7 +57,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
public void testDoNotAllocateFromPrimary() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());
@ -171,7 +171,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
public void testRandom() {
AllocationService service = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());
@ -220,7 +220,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
public void testRollingRestart() {
AllocationService service = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESAllocationTestCase;
@ -42,6 +43,7 @@ public class PreferPrimaryAllocationTests extends ESAllocationTestCase {
logger.info("create an allocation with 1 initial recoveries");
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 1)
.build());

View File

@ -43,7 +43,7 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(PrimaryElectionRoutingTests.class);
public void testBackupElectionToPrimaryWhenPrimaryCanBeAllocatedToAnotherNode() {
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("Building initial routing table");
@ -93,7 +93,7 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
}
public void testRemovingInitializingReplicasIfPrimariesFails() {
AllocationService allocation = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService allocation = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -44,6 +44,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests extends ESAllocationTes
public void testPrimaryNotRelocatedWhileBeingRecoveredFrom() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.concurrent_source_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.build());

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESAllocationTestCase;
@ -57,7 +58,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase {
public void testRandomDecisions() {
RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom());
AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY),
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY),
randomAllocationDecider))), new ShardsAllocators(NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
int indices = scaledRandomIntBetween(1, 20);
Builder metaBuilder = MetaData.builder();

View File

@ -56,7 +56,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
}
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(),

View File

@ -45,7 +45,7 @@ public class ReplicaAllocatedAfterPrimaryTests extends ESAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(ReplicaAllocatedAfterPrimaryTests.class);
public void testBackupIsAllocatedAfterPrimary() {
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESAllocationTestCase;
@ -211,6 +212,7 @@ public class RoutingNodesIntegrityTests extends ESAllocationTestCase {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 3)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build());

View File

@ -46,7 +46,7 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(ShardsLimitAllocationTests.class);
public void testIndexLevelShardsLimitAllocate() {
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("Building initial routing table");
@ -89,7 +89,7 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase {
public void testClusterLevelShardsLimitAllocate() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1)
.build());
@ -125,7 +125,7 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase {
// Bump the cluster total shards to 2
strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2)
.build());
@ -147,7 +147,7 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase {
public void testIndexLevelShardsLimitRemain() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.balance.index", 0.0f)

View File

@ -59,7 +59,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(SingleShardNoReplicasRoutingTests.class);
public void testSingleIndexStartedShard() {
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("Building initial routing table");
@ -160,7 +160,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase {
}
public void testSingleIndexShardFailed() {
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("Building initial routing table");
@ -210,7 +210,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase {
public void testMultiIndexEvenDistribution() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());
@ -322,7 +322,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase {
public void testMultiIndexUnevenNodes() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());
@ -413,4 +413,4 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase {
assertThat(routingNode.numberOfShardsWithState(STARTED), equalTo(2));
}
}
}
}

View File

@ -44,7 +44,7 @@ public class SingleShardOneReplicaRoutingTests extends ESAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(SingleShardOneReplicaRoutingTests.class);
public void testSingleIndexFirstStartPrimaryThenBackups() {
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -51,9 +51,9 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase {
.nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")))
.metaData(MetaData.builder().put(indexMetaData, false));
final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1);
final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1);
final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", true, ShardRoutingState.INITIALIZING, 1);
final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", true, ShardRoutingState.STARTED, 1);
final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", true, ShardRoutingState.RELOCATING, 1);
stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test")
.addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId()).addShard(initShard).build())
.addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId()).addShard(startedShard).build())

View File

@ -25,11 +25,16 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESAllocationTestCase;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -103,7 +108,8 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
public void testReplicaAndPrimaryRecoveryThrottling() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 3)
.put("cluster.routing.allocation.node_concurrent_recoveries", 3)
.put("cluster.routing.allocation.concurrent_source_recoveries", 3)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 3)
.build());
@ -169,4 +175,157 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
}
public void testThrottleIncomingAndOutgoing() {
Settings settings = settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 5)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 5)
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 5)
.build();
AllocationService strategy = createAllocationService(settings);
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(9).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("start one node, do reroute, only 5 should initialize");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(5));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(4));
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 5);
logger.info("start initializing, all primaries should be started");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(4));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("start another 2 nodes, 5 shards should be relocating - at most 5 are allowed per node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2")).put(newNode("node3"))).build();
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(4));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(5));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 3);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 2);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 5);
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("start the relocating shards, one more shard should relocate away from node1");
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(8));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
}
public void testOutgoingThrottlesAllocaiton() {
Settings settings = settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 1)
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 1)
.build();
AllocationService strategy = createAllocationService(settings);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(3).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build();
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 1);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
RoutingAllocation.Result reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(clusterState.getRoutingNodes().node("node1").get(0).shardId(), "node1", "node2")));
assertEquals(reroute.explanations().explanations().size(), 1);
assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.YES);
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
// outgoing throttles
reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(clusterState.getRoutingNodes().node("node3").get(0).shardId(), "node3", "node1")), true);
assertEquals(reroute.explanations().explanations().size(), 1);
assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.THROTTLE);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(2));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
// incoming throttles
reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(clusterState.getRoutingNodes().node("node3").get(0).shardId(), "node3", "node2")), true);
assertEquals(reroute.explanations().explanations().size(), 1);
assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.THROTTLE);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(2));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
}
}

View File

@ -46,7 +46,7 @@ public class UpdateNumberOfReplicasTests extends ESAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(UpdateNumberOfReplicasTests.class);
public void testUpdateNumberOfReplicas() {
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -107,7 +107,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
}
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -192,7 +192,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
new DiskThresholdDecider(diskSettings))));
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -223,7 +223,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
new DiskThresholdDecider(diskSettings))));
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -303,7 +303,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -360,7 +360,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
}
};
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -427,7 +427,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
new DiskThresholdDecider(diskSettings))));
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -458,7 +458,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
new DiskThresholdDecider(diskSettings))));
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -567,7 +567,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -635,7 +635,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -738,7 +738,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -900,7 +900,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
new SameShardAllocationDecider(Settings.EMPTY), diskThresholdDecider
)));
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
@ -1000,8 +1000,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
)));
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis);
RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute");

View File

@ -159,6 +159,7 @@ public class EnableAllocationTests extends ESAllocationTestCase {
Settings build = settingsBuilder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), useClusterSetting ? Rebalance.NONE: RandomPicks.randomFrom(getRandom(), Rebalance.values())) // index settings override cluster settings
.put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), 3)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10)
.build();
ClusterSettings clusterSettings = new ClusterSettings(build, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationService strategy = createAllocationService(build, clusterSettings, getRandom());

View File

@ -224,7 +224,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
public void testAttributePreferenceRouting() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id,zone")
.build());
@ -279,7 +279,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
public void testNodeSelectorRouting(){
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());
@ -336,7 +336,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
public void testShardsAndPreferNodeRouting() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.build());
MetaData metaData = MetaData.builder()
@ -397,7 +397,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
public void testReplicaShardPreferenceIters() throws Exception {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.build());
OperationRouting operationRouting = new OperationRouting(Settings.Builder.EMPTY_SETTINGS, new AwarenessAllocationDecider());
@ -479,4 +479,4 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
assertTrue(routing.primary());
}
}
}

View File

@ -56,7 +56,7 @@ public class GatewayMetaStateTests extends ESAllocationTestCase {
ClusterChangedEvent generateEvent(boolean initializing, boolean versionChanged, boolean masterEligible) {
//ridiculous settings to make sure we don't run into uninitialized because fo default
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 100)
.put("cluster.routing.allocation.node_concurrent_recoveries", 100)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
@ -110,7 +110,7 @@ public class GatewayMetaStateTests extends ESAllocationTestCase {
ClusterChangedEvent generateCloseEvent(boolean masterEligible) {
//ridiculous settings to make sure we don't run into uninitialized because fo default
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 100)
.put("cluster.routing.allocation.node_concurrent_recoveries", 100)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)

View File

@ -319,14 +319,13 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
assertThat(state.metaData().index("test").getAliases().get("test_alias").filter(), notNullValue());
}
@TestLogging("gateway:TRACE,indices.recovery:TRACE,index.engine:TRACE")
public void testReusePeerRecovery() throws Exception {
final Settings settings = settingsBuilder()
.put("action.admin.cluster.node.shutdown.delay", "10ms")
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false)
.put("gateway.recover_after_nodes", 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, 4)
.put(MockFSDirectoryService.CRASH_INDEX, false).build();
internalCluster().startNodesAsync(4, settings).get();

View File

@ -114,9 +114,8 @@ public class CorruptedFileIT extends ESIntegTestCase {
// and we need to make sure primaries are not just trashed if we don't have replicas
.put(super.nodeSettings(nodeOrdinal))
// speed up recoveries
.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.getKey(), 10)
.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), 10)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 5)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 5)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 5)
.build();
}

View File

@ -108,7 +108,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
assertEquals(0, recoveryDiff.missing.size());
IndexReader reader = DirectoryReader.open(targetStore.directory());
assertEquals(numDocs, reader.maxDoc());
IOUtils.close(reader, writer, store, targetStore, recoverySettings);
IOUtils.close(reader, writer, store, targetStore);
}
public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
@ -170,7 +170,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
assertNotNull(ExceptionsHelper.unwrapCorruption(ex));
}
assertTrue(failedEngine.get());
IOUtils.close(store, targetStore, recoverySettings);
IOUtils.close(store, targetStore);
}
@ -231,7 +231,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
fail("not expected here");
}
assertFalse(failedEngine.get());
IOUtils.close(store, targetStore, recoverySettings);
IOUtils.close(store, targetStore);
}
private Store newStore(Path path) throws IOException {

View File

@ -32,18 +32,6 @@ public class RecoverySettingsTests extends ESSingleNodeTestCase {
}
public void testAllSettingsAreDynamicallyUpdatable() {
innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.getKey(), randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.concurrentStreamPool().getMaximumPoolSize());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.concurrentSmallFileStreamPool().getMaximumPoolSize());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0, new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {

View File

@ -200,6 +200,11 @@ If you are using any of these settings please take the time and review their pur
_expert settings_ and should only be used if absolutely necessary. If you have set any of the above setting as persistent
cluster settings please use the settings update API and set their superseded keys accordingly.
The following settings have been removed without replacement
* `indices.recovery.concurrent_small_file_streams` - recoveries are now single threaded. The number of concurrent outgoing recoveries are throttled via allocation deciders
* `indices.recovery.concurrent_file_streams` - recoveries are now single threaded. The number of concurrent outgoing recoveries are throttled via allocation deciders
==== Translog settings
The `index.translog.flush_threshold_ops` setting is not supported anymore. In order to control flushes based on the transaction log
@ -211,6 +216,14 @@ anymore, the `buffered` implementation is now the only available option and uses
The deprecated settings `index.cache.query.enable` and `indices.cache.query.size` have been removed and are replaced with
`index.requests.cache.enable` and `indices.requests.cache.size` respectively.
==== Allocation settings
Allocation settings deprecated in 1.x have been removed:
* `cluster.routing.allocation.concurrent_recoveries` is superseded by `cluster.routing.allocation.node_concurrent_recoveries`
Please change the setting in your configuration files or in the clusterstate to use the new settings instead.
[[breaking_30_mapping_changes]]
=== Mapping changes

View File

@ -27,10 +27,15 @@ one of the active allocation ids in the cluster state.
--
`cluster.routing.allocation.node_concurrent_recoveries`::
`cluster.routing.allocation.node_concurrent_incoming_recoveries`::
How many concurrent shard recoveries are allowed to happen on a node.
Defaults to `2`.
How many concurrent incoming shard recoveries are allowed to happen on a node. Incoming recoveries are the recoveries
where the target shard (most likely the replica unless a shard is relocating) is allocated on the node. Defaults to `2`.
`cluster.routing.allocation.node_concurrent_outgoing_recoveries`::
How many concurrent outgoing shard recoveries are allowed to happen on a node. Outgoing recoveries are the recoveries
where the source shard (most likely the primary unless a shard is relocating) is allocated on the node. Defaults to `2`.
`cluster.routing.allocation.node_initial_primaries_recoveries`::
@ -47,17 +52,6 @@ one of the active allocation ids in the cluster state.
Defaults to `false`, meaning that no check is performed by default. This
setting only applies if multiple nodes are started on the same machine.
`indices.recovery.concurrent_streams`::
The number of network streams to open per node to recover a shard from
a peer shard. Defaults to `3`.
`indices.recovery.concurrent_small_file_streams`::
The number of streams to open per node for small files (under 5mb) to
recover a shard from a peer shard. Defaults to `2`.
[float]
=== Shard Rebalancing Settings

View File

@ -3,12 +3,6 @@
The following _expert_ settings can be set to manage the recovery policy.
`indices.recovery.concurrent_streams`::
Defaults to `3`.
`indices.recovery.concurrent_small_file_streams`::
Defaults to `2`.
`indices.recovery.file_chunk_size`::
Defaults to `512kb`.

View File

@ -306,13 +306,11 @@ public final class InternalTestCluster extends TestCluster {
builder.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b");
builder.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b");
if (TEST_NIGHTLY) {
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.getKey(), RandomInts.randomIntBetween(random, 10, 15));
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), RandomInts.randomIntBetween(random, 10, 15));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10));
} else if (random.nextInt(100) <= 90) {
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.getKey(), RandomInts.randomIntBetween(random, 3, 6));
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), RandomInts.randomIntBetween(random, 3, 6));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5));
}
// always reduce this - it can make tests really slow
builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50)));