Add `expectedShardSize` to ShardRouting and use it in path.data allocation

Today we only guess how big the shard will be that we are allocating on a node.
Yet, we have this information on the master but it's not available on the data nodes
when we pick a data path for the shard. We use some rather simple heuristic based on
existing shard sizes on this node which might be complete bogus. This change adds
the expected shard size to the ShardRouting for RELOCATING and INITIALIZING shards
to be used on the actual node to find the best data path for the shard.

Closes #11271
This commit is contained in:
Simon Willnauer 2015-08-13 15:14:56 +02:00
parent d0835715c2
commit d2507c4ac0
25 changed files with 412 additions and 74 deletions

View File

@ -30,7 +30,7 @@ import java.util.Map;
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
* for the key used in the shardSizes map
*/
public final class ClusterInfo {
public class ClusterInfo {
private final Map<String, DiskUsage> usages;
final Map<String, Long> shardSizes;
@ -54,6 +54,11 @@ public final class ClusterInfo {
return shardSizes.get(shardIdentifierFromRouting(shardRouting));
}
public long getShardSize(ShardRouting shardRouting, long defaultValue) {
Long shardSize = getShardSize(shardRouting);
return shardSize == null ? defaultValue : shardSize;
}
/**
* Method that incorporates the ShardId for the shard into a string that
* includes a 'p' or 'r' depending on whether the shard is a primary.

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -36,6 +37,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.node.settings.NodeSettingsService;
@ -45,6 +47,7 @@ import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* InternalClusterInfoService provides the ClusterInfoService interface,

View File

@ -345,10 +345,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/**
* Moves a shard from unassigned to initialize state
*/
public void initialize(ShardRouting shard, String nodeId) {
public void initialize(ShardRouting shard, String nodeId, long expectedSize) {
ensureMutable();
assert shard.unassigned() : shard;
shard.initialize(nodeId);
shard.initialize(nodeId, expectedSize);
node(nodeId).add(shard);
inactiveShardCount++;
if (shard.primary()) {
@ -362,10 +362,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
* shard as well as assigning it. And returning the target initializing
* shard.
*/
public ShardRouting relocate(ShardRouting shard, String nodeId) {
public ShardRouting relocate(ShardRouting shard, String nodeId, long expectedShardSize) {
ensureMutable();
relocatingShards++;
shard.relocate(nodeId);
shard.relocate(nodeId, expectedShardSize);
ShardRouting target = shard.buildTargetRelocatingShard();
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);
@ -608,16 +608,9 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/**
* Initializes the current unassigned shard and moves it from the unassigned list.
*/
public void initialize(String nodeId) {
initialize(nodeId, current.version());
}
/**
* Initializes the current unassigned shard and moves it from the unassigned list.
*/
public void initialize(String nodeId, long version) {
public void initialize(String nodeId, long version, long expectedShardSize) {
innerRemove();
nodes.initialize(new ShardRouting(current, version), nodeId);
nodes.initialize(new ShardRouting(current, version), nodeId, expectedShardSize);
}
/**

View File

@ -37,6 +37,11 @@ import java.util.List;
*/
public final class ShardRouting implements Streamable, ToXContent {
/**
* Used if shard size is not available
*/
public static final long UNAVAILABLE_EXPECTED_SHARD_SIZE = -1;
private String index;
private int shardId;
private String currentNodeId;
@ -50,6 +55,7 @@ public final class ShardRouting implements Streamable, ToXContent {
private final transient List<ShardRouting> asList;
private transient ShardId shardIdentifier;
private boolean frozen = false;
private long expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
private ShardRouting() {
this.asList = Collections.singletonList(this);
@ -60,7 +66,7 @@ public final class ShardRouting implements Streamable, ToXContent {
}
public ShardRouting(ShardRouting copy, long version) {
this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true);
this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true, copy.getExpectedShardSize());
}
/**
@ -69,7 +75,7 @@ public final class ShardRouting implements Streamable, ToXContent {
*/
ShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version,
UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal) {
UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal, long expectedShardSize) {
this.index = index;
this.shardId = shardId;
this.currentNodeId = currentNodeId;
@ -81,6 +87,9 @@ public final class ShardRouting implements Streamable, ToXContent {
this.restoreSource = restoreSource;
this.unassignedInfo = unassignedInfo;
this.allocationId = allocationId;
this.expectedShardSize = expectedShardSize;
assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE || state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert expectedShardSize >= 0 || state != ShardRoutingState.INITIALIZING || state != ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
if (!internal) {
assert state == ShardRoutingState.UNASSIGNED;
@ -88,13 +97,14 @@ public final class ShardRouting implements Streamable, ToXContent {
assert relocatingNodeId == null;
assert allocationId == null;
}
}
/**
* Creates a new unassigned shard.
*/
public static ShardRouting newUnassigned(String index, int shardId, RestoreSource restoreSource, boolean primary, UnassignedInfo unassignedInfo) {
return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true);
return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true, UNAVAILABLE_EXPECTED_SHARD_SIZE);
}
/**
@ -205,7 +215,7 @@ public final class ShardRouting implements Streamable, ToXContent {
public ShardRouting buildTargetRelocatingShard() {
assert relocating();
return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo,
AllocationId.newTargetRelocation(allocationId), true);
AllocationId.newTargetRelocation(allocationId), true, expectedShardSize);
}
/**
@ -317,6 +327,11 @@ public final class ShardRouting implements Streamable, ToXContent {
if (in.readBoolean()) {
allocationId = new AllocationId(in);
}
if (relocating() || initializing()) {
expectedShardSize = in.readLong();
} else {
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
}
freeze();
}
@ -368,6 +383,10 @@ public final class ShardRouting implements Streamable, ToXContent {
} else {
out.writeBoolean(false);
}
if (relocating() || initializing()) {
out.writeLong(expectedShardSize);
}
}
@Override
@ -397,12 +416,13 @@ public final class ShardRouting implements Streamable, ToXContent {
relocatingNodeId = null;
this.unassignedInfo = unassignedInfo;
allocationId = null;
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
}
/**
* Initializes an unassigned shard on a node.
*/
void initialize(String nodeId) {
void initialize(String nodeId, long expectedShardSize) {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.UNASSIGNED : this;
@ -410,6 +430,7 @@ public final class ShardRouting implements Streamable, ToXContent {
state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId;
allocationId = AllocationId.newInitializing();
this.expectedShardSize = expectedShardSize;
}
/**
@ -417,13 +438,14 @@ public final class ShardRouting implements Streamable, ToXContent {
*
* @param relocatingNodeId id of the node to relocate the shard
*/
void relocate(String relocatingNodeId) {
void relocate(String relocatingNodeId, long expectedShardSize) {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.STARTED : "current shard has to be started in order to be relocated " + this;
state = ShardRoutingState.RELOCATING;
this.relocatingNodeId = relocatingNodeId;
this.allocationId = AllocationId.newRelocation(allocationId);
this.expectedShardSize = expectedShardSize;
}
/**
@ -436,7 +458,7 @@ public final class ShardRouting implements Streamable, ToXContent {
assert state == ShardRoutingState.RELOCATING : this;
assert assignedToNode() : this;
assert relocatingNodeId != null : this;
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
state = ShardRoutingState.STARTED;
relocatingNodeId = null;
allocationId = AllocationId.cancelRelocation(allocationId);
@ -470,6 +492,7 @@ public final class ShardRouting implements Streamable, ToXContent {
// relocation target
allocationId = AllocationId.finishRelocation(allocationId);
}
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
state = ShardRoutingState.STARTED;
}
@ -669,6 +692,9 @@ public final class ShardRouting implements Streamable, ToXContent {
if (this.unassignedInfo != null) {
sb.append(", ").append(unassignedInfo.toString());
}
if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) {
sb.append(", expected_shard_size[").append(expectedShardSize).append("]");
}
return sb.toString();
}
@ -682,7 +708,9 @@ public final class ShardRouting implements Streamable, ToXContent {
.field("shard", shardId().id())
.field("index", shardId().index().name())
.field("version", version);
if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE){
builder.field("expected_shard_size_in_bytes", expectedShardSize);
}
if (restoreSource() != null) {
builder.field("restore_source");
restoreSource().toXContent(builder, params);
@ -709,4 +737,12 @@ public final class ShardRouting implements Streamable, ToXContent {
boolean isFrozen() {
return frozen;
}
/**
* Returns the expected shard size for {@link ShardRoutingState#RELOCATING} and {@link ShardRoutingState#INITIALIZING}
* shards. If it's size is not available {@value #UNAVAILABLE_EXPECTED_SHARD_SIZE} will be returned.
*/
public long getExpectedShardSize() {
return expectedShardSize;
}
}

View File

@ -507,7 +507,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
Decision decision = allocation.deciders().canAllocate(shard, target, allocation);
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
sourceNode.removeShard(shard);
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId());
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
currentNode.addShard(targetRelocatingShard, decision);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
@ -687,7 +687,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId());
routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
changed = true;
continue; // don't add to ignoreUnassigned
} else {
@ -779,10 +779,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
/* now allocate on the cluster - if we are started we need to relocate the shard */
if (candidate.started()) {
RoutingNode lowRoutingNode = routingNodes.node(minNode.getNodeId());
routingNodes.relocate(candidate, lowRoutingNode.nodeId());
routingNodes.relocate(candidate, lowRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
} else {
routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId());
routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
return true;

View File

@ -231,7 +231,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure()));
}
it.initialize(routingNode.nodeId());
it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
break;
}
return new RerouteExplanation(this, decision);

View File

@ -178,7 +178,7 @@ public class MoveAllocationCommand implements AllocationCommand {
if (decision.type() == Decision.Type.THROTTLE) {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
}
allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId());
allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
if (!found) {

View File

@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.Version;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Classes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -94,12 +94,12 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) {
DiscoveryNode node = nodesToAllocate.noNodes.get(0);
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else {
// we are throttling this, but we have enough to allocate to this node, ignore it for now
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes);

View File

@ -169,7 +169,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
// we found a match
changed = true;
unassignedIterator.initialize(nodeWithHighestMatch.nodeId());
unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), shard.version(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Iterators;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
@ -270,7 +271,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
public synchronized IndexShard createShard(int sShardId, boolean primary) {
public synchronized IndexShard createShard(int sShardId, ShardRouting routing) {
final boolean primary = routing.primary();
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
@ -299,7 +301,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
if (path == null) {
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, getAvgShardSizeInBytes(), this);
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, routing.getExpectedShardSize() == -1 ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), this);
logger.debug("{} creating using a new path [{}]", shardId, path);
} else {
logger.debug("{} creating using an existing path [{}]", shardId, path);

View File

@ -638,7 +638,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] creating shard", shardRouting.index(), shardId);
}
IndexShard indexShard = indexService.createShard(shardId, shardRouting.primary());
IndexShard indexShard = indexService.createShard(shardId, shardRouting);
indexShard.updateRoutingEntry(shardRouting, state.blocks().disableStatePersistence() == false);
indexShard.addFailedEngineListener(failedEngineHandler);
} catch (IndexShardAlreadyExistsException e) {

View File

@ -18,12 +18,16 @@
*/
package org.elasticsearch.cluster.allocation;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

View File

@ -35,7 +35,7 @@ public class AllocationIdTests extends ESTestCase {
assertThat(shard.allocationId(), nullValue());
logger.info("-- initialize the shard");
shard.initialize("node1");
shard.initialize("node1", -1);
AllocationId allocationId = shard.allocationId();
assertThat(allocationId, notNullValue());
assertThat(allocationId.getId(), notNullValue());
@ -53,12 +53,12 @@ public class AllocationIdTests extends ESTestCase {
public void testSuccessfulRelocation() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1");
shard.initialize("node1", -1);
shard.moveToStarted();
AllocationId allocationId = shard.allocationId();
logger.info("-- relocate the shard");
shard.relocate("node2");
shard.relocate("node2", -1);
assertThat(shard.allocationId(), not(equalTo(allocationId)));
assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
assertThat(shard.allocationId().getRelocationId(), notNullValue());
@ -77,12 +77,12 @@ public class AllocationIdTests extends ESTestCase {
public void testCancelRelocation() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1");
shard.initialize("node1", -1);
shard.moveToStarted();
AllocationId allocationId = shard.allocationId();
logger.info("-- relocate the shard");
shard.relocate("node2");
shard.relocate("node2", -1);
assertThat(shard.allocationId(), not(equalTo(allocationId)));
assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
assertThat(shard.allocationId().getRelocationId(), notNullValue());
@ -98,7 +98,7 @@ public class AllocationIdTests extends ESTestCase {
public void testMoveToUnassigned() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1");
shard.initialize("node1", -1);
shard.moveToStarted();
logger.info("-- move to unassigned");
@ -110,7 +110,7 @@ public class AllocationIdTests extends ESTestCase {
public void testReinitializing() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1");
shard.initialize("node1", -1);
shard.moveToStarted();
AllocationId allocationId = shard.allocationId();

View File

@ -42,7 +42,7 @@ public final class RandomShardRoutingMutator {
break;
case 1:
if (shardRouting.unassigned()) {
shardRouting.initialize(randomFrom(nodes));
shardRouting.initialize(randomFrom(nodes), -1);
}
break;
case 2:

View File

@ -25,7 +25,11 @@ package org.elasticsearch.cluster.routing;
public class ShardRoutingHelper {
public static void relocate(ShardRouting routing, String nodeId) {
routing.relocate(nodeId);
relocate(routing, nodeId, -1);
}
public static void relocate(ShardRouting routing, String nodeId, long expectedByteSize) {
routing.relocate(nodeId, expectedByteSize);
}
public static void moveToStarted(ShardRouting routing) {
@ -33,6 +37,10 @@ public class ShardRoutingHelper {
}
public static void initialize(ShardRouting routing, String nodeId) {
routing.initialize(nodeId);
initialize(routing, nodeId, -1);
}
public static void initialize(ShardRouting routing, String nodeId, long expectedSize) {
routing.initialize(nodeId, expectedSize);
}
}

View File

@ -103,12 +103,12 @@ public class ShardRoutingTests extends ESTestCase {
ShardRouting startedShard1 = new ShardRouting(initializingShard1);
startedShard1.moveToStarted();
ShardRouting sourceShard0a = new ShardRouting(startedShard0);
sourceShard0a.relocate("node2");
sourceShard0a.relocate("node2", -1);
ShardRouting targetShard0a = sourceShard0a.buildTargetRelocatingShard();
ShardRouting sourceShard0b = new ShardRouting(startedShard0);
sourceShard0b.relocate("node2");
sourceShard0b.relocate("node2", -1);
ShardRouting sourceShard1 = new ShardRouting(startedShard1);
sourceShard1.relocate("node2");
sourceShard1.relocate("node2", -1);
// test true scenarios
assertTrue(targetShard0a.isRelocationTargetOf(sourceShard0a));
@ -254,7 +254,7 @@ public class ShardRoutingTests extends ESTestCase {
}
try {
routing.initialize("boom");
routing.initialize("boom", -1);
fail("must be frozen");
} catch (IllegalStateException ex) {
// expected
@ -273,7 +273,7 @@ public class ShardRoutingTests extends ESTestCase {
}
try {
routing.relocate("foobar");
routing.relocate("foobar", -1);
fail("must be frozen");
} catch (IllegalStateException ex) {
// expected
@ -287,4 +287,39 @@ public class ShardRoutingTests extends ESTestCase {
assertEquals(version, routing.version());
}
}
public void testExpectedSize() throws IOException {
final int iters = randomIntBetween(10, 100);
for (int i = 0; i < iters; i++) {
ShardRouting routing = randomShardRouting("test", 0);
long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
if (routing.unassigned()) {
ShardRoutingHelper.initialize(routing, "foo", byteSize);
} else if (routing.started()) {
ShardRoutingHelper.relocate(routing, "foo", byteSize);
} else {
byteSize = -1;
}
if (randomBoolean()) {
BytesStreamOutput out = new BytesStreamOutput();
routing.writeTo(out);
routing = ShardRouting.readShardRoutingEntry(StreamInput.wrap(out.bytes()));
}
if (routing.initializing() || routing.relocating()) {
assertEquals(routing.toString(), byteSize, routing.getExpectedShardSize());
if (byteSize >= 0) {
assertTrue(routing.toString(), routing.toString().contains("expected_shard_size[" + byteSize + "]"));
}
if (routing.initializing()) {
routing = new ShardRouting(routing);
routing.moveToStarted();
assertEquals(-1, routing.getExpectedShardSize());
assertFalse(routing.toString(), routing.toString().contains("expected_shard_size[" + byteSize + "]"));
}
} else {
assertFalse(routing.toString(), routing.toString().contains("expected_shard_size [" + byteSize + "]"));
assertEquals(byteSize, routing.getExpectedShardSize());
}
}
}
}

View File

@ -28,25 +28,25 @@ import org.elasticsearch.test.ESTestCase;
public class TestShardRouting {
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true);
return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true);
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true);
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true, -1);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true);
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1);
}
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version,
UnassignedInfo unassignedInfo) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true);
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true, -1);
}
private static AllocationId buildAllocationId(ShardRoutingState state) {

View File

@ -192,7 +192,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
ShardRouting shard = TestShardRouting.newShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting mutable = new ShardRouting(shard);
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.initialize("test_node");
mutable.initialize("test_node", -1);
assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING));
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.moveToStarted();

