Cleanup ShardRoutingState uses and hide implementation details of ClusterInfo

This commit is contained in:
Simon Willnauer 2015-07-08 14:36:26 +02:00
parent 097b132238
commit f9a45fd605
11 changed files with 47 additions and 49 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.routing.ShardRouting;
import java.util.Map;
@ -31,10 +32,10 @@ import java.util.Map;
*/
public class ClusterInfo {
private final ImmutableMap<String, DiskUsage> usages;
private final ImmutableMap<String, Long> shardSizes;
private final Map<String, DiskUsage> usages;
final Map<String, Long> shardSizes;
public ClusterInfo(ImmutableMap<String, DiskUsage> usages, ImmutableMap<String, Long> shardSizes) {
public ClusterInfo(Map<String, DiskUsage> usages, Map<String, Long> shardSizes) {
this.usages = usages;
this.shardSizes = shardSizes;
}
@ -43,7 +44,15 @@ public class ClusterInfo {
return this.usages;
}
public Map<String, Long> getShardSizes() {
return this.shardSizes;
public Long getShardSize(ShardRouting shardRouting) {
return shardSizes.get(shardIdentifierFromRouting(shardRouting));
}
/**
* Method that incorporates the ShardId for the shard into a string that
* includes a 'p' or 'r' depending on whether the shard is a primary.
*/
static String shardIdentifierFromRouting(ShardRouting shardRouting) {
return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]";
}
}

View File

@ -360,7 +360,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
HashMap<String, Long> newShardSizes = new HashMap<>();
for (ShardStats s : stats) {
long size = s.getStats().getStore().sizeInBytes();
String sid = shardIdentifierFromRouting(s.getShardRouting());
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());
if (logger.isTraceEnabled()) {
logger.trace("shard: {} size: {}", sid, size);
}
@ -411,11 +411,5 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
}
}
/**
* Method that incorporates the ShardId for the shard into a string that
* includes a 'p' or 'r' depending on whether the shard is a primary.
*/
public static String shardIdentifierFromRouting(ShardRouting shardRouting) {
return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]";
}
}

View File

