Merge pull request #12947 from s1monw/expected_shard_size

Add `expectedShardSize` to ShardRouting and use it in path.data allocation
This commit is contained in:
Simon Willnauer 2015-08-21 08:34:38 +02:00
commit 3fb2d8e448
25 changed files with 412 additions and 74 deletions

View File

@ -30,7 +30,7 @@ import java.util.Map;
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code> * <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
* for the key used in the shardSizes map * for the key used in the shardSizes map
*/ */
public final class ClusterInfo { public class ClusterInfo {
private final Map<String, DiskUsage> usages; private final Map<String, DiskUsage> usages;
final Map<String, Long> shardSizes; final Map<String, Long> shardSizes;
@ -54,6 +54,11 @@ public final class ClusterInfo {
return shardSizes.get(shardIdentifierFromRouting(shardRouting)); return shardSizes.get(shardIdentifierFromRouting(shardRouting));
} }
public long getShardSize(ShardRouting shardRouting, long defaultValue) {
Long shardSize = getShardSize(shardRouting);
return shardSize == null ? defaultValue : shardSize;
}
/** /**
* Method that incorporates the ShardId for the shard into a string that * Method that incorporates the ShardId for the shard into a string that
* includes a 'p' or 'r' depending on whether the shard is a primary. * includes a 'p' or 'r' depending on whether the shard is a primary.

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster; package org.elasticsearch.cluster;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -36,6 +37,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
@ -45,6 +47,7 @@ import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.util.*; import java.util.*;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* InternalClusterInfoService provides the ClusterInfoService interface, * InternalClusterInfoService provides the ClusterInfoService interface,

View File

@ -345,10 +345,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/** /**
* Moves a shard from unassigned to initialize state * Moves a shard from unassigned to initialize state
*/ */
public void initialize(ShardRouting shard, String nodeId) { public void initialize(ShardRouting shard, String nodeId, long expectedSize) {
ensureMutable(); ensureMutable();
assert shard.unassigned() : shard; assert shard.unassigned() : shard;
shard.initialize(nodeId); shard.initialize(nodeId, expectedSize);
node(nodeId).add(shard); node(nodeId).add(shard);
inactiveShardCount++; inactiveShardCount++;
if (shard.primary()) { if (shard.primary()) {
@ -362,10 +362,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
* shard as well as assigning it. And returning the target initializing * shard as well as assigning it. And returning the target initializing
* shard. * shard.
*/ */
public ShardRouting relocate(ShardRouting shard, String nodeId) { public ShardRouting relocate(ShardRouting shard, String nodeId, long expectedShardSize) {
ensureMutable(); ensureMutable();
relocatingShards++; relocatingShards++;
shard.relocate(nodeId); shard.relocate(nodeId, expectedShardSize);
ShardRouting target = shard.buildTargetRelocatingShard(); ShardRouting target = shard.buildTargetRelocatingShard();
node(target.currentNodeId()).add(target); node(target.currentNodeId()).add(target);
assignedShardsAdd(target); assignedShardsAdd(target);
@ -608,16 +608,9 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/** /**
* Initializes the current unassigned shard and moves it from the unassigned list. * Initializes the current unassigned shard and moves it from the unassigned list.
*/ */
public void initialize(String nodeId) { public void initialize(String nodeId, long version, long expectedShardSize) {
initialize(nodeId, current.version());
}
/**
* Initializes the current unassigned shard and moves it from the unassigned list.
*/
public void initialize(String nodeId, long version) {
innerRemove(); innerRemove();
nodes.initialize(new ShardRouting(current, version), nodeId); nodes.initialize(new ShardRouting(current, version), nodeId, expectedShardSize);
} }
/** /**

View File

@ -37,6 +37,11 @@ import java.util.List;
*/ */
public final class ShardRouting implements Streamable, ToXContent { public final class ShardRouting implements Streamable, ToXContent {
/**
* Used if shard size is not available
*/
public static final long UNAVAILABLE_EXPECTED_SHARD_SIZE = -1;
private String index; private String index;
private int shardId; private int shardId;
private String currentNodeId; private String currentNodeId;
@ -50,6 +55,7 @@ public final class ShardRouting implements Streamable, ToXContent {
private final transient List<ShardRouting> asList; private final transient List<ShardRouting> asList;
private transient ShardId shardIdentifier; private transient ShardId shardIdentifier;
private boolean frozen = false; private boolean frozen = false;
private long expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
private ShardRouting() { private ShardRouting() {
this.asList = Collections.singletonList(this); this.asList = Collections.singletonList(this);
@ -60,7 +66,7 @@ public final class ShardRouting implements Streamable, ToXContent {
} }
public ShardRouting(ShardRouting copy, long version) { public ShardRouting(ShardRouting copy, long version) {
this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true); this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true, copy.getExpectedShardSize());
} }
/** /**
@ -69,7 +75,7 @@ public final class ShardRouting implements Streamable, ToXContent {
*/ */
ShardRouting(String index, int shardId, String currentNodeId, ShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version,
UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal) { UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal, long expectedShardSize) {
this.index = index; this.index = index;
this.shardId = shardId; this.shardId = shardId;
this.currentNodeId = currentNodeId; this.currentNodeId = currentNodeId;
@ -81,6 +87,9 @@ public final class ShardRouting implements Streamable, ToXContent {
this.restoreSource = restoreSource; this.restoreSource = restoreSource;
this.unassignedInfo = unassignedInfo; this.unassignedInfo = unassignedInfo;
this.allocationId = allocationId; this.allocationId = allocationId;
this.expectedShardSize = expectedShardSize;
assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE || state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert expectedShardSize >= 0 || state != ShardRoutingState.INITIALIZING || state != ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta"; assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
if (!internal) { if (!internal) {
assert state == ShardRoutingState.UNASSIGNED; assert state == ShardRoutingState.UNASSIGNED;
@ -88,13 +97,14 @@ public final class ShardRouting implements Streamable, ToXContent {
assert relocatingNodeId == null; assert relocatingNodeId == null;
assert allocationId == null; assert allocationId == null;
} }
} }
/** /**
* Creates a new unassigned shard. * Creates a new unassigned shard.
*/ */
public static ShardRouting newUnassigned(String index, int shardId, RestoreSource restoreSource, boolean primary, UnassignedInfo unassignedInfo) { public static ShardRouting newUnassigned(String index, int shardId, RestoreSource restoreSource, boolean primary, UnassignedInfo unassignedInfo) {
return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true); return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true, UNAVAILABLE_EXPECTED_SHARD_SIZE);
} }
/** /**
@ -205,7 +215,7 @@ public final class ShardRouting implements Streamable, ToXContent {
public ShardRouting buildTargetRelocatingShard() { public ShardRouting buildTargetRelocatingShard() {
assert relocating(); assert relocating();
return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo, return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo,
AllocationId.newTargetRelocation(allocationId), true); AllocationId.newTargetRelocation(allocationId), true, expectedShardSize);
} }
/** /**
@ -317,6 +327,11 @@ public final class ShardRouting implements Streamable, ToXContent {
if (in.readBoolean()) { if (in.readBoolean()) {
allocationId = new AllocationId(in); allocationId = new AllocationId(in);
} }
if (relocating() || initializing()) {
expectedShardSize = in.readLong();
} else {
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
}
freeze(); freeze();
} }
@ -368,6 +383,10 @@ public final class ShardRouting implements Streamable, ToXContent {
} else { } else {
out.writeBoolean(false); out.writeBoolean(false);
} }
if (relocating() || initializing()) {
out.writeLong(expectedShardSize);
}
} }
@Override @Override
@ -397,12 +416,13 @@ public final class ShardRouting implements Streamable, ToXContent {
relocatingNodeId = null; relocatingNodeId = null;
this.unassignedInfo = unassignedInfo; this.unassignedInfo = unassignedInfo;
allocationId = null; allocationId = null;
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
} }
/** /**
* Initializes an unassigned shard on a node. * Initializes an unassigned shard on a node.
*/ */
void initialize(String nodeId) { void initialize(String nodeId, long expectedShardSize) {
ensureNotFrozen(); ensureNotFrozen();
version++; version++;
assert state == ShardRoutingState.UNASSIGNED : this; assert state == ShardRoutingState.UNASSIGNED : this;
@ -410,6 +430,7 @@ public final class ShardRouting implements Streamable, ToXContent {
state = ShardRoutingState.INITIALIZING; state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId; currentNodeId = nodeId;
allocationId = AllocationId.newInitializing(); allocationId = AllocationId.newInitializing();
this.expectedShardSize = expectedShardSize;
} }
/** /**
@ -417,13 +438,14 @@ public final class ShardRouting implements Streamable, ToXContent {
* *
* @param relocatingNodeId id of the node to relocate the shard * @param relocatingNodeId id of the node to relocate the shard
*/ */
void relocate(String relocatingNodeId) { void relocate(String relocatingNodeId, long expectedShardSize) {
ensureNotFrozen(); ensureNotFrozen();
version++; version++;
assert state == ShardRoutingState.STARTED : "current shard has to be started in order to be relocated " + this; assert state == ShardRoutingState.STARTED : "current shard has to be started in order to be relocated " + this;
state = ShardRoutingState.RELOCATING; state = ShardRoutingState.RELOCATING;
this.relocatingNodeId = relocatingNodeId; this.relocatingNodeId = relocatingNodeId;
this.allocationId = AllocationId.newRelocation(allocationId); this.allocationId = AllocationId.newRelocation(allocationId);
this.expectedShardSize = expectedShardSize;
} }
/** /**
@ -436,7 +458,7 @@ public final class ShardRouting implements Streamable, ToXContent {
assert state == ShardRoutingState.RELOCATING : this; assert state == ShardRoutingState.RELOCATING : this;
assert assignedToNode() : this; assert assignedToNode() : this;
assert relocatingNodeId != null : this; assert relocatingNodeId != null : this;
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
state = ShardRoutingState.STARTED; state = ShardRoutingState.STARTED;
relocatingNodeId = null; relocatingNodeId = null;
allocationId = AllocationId.cancelRelocation(allocationId); allocationId = AllocationId.cancelRelocation(allocationId);
@ -470,6 +492,7 @@ public final class ShardRouting implements Streamable, ToXContent {
// relocation target // relocation target
allocationId = AllocationId.finishRelocation(allocationId); allocationId = AllocationId.finishRelocation(allocationId);
} }
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
state = ShardRoutingState.STARTED; state = ShardRoutingState.STARTED;
} }
@ -669,6 +692,9 @@ public final class ShardRouting implements Streamable, ToXContent {
if (this.unassignedInfo != null) { if (this.unassignedInfo != null) {
sb.append(", ").append(unassignedInfo.toString()); sb.append(", ").append(unassignedInfo.toString());
} }
if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) {
sb.append(", expected_shard_size[").append(expectedShardSize).append("]");
}
return sb.toString(); return sb.toString();
} }
@ -682,7 +708,9 @@ public final class ShardRouting implements Streamable, ToXContent {
.field("shard", shardId().id()) .field("shard", shardId().id())
.field("index", shardId().index().name()) .field("index", shardId().index().name())
.field("version", version); .field("version", version);
if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE){
builder.field("expected_shard_size_in_bytes", expectedShardSize);
}
if (restoreSource() != null) { if (restoreSource() != null) {
builder.field("restore_source"); builder.field("restore_source");
restoreSource().toXContent(builder, params); restoreSource().toXContent(builder, params);
@ -709,4 +737,12 @@ public final class ShardRouting implements Streamable, ToXContent {
boolean isFrozen() { boolean isFrozen() {
return frozen; return frozen;
} }
/**
* Returns the expected shard size for {@link ShardRoutingState#RELOCATING} and {@link ShardRoutingState#INITIALIZING}
* shards. If it's size is not available {@value #UNAVAILABLE_EXPECTED_SHARD_SIZE} will be returned.
*/
public long getExpectedShardSize() {
return expectedShardSize;
}
} }

View File

@ -507,7 +507,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
Decision decision = allocation.deciders().canAllocate(shard, target, allocation); Decision decision = allocation.deciders().canAllocate(shard, target, allocation);
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too? if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
sourceNode.removeShard(shard); sourceNode.removeShard(shard);
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId()); ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
currentNode.addShard(targetRelocatingShard, decision); currentNode.addShard(targetRelocatingShard, decision);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId()); logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
@ -687,7 +687,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
} }
routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId()); routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
changed = true; changed = true;
continue; // don't add to ignoreUnassigned continue; // don't add to ignoreUnassigned
} else { } else {
@ -779,10 +779,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
/* now allocate on the cluster - if we are started we need to relocate the shard */ /* now allocate on the cluster - if we are started we need to relocate the shard */
if (candidate.started()) { if (candidate.started()) {
RoutingNode lowRoutingNode = routingNodes.node(minNode.getNodeId()); RoutingNode lowRoutingNode = routingNodes.node(minNode.getNodeId());
routingNodes.relocate(candidate, lowRoutingNode.nodeId()); routingNodes.relocate(candidate, lowRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
} else { } else {
routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId()); routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
} }
return true; return true;

View File

@ -231,7 +231,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure())); "force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure()));
} }
it.initialize(routingNode.nodeId()); it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
break; break;
} }
return new RerouteExplanation(this, decision); return new RerouteExplanation(this, decision);

