From f9a45fd605f22b43ca1445d3046c1990ea25a7b8 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 8 Jul 2015 14:36:26 +0200 Subject: [PATCH] Cleanup ShardRoutingState uses and hide implementation details of ClusterInfo --- .../elasticsearch/cluster/ClusterInfo.java | 19 ++++++++++++++----- .../cluster/InternalClusterInfoService.java | 10 ++-------- .../cluster/routing/RoutingTable.java | 2 +- .../cluster/routing/ShardRouting.java | 1 - .../routing/allocation/AllocationService.java | 10 ++++------ .../command/AllocateAllocationCommand.java | 14 ++++++++------ .../command/CancelAllocationCommand.java | 2 +- .../decider/DiskThresholdDecider.java | 18 +++++++----------- .../decider/ThrottlingAllocationDecider.java | 4 ++-- .../gateway/GatewayAllocator.java | 6 +++--- .../cluster/ClusterInfoServiceTests.java | 10 +++++----- 11 files changed, 47 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 5c010c6fd01..ae1228cc119 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster; import com.google.common.collect.ImmutableMap; +import org.elasticsearch.cluster.routing.ShardRouting; import java.util.Map; @@ -31,10 +32,10 @@ import java.util.Map; */ public class ClusterInfo { - private final ImmutableMap usages; - private final ImmutableMap shardSizes; + private final Map usages; + final Map shardSizes; - public ClusterInfo(ImmutableMap usages, ImmutableMap shardSizes) { + public ClusterInfo(Map usages, Map shardSizes) { this.usages = usages; this.shardSizes = shardSizes; } @@ -43,7 +44,15 @@ public class ClusterInfo { return this.usages; } - public Map getShardSizes() { - return this.shardSizes; + public Long getShardSize(ShardRouting shardRouting) { + return shardSizes.get(shardIdentifierFromRouting(shardRouting)); + } + + /** + * Method that incorporates the ShardId for the shard into a string that + * includes a 'p' or 'r' depending on whether the shard is a primary. + */ + static String shardIdentifierFromRouting(ShardRouting shardRouting) { + return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]"; } } diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 87f6e57616f..e77818e66a1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -360,7 +360,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu HashMap newShardSizes = new HashMap<>(); for (ShardStats s : stats) { long size = s.getStats().getStore().sizeInBytes(); - String sid = shardIdentifierFromRouting(s.getShardRouting()); + String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting()); if (logger.isTraceEnabled()) { logger.trace("shard: {} size: {}", sid, size); } @@ -411,11 +411,5 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu } } - /** - * Method that incorporates the ShardId for the shard into a string that - * includes a 'p' or 'r' depending on whether the shard is a primary. - */ - public static String shardIdentifierFromRouting(ShardRouting shardRouting) { - return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]"; - } + } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index b8c79f4f514..1327b556208 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -349,7 +349,7 @@ public class RoutingTable implements Iterable, Diffable it = allocation.routingNodes().unassigned().iterator(); it.hasNext(); ) { + for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) { if (it.next() != shardRouting) { continue; } it.remove(); - allocation.routingNodes().assign(shardRouting, routingNode.nodeId()); + routingNodes.assign(shardRouting, routingNode.nodeId()); if (shardRouting.primary()) { // we need to clear the post allocation flag, since its an explicit allocation of the primary shard // and we want to force allocate it (and create a new index for it) - allocation.routingNodes().addClearPostAllocationFlag(shardRouting.shardId()); + routingNodes.addClearPostAllocationFlag(shardRouting.shardId()); } break; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java index 766559862a7..dafe7723526 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java @@ -201,7 +201,7 @@ public class CancelAllocationCommand implements AllocationCommand { if (initializingNode != null) { while (initializingNode.hasNext()) { ShardRouting initializingShardRouting = initializingNode.next(); - if (initializingShardRouting.shardId().equals(shardRouting.shardId()) && initializingShardRouting.state() == INITIALIZING) { + if (initializingShardRouting.shardId().equals(shardRouting.shardId()) && initializingShardRouting.initializing()) { initializingNode.remove(); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 54d2d9422c9..8519360375d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -38,8 +38,6 @@ import org.elasticsearch.node.settings.NodeSettingsService; import java.util.Map; -import static org.elasticsearch.cluster.InternalClusterInfoService.shardIdentifierFromRouting; - /** * The {@link DiskThresholdDecider} checks that the node a shard is potentially * being allocated to has enough disk space. @@ -276,20 +274,20 @@ public class DiskThresholdDecider extends AllocationDecider { * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size * of all shards */ - public long sizeOfRelocatingShards(RoutingNode node, Map shardSizes, boolean subtractShardsMovingAway) { + public long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway) { long totalSize = 0; for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) { if (routing.initializing() && routing.relocatingNodeId() != null) { - totalSize += getShardSize(routing, shardSizes); + totalSize += getShardSize(routing, clusterInfo); } else if (subtractShardsMovingAway && routing.relocating()) { - totalSize -= getShardSize(routing, shardSizes); + totalSize -= getShardSize(routing, clusterInfo); } } return totalSize; } - private long getShardSize(ShardRouting routing, Map shardSizes) { - Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing)); + private long getShardSize(ShardRouting routing, ClusterInfo clusterInfo) { + Long shardSize = clusterInfo.getShardSize(routing); return shardSize == null ? 0 : shardSize; } @@ -383,8 +381,7 @@ public class DiskThresholdDecider extends AllocationDecider { } // Secondly, check that allocating the shard to this node doesn't put it above the high watermark - Map shardSizes = allocation.clusterInfo().getShardSizes(); - Long shardSize = shardSizes.get(shardIdentifierFromRouting(shardRouting)); + Long shardSize = allocation.clusterInfo().getShardSize(shardRouting); shardSize = shardSize == null ? 0 : shardSize; double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); long freeBytesAfterShard = freeBytes - shardSize; @@ -452,8 +449,7 @@ public class DiskThresholdDecider extends AllocationDecider { } if (includeRelocations) { - Map shardSizes = clusterInfo.getShardSizes(); - long relocatingShardsSize = sizeOfRelocatingShards(node, shardSizes, true); + long relocatingShardsSize = sizeOfRelocatingShards(node, clusterInfo, true); DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); if (logger.isTraceEnabled()) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index d8d67cc1a2d..2e7d5dd3cc6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -83,7 +83,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider { for (ShardRouting shard : node) { // when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node* // we only count initial recoveries here, so we need to make sure that relocating node is null - if (shard.state() == ShardRoutingState.INITIALIZING && shard.primary() && shard.relocatingNodeId() == null) { + if (shard.initializing() && shard.primary() && shard.relocatingNodeId() == null) { primariesInRecovery++; } } @@ -106,7 +106,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider { public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) { int currentRecoveries = 0; for (ShardRouting shard : node) { - if (shard.state() == ShardRoutingState.INITIALIZING || shard.state() == ShardRoutingState.RELOCATING) { + if (shard.initializing() || shard.relocating()) { currentRecoveries++; } } diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index e71ebc88e9f..3afd4bb926d 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -339,7 +339,7 @@ public class GatewayAllocator extends AbstractComponent { // we found a match changed = true; // make sure we create one with the version from the recovered state - allocation.routingNodes().assign(new ShardRouting(shard, highestVersion), node.nodeId()); + routingNodes.assign(new ShardRouting(shard, highestVersion), node.nodeId()); unassignedIterator.remove(); // found a node, so no throttling, no "no", and break out of the loop @@ -359,7 +359,7 @@ public class GatewayAllocator extends AbstractComponent { // we found a match changed = true; // make sure we create one with the version from the recovered state - allocation.routingNodes().assign(new ShardRouting(shard, highestVersion), node.nodeId()); + routingNodes.assign(new ShardRouting(shard, highestVersion), node.nodeId()); unassignedIterator.remove(); } } else { @@ -514,7 +514,7 @@ public class GatewayAllocator extends AbstractComponent { } // we found a match changed = true; - allocation.routingNodes().assign(shard, lastNodeMatched.nodeId()); + routingNodes.assign(shard, lastNodeMatched.nodeId()); unassignedIterator.remove(); } } else if (hasReplicaData == false) { diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceTests.java index 87cadc4331a..699e8ed9d11 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceTests.java @@ -163,7 +163,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest { ClusterInfo info = listener.get(); assertNotNull("info should not be null", info); Map usages = info.getNodeDiskUsages(); - Map shardSizes = info.getShardSizes(); + Map shardSizes = info.shardSizes; assertNotNull(usages); assertNotNull(shardSizes); assertThat("some usages are populated", usages.values().size(), Matchers.equalTo(2)); @@ -196,7 +196,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest { ClusterInfo info = listener.get(); assertNotNull("failed to collect info", info); assertThat("some usages are populated", info.getNodeDiskUsages().size(), Matchers.equalTo(2)); - assertThat("some shard sizes are populated", info.getShardSizes().size(), greaterThan(0)); + assertThat("some shard sizes are populated", info.shardSizes.size(), greaterThan(0)); MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, internalTestCluster.getMasterName()); @@ -231,7 +231,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest { // node. assertThat(info.getNodeDiskUsages().size(), greaterThanOrEqualTo(1)); // indices is guaranteed to time out on the latch, not updating anything. - assertThat(info.getShardSizes().size(), greaterThan(1)); + assertThat(info.shardSizes.size(), greaterThan(1)); // now we cause an exception timeout.set(false); @@ -251,7 +251,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest { info = listener.get(); assertNotNull("info should not be null", info); assertThat(info.getNodeDiskUsages().size(), equalTo(0)); - assertThat(info.getShardSizes().size(), equalTo(0)); + assertThat(info.shardSizes.size(), equalTo(0)); // check we recover blockingActionFilter.blockActions(); @@ -260,7 +260,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest { info = listener.get(); assertNotNull("info should not be null", info); assertThat(info.getNodeDiskUsages().size(), equalTo(2)); - assertThat(info.getShardSizes().size(), greaterThan(0)); + assertThat(info.shardSizes.size(), greaterThan(0)); } }