From d2507c4ac0477f28b441f83eeac07786fb8f42b4 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 13 Aug 2015 15:14:56 +0200 Subject: [PATCH 1/2] Add `expectedShardSize` to ShardRouting and use it in path.data allocation Today we only guess how big the shard will be that we are allocating on a node. Yet, we have this information on the master but it's not available on the data nodes when we pick a data path for the shard. We use some rather simple heuristic based on existing shard sizes on this node which might be complete bogus. This change adds the expected shard size to the ShardRouting for RELOCATING and INITIALIZING shards to be used on the actual node to find the best data path for the shard. Closes #11271 --- .../elasticsearch/cluster/ClusterInfo.java | 7 +- .../cluster/InternalClusterInfoService.java | 3 + .../cluster/routing/RoutingNodes.java | 19 +- .../cluster/routing/ShardRouting.java | 52 ++++- .../allocator/BalancedShardsAllocator.java | 8 +- .../command/AllocateAllocationCommand.java | 2 +- .../command/MoveAllocationCommand.java | 2 +- .../common/settings/Settings.java | 1 - .../gateway/PrimaryShardAllocator.java | 4 +- .../gateway/ReplicaShardAllocator.java | 2 +- .../org/elasticsearch/index/IndexService.java | 6 +- .../cluster/IndicesClusterStateService.java | 2 +- .../allocation/SimpleAllocationIT.java | 4 + .../cluster/routing/AllocationIdTests.java | 14 +- .../routing/RandomShardRoutingMutator.java | 2 +- .../cluster/routing/ShardRoutingHelper.java | 12 +- .../cluster/routing/ShardRoutingTests.java | 45 ++++- .../cluster/routing/TestShardRouting.java | 10 +- .../cluster/routing/UnassignedInfoTests.java | 2 +- .../allocation/BalanceConfigurationTests.java | 20 +- .../ExpectedShardSizeAllocationTests.java | 179 ++++++++++++++++++ .../allocation/RebalanceAfterActiveTests.java | 58 +++++- .../gateway/ReplicaShardAllocatorTests.java | 5 +- .../index/shard/IndexShardTests.java | 17 ++ .../test/ESAllocationTestCase.java | 10 +- 25 files changed, 412 insertions(+), 74 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 5e2d35a9818..f21bc9f052e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -30,7 +30,7 @@ import java.util.Map; * InternalClusterInfoService.shardIdentifierFromRouting(String) * for the key used in the shardSizes map */ -public final class ClusterInfo { +public class ClusterInfo { private final Map usages; final Map shardSizes; @@ -54,6 +54,11 @@ public final class ClusterInfo { return shardSizes.get(shardIdentifierFromRouting(shardRouting)); } + public long getShardSize(ShardRouting shardRouting, long defaultValue) { + Long shardSize = getShardSize(shardRouting); + return shardSize == null ? defaultValue : shardSize; + } + /** * Method that incorporates the ShardId for the shard into a string that * includes a 'p' or 'r' depending on whether the shard is a primary. diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 567812e7d58..8be9465f7b2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -36,6 +37,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.node.settings.NodeSettingsService; @@ -45,6 +47,7 @@ import org.elasticsearch.transport.ReceiveTimeoutTransportException; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * InternalClusterInfoService provides the ClusterInfoService interface, diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 7acedd2b98b..8c07429776a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -345,10 +345,10 @@ public class RoutingNodes implements Iterable { /** * Moves a shard from unassigned to initialize state */ - public void initialize(ShardRouting shard, String nodeId) { + public void initialize(ShardRouting shard, String nodeId, long expectedSize) { ensureMutable(); assert shard.unassigned() : shard; - shard.initialize(nodeId); + shard.initialize(nodeId, expectedSize); node(nodeId).add(shard); inactiveShardCount++; if (shard.primary()) { @@ -362,10 +362,10 @@ public class RoutingNodes implements Iterable { * shard as well as assigning it. And returning the target initializing * shard. */ - public ShardRouting relocate(ShardRouting shard, String nodeId) { + public ShardRouting relocate(ShardRouting shard, String nodeId, long expectedShardSize) { ensureMutable(); relocatingShards++; - shard.relocate(nodeId); + shard.relocate(nodeId, expectedShardSize); ShardRouting target = shard.buildTargetRelocatingShard(); node(target.currentNodeId()).add(target); assignedShardsAdd(target); @@ -608,16 +608,9 @@ public class RoutingNodes implements Iterable { /** * Initializes the current unassigned shard and moves it from the unassigned list. */ - public void initialize(String nodeId) { - initialize(nodeId, current.version()); - } - - /** - * Initializes the current unassigned shard and moves it from the unassigned list. - */ - public void initialize(String nodeId, long version) { + public void initialize(String nodeId, long version, long expectedShardSize) { innerRemove(); - nodes.initialize(new ShardRouting(current, version), nodeId); + nodes.initialize(new ShardRouting(current, version), nodeId, expectedShardSize); } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 9c0af60a16f..60764ab6d3b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -37,6 +37,11 @@ import java.util.List; */ public final class ShardRouting implements Streamable, ToXContent { + /** + * Used if shard size is not available + */ + public static final long UNAVAILABLE_EXPECTED_SHARD_SIZE = -1; + private String index; private int shardId; private String currentNodeId; @@ -50,6 +55,7 @@ public final class ShardRouting implements Streamable, ToXContent { private final transient List asList; private transient ShardId shardIdentifier; private boolean frozen = false; + private long expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE; private ShardRouting() { this.asList = Collections.singletonList(this); @@ -60,7 +66,7 @@ public final class ShardRouting implements Streamable, ToXContent { } public ShardRouting(ShardRouting copy, long version) { - this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true); + this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true, copy.getExpectedShardSize()); } /** @@ -69,7 +75,7 @@ public final class ShardRouting implements Streamable, ToXContent { */ ShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version, - UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal) { + UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal, long expectedShardSize) { this.index = index; this.shardId = shardId; this.currentNodeId = currentNodeId; @@ -81,6 +87,9 @@ public final class ShardRouting implements Streamable, ToXContent { this.restoreSource = restoreSource; this.unassignedInfo = unassignedInfo; this.allocationId = allocationId; + this.expectedShardSize = expectedShardSize; + assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE || state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state; + assert expectedShardSize >= 0 || state != ShardRoutingState.INITIALIZING || state != ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state; assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta"; if (!internal) { assert state == ShardRoutingState.UNASSIGNED; @@ -88,13 +97,14 @@ public final class ShardRouting implements Streamable, ToXContent { assert relocatingNodeId == null; assert allocationId == null; } + } /** * Creates a new unassigned shard. */ public static ShardRouting newUnassigned(String index, int shardId, RestoreSource restoreSource, boolean primary, UnassignedInfo unassignedInfo) { - return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true); + return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true, UNAVAILABLE_EXPECTED_SHARD_SIZE); } /** @@ -205,7 +215,7 @@ public final class ShardRouting implements Streamable, ToXContent { public ShardRouting buildTargetRelocatingShard() { assert relocating(); return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo, - AllocationId.newTargetRelocation(allocationId), true); + AllocationId.newTargetRelocation(allocationId), true, expectedShardSize); } /** @@ -317,6 +327,11 @@ public final class ShardRouting implements Streamable, ToXContent { if (in.readBoolean()) { allocationId = new AllocationId(in); } + if (relocating() || initializing()) { + expectedShardSize = in.readLong(); + } else { + expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE; + } freeze(); } @@ -368,6 +383,10 @@ public final class ShardRouting implements Streamable, ToXContent { } else { out.writeBoolean(false); } + if (relocating() || initializing()) { + out.writeLong(expectedShardSize); + } + } @Override @@ -397,12 +416,13 @@ public final class ShardRouting implements Streamable, ToXContent { relocatingNodeId = null; this.unassignedInfo = unassignedInfo; allocationId = null; + expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE; } /** * Initializes an unassigned shard on a node. */ - void initialize(String nodeId) { + void initialize(String nodeId, long expectedShardSize) { ensureNotFrozen(); version++; assert state == ShardRoutingState.UNASSIGNED : this; @@ -410,6 +430,7 @@ public final class ShardRouting implements Streamable, ToXContent { state = ShardRoutingState.INITIALIZING; currentNodeId = nodeId; allocationId = AllocationId.newInitializing(); + this.expectedShardSize = expectedShardSize; } /** @@ -417,13 +438,14 @@ public final class ShardRouting implements Streamable, ToXContent { * * @param relocatingNodeId id of the node to relocate the shard */ - void relocate(String relocatingNodeId) { + void relocate(String relocatingNodeId, long expectedShardSize) { ensureNotFrozen(); version++; assert state == ShardRoutingState.STARTED : "current shard has to be started in order to be relocated " + this; state = ShardRoutingState.RELOCATING; this.relocatingNodeId = relocatingNodeId; this.allocationId = AllocationId.newRelocation(allocationId); + this.expectedShardSize = expectedShardSize; } /** @@ -436,7 +458,7 @@ public final class ShardRouting implements Streamable, ToXContent { assert state == ShardRoutingState.RELOCATING : this; assert assignedToNode() : this; assert relocatingNodeId != null : this; - + expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE; state = ShardRoutingState.STARTED; relocatingNodeId = null; allocationId = AllocationId.cancelRelocation(allocationId); @@ -470,6 +492,7 @@ public final class ShardRouting implements Streamable, ToXContent { // relocation target allocationId = AllocationId.finishRelocation(allocationId); } + expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE; state = ShardRoutingState.STARTED; } @@ -669,6 +692,9 @@ public final class ShardRouting implements Streamable, ToXContent { if (this.unassignedInfo != null) { sb.append(", ").append(unassignedInfo.toString()); } + if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) { + sb.append(", expected_shard_size[").append(expectedShardSize).append("]"); + } return sb.toString(); } @@ -682,7 +708,9 @@ public final class ShardRouting implements Streamable, ToXContent { .field("shard", shardId().id()) .field("index", shardId().index().name()) .field("version", version); - + if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE){ + builder.field("expected_shard_size_in_bytes", expectedShardSize); + } if (restoreSource() != null) { builder.field("restore_source"); restoreSource().toXContent(builder, params); @@ -709,4 +737,12 @@ public final class ShardRouting implements Streamable, ToXContent { boolean isFrozen() { return frozen; } + + /** + * Returns the expected shard size for {@link ShardRoutingState#RELOCATING} and {@link ShardRoutingState#INITIALIZING} + * shards. If it's size is not available {@value #UNAVAILABLE_EXPECTED_SHARD_SIZE} will be returned. + */ + public long getExpectedShardSize() { + return expectedShardSize; + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 9c42256de95..4d56659c276 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -507,7 +507,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards Decision decision = allocation.deciders().canAllocate(shard, target, allocation); if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too? sourceNode.removeShard(shard); - ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId()); + ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); currentNode.addShard(targetRelocatingShard, decision); if (logger.isTraceEnabled()) { logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId()); @@ -687,7 +687,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (logger.isTraceEnabled()) { logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); } - routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId()); + routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); changed = true; continue; // don't add to ignoreUnassigned } else { @@ -779,10 +779,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards /* now allocate on the cluster - if we are started we need to relocate the shard */ if (candidate.started()) { RoutingNode lowRoutingNode = routingNodes.node(minNode.getNodeId()); - routingNodes.relocate(candidate, lowRoutingNode.nodeId()); + routingNodes.relocate(candidate, lowRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); } else { - routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId()); + routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); } return true; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java index 5e9f44b92b0..b210557b687 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java @@ -231,7 +231,7 @@ public class AllocateAllocationCommand implements AllocationCommand { unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure())); } - it.initialize(routingNode.nodeId()); + it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); break; } return new RerouteExplanation(this, decision); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java index f945da3aa35..614397aeadc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java @@ -178,7 +178,7 @@ public class MoveAllocationCommand implements AllocationCommand { if (decision.type() == Decision.Type.THROTTLE) { // its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it... } - allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId()); + allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); } if (!found) { diff --git a/core/src/main/java/org/elasticsearch/common/settings/Settings.java b/core/src/main/java/org/elasticsearch/common/settings/Settings.java index 9309e1c9f78..13383d68468 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/Settings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/Settings.java @@ -28,7 +28,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.elasticsearch.Version; import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.Classes; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.StreamInput; diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index 0dc7de4441c..b4bf6367f13 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -94,12 +94,12 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { DiscoveryNode node = nodesToAllocate.yesNodes.get(0); logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node); changed = true; - unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion); + unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); } else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) { DiscoveryNode node = nodesToAllocate.noNodes.get(0); logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node); changed = true; - unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion); + unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); } else { // we are throttling this, but we have enough to allocate to this node, ignore it for now logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes); diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 9af9a4e8d5f..03772f74630 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -169,7 +169,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node()); // we found a match changed = true; - unassignedIterator.initialize(nodeWithHighestMatch.nodeId()); + unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), shard.version(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); } } else if (matchingNodes.hasAnyData() == false) { // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index be7b8e5d0d3..e2e4c994a19 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -26,6 +26,7 @@ import com.google.common.collect.Iterators; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; @@ -270,7 +271,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } - public synchronized IndexShard createShard(int sShardId, boolean primary) { + public synchronized IndexShard createShard(int sShardId, ShardRouting routing) { + final boolean primary = routing.primary(); /* * TODO: we execute this in parallel but it's a synced method. Yet, we might * be able to serialize the execution via the cluster state in the future. for now we just @@ -299,7 +301,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } if (path == null) { - path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, getAvgShardSizeInBytes(), this); + path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, routing.getExpectedShardSize() == -1 ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), this); logger.debug("{} creating using a new path [{}]", shardId, path); } else { logger.debug("{} creating using an existing path [{}]", shardId, path); diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index d8e81395a92..c669a9d6d59 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -638,7 +638,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent= 0) { + assertTrue(routing.toString(), routing.toString().contains("expected_shard_size[" + byteSize + "]")); + } + if (routing.initializing()) { + routing = new ShardRouting(routing); + routing.moveToStarted(); + assertEquals(-1, routing.getExpectedShardSize()); + assertFalse(routing.toString(), routing.toString().contains("expected_shard_size[" + byteSize + "]")); + } + } else { + assertFalse(routing.toString(), routing.toString().contains("expected_shard_size [" + byteSize + "]")); + assertEquals(byteSize, routing.getExpectedShardSize()); + } + } + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java index 0d60b6b5718..df9e1f8af24 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -28,25 +28,25 @@ import org.elasticsearch.test.ESTestCase; public class TestShardRouting { public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) { - return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true); + return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, long version) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true, -1); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version, UnassignedInfo unassignedInfo) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true, -1); } private static AllocationId buildAllocationId(ShardRoutingState state) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 657d33f1c85..a0b610c2bb2 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -192,7 +192,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase { ShardRouting shard = TestShardRouting.newShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); ShardRouting mutable = new ShardRouting(shard); assertThat(mutable.unassignedInfo(), notNullValue()); - mutable.initialize("test_node"); + mutable.initialize("test_node", -1); assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING)); assertThat(mutable.unassignedInfo(), notNullValue()); mutable.moveToStarted(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index f9306ad734f..e17fe4789f9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -369,37 +369,37 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { switch (sr.id()) { case 0: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node1"); + allocation.routingNodes().initialize(sr, "node1", -1); } else { - allocation.routingNodes().initialize(sr, "node0"); + allocation.routingNodes().initialize(sr, "node0", -1); } break; case 1: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node1"); + allocation.routingNodes().initialize(sr, "node1", -1); } else { - allocation.routingNodes().initialize(sr, "node2"); + allocation.routingNodes().initialize(sr, "node2", -1); } break; case 2: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node3"); + allocation.routingNodes().initialize(sr, "node3", -1); } else { - allocation.routingNodes().initialize(sr, "node2"); + allocation.routingNodes().initialize(sr, "node2", -1); } break; case 3: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node3"); + allocation.routingNodes().initialize(sr, "node3", -1); } else { - allocation.routingNodes().initialize(sr, "node1"); + allocation.routingNodes().initialize(sr, "node1", -1); } break; case 4: if (sr.primary()) { - allocation.routingNodes().initialize(sr, "node2"); + allocation.routingNodes().initialize(sr, "node2", -1); } else { - allocation.routingNodes().initialize(sr, "node0"); + allocation.routingNodes().initialize(sr, "node0", -1); } break; } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java new file mode 100644 index 00000000000..e512fcdfbd3 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESAllocationTestCase; +import org.junit.Test; + +import java.util.Collections; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.cluster.routing.allocation.RoutingNodesUtils.numberOfShardsOfType; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.hamcrest.Matchers.equalTo; + +/** + */ +public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase { + + private final ESLogger logger = Loggers.getLogger(ExpectedShardSizeAllocationTests.class); + + @Test + public void testInitializingHasExpectedSize() { + final long byteSize = randomIntBetween(0, Integer.MAX_VALUE); + AllocationService strategy = createAllocationService(Settings.EMPTY, new ClusterInfoService() { + @Override + public ClusterInfo getClusterInfo() { + return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) { + @Override + public Long getShardSize(ShardRouting shardRouting) { + if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) { + return byteSize; + } + return null; + } + }; + } + + @Override + public void addListener(Listener listener) { + } + }); + + logger.info("Building initial routing table"); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + logger.info("Adding one node and performing rerouting"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertEquals(1, clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING)); + assertEquals(byteSize, clusterState.getRoutingNodes().getRoutingTable().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize()); + logger.info("Start the primary shard"); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertEquals(1, clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED)); + assertEquals(1, clusterState.getRoutingNodes().unassigned().size()); + + logger.info("Add another one node and reroute"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertEquals(1, clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING)); + assertEquals(byteSize, clusterState.getRoutingNodes().getRoutingTable().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize()); + } + + @Test + public void testExpectedSizeOnMove() { + final long byteSize = randomIntBetween(0, Integer.MAX_VALUE); + final AllocationService allocation = createAllocationService(Settings.EMPTY, new ClusterInfoService() { + @Override + public ClusterInfo getClusterInfo() { + return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) { + @Override + public Long getShardSize(ShardRouting shardRouting) { + if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) { + return byteSize; + } + return null; + } + }; + } + + @Override + public void addListener(Listener listener) { + } + }); + logger.info("creating an index with 1 shard, no replica"); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + logger.info("adding two nodes and performing rerouting"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState); + clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + + logger.info("start primary shard"); + rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + + logger.info("move the shard"); + String existingNodeId = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(); + String toNodeId; + if ("node1".equals(existingNodeId)) { + toNodeId = "node2"; + } else { + toNodeId = "node1"; + } + rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(new ShardId("test", 0), existingNodeId, toNodeId))); + assertThat(rerouteResult.changed(), equalTo(true)); + clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + assertEquals(clusterState.getRoutingNodes().node(existingNodeId).get(0).state(), ShardRoutingState.RELOCATING); + assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).state(),ShardRoutingState.INITIALIZING); + + assertEquals(clusterState.getRoutingNodes().node(existingNodeId).get(0).getExpectedShardSize(), byteSize); + assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).getExpectedShardSize(), byteSize); + + logger.info("finish moving the shard"); + rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + + assertThat(clusterState.getRoutingNodes().node(existingNodeId).isEmpty(), equalTo(true)); + assertThat(clusterState.getRoutingNodes().node(toNodeId).get(0).state(), equalTo(ShardRoutingState.STARTED)); + assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).getExpectedShardSize(), -1); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index 4b1b08e338d..56510384246 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -27,12 +29,16 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESAllocationTestCase; import org.junit.Test; +import java.util.Collections; + import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.hamcrest.Matchers.equalTo; @@ -47,12 +53,33 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase { @Test public void testRebalanceOnlyAfterAllShardsAreActive() { - AllocationService strategy = createAllocationService(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") - .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build()); + final long[] sizes = new long[5]; + for (int i =0; i < sizes.length; i++) { + sizes[i] = randomIntBetween(0, Integer.MAX_VALUE); + } + AllocationService strategy = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .build(), + new ClusterInfoService() { + @Override + public ClusterInfo getClusterInfo() { + return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) { + @Override + public Long getShardSize(ShardRouting shardRouting) { + if (shardRouting.index().equals("test")) { + return sizes[shardRouting.getId()]; + } + return null; } + }; + } + + @Override + public void addListener(Listener listener) { + } + }); logger.info("Building initial routing table"); MetaData metaData = MetaData.builder() @@ -97,6 +124,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + assertEquals(routingTable.index("test").shard(i).replicaShards().get(0).getExpectedShardSize(), sizes[i]); } logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); @@ -112,6 +140,8 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + assertEquals(routingTable.index("test").shard(i).replicaShards().get(0).getExpectedShardSize(), sizes[i]); + } logger.info("start the replica shards, rebalancing should start"); @@ -124,6 +154,16 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase { // we only allow one relocation at a time assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + int num = 0; + for (ShardRouting routing : routingTable.index("test").shard(i).shards()) { + if (routing.state() == RELOCATING || routing.state() == INITIALIZING) { + assertEquals(routing.getExpectedShardSize(), sizes[i]); + num++; + } + } + assertTrue(num > 0); + } logger.info("complete relocation, other half of relocation should happen"); routingNodes = clusterState.getRoutingNodes(); @@ -135,6 +175,14 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase { // we now only relocate 3, since 2 remain where they are! assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7)); assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + for (ShardRouting routing : routingTable.index("test").shard(i).shards()) { + if (routing.state() == RELOCATING || routing.state() == INITIALIZING) { + assertEquals(routing.getExpectedShardSize(), sizes[i]); + } + } + } + logger.info("complete relocation, thats it!"); routingNodes = clusterState.getRoutingNodes(); diff --git a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index b19a552e513..2b4ccf7d80f 100644 --- a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.gateway; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -302,7 +303,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY); } private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { @@ -321,7 +322,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY); } class TestAllocator extends ReplicaShardAllocator { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3aadd0d88d7..1efe8d7f776 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -454,6 +455,22 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertPathHasBeenCleared(idxPath); } + public void testExpectedShardSizeIsPresent() throws InterruptedException { + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); + for (int i = 0; i < 50; i++) { + client().prepareIndex("test", "test").setSource("{}").get(); + } + ensureGreen("test"); + InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); + InternalClusterInfoService.ClusterInfoUpdateJob job = clusterInfoService.new ClusterInfoUpdateJob(false); + job.run(); + ClusterState state = getInstanceFromNode(ClusterService.class).state(); + Long test = clusterInfoService.getClusterInfo().getShardSize(state.getRoutingTable().index("test").getShards().get(0).primaryShard()); + assertNotNull(test); + assertTrue(test > 0); + } + public void testIndexCanChangeCustomDataPath() throws Exception { Environment env = getInstanceFromNode(Environment.class); Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10)); diff --git a/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java b/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java index a4f8f3b2fa8..991aea74345 100644 --- a/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java @@ -19,6 +19,7 @@ package org.elasticsearch.test; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.EmptyClusterInfoService; @@ -63,7 +64,7 @@ public abstract class ESAllocationTestCase extends ESTestCase { } public static AllocationService createAllocationService(Settings settings, Random random) { - return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random); + return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random); } public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) { @@ -72,6 +73,13 @@ public abstract class ESAllocationTestCase extends ESTestCase { new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); } + public static AllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) { + return new AllocationService(settings, + randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()), + new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService); + } + + public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) { final List> defaultAllocationDeciders = ClusterModule.DEFAULT_ALLOCATION_DECIDERS; From 3dd6c4ab8087e4c21cc3ca69458ebc39989cc871 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 17 Aug 2015 22:22:05 +0200 Subject: [PATCH 2/2] Use constant to determin if expected size is available --- core/src/main/java/org/elasticsearch/index/IndexService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index e2e4c994a19..83ad78fc32b 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -301,7 +301,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } if (path == null) { - path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, routing.getExpectedShardSize() == -1 ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), this); + path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), this); logger.debug("{} creating using a new path [{}]", shardId, path); } else { logger.debug("{} creating using an existing path [{}]", shardId, path);