View File

@ -178,7 +178,7 @@ public class MoveAllocationCommand implements AllocationCommand {
if (decision.type() == Decision.Type.THROTTLE) { if (decision.type() == Decision.Type.THROTTLE) {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it... // its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
} }
allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId()); allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
} }
if (!found) { if (!found) {

View File

@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Classes;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -94,12 +94,12 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
DiscoveryNode node = nodesToAllocate.yesNodes.get(0); DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node); logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true; changed = true;
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion); unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) { } else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) {
DiscoveryNode node = nodesToAllocate.noNodes.get(0); DiscoveryNode node = nodesToAllocate.noNodes.get(0);
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node); logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true; changed = true;
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion); unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else { } else {
// we are throttling this, but we have enough to allocate to this node, ignore it for now // we are throttling this, but we have enough to allocate to this node, ignore it for now
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes); logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes);

View File

@ -169,7 +169,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node()); logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
// we found a match // we found a match
changed = true; changed = true;
unassignedIterator.initialize(nodeWithHighestMatch.nodeId()); unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), shard.version(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
} }
} else if (matchingNodes.hasAnyData() == false) { } else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Iterators;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
@ -270,7 +271,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
} }
} }
public synchronized IndexShard createShard(int sShardId, boolean primary) { public synchronized IndexShard createShard(int sShardId, ShardRouting routing) {
final boolean primary = routing.primary();
/* /*
* TODO: we execute this in parallel but it's a synced method. Yet, we might * TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just * be able to serialize the execution via the cluster state in the future. for now we just
@ -299,7 +301,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
} }
} }
if (path == null) { if (path == null) {
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, getAvgShardSizeInBytes(), this); path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), this);
logger.debug("{} creating using a new path [{}]", shardId, path); logger.debug("{} creating using a new path [{}]", shardId, path);
} else { } else {
logger.debug("{} creating using an existing path [{}]", shardId, path); logger.debug("{} creating using an existing path [{}]", shardId, path);

View File

@ -638,7 +638,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] creating shard", shardRouting.index(), shardId); logger.debug("[{}][{}] creating shard", shardRouting.index(), shardId);
} }
IndexShard indexShard = indexService.createShard(shardId, shardRouting.primary()); IndexShard indexShard = indexService.createShard(shardId, shardRouting);
indexShard.updateRoutingEntry(shardRouting, state.blocks().disableStatePersistence() == false); indexShard.updateRoutingEntry(shardRouting, state.blocks().disableStatePersistence() == false);
indexShard.addFailedEngineListener(failedEngineHandler); indexShard.addFailedEngineListener(failedEngineHandler);
} catch (IndexShardAlreadyExistsException e) { } catch (IndexShardAlreadyExistsException e) {

View File

@ -18,12 +18,16 @@
*/ */
package org.elasticsearch.cluster.allocation; package org.elasticsearch.cluster.allocation;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test; import org.junit.Test;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;

View File

@ -35,7 +35,7 @@ public class AllocationIdTests extends ESTestCase {
assertThat(shard.allocationId(), nullValue()); assertThat(shard.allocationId(), nullValue());
logger.info("-- initialize the shard"); logger.info("-- initialize the shard");
shard.initialize("node1"); shard.initialize("node1", -1);
AllocationId allocationId = shard.allocationId(); AllocationId allocationId = shard.allocationId();
assertThat(allocationId, notNullValue()); assertThat(allocationId, notNullValue());
assertThat(allocationId.getId(), notNullValue()); assertThat(allocationId.getId(), notNullValue());
@ -53,12 +53,12 @@ public class AllocationIdTests extends ESTestCase {
public void testSuccessfulRelocation() { public void testSuccessfulRelocation() {
logger.info("-- build started shard"); logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1"); shard.initialize("node1", -1);
shard.moveToStarted(); shard.moveToStarted();
AllocationId allocationId = shard.allocationId(); AllocationId allocationId = shard.allocationId();
logger.info("-- relocate the shard"); logger.info("-- relocate the shard");
shard.relocate("node2"); shard.relocate("node2", -1);
assertThat(shard.allocationId(), not(equalTo(allocationId))); assertThat(shard.allocationId(), not(equalTo(allocationId)));
assertThat(shard.allocationId().getId(), equalTo(allocationId.getId())); assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
assertThat(shard.allocationId().getRelocationId(), notNullValue()); assertThat(shard.allocationId().getRelocationId(), notNullValue());
@ -77,12 +77,12 @@ public class AllocationIdTests extends ESTestCase {
public void testCancelRelocation() { public void testCancelRelocation() {
logger.info("-- build started shard"); logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1"); shard.initialize("node1", -1);
shard.moveToStarted(); shard.moveToStarted();
AllocationId allocationId = shard.allocationId(); AllocationId allocationId = shard.allocationId();
logger.info("-- relocate the shard"); logger.info("-- relocate the shard");
shard.relocate("node2"); shard.relocate("node2", -1);
assertThat(shard.allocationId(), not(equalTo(allocationId))); assertThat(shard.allocationId(), not(equalTo(allocationId)));
assertThat(shard.allocationId().getId(), equalTo(allocationId.getId())); assertThat(shard.allocationId().getId(), equalTo(allocationId.getId()));
assertThat(shard.allocationId().getRelocationId(), notNullValue()); assertThat(shard.allocationId().getRelocationId(), notNullValue());
@ -98,7 +98,7 @@ public class AllocationIdTests extends ESTestCase {
public void testMoveToUnassigned() { public void testMoveToUnassigned() {
logger.info("-- build started shard"); logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1"); shard.initialize("node1", -1);
shard.moveToStarted(); shard.moveToStarted();
logger.info("-- move to unassigned"); logger.info("-- move to unassigned");
@ -110,7 +110,7 @@ public class AllocationIdTests extends ESTestCase {
public void testReinitializing() { public void testReinitializing() {
logger.info("-- build started shard"); logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); ShardRouting shard = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard.initialize("node1"); shard.initialize("node1", -1);
shard.moveToStarted(); shard.moveToStarted();
AllocationId allocationId = shard.allocationId(); AllocationId allocationId = shard.allocationId();

View File

@ -42,7 +42,7 @@ public final class RandomShardRoutingMutator {
break; break;
case 1: case 1:
if (shardRouting.unassigned()) { if (shardRouting.unassigned()) {
shardRouting.initialize(randomFrom(nodes)); shardRouting.initialize(randomFrom(nodes), -1);
} }
break; break;
case 2: case 2:

View File

@ -25,7 +25,11 @@ package org.elasticsearch.cluster.routing;
public class ShardRoutingHelper { public class ShardRoutingHelper {
public static void relocate(ShardRouting routing, String nodeId) { public static void relocate(ShardRouting routing, String nodeId) {
routing.relocate(nodeId); relocate(routing, nodeId, -1);
}
public static void relocate(ShardRouting routing, String nodeId, long expectedByteSize) {
routing.relocate(nodeId, expectedByteSize);
} }
public static void moveToStarted(ShardRouting routing) { public static void moveToStarted(ShardRouting routing) {
@ -33,6 +37,10 @@ public class ShardRoutingHelper {
} }
public static void initialize(ShardRouting routing, String nodeId) { public static void initialize(ShardRouting routing, String nodeId) {
routing.initialize(nodeId); initialize(routing, nodeId, -1);
}
public static void initialize(ShardRouting routing, String nodeId, long expectedSize) {
routing.initialize(nodeId, expectedSize);
} }
} }

View File

@ -103,12 +103,12 @@ public class ShardRoutingTests extends ESTestCase {
ShardRouting startedShard1 = new ShardRouting(initializingShard1); ShardRouting startedShard1 = new ShardRouting(initializingShard1);
startedShard1.moveToStarted(); startedShard1.moveToStarted();
ShardRouting sourceShard0a = new ShardRouting(startedShard0); ShardRouting sourceShard0a = new ShardRouting(startedShard0);
sourceShard0a.relocate("node2"); sourceShard0a.relocate("node2", -1);
ShardRouting targetShard0a = sourceShard0a.buildTargetRelocatingShard(); ShardRouting targetShard0a = sourceShard0a.buildTargetRelocatingShard();
ShardRouting sourceShard0b = new ShardRouting(startedShard0); ShardRouting sourceShard0b = new ShardRouting(startedShard0);
sourceShard0b.relocate("node2"); sourceShard0b.relocate("node2", -1);
ShardRouting sourceShard1 = new ShardRouting(startedShard1); ShardRouting sourceShard1 = new ShardRouting(startedShard1);
sourceShard1.relocate("node2"); sourceShard1.relocate("node2", -1);
// test true scenarios // test true scenarios
assertTrue(targetShard0a.isRelocationTargetOf(sourceShard0a)); assertTrue(targetShard0a.isRelocationTargetOf(sourceShard0a));
@ -254,7 +254,7 @@ public class ShardRoutingTests extends ESTestCase {
} }
try { try {
routing.initialize("boom"); routing.initialize("boom", -1);
fail("must be frozen"); fail("must be frozen");
} catch (IllegalStateException ex) { } catch (IllegalStateException ex) {
// expected // expected
@ -273,7 +273,7 @@ public class ShardRoutingTests extends ESTestCase {
} }
try { try {
routing.relocate("foobar"); routing.relocate("foobar", -1);
fail("must be frozen"); fail("must be frozen");
} catch (IllegalStateException ex) { } catch (IllegalStateException ex) {
// expected // expected
@ -287,4 +287,39 @@ public class ShardRoutingTests extends ESTestCase {
assertEquals(version, routing.version()); assertEquals(version, routing.version());
} }
} }
public void testExpectedSize() throws IOException {
final int iters = randomIntBetween(10, 100);
for (int i = 0; i < iters; i++) {
ShardRouting routing = randomShardRouting("test", 0);
long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
if (routing.unassigned()) {
ShardRoutingHelper.initialize(routing, "foo", byteSize);
} else if (routing.started()) {
ShardRoutingHelper.relocate(routing, "foo", byteSize);
} else {
byteSize = -1;
}
if (randomBoolean()) {
BytesStreamOutput out = new BytesStreamOutput();
routing.writeTo(out);
routing = ShardRouting.readShardRoutingEntry(StreamInput.wrap(out.bytes()));
}
if (routing.initializing() || routing.relocating()) {
assertEquals(routing.toString(), byteSize, routing.getExpectedShardSize());
if (byteSize >= 0) {
assertTrue(routing.toString(), routing.toString().contains("expected_shard_size[" + byteSize + "]"));
}
if (routing.initializing()) {
routing = new ShardRouting(routing);
routing.moveToStarted();
assertEquals(-1, routing.getExpectedShardSize());
assertFalse(routing.toString(), routing.toString().contains("expected_shard_size[" + byteSize + "]"));
}
} else {
assertFalse(routing.toString(), routing.toString().contains("expected_shard_size [" + byteSize + "]"));
assertEquals(byteSize, routing.getExpectedShardSize());
}
}
}
} }

View File

@ -28,25 +28,25 @@ import org.elasticsearch.test.ESTestCase;
public class TestShardRouting { public class TestShardRouting {
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) { public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true); return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1);
} }
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, long version) { public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true); return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1);
} }
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) { public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true); return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true, -1);
} }
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) { public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true); return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true, -1);
} }
public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version,
UnassignedInfo unassignedInfo) { UnassignedInfo unassignedInfo) {
return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true); return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, unassignedInfo, buildAllocationId(state), true, -1);
} }
private static AllocationId buildAllocationId(ShardRoutingState state) { private static AllocationId buildAllocationId(ShardRoutingState state) {

View File

@ -192,7 +192,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
ShardRouting shard = TestShardRouting.newShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)); ShardRouting shard = TestShardRouting.newShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting mutable = new ShardRouting(shard); ShardRouting mutable = new ShardRouting(shard);
assertThat(mutable.unassignedInfo(), notNullValue()); assertThat(mutable.unassignedInfo(), notNullValue());
mutable.initialize("test_node"); mutable.initialize("test_node", -1);
assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING)); assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING));
assertThat(mutable.unassignedInfo(), notNullValue()); assertThat(mutable.unassignedInfo(), notNullValue());
mutable.moveToStarted(); mutable.moveToStarted();

