diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
index 5e2d35a9818..f21bc9f052e 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
@@ -30,7 +30,7 @@ import java.util.Map;
* InternalClusterInfoService.shardIdentifierFromRouting(String)
* for the key used in the shardSizes map
*/
-public final class ClusterInfo {
+public class ClusterInfo {
private final Map usages;
final Map shardSizes;
@@ -54,6 +54,11 @@ public final class ClusterInfo {
return shardSizes.get(shardIdentifierFromRouting(shardRouting));
}
+ public long getShardSize(ShardRouting shardRouting, long defaultValue) {
+ Long shardSize = getShardSize(shardRouting);
+ return shardSize == null ? defaultValue : shardSize;
+ }
+
/**
* Method that incorporates the ShardId for the shard into a string that
* includes a 'p' or 'r' depending on whether the shard is a primary.
diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java
index 567812e7d58..8be9465f7b2 100644
--- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java
@@ -19,6 +19,7 @@
package org.elasticsearch.cluster;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@@ -36,6 +37,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.node.settings.NodeSettingsService;
@@ -45,6 +47,7 @@ import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
/**
* InternalClusterInfoService provides the ClusterInfoService interface,
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
index 7acedd2b98b..8c07429776a 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
@@ -345,10 +345,10 @@ public class RoutingNodes implements Iterable {
/**
* Moves a shard from unassigned to initialize state
*/
- public void initialize(ShardRouting shard, String nodeId) {
+ public void initialize(ShardRouting shard, String nodeId, long expectedSize) {
ensureMutable();
assert shard.unassigned() : shard;
- shard.initialize(nodeId);
+ shard.initialize(nodeId, expectedSize);
node(nodeId).add(shard);
inactiveShardCount++;
if (shard.primary()) {
@@ -362,10 +362,10 @@ public class RoutingNodes implements Iterable {
* shard as well as assigning it. And returning the target initializing
* shard.
*/
- public ShardRouting relocate(ShardRouting shard, String nodeId) {
+ public ShardRouting relocate(ShardRouting shard, String nodeId, long expectedShardSize) {
ensureMutable();
relocatingShards++;
- shard.relocate(nodeId);
+ shard.relocate(nodeId, expectedShardSize);
ShardRouting target = shard.buildTargetRelocatingShard();
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);
@@ -608,16 +608,9 @@ public class RoutingNodes implements Iterable {
/**
* Initializes the current unassigned shard and moves it from the unassigned list.
*/
- public void initialize(String nodeId) {
- initialize(nodeId, current.version());
- }
-
- /**
- * Initializes the current unassigned shard and moves it from the unassigned list.
- */
- public void initialize(String nodeId, long version) {
+ public void initialize(String nodeId, long version, long expectedShardSize) {
innerRemove();
- nodes.initialize(new ShardRouting(current, version), nodeId);
+ nodes.initialize(new ShardRouting(current, version), nodeId, expectedShardSize);
}
/**
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java
index 9c0af60a16f..60764ab6d3b 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java
@@ -37,6 +37,11 @@ import java.util.List;
*/
public final class ShardRouting implements Streamable, ToXContent {
+ /**
+ * Used if shard size is not available
+ */
+ public static final long UNAVAILABLE_EXPECTED_SHARD_SIZE = -1;
+
private String index;
private int shardId;
private String currentNodeId;
@@ -50,6 +55,7 @@ public final class ShardRouting implements Streamable, ToXContent {
private final transient List asList;
private transient ShardId shardIdentifier;
private boolean frozen = false;
+ private long expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
private ShardRouting() {
this.asList = Collections.singletonList(this);
@@ -60,7 +66,7 @@ public final class ShardRouting implements Streamable, ToXContent {
}
public ShardRouting(ShardRouting copy, long version) {
- this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true);
+ this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true, copy.getExpectedShardSize());
}
/**
@@ -69,7 +75,7 @@ public final class ShardRouting implements Streamable, ToXContent {
*/
ShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version,
- UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal) {
+ UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal, long expectedShardSize) {
this.index = index;
this.shardId = shardId;
this.currentNodeId = currentNodeId;
@@ -81,6 +87,9 @@ public final class ShardRouting implements Streamable, ToXContent {
this.restoreSource = restoreSource;
this.unassignedInfo = unassignedInfo;
this.allocationId = allocationId;
+ this.expectedShardSize = expectedShardSize;
+ assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE || state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
+ assert expectedShardSize >= 0 || state != ShardRoutingState.INITIALIZING || state != ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
if (!internal) {
assert state == ShardRoutingState.UNASSIGNED;
@@ -88,13 +97,14 @@ public final class ShardRouting implements Streamable, ToXContent {
assert relocatingNodeId == null;
assert allocationId == null;
}
+
}
/**
* Creates a new unassigned shard.
*/
public static ShardRouting newUnassigned(String index, int shardId, RestoreSource restoreSource, boolean primary, UnassignedInfo unassignedInfo) {
- return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true);
+ return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true, UNAVAILABLE_EXPECTED_SHARD_SIZE);
}
/**
@@ -205,7 +215,7 @@ public final class ShardRouting implements Streamable, ToXContent {
public ShardRouting buildTargetRelocatingShard() {
assert relocating();
return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo,
- AllocationId.newTargetRelocation(allocationId), true);
+ AllocationId.newTargetRelocation(allocationId), true, expectedShardSize);
}
/**
@@ -317,6 +327,11 @@ public final class ShardRouting implements Streamable, ToXContent {
if (in.readBoolean()) {
allocationId = new AllocationId(in);
}
+ if (relocating() || initializing()) {
+ expectedShardSize = in.readLong();
+ } else {
+ expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
+ }
freeze();
}
@@ -368,6 +383,10 @@ public final class ShardRouting implements Streamable, ToXContent {
} else {
out.writeBoolean(false);
}
+ if (relocating() || initializing()) {
+ out.writeLong(expectedShardSize);
+ }
+
}
@Override
@@ -397,12 +416,13 @@ public final class ShardRouting implements Streamable, ToXContent {
relocatingNodeId = null;
this.unassignedInfo = unassignedInfo;
allocationId = null;
+ expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
}
/**
* Initializes an unassigned shard on a node.
*/
- void initialize(String nodeId) {
+ void initialize(String nodeId, long expectedShardSize) {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.UNASSIGNED : this;
@@ -410,6 +430,7 @@ public final class ShardRouting implements Streamable, ToXContent {
state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId;
allocationId = AllocationId.newInitializing();
+ this.expectedShardSize = expectedShardSize;
}
/**
@@ -417,13 +438,14 @@ public final class ShardRouting implements Streamable, ToXContent {
*
* @param relocatingNodeId id of the node to relocate the shard
*/
- void relocate(String relocatingNodeId) {
+ void relocate(String relocatingNodeId, long expectedShardSize) {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.STARTED : "current shard has to be started in order to be relocated " + this;
state = ShardRoutingState.RELOCATING;
this.relocatingNodeId = relocatingNodeId;
this.allocationId = AllocationId.newRelocation(allocationId);
+ this.expectedShardSize = expectedShardSize;
}
/**
@@ -436,7 +458,7 @@ public final class ShardRouting implements Streamable, ToXContent {
assert state == ShardRoutingState.RELOCATING : this;
assert assignedToNode() : this;
assert relocatingNodeId != null : this;
-
+ expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
state = ShardRoutingState.STARTED;
relocatingNodeId = null;
allocationId = AllocationId.cancelRelocation(allocationId);
@@ -470,6 +492,7 @@ public final class ShardRouting implements Streamable, ToXContent {
// relocation target
allocationId = AllocationId.finishRelocation(allocationId);
}
+ expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
state = ShardRoutingState.STARTED;
}
@@ -669,6 +692,9 @@ public final class ShardRouting implements Streamable, ToXContent {
if (this.unassignedInfo != null) {
sb.append(", ").append(unassignedInfo.toString());
}
+ if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) {
+ sb.append(", expected_shard_size[").append(expectedShardSize).append("]");
+ }
return sb.toString();
}
@@ -682,7 +708,9 @@ public final class ShardRouting implements Streamable, ToXContent {
.field("shard", shardId().id())
.field("index", shardId().index().name())
.field("version", version);
-
+ if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE){
+ builder.field("expected_shard_size_in_bytes", expectedShardSize);
+ }
if (restoreSource() != null) {
builder.field("restore_source");
restoreSource().toXContent(builder, params);
@@ -709,4 +737,12 @@ public final class ShardRouting implements Streamable, ToXContent {
boolean isFrozen() {
return frozen;
}
+
+ /**
+ * Returns the expected shard size for {@link ShardRoutingState#RELOCATING} and {@link ShardRoutingState#INITIALIZING}
+ * shards. If it's size is not available {@value #UNAVAILABLE_EXPECTED_SHARD_SIZE} will be returned.
+ */
+ public long getExpectedShardSize() {
+ return expectedShardSize;
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
index 9c42256de95..4d56659c276 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
@@ -507,7 +507,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
Decision decision = allocation.deciders().canAllocate(shard, target, allocation);
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
sourceNode.removeShard(shard);
- ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId());
+ ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
currentNode.addShard(targetRelocatingShard, decision);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
@@ -687,7 +687,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
- routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId());
+ routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
changed = true;
continue; // don't add to ignoreUnassigned
} else {
@@ -779,10 +779,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
/* now allocate on the cluster - if we are started we need to relocate the shard */
if (candidate.started()) {
RoutingNode lowRoutingNode = routingNodes.node(minNode.getNodeId());
- routingNodes.relocate(candidate, lowRoutingNode.nodeId());
+ routingNodes.relocate(candidate, lowRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
} else {
- routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId());
+ routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
return true;
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java
index 5e9f44b92b0..b210557b687 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java
@@ -231,7 +231,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure()));
}
- it.initialize(routingNode.nodeId());
+ it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
break;
}
return new RerouteExplanation(this, decision);
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java
index f945da3aa35..614397aeadc 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java
@@ -178,7 +178,7 @@ public class MoveAllocationCommand implements AllocationCommand {
if (decision.type() == Decision.Type.THROTTLE) {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
}
- allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId());
+ allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
if (!found) {
diff --git a/core/src/main/java/org/elasticsearch/common/settings/Settings.java b/core/src/main/java/org/elasticsearch/common/settings/Settings.java
index 9309e1c9f78..13383d68468 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/Settings.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/Settings.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.Version;
import org.elasticsearch.common.Booleans;
-import org.elasticsearch.common.Classes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput;
diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java
index 0dc7de4441c..b4bf6367f13 100644
--- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java
+++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java
@@ -94,12 +94,12 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
- unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
+ unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) {
DiscoveryNode node = nodesToAllocate.noNodes.get(0);
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
- unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
+ unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else {
// we are throttling this, but we have enough to allocate to this node, ignore it for now
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes);
diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
index 9af9a4e8d5f..03772f74630 100644
--- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
+++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
@@ -169,7 +169,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
// we found a match
changed = true;
- unassignedIterator.initialize(nodeWithHighestMatch.nodeId());
+ unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), shard.version(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java
index be7b8e5d0d3..e2e4c994a19 100644
--- a/core/src/main/java/org/elasticsearch/index/IndexService.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexService.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Iterators;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
@@ -270,7 +271,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
- public synchronized IndexShard createShard(int sShardId, boolean primary) {
+ public synchronized IndexShard createShard(int sShardId, ShardRouting routing) {
+ final boolean primary = routing.primary();
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
@@ -299,7 +301,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
if (path == null) {
- path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, getAvgShardSizeInBytes(), this);
+ path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, routing.getExpectedShardSize() == -1 ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), this);
logger.debug("{} creating using a new path [{}]", shardId, path);
} else {
logger.debug("{} creating using an existing path [{}]", shardId, path);
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index d8e81395a92..c669a9d6d59 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -638,7 +638,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent= 0) {
+ assertTrue(routing.toString(), routing.toString().contains("expected_shard_size[" + byteSize + "]"));
+ }
+ if (routing.initializing()) {
+ routing = new ShardRouting(routing);
+ routing.moveToStarted();
+ assertEquals(-1, routing.getExpectedShardSize());
+ assertFalse(routing.toString(), routing.toString().contains("expected_shard_size[" + byteSize + "]"));
+ }
+ } else {
+ assertFalse(routing.toString(), routing.toString().contains("expected_shard_size [" + byteSize + "]"));
+ assertEquals(byteSize, routing.getExpectedShardSize());
+ }
+ }
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java
index 0d60b6b5718..df9e1f8af24 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java
@@ -28,25 +28,25 @@ import org.elasticsearch.test.ESTestCase;
public class TestShardRouting {
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) {
- return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true);
+ return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, long version) {
- return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true);
+ return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) {
- return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true);
+ return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true, -1);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) {
- return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true);
+ return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version,
UnassignedInfo unassignedInfo) {
- return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true);
+ return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true, -1);
}
private static AllocationId buildAllocationId(ShardRoutingState state) {
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java
index 657d33f1c85..a0b610c2bb2 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java
@@ -192,7 +192,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
ShardRouting shard = TestShardRouting.newShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting mutable = new ShardRouting(shard);
assertThat(mutable.unassignedInfo(), notNullValue());
- mutable.initialize("test_node");
+ mutable.initialize("test_node", -1);
assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING));
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.moveToStarted();
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
index f9306ad734f..e17fe4789f9 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
@@ -369,37 +369,37 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
switch (sr.id()) {
case 0:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node1");
+ allocation.routingNodes().initialize(sr, "node1", -1);
} else {
- allocation.routingNodes().initialize(sr, "node0");
+ allocation.routingNodes().initialize(sr, "node0", -1);
}
break;
case 1:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node1");
+ allocation.routingNodes().initialize(sr, "node1", -1);
} else {
- allocation.routingNodes().initialize(sr, "node2");
+ allocation.routingNodes().initialize(sr, "node2", -1);
}
break;
case 2:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node3");
+ allocation.routingNodes().initialize(sr, "node3", -1);
} else {
- allocation.routingNodes().initialize(sr, "node2");
+ allocation.routingNodes().initialize(sr, "node2", -1);
}
break;
case 3:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node3");
+ allocation.routingNodes().initialize(sr, "node3", -1);
} else {
- allocation.routingNodes().initialize(sr, "node1");
+ allocation.routingNodes().initialize(sr, "node1", -1);
}
break;
case 4:
if (sr.primary()) {
- allocation.routingNodes().initialize(sr, "node2");
+ allocation.routingNodes().initialize(sr, "node2", -1);
} else {
- allocation.routingNodes().initialize(sr, "node0");
+ allocation.routingNodes().initialize(sr, "node0", -1);
}
break;
}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java
new file mode 100644
index 00000000000..e512fcdfbd3
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterInfo;
+import org.elasticsearch.cluster.ClusterInfoService;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RoutingNodes;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
+import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.ESAllocationTestCase;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
+import static org.elasticsearch.cluster.routing.allocation.RoutingNodesUtils.numberOfShardsOfType;
+import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ */
+public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
+
+ private final ESLogger logger = Loggers.getLogger(ExpectedShardSizeAllocationTests.class);
+
+ @Test
+ public void testInitializingHasExpectedSize() {
+ final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
+ AllocationService strategy = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
+ @Override
+ public ClusterInfo getClusterInfo() {
+ return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
+ @Override
+ public Long getShardSize(ShardRouting shardRouting) {
+ if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) {
+ return byteSize;
+ }
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void addListener(Listener listener) {
+ }
+ });
+
+ logger.info("Building initial routing table");
+
+ MetaData metaData = MetaData.builder()
+ .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)))
+ .build();
+
+ RoutingTable routingTable = RoutingTable.builder()
+ .addAsNew(metaData.index("test"))
+ .build();
+
+ ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
+ logger.info("Adding one node and performing rerouting");
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
+ routingTable = strategy.reroute(clusterState).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+
+ assertEquals(1, clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING));
+ assertEquals(byteSize, clusterState.getRoutingNodes().getRoutingTable().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize());
+ logger.info("Start the primary shard");
+ RoutingNodes routingNodes = clusterState.getRoutingNodes();
+ routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+
+ assertEquals(1, clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED));
+ assertEquals(1, clusterState.getRoutingNodes().unassigned().size());
+
+ logger.info("Add another one node and reroute");
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build();
+ routingTable = strategy.reroute(clusterState).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+
+ assertEquals(1, clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING));
+ assertEquals(byteSize, clusterState.getRoutingNodes().getRoutingTable().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize());
+ }
+
+ @Test
+ public void testExpectedSizeOnMove() {
+ final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
+ final AllocationService allocation = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
+ @Override
+ public ClusterInfo getClusterInfo() {
+ return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
+ @Override
+ public Long getShardSize(ShardRouting shardRouting) {
+ if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) {
+ return byteSize;
+ }
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void addListener(Listener listener) {
+ }
+ });
+ logger.info("creating an index with 1 shard, no replica");
+ MetaData metaData = MetaData.builder()
+ .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
+ .build();
+ RoutingTable routingTable = RoutingTable.builder()
+ .addAsNew(metaData.index("test"))
+ .build();
+ ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
+
+ logger.info("adding two nodes and performing rerouting");
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
+ RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
+ clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+
+ logger.info("start primary shard");
+ rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
+ clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+
+ logger.info("move the shard");
+ String existingNodeId = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
+ String toNodeId;
+ if ("node1".equals(existingNodeId)) {
+ toNodeId = "node2";
+ } else {
+ toNodeId = "node1";
+ }
+ rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(new ShardId("test", 0), existingNodeId, toNodeId)));
+ assertThat(rerouteResult.changed(), equalTo(true));
+ clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+ assertEquals(clusterState.getRoutingNodes().node(existingNodeId).get(0).state(), ShardRoutingState.RELOCATING);
+ assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).state(),ShardRoutingState.INITIALIZING);
+
+ assertEquals(clusterState.getRoutingNodes().node(existingNodeId).get(0).getExpectedShardSize(), byteSize);
+ assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).getExpectedShardSize(), byteSize);
+
+ logger.info("finish moving the shard");
+ rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
+ clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+
+ assertThat(clusterState.getRoutingNodes().node(existingNodeId).isEmpty(), equalTo(true));
+ assertThat(clusterState.getRoutingNodes().node(toNodeId).get(0).state(), equalTo(ShardRoutingState.STARTED));
+ assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).getExpectedShardSize(), -1);
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java
index 4b1b08e338d..56510384246 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java
@@ -20,6 +20,8 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterInfo;
+import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@@ -27,12 +29,16 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Test;
+import java.util.Collections;
+
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
@@ -47,12 +53,33 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
@Test
public void testRebalanceOnlyAfterAllShardsAreActive() {
- AllocationService strategy = createAllocationService(settingsBuilder()
- .put("cluster.routing.allocation.concurrent_recoveries", 10)
- .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
- .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
- .build());
+ final long[] sizes = new long[5];
+ for (int i =0; i < sizes.length; i++) {
+ sizes[i] = randomIntBetween(0, Integer.MAX_VALUE);
+ }
+ AllocationService strategy = createAllocationService(settingsBuilder()
+ .put("cluster.routing.allocation.concurrent_recoveries", 10)
+ .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
+ .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
+ .build(),
+ new ClusterInfoService() {
+ @Override
+ public ClusterInfo getClusterInfo() {
+ return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
+ @Override
+ public Long getShardSize(ShardRouting shardRouting) {
+ if (shardRouting.index().equals("test")) {
+ return sizes[shardRouting.getId()];
+ }
+ return null; }
+ };
+ }
+
+ @Override
+ public void addListener(Listener listener) {
+ }
+ });
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder()
@@ -97,6 +124,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
+ assertEquals(routingTable.index("test").shard(i).replicaShards().get(0).getExpectedShardSize(), sizes[i]);
}
logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened");
@@ -112,6 +140,8 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
+ assertEquals(routingTable.index("test").shard(i).replicaShards().get(0).getExpectedShardSize(), sizes[i]);
+
}
logger.info("start the replica shards, rebalancing should start");
@@ -124,6 +154,16 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
// we only allow one relocation at a time
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5));
+ for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
+ int num = 0;
+ for (ShardRouting routing : routingTable.index("test").shard(i).shards()) {
+ if (routing.state() == RELOCATING || routing.state() == INITIALIZING) {
+ assertEquals(routing.getExpectedShardSize(), sizes[i]);
+ num++;
+ }
+ }
+ assertTrue(num > 0);
+ }
logger.info("complete relocation, other half of relocation should happen");
routingNodes = clusterState.getRoutingNodes();
@@ -135,6 +175,14 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
// we now only relocate 3, since 2 remain where they are!
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3));
+ for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
+ for (ShardRouting routing : routingTable.index("test").shard(i).shards()) {
+ if (routing.state() == RELOCATING || routing.state() == INITIALIZING) {
+ assertEquals(routing.getExpectedShardSize(), sizes[i]);
+ }
+ }
+ }
+
logger.info("complete relocation, thats it!");
routingNodes = clusterState.getRoutingNodes();
diff --git a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java
index b19a552e513..2b4ccf7d80f 100644
--- a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java
+++ b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java
@@ -21,6 +21,7 @@ package org.elasticsearch.gateway;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@@ -302,7 +303,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
- return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null);
+ return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
}
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
@@ -321,7 +322,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
- return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null);
+ return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
}
class TestAllocator extends ReplicaShardAllocator {
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 3aadd0d88d7..1efe8d7f776 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -25,6 +25,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
@@ -454,6 +455,22 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertPathHasBeenCleared(idxPath);
}
+ public void testExpectedShardSizeIsPresent() throws InterruptedException {
+ assertAcked(client().admin().indices().prepareCreate("test")
+ .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
+ for (int i = 0; i < 50; i++) {
+ client().prepareIndex("test", "test").setSource("{}").get();
+ }
+ ensureGreen("test");
+ InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
+ InternalClusterInfoService.ClusterInfoUpdateJob job = clusterInfoService.new ClusterInfoUpdateJob(false);
+ job.run();
+ ClusterState state = getInstanceFromNode(ClusterService.class).state();
+ Long test = clusterInfoService.getClusterInfo().getShardSize(state.getRoutingTable().index("test").getShards().get(0).primaryShard());
+ assertNotNull(test);
+ assertTrue(test > 0);
+ }
+
public void testIndexCanChangeCustomDataPath() throws Exception {
Environment env = getInstanceFromNode(Environment.class);
Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10));
diff --git a/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java b/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java
index a4f8f3b2fa8..991aea74345 100644
--- a/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/test/ESAllocationTestCase.java
@@ -19,6 +19,7 @@
package org.elasticsearch.test;
import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
@@ -63,7 +64,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
}
public static AllocationService createAllocationService(Settings settings, Random random) {
- return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random);
+ return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random);
}
public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
@@ -72,6 +73,13 @@ public abstract class ESAllocationTestCase extends ESTestCase {
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
}
+ public static AllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
+ return new AllocationService(settings,
+ randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
+ new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService);
+ }
+
+
public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
final List> defaultAllocationDeciders = ClusterModule.DEFAULT_ALLOCATION_DECIDERS;