diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 25e86d9887f..c0370248ad7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -74,7 +74,6 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.MergePolicyConfig; import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.store.IndexStore; -import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.indices.ttl.IndicesTTLService; @@ -129,7 +128,6 @@ public class ClusterModule extends AbstractModule { registerShardsAllocator(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, BalancedShardsAllocator.class); } - private void registerBuiltinIndexSettings() { registerIndexDynamicSetting(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); registerIndexDynamicSetting(IndexStore.INDEX_STORE_THROTTLE_TYPE, Validator.EMPTY); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 6512ee5cef7..d425b63b34c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -106,7 +106,6 @@ public class IndexShardRoutingTable implements Iterable { } } this.allShardsStarted = allShardsStarted; - this.primary = primary; if (primary != null) { this.primaryAsList = Collections.singletonList(primary); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 8dd980c8bb3..e98eb9d4a47 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -69,6 +69,7 @@ public class RoutingNodes implements Iterable { private int relocatingShards = 0; private final Map> nodesPerAttributeNames = new HashMap<>(); + private final Map recoveryiesPerNode = new HashMap<>(); public RoutingNodes(ClusterState clusterState) { this(clusterState, true); @@ -91,6 +92,7 @@ public class RoutingNodes implements Iterable { // also fill replicaSet information for (ObjectCursor indexRoutingTable : routingTable.indicesRouting().values()) { for (IndexShardRoutingTable indexShard : indexRoutingTable.value) { + assert indexShard.primary != null; for (ShardRouting shard : indexShard) { // to get all the shards belonging to an index, including the replicas, // we define a replica set and keep track of it. A replica set is identified @@ -107,16 +109,18 @@ public class RoutingNodes implements Iterable { // add the counterpart shard with relocatingNodeId reflecting the source from which // it's relocating from. ShardRouting targetShardRouting = shard.buildTargetRelocatingShard(); + addInitialRecovery(targetShardRouting); if (readOnly) { targetShardRouting.freeze(); } entries.add(targetShardRouting); assignedShardsAdd(targetShardRouting); - } else if (!shard.active()) { // shards that are initializing without being relocated + } else if (shard.active() == false) { // shards that are initializing without being relocated if (shard.primary()) { inactivePrimaryCount++; } inactiveShardCount++; + addInitialRecovery(shard); } } else { final ShardRouting sr = getRouting(shard, readOnly); @@ -132,6 +136,79 @@ public class RoutingNodes implements Iterable { } } + private void addRecovery(ShardRouting routing) { + addRecovery(routing, true, false); + } + + private void removeRecovery(ShardRouting routing) { + addRecovery(routing, false, false); + } + + public void addInitialRecovery(ShardRouting routing) { + addRecovery(routing,true, true); + } + + private void addRecovery(final ShardRouting routing, final boolean increment, final boolean initializing) { + final int howMany = increment ? 1 : -1; + assert routing.initializing() : "routing must be initializing: " + routing; + Recoveries.getOrAdd(recoveryiesPerNode, routing.currentNodeId()).addIncoming(howMany); + final String sourceNodeId; + if (routing.relocatingNodeId() != null) { // this is a relocation-target + sourceNodeId = routing.relocatingNodeId(); + if (routing.primary() && increment == false) { // primary is done relocating + int numRecoveringReplicas = 0; + for (ShardRouting assigned : assignedShards(routing)) { + if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) { + numRecoveringReplicas++; + } + } + // we transfer the recoveries to the relocated primary + recoveryiesPerNode.get(sourceNodeId).addOutgoing(-numRecoveringReplicas); + recoveryiesPerNode.get(routing.currentNodeId()).addOutgoing(numRecoveringReplicas); + } + } else if (routing.primary() == false) { // primary without relocationID is initial recovery + ShardRouting primary = findPrimary(routing); + if (primary == null && initializing) { + primary = routingTable.index(routing.index()).shard(routing.shardId().id()).primary; + } else if (primary == null) { + throw new IllegalStateException("replica is initializing but primary is unassigned"); + } + sourceNodeId = primary.currentNodeId(); + } else { + sourceNodeId = null; + } + if (sourceNodeId != null) { + Recoveries.getOrAdd(recoveryiesPerNode, sourceNodeId).addOutgoing(howMany); + } + } + + public int getIncomingRecoveries(String nodeId) { + return recoveryiesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming(); + } + + public int getOutgoingRecoveries(String nodeId) { + return recoveryiesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getOutgoing(); + } + + private ShardRouting findPrimary(ShardRouting routing) { + List shardRoutings = assignedShards.get(routing.shardId()); + ShardRouting primary = null; + if (shardRoutings != null) { + for (ShardRouting shardRouting : shardRoutings) { + if (shardRouting.primary()) { + if (shardRouting.active()) { + return shardRouting; + } else if (primary == null) { + primary = shardRouting; + } else if (primary.relocatingNodeId() != null) { + primary = shardRouting; + } + } + } + } + return primary; + } + private static ShardRouting getRouting(ShardRouting src, boolean readOnly) { if (readOnly) { src.freeze(); // we just freeze and reuse this instance if we are read only @@ -352,6 +429,7 @@ public class RoutingNodes implements Iterable { if (shard.primary()) { inactivePrimaryCount++; } + addRecovery(shard); assignedShardsAdd(shard); } @@ -367,6 +445,7 @@ public class RoutingNodes implements Iterable { ShardRouting target = shard.buildTargetRelocatingShard(); node(target.currentNodeId()).add(target); assignedShardsAdd(target); + addRecovery(target); return target; } @@ -383,9 +462,12 @@ public class RoutingNodes implements Iterable { inactivePrimaryCount--; } } + removeRecovery(shard); shard.moveToStarted(); } + + /** * Cancels a relocation of a shard that shard must relocating. */ @@ -440,6 +522,9 @@ public class RoutingNodes implements Iterable { cancelRelocation(shard); } assignedShardsRemove(shard); + if (shard.initializing()) { + removeRecovery(shard); + } } private void assignedShardsAdd(ShardRouting shard) { @@ -749,6 +834,54 @@ public class RoutingNodes implements Iterable { } } + for (Map.Entry recoveries : routingNodes.recoveryiesPerNode.entrySet()) { + String node = recoveries.getKey(); + final Recoveries value = recoveries.getValue(); + int incoming = 0; + int outgoing = 0; + RoutingNode routingNode = routingNodes.nodesToShards.get(node); + if (routingNode != null) { // node might have dropped out of the cluster + for (ShardRouting routing : routingNode) { + if (routing.initializing()) { + incoming++; + } else if (routing.relocating()) { + outgoing++; + } + if (routing.primary() && (routing.initializing() && routing.relocatingNodeId() != null) == false) { // we don't count the initialization end of the primary relocation + List shardRoutings = routingNodes.assignedShards.get(routing.shardId()); + for (ShardRouting assigned : shardRoutings) { + if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) { + outgoing++; + } + } + } + } + } +// if (outgoing != value.outgoing) { +// incoming = 0; +// outgoing = 0; +// for (ShardRouting routing : routingNode) { +// if (routing.initializing()) { +// incoming++; +// } else if (routing.relocating()) { +// outgoing++; +// } +// if (routing.primary() && (routing.initializing() && routing.relocatingNodeId() != null) == false) { // we don't count the initialization end of the primary relocation +// List shardRoutings = routingNodes.assignedShards.get(routing.shardId()); +// for (ShardRouting assigned : shardRoutings) { +// if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) { +// outgoing++; +// } +// } +// } +// } +// } + assert incoming == value.incoming : incoming + " != " + value.incoming; + assert outgoing == value.outgoing : outgoing + " != " + value.outgoing + " node: " + routingNode; + + } + + assert unassignedPrimaryCount == routingNodes.unassignedShards.getNumPrimaries() : "Unassigned primaries is [" + unassignedPrimaryCount + "] but RoutingNodes returned unassigned primaries [" + routingNodes.unassigned().getNumPrimaries() + "]"; assert unassignedIgnoredPrimaryCount == routingNodes.unassignedShards.getNumIgnoredPrimaries() : @@ -856,4 +989,41 @@ public class RoutingNodes implements Iterable { throw new IllegalStateException("can't modify RoutingNodes - readonly"); } } + + private static final class Recoveries { + private static final Recoveries EMPTY = new Recoveries(); + private int incoming = 0; + private int outgoing = 0; + + int getTotal() { + return incoming + outgoing; + } + + void addOutgoing(int howMany) { + assert outgoing + howMany >= 0 : outgoing + howMany+ " must be >= 0"; + outgoing += howMany; + } + + void addIncoming(int howMany) { + assert incoming + howMany >= 0 : incoming + howMany+ " must be >= 0"; + incoming += howMany; + } + + int getOutgoing() { + return outgoing; + } + + int getIncoming() { + return incoming; + } + + public static Recoveries getOrAdd(Map map, String key) { + Recoveries recoveries = map.get(key); + if (recoveries == null) { + recoveries = new Recoveries(); + map.put(key, recoveries); + } + return recoveries; + } + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index feafb76a5f2..774bcb078f5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -364,35 +365,17 @@ public class AllocationService extends AbstractComponent { private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) { boolean changed = false; - RoutingNodes routingNodes = allocation.routingNodes(); + final RoutingNodes routingNodes = allocation.routingNodes(); if (routingNodes.unassigned().getNumPrimaries() == 0) { // move out if we don't have unassigned primaries return changed; } - - // go over and remove dangling replicas that are initializing for primary shards - List shardsToFail = new ArrayList<>(); - for (ShardRouting shardEntry : routingNodes.unassigned()) { - if (shardEntry.primary()) { - for (ShardRouting routing : routingNodes.assignedShards(shardEntry)) { - if (!routing.primary() && routing.initializing()) { - shardsToFail.add(routing); - } - } - - } - } - for (ShardRouting shardToFail : shardsToFail) { - changed |= applyFailedShard(allocation, shardToFail, false, - new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", - null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); - } - // now, go over and elect a new primary if possible, not, from this code block on, if one is elected, // routingNodes.hasUnassignedPrimaries() will potentially be false - for (ShardRouting shardEntry : routingNodes.unassigned()) { if (shardEntry.primary()) { + // remove dangling replicas that are initializing for primary shards + changed |= failReplicasForUnassignedPrimary(allocation, shardEntry); ShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry); if (candidate != null) { IndexMetaData index = allocation.metaData().index(candidate.index()); @@ -457,6 +440,22 @@ public class AllocationService extends AbstractComponent { return changed; } + private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, ShardRouting primary) { + List replicas = new ArrayList<>(); + for (ShardRouting routing : allocation.routingNodes().assignedShards(primary)) { + if (!routing.primary() && routing.initializing()) { + replicas.add(routing); + } + } + boolean changed = false; + for (ShardRouting routing : replicas) { + changed |= applyFailedShard(allocation, routing, false, + new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", + null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); + } + return changed; + } + private boolean applyStartedShards(RoutingNodes routingNodes, Iterable startedShardEntries) { boolean dirty = false; // apply shards might be called several times with the same shard, ignore it @@ -523,7 +522,6 @@ public class AllocationService extends AbstractComponent { logger.debug("{} ignoring shard failure, unknown index in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); return false; } - RoutingNodes routingNodes = allocation.routingNodes(); RoutingNodes.RoutingNodeIterator matchedNode = routingNodes.routingNodeIter(failedShard.currentNodeId()); @@ -546,7 +544,10 @@ public class AllocationService extends AbstractComponent { logger.debug("{} ignoring shard failure, unknown allocation id in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); return false; } - + if (failedShard.primary()) { + // fail replicas first otherwise we move RoutingNodes into an inconsistent state + failReplicasForUnassignedPrimary(allocation, failedShard); + } // replace incoming instance to make sure we work on the latest one. Copy it to maintain information during modifications. failedShard = new ShardRouting(matchedNode.current()); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index bbd28104baf..25f43f57610 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -50,26 +50,36 @@ public class ThrottlingAllocationDecider extends AllocationDecider { public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = 2; public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = 4; public static final String NAME = "throttling"; - public static final String CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES = "cluster.routing.allocation.concurrent_recoveries"; - + public static final Setting CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_recoveries", Integer.toString(DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_recoveries"), true, Setting.Scope.CLUSTER); public static final Setting CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING = Setting.intSetting("cluster.routing.allocation.node_initial_primaries_recoveries", DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 0, true, Setting.Scope.CLUSTER); - public static final Setting CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_recoveries", (s) -> s.get(CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES,Integer.toString(DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES)), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_recoveries"), true, Setting.Scope.CLUSTER); + public static final Setting CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_incoming_recoveries", (s) -> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getRaw(s), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_incoming_recoveries"), true, Setting.Scope.CLUSTER); + public static final Setting CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_outgoing_recoveries", (s) -> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getRaw(s), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_outgoing_recoveries"), true, Setting.Scope.CLUSTER); + private volatile int primariesInitialRecoveries; - private volatile int concurrentRecoveries; + private volatile int concurrentIncomingRecoveries; + private volatile int concurrentOutgoingRecoveries; + @Inject public ThrottlingAllocationDecider(Settings settings, ClusterSettings clusterSettings) { super(settings); this.primariesInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.get(settings); - this.concurrentRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.get(settings); - logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries); + concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings); + concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, this::setPrimariesInitialRecoveries); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING, this::setConcurrentRecoveries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, this::setConcurrentIncomingRecoverries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, this::setConcurrentOutgoingRecoverries); + + logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries); } - private void setConcurrentRecoveries(int concurrentRecoveries) { - this.concurrentRecoveries = concurrentRecoveries; + private void setConcurrentIncomingRecoverries(int concurrentIncomingRecoveries) { + this.concurrentIncomingRecoveries = concurrentIncomingRecoveries; + } + private void setConcurrentOutgoingRecoverries(int concurrentOutgoingRecoveries) { + this.concurrentOutgoingRecoveries = concurrentOutgoingRecoveries; } private void setPrimariesInitialRecoveries(int primariesInitialRecoveries) { @@ -99,7 +109,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider { } } } - + // TODO should we allow shards not allocated post API to always allocate? // either primary or replica doing recovery (from peer shard) // count the number of recoveries on the node, its for both target (INITIALIZING) and source (RELOCATING) @@ -108,17 +118,16 @@ public class ThrottlingAllocationDecider extends AllocationDecider { @Override public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) { - int currentRecoveries = 0; - for (ShardRouting shard : node) { - if (shard.initializing()) { - currentRecoveries++; - } - } - if (currentRecoveries >= concurrentRecoveries) { - return allocation.decision(Decision.THROTTLE, NAME, "too many shards currently recovering [%d], limit: [%d]", - currentRecoveries, concurrentRecoveries); - } else { - return allocation.decision(Decision.YES, NAME, "below shard recovery limit of [%d]", concurrentRecoveries); + int currentOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId()); + int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId()); + if (currentOutRecoveries >= concurrentOutgoingRecoveries) { + return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards currently recovering [%d], limit: [%d]", + currentOutRecoveries, concurrentOutgoingRecoveries); + } else if (currentInRecoveries >= concurrentIncomingRecoveries) { + return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards currently recovering [%d], limit: [%d]", + currentInRecoveries, concurrentIncomingRecoveries); + } else { + return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d] incoming: [%d]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries); } } } diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 0e1dcf5a605..10c602688a4 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -109,8 +109,6 @@ public final class ClusterSettings extends AbstractScopedSettings { IndicesTTLService.INDICES_TTL_INTERVAL_SETTING, MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, MetaData.SETTING_READ_ONLY_SETTING, - RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING, - RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, @@ -119,6 +117,8 @@ public final class ClusterSettings extends AbstractScopedSettings { RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING, ThreadPool.THREADPOOL_GROUP_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, + ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, + ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING, DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 682b66e084e..c86309db136 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -29,19 +29,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.threadpool.ThreadPool; -import java.io.Closeable; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +public class RecoverySettings extends AbstractComponent { -/** - */ -public class RecoverySettings extends AbstractComponent implements Closeable { - - public static final Setting INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING = Setting.intSetting("indices.recovery.concurrent_streams", 3, true, Setting.Scope.CLUSTER); - public static final Setting INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING = Setting.intSetting("indices.recovery.concurrent_small_file_streams", 2, true, Setting.Scope.CLUSTER); public static final Setting INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting("indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), true, Setting.Scope.CLUSTER); /** @@ -68,15 +58,8 @@ public class RecoverySettings extends AbstractComponent implements Closeable { */ public static final Setting INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING = Setting.timeSetting("indices.recovery.recovery_activity_timeout", (s) -> INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.getRaw(s) , TimeValue.timeValueSeconds(0), true, Setting.Scope.CLUSTER); - public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb", "SMALL_FILE_CUTOFF_BYTES").bytes(); - public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); - private volatile int concurrentStreams; - private volatile int concurrentSmallFileStreams; - private final ThreadPoolExecutor concurrentStreamPool; - private final ThreadPoolExecutor concurrentSmallFileStreamPool; - private volatile ByteSizeValue maxBytesPerSec; private volatile SimpleRateLimiter rateLimiter; private volatile TimeValue retryDelayStateSync; @@ -101,14 +84,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable { this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); - - this.concurrentStreams = INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.get(settings); - this.concurrentStreamPool = EsExecutors.newScaling("recovery_stream", 0, concurrentStreams, 60, TimeUnit.SECONDS, - EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); - this.concurrentSmallFileStreams = INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.get(settings); - this.concurrentSmallFileStreamPool = EsExecutors.newScaling("small_file_recovery_stream", 0, concurrentSmallFileStreams, 60, - TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]")); - this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); if (maxBytesPerSec.bytes() <= 0) { rateLimiter = null; @@ -116,11 +91,9 @@ public class RecoverySettings extends AbstractComponent implements Closeable { rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac()); } - logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}]", - maxBytesPerSec, concurrentStreams); - clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING, this::setConcurrentStreams); - clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING, this::setConcurrentSmallFileStreams); + logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork); @@ -129,20 +102,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable { clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout); } - @Override - public void close() { - ThreadPool.terminate(concurrentStreamPool, 1, TimeUnit.SECONDS); - ThreadPool.terminate(concurrentSmallFileStreamPool, 1, TimeUnit.SECONDS); - } - - public ThreadPoolExecutor concurrentStreamPool() { - return concurrentStreamPool; - } - - public ThreadPoolExecutor concurrentSmallFileStreamPool() { - return concurrentSmallFileStreamPool; - } - public RateLimiter rateLimiter() { return rateLimiter; } @@ -176,10 +135,6 @@ public class RecoverySettings extends AbstractComponent implements Closeable { this.chunkSize = chunkSize; } - private void setConcurrentStreams(int concurrentStreams) { - this.concurrentStreams = concurrentStreams; - concurrentStreamPool.setMaximumPoolSize(concurrentStreams); - } public void setRetryDelayStateSync(TimeValue retryDelayStateSync) { this.retryDelayStateSync = retryDelayStateSync; @@ -211,9 +166,4 @@ public class RecoverySettings extends AbstractComponent implements Closeable { rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac()); } } - - private void setConcurrentSmallFileStreams(int concurrentSmallFileStreams) { - this.concurrentSmallFileStreams = concurrentSmallFileStreams; - concurrentSmallFileStreamPool.setMaximumPoolSize(concurrentSmallFileStreams); - } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 1410f499078..10ab9956df4 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -58,9 +58,6 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.StreamSupport; @@ -591,100 +588,38 @@ public class RecoverySourceHandler { void sendFiles(Store store, StoreFileMetaData[] files, Function outputStreamFactory) throws Throwable { store.incRef(); try { - Future[] runners = asyncSendFiles(store, files, outputStreamFactory); - IOException corruptedEngine = null; - final List exceptions = new ArrayList<>(); - for (int i = 0; i < runners.length; i++) { - StoreFileMetaData md = files[i]; - try { - runners[i].get(); - } catch (ExecutionException t) { - corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t.getCause()); - } catch (InterruptedException t) { - corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t); + ArrayUtil.timSort(files, (a,b) -> Long.compare(a.length(), b.length())); // send smallest first + for (int i = 0; i < files.length; i++) { + final StoreFileMetaData md = files[i]; + try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) { + // it's fine that we are only having the indexInput int he try/with block. The copy methods handles + // exceptions during close correctly and doesn't hide the original exception. + Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md)); + } catch (Throwable t) { + final IOException corruptIndexException; + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(t)) != null) { + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); + failEngine(corruptIndexException); + throw corruptIndexException; + } else { // corruption has happened on the way to replica + RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); + exception.addSuppressed(t); + logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK", + corruptIndexException, shardId, request.targetNode(), md); + throw exception; + } + } else { + throw t; + } } } - if (corruptedEngine != null) { - failEngine(corruptedEngine); - throw corruptedEngine; - } else { - ExceptionsHelper.rethrowAndSuppress(exceptions); - } } finally { store.decRef(); } } - private IOException handleExecutionException(Store store, IOException corruptedEngine, List exceptions, StoreFileMetaData md, Throwable t) { - logger.debug("Failed to transfer file [" + md + "] on recovery"); - final IOException corruptIndexException; - final boolean checkIntegrity = corruptedEngine == null; - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(t)) != null) { - if (checkIntegrity && store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); - corruptedEngine = corruptIndexException; - } else { // corruption has happened on the way to replica - RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); - exception.addSuppressed(t); - if (checkIntegrity) { - logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK", - corruptIndexException, shardId, request.targetNode(), md); - } else { - logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum are skipped", - corruptIndexException, shardId, request.targetNode(), md); - } - exceptions.add(exception); - - } - } else { - exceptions.add(t); - } - return corruptedEngine; - } - protected void failEngine(IOException cause) { shard.failShard("recovery", cause); } - - Future[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function outputStreamFactory) { - store.incRef(); - try { - final Future[] futures = new Future[files.length]; - for (int i = 0; i < files.length; i++) { - final StoreFileMetaData md = files[i]; - long fileSize = md.length(); - - // Files are split into two categories, files that are "small" - // (under 5mb) and other files. Small files are transferred - // using a separate thread pool dedicated to small files. - // - // The idea behind this is that while we are transferring an - // older, large index, a user may create a new index, but that - // index will not be able to recover until the large index - // finishes, by using two different thread pools we can allow - // tiny files (like segments for a brand new index) to be - // recovered while ongoing large segment recoveries are - // happening. It also allows these pools to be configured - // separately. - ThreadPoolExecutor pool; - if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) { - pool = recoverySettings.concurrentStreamPool(); - } else { - pool = recoverySettings.concurrentSmallFileStreamPool(); - } - Future future = pool.submit(() -> { - try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) { - // it's fine that we are only having the indexInput int he try/with block. The copy methods handles - // exceptions during close correctly and doesn't hide the original exception. - Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md)); - } - return null; - }); - futures[i] = future; - } - return futures; - } finally { - store.decRef(); - } - } } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index c964e79587e..a9651eace33 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -322,7 +322,6 @@ public class Node implements Releasable { for (Class plugin : pluginsService.nodeServices()) { injector.getInstance(plugin).stop(); } - injector.getInstance(RecoverySettings.class).close(); // we should stop this last since it waits for resources to get released // if we had scroll searchers etc or recovery going on we wait for to finish. injector.getInstance(IndicesService.class).stop(); diff --git a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java index b0b5e9fd517..7011b4092e4 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.admin.indices.upgrade.UpgradeIT; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; @@ -117,7 +118,8 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase { public Settings nodeSettings(int ord) { return Settings.builder() .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) // disable merging so no segments will be upgraded - .put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), 30) // increase recovery speed for small files + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 30) // speed up recoveries + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 30) .build(); } diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java index c5e48a97dfd..03cfbf2b307 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java @@ -50,7 +50,8 @@ public class AckClusterUpdateSettingsIT extends ESIntegTestCase { .put(super.nodeSettings(nodeOrdinal)) //make sure that enough concurrent reroutes can happen at the same time //we have a minimum of 2 nodes, and a maximum of 10 shards, thus 5 should be enough - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 5) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 5) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 5) .put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), 10) .build(); } diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java index 4298b27fa3d..6b406a3bfdf 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java @@ -162,7 +162,8 @@ public class ClusterRerouteIT extends ESIntegTestCase { public void testDelayWithALargeAmountOfShards() throws Exception { Settings commonSettings = settingsBuilder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, 1) .build(); logger.info("--> starting 4 nodes"); String node_1 = internalCluster().startNode(commonSettings); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java b/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java index b451183826b..72ecc171eed 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java @@ -32,7 +32,7 @@ public final class RandomShardRoutingMutator { } public static void randomChange(ShardRouting shardRouting, String[] nodes) { - switch (randomInt(3)) { + switch (randomInt(2)) { case 0: if (shardRouting.unassigned() == false) { shardRouting.moveToUnassigned(new UnassignedInfo(randomReason(), randomAsciiOfLength(10))); @@ -46,13 +46,6 @@ public final class RandomShardRoutingMutator { } break; case 2: - if (shardRouting.primary()) { - shardRouting.moveFromPrimary(); - } else { - shardRouting.moveToPrimary(); - } - break; - case 3: if (shardRouting.initializing()) { shardRouting.moveToStarted(); } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java index d69264a1e3a..fcdef700b98 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java @@ -50,7 +50,7 @@ public class RoutingTableTests extends ESAllocationTestCase { private int totalNumberOfShards; private final static Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); private final AllocationService ALLOCATION_SERVICE = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .build()); private ClusterState clusterState; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java index 1cf5ba0083d..4c4fa72a6ec 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java @@ -54,7 +54,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase { private final ESLogger logger = Loggers.getLogger(AllocationCommandsTests.class); public void testMoveShardCommand() { - AllocationService allocation = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService allocation = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("creating an index with 1 shard, no replica"); MetaData metaData = MetaData.builder() diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java index 8d510e7f0c5..52aad66776e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java @@ -38,9 +38,10 @@ public class AllocationPriorityTests extends ESAllocationTestCase { */ public void testPrioritizedIndicesAllocatedFirst() { AllocationService allocation = createAllocationService(settingsBuilder(). - put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1) + put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1).build()); + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1).build()); final String highPriorityName; final String lowPriorityName; final int priorityFirst; @@ -84,7 +85,7 @@ public class AllocationPriorityTests extends ESAllocationTestCase { routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).toString(),2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java index e9d0f75b1c1..eb94b6de109 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java @@ -54,7 +54,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase { public void testMoveShardOnceNewNodeWithAttributeAdded1() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); @@ -386,7 +386,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase { public void testMoveShardOnceNewNodeWithAttributeAdded5() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); @@ -464,7 +464,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase { public void testMoveShardOnceNewNodeWithAttributeAdded6() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); @@ -544,7 +544,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase { public void testFullAwareness1() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2") .put("cluster.routing.allocation.awareness.attributes", "rack_id") @@ -611,7 +611,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase { public void testFullAwareness2() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2") .put("cluster.routing.allocation.awareness.attributes", "rack_id") @@ -827,7 +827,7 @@ public class AwarenessAllocationTests extends ESAllocationTestCase { public void testUnassignedShardsWithUnbalancedZones() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "zone") .build()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index 627febdbc11..08cbdc09fe0 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.EmptyClusterInfoService; @@ -358,7 +359,9 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { public boolean allocateUnassigned(RoutingAllocation allocation) { RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); boolean changed = !unassigned.isEmpty(); - for (ShardRouting sr : unassigned.drain()) { + ShardRouting[] drain = unassigned.drain(); + ArrayUtil.timSort(drain, (a, b) -> { return a.primary() ? -1 : 1; }); // we have to allocate primaries first + for (ShardRouting sr : drain) { switch (sr.id()) { case 0: if (sr.primary()) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index 34d78ae3099..886462610ca 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -43,7 +43,7 @@ public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase { public void testClusterConcurrentRebalance() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) .build()); @@ -145,4 +145,4 @@ public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase { assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10)); assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(0)); } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DeadNodesAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DeadNodesAllocationTests.java index d807dc1b5ca..cb09fb93b60 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DeadNodesAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DeadNodesAllocationTests.java @@ -45,7 +45,7 @@ public class DeadNodesAllocationTests extends ESAllocationTestCase { public void testSimpleDeadNodeOnStartedPrimaryShard() { AllocationService allocation = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); @@ -96,7 +96,7 @@ public class DeadNodesAllocationTests extends ESAllocationTestCase { public void testDeadNodeWhileRelocatingOnToNode() { AllocationService allocation = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); @@ -170,7 +170,7 @@ public class DeadNodesAllocationTests extends ESAllocationTestCase { public void testDeadNodeWhileRelocatingOnFromNode() { AllocationService allocation = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java index e7c956c4ccd..fc686f0bb5a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java @@ -43,7 +43,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests extends ESAllocationTest private final ESLogger logger = Loggers.getLogger(ElectReplicaAsPrimaryDuringRelocationTests.class); public void testElectReplicaAsPrimaryDuringRelocation() { - AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 8dffacaa379..b8ab9c13590 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -56,7 +56,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { public void testFailedShardPrimaryRelocatingToAndFrom() { AllocationService allocation = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); @@ -144,7 +144,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { public void testFailPrimaryStartedCheckReplicaElected() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); @@ -225,7 +225,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { public void testFirstAllocationFailureSingleNode() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); @@ -281,7 +281,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { public void testSingleShardMultipleAllocationFailures() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); @@ -337,7 +337,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { public void testFirstAllocationFailureTwoNodes() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); @@ -397,7 +397,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { public void testRebalanceFailure() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index a8d015a0d49..2b0c7ef6bda 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -57,7 +57,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { public void testDoNotAllocateFromPrimary() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); @@ -171,7 +171,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { public void testRandom() { AllocationService service = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); @@ -220,7 +220,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { public void testRollingRestart() { AllocationService service = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferPrimaryAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferPrimaryAllocationTests.java index 0ac98d4f92b..d4beb7190e3 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferPrimaryAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferPrimaryAllocationTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.ESAllocationTestCase; @@ -42,6 +43,7 @@ public class PreferPrimaryAllocationTests extends ESAllocationTestCase { logger.info("create an allocation with 1 initial recoveries"); AllocationService strategy = createAllocationService(settingsBuilder() .put("cluster.routing.allocation.node_concurrent_recoveries", 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 1) .build()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java index e994c885629..7e59ab8a6b4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -43,7 +43,7 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { private final ESLogger logger = Loggers.getLogger(PrimaryElectionRoutingTests.class); public void testBackupElectionToPrimaryWhenPrimaryCanBeAllocatedToAnotherNode() { - AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); @@ -93,7 +93,7 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { } public void testRemovingInitializingReplicasIfPrimariesFails() { - AllocationService allocation = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService allocation = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java index 12ff9fd3f7d..371624484ff 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java @@ -44,6 +44,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests extends ESAllocationTes public void testPrimaryNotRelocatedWhileBeingRecoveredFrom() { AllocationService strategy = createAllocationService(settingsBuilder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.concurrent_source_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .build()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java index 4d5f4d07ea1..abc561a0916 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESAllocationTestCase; @@ -57,7 +58,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase { public void testRandomDecisions() { RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom()); AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(Settings.EMPTY, - new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), + new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY), randomAllocationDecider))), new ShardsAllocators(NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); int indices = scaledRandomIntBetween(1, 20); Builder metaBuilder = MetaData.builder(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index 18725a0de78..4672f339c70 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -56,7 +56,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase { } AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java index 0d33b5ecd46..1b8bea26dbe 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java @@ -45,7 +45,7 @@ public class ReplicaAllocatedAfterPrimaryTests extends ESAllocationTestCase { private final ESLogger logger = Loggers.getLogger(ReplicaAllocatedAfterPrimaryTests.class); public void testBackupIsAllocatedAfterPrimary() { - AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java index eec1b48be97..9a4e56a26b2 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.ESAllocationTestCase; @@ -211,6 +212,7 @@ public class RoutingNodesIntegrityTests extends ESAllocationTestCase { AllocationService strategy = createAllocationService(settingsBuilder() .put("cluster.routing.allocation.node_concurrent_recoveries", 1) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsLimitAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsLimitAllocationTests.java index c0f0c0c2252..dd3f3f373ff 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsLimitAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsLimitAllocationTests.java @@ -46,7 +46,7 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase { private final ESLogger logger = Loggers.getLogger(ShardsLimitAllocationTests.class); public void testIndexLevelShardsLimitAllocate() { - AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); @@ -89,7 +89,7 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase { public void testClusterLevelShardsLimitAllocate() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1) .build()); @@ -125,7 +125,7 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase { // Bump the cluster total shards to 2 strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2) .build()); @@ -147,7 +147,7 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase { public void testIndexLevelShardsLimitRemain() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.balance.index", 0.0f) diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index 29ef451324d..bf41ad8a053 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -59,7 +59,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase { private final ESLogger logger = Loggers.getLogger(SingleShardNoReplicasRoutingTests.class); public void testSingleIndexStartedShard() { - AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); @@ -160,7 +160,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase { } public void testSingleIndexShardFailed() { - AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); @@ -210,7 +210,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase { public void testMultiIndexEvenDistribution() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); @@ -322,7 +322,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase { public void testMultiIndexUnevenNodes() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); @@ -413,4 +413,4 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase { assertThat(routingNode.numberOfShardsWithState(STARTED), equalTo(2)); } } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java index ff442852017..f7033ec2596 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java @@ -44,7 +44,7 @@ public class SingleShardOneReplicaRoutingTests extends ESAllocationTestCase { private final ESLogger logger = Loggers.getLogger(SingleShardOneReplicaRoutingTests.class); public void testSingleIndexFirstStartPrimaryThenBackups() { - AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java index 28033915abe..0712e9cd02a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java @@ -51,9 +51,9 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase { .nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))) .metaData(MetaData.builder().put(indexMetaData, false)); - final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); - final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1); - final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1); + final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", true, ShardRoutingState.INITIALIZING, 1); + final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", true, ShardRoutingState.STARTED, 1); + final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", true, ShardRoutingState.RELOCATING, 1); stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test") .addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId()).addShard(initShard).build()) .addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId()).addShard(startedShard).build()) diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java index 223da88192b..1d60436d3c7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -25,11 +25,16 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESAllocationTestCase; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -103,7 +108,8 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { public void testReplicaAndPrimaryRecoveryThrottling() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 3) + .put("cluster.routing.allocation.node_concurrent_recoveries", 3) + .put("cluster.routing.allocation.concurrent_source_recoveries", 3) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) .build()); @@ -169,4 +175,157 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0)); assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); } + + public void testThrottleIncomingAndOutgoing() { + Settings settings = settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 5) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 5) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 5) + .build(); + AllocationService strategy = createAllocationService(settings); + logger.info("Building initial routing table"); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(9).numberOfReplicas(0)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + logger.info("start one node, do reroute, only 5 should initialize"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build(); + routingTable = strategy.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(5)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(4)); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 5); + + logger.info("start initializing, all primaries should be started"); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(4)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); + + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + logger.info("start another 2 nodes, 5 shards should be relocating - at most 5 are allowed per node"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2")).put(newNode("node3"))).build(); + routingTable = strategy.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(4)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(5)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 3); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 2); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 5); + + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + logger.info("start the relocating shards, one more shard should relocate away from node1"); + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(8)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(1)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 0); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 1); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1); + } + + public void testOutgoingThrottlesAllocaiton() { + Settings settings = settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 1) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 1) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 1) + .build(); + AllocationService strategy = createAllocationService(settings); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(3).numberOfReplicas(0)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build(); + routingTable = strategy.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 1); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 1); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0); + + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 0); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0); + + RoutingAllocation.Result reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(clusterState.getRoutingNodes().node("node1").get(0).shardId(), "node1", "node2"))); + assertEquals(reroute.explanations().explanations().size(), 1); + assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.YES); + routingTable = reroute.routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0); + + // outgoing throttles + reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(clusterState.getRoutingNodes().node("node3").get(0).shardId(), "node3", "node1")), true); + assertEquals(reroute.explanations().explanations().size(), 1); + assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.THROTTLE); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0); + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(2)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(1)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); + + // incoming throttles + reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(clusterState.getRoutingNodes().node("node3").get(0).shardId(), "node3", "node2")), true); + assertEquals(reroute.explanations().explanations().size(), 1); + assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.THROTTLE); + + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0); + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(2)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(1)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); + + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java index 7fa27e7050c..5ff5af4e4cd 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java @@ -46,7 +46,7 @@ public class UpdateNumberOfReplicasTests extends ESAllocationTestCase { private final ESLogger logger = Loggers.getLogger(UpdateNumberOfReplicasTests.class); public void testUpdateNumberOfReplicas() { - AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = createAllocationService(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index e319d4156ac..fa52503eac5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -107,7 +107,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { } }; AllocationService strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -192,7 +192,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { new DiskThresholdDecider(diskSettings)))); strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -223,7 +223,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { new DiskThresholdDecider(diskSettings)))); strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -303,7 +303,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { }; AllocationService strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -360,7 +360,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { } }; strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -427,7 +427,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { new DiskThresholdDecider(diskSettings)))); strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -458,7 +458,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { new DiskThresholdDecider(diskSettings)))); strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -567,7 +567,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { }; AllocationService strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -635,7 +635,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { }; AllocationService strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -738,7 +738,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { }; AllocationService strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -900,7 +900,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { new SameShardAllocationDecider(Settings.EMPTY), diskThresholdDecider ))); AllocationService strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); @@ -1000,8 +1000,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ))); AllocationService strategy = new AllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), deciders, makeShardsAllocators(), cis); RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute"); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationTests.java index f8be6a8c4da..b2559c29ed2 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationTests.java @@ -159,6 +159,7 @@ public class EnableAllocationTests extends ESAllocationTestCase { Settings build = settingsBuilder() .put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), useClusterSetting ? Rebalance.NONE: RandomPicks.randomFrom(getRandom(), Rebalance.values())) // index settings override cluster settings .put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), 3) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10) .build(); ClusterSettings clusterSettings = new ClusterSettings(build, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); AllocationService strategy = createAllocationService(build, clusterSettings, getRandom()); diff --git a/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java b/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java index 6094d49234c..ced1e0097a1 100644 --- a/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java @@ -224,7 +224,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase { public void testAttributePreferenceRouting() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "rack_id,zone") .build()); @@ -279,7 +279,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase { public void testNodeSelectorRouting(){ AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .build()); @@ -336,7 +336,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase { public void testShardsAndPreferNodeRouting() { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .build()); MetaData metaData = MetaData.builder() @@ -397,7 +397,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase { public void testReplicaShardPreferenceIters() throws Exception { AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .build()); OperationRouting operationRouting = new OperationRouting(Settings.Builder.EMPTY_SETTINGS, new AwarenessAllocationDecider()); @@ -479,4 +479,4 @@ public class RoutingIteratorTests extends ESAllocationTestCase { assertTrue(routing.primary()); } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java b/core/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java index 2c6a55da242..0de220a8fa3 100644 --- a/core/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java @@ -56,7 +56,7 @@ public class GatewayMetaStateTests extends ESAllocationTestCase { ClusterChangedEvent generateEvent(boolean initializing, boolean versionChanged, boolean masterEligible) { //ridiculous settings to make sure we don't run into uninitialized because fo default AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 100) + .put("cluster.routing.allocation.node_concurrent_recoveries", 100) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) @@ -110,7 +110,7 @@ public class GatewayMetaStateTests extends ESAllocationTestCase { ClusterChangedEvent generateCloseEvent(boolean masterEligible) { //ridiculous settings to make sure we don't run into uninitialized because fo default AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 100) + .put("cluster.routing.allocation.node_concurrent_recoveries", 100) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 1dfab4f62d3..f0650a1cbda 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -319,14 +319,13 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { assertThat(state.metaData().index("test").getAliases().get("test_alias").filter(), notNullValue()); } - @TestLogging("gateway:TRACE,indices.recovery:TRACE,index.engine:TRACE") public void testReusePeerRecovery() throws Exception { final Settings settings = settingsBuilder() .put("action.admin.cluster.node.shutdown.delay", "10ms") .put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) .put("gateway.recover_after_nodes", 4) - - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 4) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, 4) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, 4) .put(MockFSDirectoryService.CRASH_INDEX, false).build(); internalCluster().startNodesAsync(4, settings).get(); diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index 496bdc21b15..1dfe8514502 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -114,9 +114,8 @@ public class CorruptedFileIT extends ESIntegTestCase { // and we need to make sure primaries are not just trashed if we don't have replicas .put(super.nodeSettings(nodeOrdinal)) // speed up recoveries - .put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.getKey(), 10) - .put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), 10) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 5) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 5) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 5) .build(); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 707fbe0e02e..a64b8606aea 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -108,7 +108,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { assertEquals(0, recoveryDiff.missing.size()); IndexReader reader = DirectoryReader.open(targetStore.directory()); assertEquals(numDocs, reader.maxDoc()); - IOUtils.close(reader, writer, store, targetStore, recoverySettings); + IOUtils.close(reader, writer, store, targetStore); } public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { @@ -170,7 +170,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { assertNotNull(ExceptionsHelper.unwrapCorruption(ex)); } assertTrue(failedEngine.get()); - IOUtils.close(store, targetStore, recoverySettings); + IOUtils.close(store, targetStore); } @@ -231,7 +231,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { fail("not expected here"); } assertFalse(failedEngine.get()); - IOUtils.close(store, targetStore, recoverySettings); + IOUtils.close(store, targetStore); } private Store newStore(Path path) throws IOException { diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java b/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java index d7e13be312f..26c22fc3bb0 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java @@ -32,18 +32,6 @@ public class RecoverySettingsTests extends ESSingleNodeTestCase { } public void testAllSettingsAreDynamicallyUpdatable() { - innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.getKey(), randomIntBetween(1, 200), new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.concurrentStreamPool().getMaximumPoolSize()); - } - }); - innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), randomIntBetween(1, 200), new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.concurrentSmallFileStreamPool().getMaximumPoolSize()); - } - }); innerTestSettings(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0, new Validator() { @Override public void validate(RecoverySettings recoverySettings, int expectedValue) { diff --git a/docs/reference/migration/migrate_3_0.asciidoc b/docs/reference/migration/migrate_3_0.asciidoc index d53740c198b..234fd693498 100644 --- a/docs/reference/migration/migrate_3_0.asciidoc +++ b/docs/reference/migration/migrate_3_0.asciidoc @@ -200,6 +200,11 @@ If you are using any of these settings please take the time and review their pur _expert settings_ and should only be used if absolutely necessary. If you have set any of the above setting as persistent cluster settings please use the settings update API and set their superseded keys accordingly. +The following settings have been removed without replacement + + * `indices.recovery.concurrent_small_file_streams` - recoveries are now single threaded. The number of concurrent outgoing recoveries are throttled via allocation deciders + * `indices.recovery.concurrent_file_streams` - recoveries are now single threaded. The number of concurrent outgoing recoveries are throttled via allocation deciders + ==== Translog settings The `index.translog.flush_threshold_ops` setting is not supported anymore. In order to control flushes based on the transaction log @@ -211,6 +216,14 @@ anymore, the `buffered` implementation is now the only available option and uses The deprecated settings `index.cache.query.enable` and `indices.cache.query.size` have been removed and are replaced with `index.requests.cache.enable` and `indices.requests.cache.size` respectively. +==== Allocation settings + +Allocation settings deprecated in 1.x have been removed: + + * `cluster.routing.allocation.concurrent_recoveries` is superseded by `cluster.routing.allocation.node_concurrent_recoveries` + +Please change the setting in your configuration files or in the clusterstate to use the new settings instead. + [[breaking_30_mapping_changes]] === Mapping changes diff --git a/docs/reference/modules/cluster/shards_allocation.asciidoc b/docs/reference/modules/cluster/shards_allocation.asciidoc index b8073927a0f..b650e237629 100644 --- a/docs/reference/modules/cluster/shards_allocation.asciidoc +++ b/docs/reference/modules/cluster/shards_allocation.asciidoc @@ -27,10 +27,15 @@ one of the active allocation ids in the cluster state. -- -`cluster.routing.allocation.node_concurrent_recoveries`:: +`cluster.routing.allocation.node_concurrent_incoming_recoveries`:: - How many concurrent shard recoveries are allowed to happen on a node. - Defaults to `2`. + How many concurrent incoming shard recoveries are allowed to happen on a node. Incoming recoveries are the recoveries + where the target shard (most likely the replica unless a shard is relocating) is allocated on the node. Defaults to `2`. + +`cluster.routing.allocation.node_concurrent_outgoing_recoveries`:: + + How many concurrent outgoing shard recoveries are allowed to happen on a node. Outgoing recoveries are the recoveries + where the source shard (most likely the primary unless a shard is relocating) is allocated on the node. Defaults to `2`. `cluster.routing.allocation.node_initial_primaries_recoveries`:: @@ -47,17 +52,6 @@ one of the active allocation ids in the cluster state. Defaults to `false`, meaning that no check is performed by default. This setting only applies if multiple nodes are started on the same machine. -`indices.recovery.concurrent_streams`:: - - The number of network streams to open per node to recover a shard from - a peer shard. Defaults to `3`. - -`indices.recovery.concurrent_small_file_streams`:: - - The number of streams to open per node for small files (under 5mb) to - recover a shard from a peer shard. Defaults to `2`. - - [float] === Shard Rebalancing Settings diff --git a/docs/reference/modules/indices/recovery.asciidoc b/docs/reference/modules/indices/recovery.asciidoc index cd21f135e38..8de3309347c 100644 --- a/docs/reference/modules/indices/recovery.asciidoc +++ b/docs/reference/modules/indices/recovery.asciidoc @@ -3,12 +3,6 @@ The following _expert_ settings can be set to manage the recovery policy. -`indices.recovery.concurrent_streams`:: - Defaults to `3`. - -`indices.recovery.concurrent_small_file_streams`:: - Defaults to `2`. - `indices.recovery.file_chunk_size`:: Defaults to `512kb`. diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 5ab862e3e4d..ea2796aad84 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -306,13 +306,11 @@ public final class InternalTestCluster extends TestCluster { builder.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b"); builder.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b"); if (TEST_NIGHTLY) { - builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.getKey(), RandomInts.randomIntBetween(random, 10, 15)); - builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), RandomInts.randomIntBetween(random, 10, 15)); - builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10)); + builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10)); + builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10)); } else if (random.nextInt(100) <= 90) { - builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS_SETTING.getKey(), RandomInts.randomIntBetween(random, 3, 6)); - builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS_SETTING.getKey(), RandomInts.randomIntBetween(random, 3, 6)); - builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5)); + builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5)); + builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5)); } // always reduce this - it can make tests really slow builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50)));