View File

@ -369,37 +369,37 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
switch (sr.id()) { switch (sr.id()) {
case 0: case 0:
if (sr.primary()) { if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node1"); allocation.routingNodes().initialize(sr, "node1", -1);
} else { } else {
allocation.routingNodes().initialize(sr, "node0"); allocation.routingNodes().initialize(sr, "node0", -1);
} }
break; break;
case 1: case 1:
if (sr.primary()) { if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node1"); allocation.routingNodes().initialize(sr, "node1", -1);
} else { } else {
allocation.routingNodes().initialize(sr, "node2"); allocation.routingNodes().initialize(sr, "node2", -1);
} }
break; break;
case 2: case 2:
if (sr.primary()) { if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node3"); allocation.routingNodes().initialize(sr, "node3", -1);
} else { } else {
allocation.routingNodes().initialize(sr, "node2"); allocation.routingNodes().initialize(sr, "node2", -1);
} }
break; break;
case 3: case 3:
if (sr.primary()) { if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node3"); allocation.routingNodes().initialize(sr, "node3", -1);
} else { } else {
allocation.routingNodes().initialize(sr, "node1"); allocation.routingNodes().initialize(sr, "node1", -1);
} }
break; break;
case 4: case 4:
if (sr.primary()) { if (sr.primary()) {
allocation.routingNodes().initialize(sr, "node2"); allocation.routingNodes().initialize(sr, "node2", -1);
} else { } else {
allocation.routingNodes().initialize(sr, "node0"); allocation.routingNodes().initialize(sr, "node0", -1);
} }
break; break;
} }