View File

@ -369,37 +369,37 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
switch (sr.id()) {
case 0:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node1");
allocation.routingNodes().initialize(sr, "node1", -1);
} else {
allocation.routingNodes().initialize(sr, "node0");
allocation.routingNodes().initialize(sr, "node0", -1);
}
break;
case 1:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node1");
allocation.routingNodes().initialize(sr, "node1", -1);
} else {
allocation.routingNodes().initialize(sr, "node2");
allocation.routingNodes().initialize(sr, "node2", -1);
}
break;
case 2:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node3");
allocation.routingNodes().initialize(sr, "node3", -1);
} else {
allocation.routingNodes().initialize(sr, "node2");
allocation.routingNodes().initialize(sr, "node2", -1);
}
break;
case 3:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node3");
allocation.routingNodes().initialize(sr, "node3", -1);
} else {
allocation.routingNodes().initialize(sr, "node1");
allocation.routingNodes().initialize(sr, "node1", -1);
}
break;
case 4:
if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node2");
allocation.routingNodes().initialize(sr, "node2", -1);
} else {
allocation.routingNodes().initialize(sr, "node0");
allocation.routingNodes().initialize(sr, "node0", -1);
}
break;
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Test;
import java.util.Collections;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.cluster.routing.allocation.RoutingNodesUtils.numberOfShardsOfType;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(ExpectedShardSizeAllocationTests.class);
@Test
public void testInitializingHasExpectedSize() {
final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
AllocationService strategy = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
@Override
public Long getShardSize(ShardRouting shardRouting) {
if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) {
return byteSize;
}
return null;
}
};
}
@Override
public void addListener(Listener listener) {
}
});
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("Adding one node and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(1, clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING));
assertEquals(byteSize, clusterState.getRoutingNodes().getRoutingTable().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize());
logger.info("Start the primary shard");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(1, clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED));
assertEquals(1, clusterState.getRoutingNodes().unassigned().size());
logger.info("Add another one node and reroute");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(1, clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING));
assertEquals(byteSize, clusterState.getRoutingNodes().getRoutingTable().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize());
}
@Test
public void testExpectedSizeOnMove() {
final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
final AllocationService allocation = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
@Override
public Long getShardSize(ShardRouting shardRouting) {
if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) {
return byteSize;
}
return null;
}
};
}
@Override
public void addListener(Listener listener) {
}
});
logger.info("creating an index with 1 shard, no replica");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("adding two nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("start primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("move the shard");
String existingNodeId = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
String toNodeId;
if ("node1".equals(existingNodeId)) {
toNodeId = "node2";
} else {
toNodeId = "node1";
}
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(new ShardId("test", 0), existingNodeId, toNodeId)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertEquals(clusterState.getRoutingNodes().node(existingNodeId).get(0).state(), ShardRoutingState.RELOCATING);
assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).state(),ShardRoutingState.INITIALIZING);
assertEquals(clusterState.getRoutingNodes().node(existingNodeId).get(0).getExpectedShardSize(), byteSize);
assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).getExpectedShardSize(), byteSize);
logger.info("finish moving the shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node(existingNodeId).isEmpty(), equalTo(true));
assertThat(clusterState.getRoutingNodes().node(toNodeId).get(0).state(), equalTo(ShardRoutingState.STARTED));
assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).getExpectedShardSize(), -1);
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -27,12 +29,16 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Test;
import java.util.Collections;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
@ -47,12 +53,33 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
@Test
public void testRebalanceOnlyAfterAllShardsAreActive() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());
final long[] sizes = new long[5];
for (int i =0; i < sizes.length; i++) {
sizes[i] = randomIntBetween(0, Integer.MAX_VALUE);
}
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(),
new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
@Override
public Long getShardSize(ShardRouting shardRouting) {
if (shardRouting.index().equals("test")) {
return sizes[shardRouting.getId()];
}
return null; }
};
}
@Override
public void addListener(Listener listener) {
}
});
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder()
@ -97,6 +124,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertEquals(routingTable.index("test").shard(i).replicaShards().get(0).getExpectedShardSize(), sizes[i]);
}
logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened");
@ -112,6 +140,8 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertEquals(routingTable.index("test").shard(i).replicaShards().get(0).getExpectedShardSize(), sizes[i]);
}
logger.info("start the replica shards, rebalancing should start");
@ -124,6 +154,16 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
// we only allow one relocation at a time
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
int num = 0;
for (ShardRouting routing : routingTable.index("test").shard(i).shards()) {
if (routing.state() == RELOCATING || routing.state() == INITIALIZING) {
assertEquals(routing.getExpectedShardSize(), sizes[i]);
num++;
}
}
assertTrue(num > 0);
}
logger.info("complete relocation, other half of relocation should happen");
routingNodes = clusterState.getRoutingNodes();
@ -135,6 +175,14 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
// we now only relocate 3, since 2 remain where they are!
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
for (ShardRouting routing : routingTable.index("test").shard(i).shards()) {
if (routing.state() == RELOCATING || routing.state() == INITIALIZING) {
assertEquals(routing.getExpectedShardSize(), sizes[i]);
}
}
}
logger.info("complete relocation, thats it!");
routingNodes = clusterState.getRoutingNodes();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.gateway;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -302,7 +303,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null);
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
}
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
@ -321,7 +322,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null);
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
}
class TestAllocator extends ReplicaShardAllocator {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -454,6 +455,22 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertPathHasBeenCleared(idxPath);
}
public void testExpectedShardSizeIsPresent() throws InterruptedException {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
for (int i = 0; i < 50; i++) {
client().prepareIndex("test", "test").setSource("{}").get();
}
ensureGreen("test");
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
InternalClusterInfoService.ClusterInfoUpdateJob job = clusterInfoService.new ClusterInfoUpdateJob(false);
job.run();
ClusterState state = getInstanceFromNode(ClusterService.class).state();
Long test = clusterInfoService.getClusterInfo().getShardSize(state.getRoutingTable().index("test").getShards().get(0).primaryShard());
assertNotNull(test);
assertTrue(test > 0);
}
public void testIndexCanChangeCustomDataPath() throws Exception {
Environment env = getInstanceFromNode(Environment.class);
Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
@ -63,7 +64,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
}
public static AllocationService createAllocationService(Settings settings, Random random) {
return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random);
return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random);
}
public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
@ -72,6 +73,13 @@ public abstract class ESAllocationTestCase extends ESTestCase {
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
}
public static AllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
return new AllocationService(settings,
randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService);
}
public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
final List<Class<? extends AllocationDecider>> defaultAllocationDeciders = ClusterModule.DEFAULT_ALLOCATION_DECIDERS;