@ -349,7 +349,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
for (RoutingNode routingNode : routingNodes) {
for (ShardRouting shardRoutingEntry : routingNode) {
// every relocating shard has a double entry, ignore the target one.
if (shardRoutingEntry.state() == ShardRoutingState.INITIALIZING && shardRoutingEntry.relocatingNodeId() != null)
if (shardRoutingEntry.initializing() && shardRoutingEntry.relocatingNodeId() != null)
continue;
String index = shardRoutingEntry.index();

View File

@ -367,7 +367,6 @@ public final class ShardRouting implements Streamable, ToXContent {
version++;
if (currentNodeId == null) {
assert state == ShardRoutingState.UNASSIGNED;
state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId;
relocatingNodeId = null;

View File

@ -39,8 +39,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
/**
* This service manages the node allocation of a cluster. For this reason the
@ -326,7 +324,7 @@ public class AllocationService extends AbstractComponent {
boolean dirty = false;
// apply shards might be called several times with the same shard, ignore it
for (ShardRouting startedShard : startedShardEntries) {
assert startedShard.state() == INITIALIZING;
assert startedShard.initializing();
// retrieve the relocating node id before calling startedShard().
String relocatingNodeId = null;
@ -388,7 +386,7 @@ public class AllocationService extends AbstractComponent {
boolean dirty = false;
if (failedShard.relocatingNodeId() != null) {
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
if (failedShard.state() == INITIALIZING) {
if (failedShard.initializing()) {
// the shard is initializing and recovering from another node
// first, we need to cancel the current node that is being initialized
RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.currentNodeId());
@ -423,7 +421,7 @@ public class AllocationService extends AbstractComponent {
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
}
return dirty;
} else if (failedShard.state() == RELOCATING) {
} else if (failedShard.relocating()) {
// the shard is relocating, meaning its the source the shard is relocating from
// first, we need to cancel the current relocation from the current node
// now, find the node that we are recovering from, cancel the relocation, remove it from the node
@ -449,7 +447,7 @@ public class AllocationService extends AbstractComponent {
if (initializingNode != null) {
while (initializingNode.hasNext()) {
ShardRouting shardRouting = initializingNode.next();
if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.state() == INITIALIZING) {
if (shardRouting.shardId().equals(failedShard.shardId()) && shardRouting.initializing()) {
dirty = true;
initializingNode.remove();
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
@ -165,10 +166,11 @@ public class AllocateAllocationCommand implements AllocationCommand {
@Override
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
DiscoveryNode discoNode = allocation.nodes().resolveNode(node);
final DiscoveryNode discoNode = allocation.nodes().resolveNode(node);
final RoutingNodes routingNodes = allocation.routingNodes();
ShardRouting shardRouting = null;
for (ShardRouting routing : allocation.routingNodes().unassigned()) {
for (ShardRouting routing : routingNodes.unassigned()) {
if (routing.shardId().equals(shardId)) {
// prefer primaries first to allocate
if (shardRouting == null || routing.primary()) {
@ -193,7 +195,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
throw new IllegalArgumentException("[allocate] trying to allocate a primary shard " + shardId + ", which is disabled");
}
RoutingNode routingNode = allocation.routingNodes().node(discoNode.id());
RoutingNode routingNode = routingNodes.node(discoNode.id());
if (routingNode == null) {
if (!discoNode.dataNode()) {
if (explain) {
@ -218,16 +220,16 @@ public class AllocateAllocationCommand implements AllocationCommand {
throw new IllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
}
// go over and remove it from the unassigned
for (Iterator<ShardRouting> it = allocation.routingNodes().unassigned().iterator(); it.hasNext(); ) {
for (Iterator<ShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
if (it.next() != shardRouting) {
continue;
}
it.remove();
allocation.routingNodes().assign(shardRouting, routingNode.nodeId());
routingNodes.assign(shardRouting, routingNode.nodeId());
if (shardRouting.primary()) {
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
// and we want to force allocate it (and create a new index for it)
allocation.routingNodes().addClearPostAllocationFlag(shardRouting.shardId());
routingNodes.addClearPostAllocationFlag(shardRouting.shardId());
}
break;
}

View File

@ -201,7 +201,7 @@ public class CancelAllocationCommand implements AllocationCommand {
if (initializingNode != null) {
while (initializingNode.hasNext()) {
ShardRouting initializingShardRouting = initializingNode.next();
if (initializingShardRouting.shardId().equals(shardRouting.shardId()) && initializingShardRouting.state() == INITIALIZING) {
if (initializingShardRouting.shardId().equals(shardRouting.shardId()) && initializingShardRouting.initializing()) {
initializingNode.remove();
}
}

View File

@ -38,8 +38,6 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.Map;
import static org.elasticsearch.cluster.InternalClusterInfoService.shardIdentifierFromRouting;
/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
* being allocated to has enough disk space.
@ -276,20 +274,20 @@ public class DiskThresholdDecider extends AllocationDecider {
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
* of all shards
*/
public long sizeOfRelocatingShards(RoutingNode node, Map<String, Long> shardSizes, boolean subtractShardsMovingAway) {
public long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway) {
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
if (routing.initializing() && routing.relocatingNodeId() != null) {
totalSize += getShardSize(routing, shardSizes);
totalSize += getShardSize(routing, clusterInfo);
} else if (subtractShardsMovingAway && routing.relocating()) {
totalSize -= getShardSize(routing, shardSizes);
totalSize -= getShardSize(routing, clusterInfo);
}
}
return totalSize;
}
private long getShardSize(ShardRouting routing, Map<String, Long> shardSizes) {
Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing));
private long getShardSize(ShardRouting routing, ClusterInfo clusterInfo) {
Long shardSize = clusterInfo.getShardSize(routing);
return shardSize == null ? 0 : shardSize;
}
@ -383,8 +381,7 @@ public class DiskThresholdDecider extends AllocationDecider {
}
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
Map<String, Long> shardSizes = allocation.clusterInfo().getShardSizes();
Long shardSize = shardSizes.get(shardIdentifierFromRouting(shardRouting));
Long shardSize = allocation.clusterInfo().getShardSize(shardRouting);
shardSize = shardSize == null ? 0 : shardSize;
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
@ -452,8 +449,7 @@ public class DiskThresholdDecider extends AllocationDecider {
}
if (includeRelocations) {
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
long relocatingShardsSize = sizeOfRelocatingShards(node, shardSizes, true);
long relocatingShardsSize = sizeOfRelocatingShards(node, clusterInfo, true);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {

View File

@ -83,7 +83,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
for (ShardRouting shard : node) {
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
// we only count initial recoveries here, so we need to make sure that relocating node is null
if (shard.state() == ShardRoutingState.INITIALIZING && shard.primary() && shard.relocatingNodeId() == null) {
if (shard.initializing() && shard.primary() && shard.relocatingNodeId() == null) {
primariesInRecovery++;
}
}
@ -106,7 +106,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
int currentRecoveries = 0;
for (ShardRouting shard : node) {
if (shard.state() == ShardRoutingState.INITIALIZING || shard.state() == ShardRoutingState.RELOCATING) {
if (shard.initializing() || shard.relocating()) {
currentRecoveries++;
}
}

View File

@ -339,7 +339,7 @@ public class GatewayAllocator extends AbstractComponent {
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
allocation.routingNodes().assign(new ShardRouting(shard, highestVersion), node.nodeId());
routingNodes.assign(new ShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();
// found a node, so no throttling, no "no", and break out of the loop
@ -359,7 +359,7 @@ public class GatewayAllocator extends AbstractComponent {
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
allocation.routingNodes().assign(new ShardRouting(shard, highestVersion), node.nodeId());
routingNodes.assign(new ShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();
}
} else {
@ -514,7 +514,7 @@ public class GatewayAllocator extends AbstractComponent {
}
// we found a match
changed = true;
allocation.routingNodes().assign(shard, lastNodeMatched.nodeId());
routingNodes.assign(shard, lastNodeMatched.nodeId());
unassignedIterator.remove();
}
} else if (hasReplicaData == false) {

View File

@ -163,7 +163,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest {
ClusterInfo info = listener.get();
assertNotNull("info should not be null", info);
Map<String, DiskUsage> usages = info.getNodeDiskUsages();
Map<String, Long> shardSizes = info.getShardSizes();
Map<String, Long> shardSizes = info.shardSizes;
assertNotNull(usages);
assertNotNull(shardSizes);
assertThat("some usages are populated", usages.values().size(), Matchers.equalTo(2));
@ -196,7 +196,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest {
ClusterInfo info = listener.get();
assertNotNull("failed to collect info", info);
assertThat("some usages are populated", info.getNodeDiskUsages().size(), Matchers.equalTo(2));
assertThat("some shard sizes are populated", info.getShardSizes().size(), greaterThan(0));
assertThat("some shard sizes are populated", info.shardSizes.size(), greaterThan(0));
MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, internalTestCluster.getMasterName());
@ -231,7 +231,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest {
// node.
assertThat(info.getNodeDiskUsages().size(), greaterThanOrEqualTo(1));
// indices is guaranteed to time out on the latch, not updating anything.
assertThat(info.getShardSizes().size(), greaterThan(1));
assertThat(info.shardSizes.size(), greaterThan(1));
// now we cause an exception
timeout.set(false);
@ -251,7 +251,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest {
info = listener.get();
assertNotNull("info should not be null", info);
assertThat(info.getNodeDiskUsages().size(), equalTo(0));
assertThat(info.getShardSizes().size(), equalTo(0));
assertThat(info.shardSizes.size(), equalTo(0));
// check we recover
blockingActionFilter.blockActions();
@ -260,7 +260,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest {
info = listener.get();
assertNotNull("info should not be null", info);
assertThat(info.getNodeDiskUsages().size(), equalTo(2));
assertThat(info.getShardSizes().size(), greaterThan(0));
assertThat(info.shardSizes.size(), greaterThan(0));
}
}