View File

@ -0,0 +1,179 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Test;
import java.util.Collections;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.cluster.routing.allocation.RoutingNodesUtils.numberOfShardsOfType;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(ExpectedShardSizeAllocationTests.class);
@Test
public void testInitializingHasExpectedSize() {
final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
AllocationService strategy = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
@Override
public Long getShardSize(ShardRouting shardRouting) {
if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) {
return byteSize;
}
return null;
}
};
}
@Override
public void addListener(Listener listener) {
}
});
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("Adding one node and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(1, clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING));
assertEquals(byteSize, clusterState.getRoutingNodes().getRoutingTable().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize());
logger.info("Start the primary shard");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(1, clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED));
assertEquals(1, clusterState.getRoutingNodes().unassigned().size());
logger.info("Add another one node and reroute");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(1, clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING));
assertEquals(byteSize, clusterState.getRoutingNodes().getRoutingTable().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getExpectedShardSize());
}
@Test
public void testExpectedSizeOnMove() {
final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
final AllocationService allocation = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
@Override
public Long getShardSize(ShardRouting shardRouting) {
if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) {
return byteSize;
}
return null;
}
};
}
@Override
public void addListener(Listener listener) {
}
});
logger.info("creating an index with 1 shard, no replica");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("adding two nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("start primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
logger.info("move the shard");
String existingNodeId = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId();
String toNodeId;
if ("node1".equals(existingNodeId)) {
toNodeId = "node2";
} else {
toNodeId = "node1";
}
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand(new ShardId("test", 0), existingNodeId, toNodeId)));
assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertEquals(clusterState.getRoutingNodes().node(existingNodeId).get(0).state(), ShardRoutingState.RELOCATING);
assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).state(),ShardRoutingState.INITIALIZING);
assertEquals(clusterState.getRoutingNodes().node(existingNodeId).get(0).getExpectedShardSize(), byteSize);
assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).getExpectedShardSize(), byteSize);
logger.info("finish moving the shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node(existingNodeId).isEmpty(), equalTo(true));
assertThat(clusterState.getRoutingNodes().node(toNodeId).get(0).state(), equalTo(ShardRoutingState.STARTED));
assertEquals(clusterState.getRoutingNodes().node(toNodeId).get(0).getExpectedShardSize(), -1);
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.cluster.routing.allocation; package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -27,12 +29,16 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Test; import org.junit.Test;
import java.util.Collections;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -47,12 +53,33 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
@Test @Test
public void testRebalanceOnlyAfterAllShardsAreActive() { public void testRebalanceOnlyAfterAllShardsAreActive() {
AllocationService strategy = createAllocationService(settingsBuilder() final long[] sizes = new long[5];
.put("cluster.routing.allocation.concurrent_recoveries", 10) for (int i =0; i < sizes.length; i++) {
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") sizes[i] = randomIntBetween(0, Integer.MAX_VALUE);
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) }
.build());
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(),
new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
@Override
public Long getShardSize(ShardRouting shardRouting) {
if (shardRouting.index().equals("test")) {
return sizes[shardRouting.getId()];
}
return null; }
};
}
@Override
public void addListener(Listener listener) {
}
});
logger.info("Building initial routing table"); logger.info("Building initial routing table");
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
@ -97,6 +124,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertEquals(routingTable.index("test").shard(i).replicaShards().get(0).getExpectedShardSize(), sizes[i]);
} }
logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened");
@ -112,6 +140,8 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertEquals(routingTable.index("test").shard(i).replicaShards().get(0).getExpectedShardSize(), sizes[i]);
} }
logger.info("start the replica shards, rebalancing should start"); logger.info("start the replica shards, rebalancing should start");
@ -124,6 +154,16 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
// we only allow one relocation at a time // we only allow one relocation at a time
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5)); assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
int num = 0;
for (ShardRouting routing : routingTable.index("test").shard(i).shards()) {
if (routing.state() == RELOCATING || routing.state() == INITIALIZING) {
assertEquals(routing.getExpectedShardSize(), sizes[i]);
num++;
}
}
assertTrue(num > 0);
}
logger.info("complete relocation, other half of relocation should happen"); logger.info("complete relocation, other half of relocation should happen");
routingNodes = clusterState.getRoutingNodes(); routingNodes = clusterState.getRoutingNodes();
@ -135,6 +175,14 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
// we now only relocate 3, since 2 remain where they are! // we now only relocate 3, since 2 remain where they are!
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7)); assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3)); assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
for (ShardRouting routing : routingTable.index("test").shard(i).shards()) {
if (routing.state() == RELOCATING || routing.state() == INITIALIZING) {
assertEquals(routing.getExpectedShardSize(), sizes[i]);
}
}
}
logger.info("complete relocation, thats it!"); logger.info("complete relocation, thats it!");
routingNodes = clusterState.getRoutingNodes(); routingNodes = clusterState.getRoutingNodes();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.gateway;
import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -302,7 +303,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData) .metaData(metaData)
.routingTable(routingTable) .routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null); return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
} }
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
@ -321,7 +322,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData) .metaData(metaData)
.routingTable(routingTable) .routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null); return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
} }
class TestAllocator extends ReplicaShardAllocator { class TestAllocator extends ReplicaShardAllocator {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -454,6 +455,22 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertPathHasBeenCleared(idxPath); assertPathHasBeenCleared(idxPath);
} }
public void testExpectedShardSizeIsPresent() throws InterruptedException {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
for (int i = 0; i < 50; i++) {
client().prepareIndex("test", "test").setSource("{}").get();
}
ensureGreen("test");
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
InternalClusterInfoService.ClusterInfoUpdateJob job = clusterInfoService.new ClusterInfoUpdateJob(false);
job.run();
ClusterState state = getInstanceFromNode(ClusterService.class).state();
Long test = clusterInfoService.getClusterInfo().getShardSize(state.getRoutingTable().index("test").getShards().get(0).primaryShard());
assertNotNull(test);
assertTrue(test > 0);
}
public void testIndexCanChangeCustomDataPath() throws Exception { public void testIndexCanChangeCustomDataPath() throws Exception {
Environment env = getInstanceFromNode(Environment.class); Environment env = getInstanceFromNode(Environment.class);
Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10)); Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test; package org.elasticsearch.test;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.EmptyClusterInfoService;
@ -63,7 +64,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
} }
public static AllocationService createAllocationService(Settings settings, Random random) { public static AllocationService createAllocationService(Settings settings, Random random) {
return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random); return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random);
} }
public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) { public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
@ -72,6 +73,13 @@ public abstract class ESAllocationTestCase extends ESTestCase {
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
} }
public static AllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
return new AllocationService(settings,
randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService);
}
public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) { public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
final List<Class<? extends AllocationDecider>> defaultAllocationDeciders = ClusterModule.DEFAULT_ALLOCATION_DECIDERS; final List<Class<? extends AllocationDecider>> defaultAllocationDeciders = ClusterModule.DEFAULT_ALLOCATION_DECIDERS;