Allow primaries that have never been allocated to be allocated if under the low watermark

Fixes #6196
This commit is contained in:
Lee Hinman 2014-05-16 15:17:45 +02:00
parent 80321d89d9
commit 7023caa1a1
2 changed files with 173 additions and 42 deletions

View File

@ -131,14 +131,21 @@ public class DiskThresholdDecider extends AllocationDecider {
} }
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// Always allow allocation if the decider is disabled
if (!enabled) { if (!enabled) {
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled"); return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
} }
// Allow allocation regardless if only a single node is available // Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) { if (allocation.nodes().size() <= 1) {
if (logger.isTraceEnabled()) {
logger.trace("Only a single node is present, allowing allocation");
}
return allocation.decision(Decision.YES, NAME, "only a single node is present"); return allocation.decision(Decision.YES, NAME, "only a single node is present");
} }
// Fail open there is no info available
ClusterInfo clusterInfo = allocation.clusterInfo(); ClusterInfo clusterInfo = allocation.clusterInfo();
if (clusterInfo == null) { if (clusterInfo == null) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -147,6 +154,7 @@ public class DiskThresholdDecider extends AllocationDecider {
return allocation.decision(Decision.YES, NAME, "cluster info unavailable"); return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
} }
// Fail open if there are no disk usages available
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages(); Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
Map<String, Long> shardSizes = clusterInfo.getShardSizes(); Map<String, Long> shardSizes = clusterInfo.getShardSizes();
if (usages.isEmpty()) { if (usages.isEmpty()) {
@ -173,21 +181,72 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Node [{}] has {}% free disk", node.nodeId(), freeDiskPercentage); logger.debug("Node [{}] has {}% free disk", node.nodeId(), freeDiskPercentage);
} }
// a flag for whether the primary shard has been previously allocated
boolean primaryHasBeenAllocated = allocation.routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi();
// checks for exact byte comparisons
if (freeBytes < freeBytesThresholdLow.bytes()) { if (freeBytes < freeBytesThresholdLow.bytes()) {
if (logger.isDebugEnabled()) { // If the shard is a replica or has a primary that has already been allocated before, check the low threshold
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation", if (!shardRouting.primary() || (shardRouting.primary() && primaryHasBeenAllocated)) {
freeBytesThresholdLow, freeBytes, node.nodeId()); if (logger.isDebugEnabled()) {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytes));
} else if (freeBytes > freeBytesThresholdHigh.bytes()) {
// Allow the shard to be allocated because it is primary that
// has never been allocated if it's under the high watermark
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, " +
"but allowing allocation because primary has never been allocated",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
return allocation.decision(Decision.YES, NAME, "primary has never been allocated before");
} else {
// Even though the primary has never been allocated, the node is
// above the high watermark, so don't allow allocating the shard
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, " +
"preventing allocation even though primary has never been allocated",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
} }
return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytes));
} }
// checks for percentage comparisons
if (freeDiskPercentage < freeDiskThresholdLow) { if (freeDiskPercentage < freeDiskThresholdLow) {
if (logger.isDebugEnabled()) { // If the shard is a replica or has a primary that has already been allocated before, check the low threshold
logger.debug("Less than the required {}% free disk threshold ({}% free) on node [{}], preventing allocation", if (!shardRouting.primary() || (shardRouting.primary() && primaryHasBeenAllocated)) {
freeDiskThresholdLow, freeDiskPercentage, node.nodeId()); if (logger.isDebugEnabled()) {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node [{}], preventing allocation",
freeDiskThresholdLow, freeDiskPercentage, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdLow, freeDiskThresholdLow);
} else if (freeDiskPercentage > freeDiskThresholdHigh) {
// Allow the shard to be allocated because it is primary that
// has never been allocated if it's under the high watermark
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node [{}], " +
"but allowing allocation because primary has never been allocated",
freeDiskThresholdLow, freeDiskPercentage, node.nodeId());
}
return allocation.decision(Decision.YES, NAME, "primary has never been allocated before");
} else {
// Even though the primary has never been allocated, the node is
// above the high watermark, so don't allow allocating the shard
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, " +
"preventing allocation even though primary has never been allocated",
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdLow, freeDiskThresholdLow);
} }
return allocation.decision(Decision.NO, NAME, "less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdLow, freeDiskThresholdLow);
} }
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark // Secondly, check that allocating the shard to this node doesn't put it above the high watermark

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchAllocationTestCase; import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
@ -237,17 +238,19 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
} }
@Test @Test
@TestLogging("cluster.routing.allocation.decider:TRACE")
public void diskThresholdWithAbsoluteSizesTest() { public void diskThresholdWithAbsoluteSizesTest() {
Settings diskSettings = settingsBuilder() Settings diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true) .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "30b") .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "30b")
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "20b").build(); .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "9b").build();
Map<String, DiskUsage> usages = new HashMap<>(); Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", 100, 10)); // 90% used usages.put("node1", new DiskUsage("node1", 100, 10)); // 90% used
usages.put("node2", new DiskUsage("node2", 100, 35)); // 65% used usages.put("node2", new DiskUsage("node2", 100, 10)); // 90% used
usages.put("node3", new DiskUsage("node3", 100, 60)); // 40% used usages.put("node3", new DiskUsage("node3", 100, 60)); // 40% used
usages.put("node4", new DiskUsage("node4", 100, 80)); // 20% used usages.put("node4", new DiskUsage("node4", 100, 80)); // 20% used
usages.put("node5", new DiskUsage("node5", 100, 85)); // 15% used
Map<String, Long> shardSizes = new HashMap<>(); Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][p]", 10L); // 10 bytes
@ -274,7 +277,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.build(), deciders, new ShardsAllocators(), cis); .build(), deciders, new ShardsAllocators(), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(1)) .put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(2))
.build(); .build();
RoutingTable routingTable = RoutingTable.builder() RoutingTable routingTable = RoutingTable.builder()
@ -283,35 +286,65 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding two nodes"); logger.info("--> adding node1 and node2 node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("node1")) .put(newNode("node1"))
.put(newNode("node2")) .put(newNode("node2"))
).build(); ).build();
routingTable = strategy.reroute(clusterState).routingTable(); routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState); logShardStates(clusterState);
// Primary shard should be initializing, replica should not // Primary should initialize, even though both nodes are over the limit initialize
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
String nodeWithPrimary, nodeWithoutPrimary;
if (clusterState.getRoutingNodes().node("node1").size() == 1) {
nodeWithPrimary = "node1";
nodeWithoutPrimary = "node2";
} else {
nodeWithPrimary = "node2";
nodeWithoutPrimary = "node1";
}
// Make node without the primary now habitable to replicas
usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, 100, 35)); // 65% used
final ClusterInfo clusterInfo2 = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));
cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo2;
}
};
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Now the replica should be able to initialize
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
logger.info("--> start the shards (primaries)"); logger.info("--> start the shards (primaries)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState); logShardStates(clusterState);
// Assert that we're able to start the primary // Assert that we're able to start the primary and replica, since they were both initializing
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2));
// Assert that node1 didn't get any shards because its disk usage is too high // Assert that node1 got a single shard (the primary), even though its disk usage is too high
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
// Assert that node2 got a single shard (a replica)
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1));
logger.info("--> start the shards (replicas)"); // Assert that one replica is still unassigned
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); //assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1));
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that the replica couldn't be started since node1 doesn't have enough space
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
logger.info("--> adding node3"); logger.info("--> adding node3");
@ -323,7 +356,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
logShardStates(clusterState); logShardStates(clusterState);
// Assert that the replica is initialized now that node3 is available with enough space // Assert that the replica is initialized now that node3 is available with enough space
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
logger.info("--> start the shards (replicas)"); logger.info("--> start the shards (replicas)");
@ -331,9 +364,9 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState); logShardStates(clusterState);
// Assert that the replica couldn't be started since node1 doesn't have enough space // Assert that all replicas could be started
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3));
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1));
@ -363,8 +396,8 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
logShardStates(clusterState); logShardStates(clusterState);
// Shards remain started // Shards remain started
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(3));
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1));
@ -394,8 +427,8 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
logShardStates(clusterState); logShardStates(clusterState);
// Shards remain started // Shards remain started
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(3));
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
// Shard hasn't been moved off of node2 yet because there's nowhere for it to go // Shard hasn't been moved off of node2 yet because there's nowhere for it to go
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1));
@ -410,7 +443,9 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
logShardStates(clusterState); logShardStates(clusterState);
// Shards remain started // Shards remain started
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1)); assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2));
// One shard is relocating off of node1
assertThat(clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> apply INITIALIZING shards"); logger.info("--> apply INITIALIZING shards");
@ -418,11 +453,42 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState); logShardStates(clusterState);
// primary shard already has been relocated away
assertThat(clusterState.getRoutingNodes().node(nodeWithPrimary).size(), equalTo(0));
// node with increased space still has its shard
assertThat(clusterState.getRoutingNodes().node(nodeWithoutPrimary).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node4").size(), equalTo(1));
logger.info("--> adding node5");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node5"))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Shards remain started on node3 and node4
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2));
// One shard is relocating off of node2 now
assertThat(clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(1));
// Initializing on node5
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> apply INITIALIZING shards");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("--> final cluster state:");
logShardStates(clusterState);
// Node1 still has no shards because it has no space for them
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0));
// Node4 is available now, so the shard is moved off of node2 // Node5 is available now, so the shard is moved off of node2
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node4").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node4").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node5").size(), equalTo(1));
} }
@Test @Test
@ -537,15 +603,21 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
).build(); ).build();
routingTable = strategy.reroute(clusterState).routingTable(); routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// Shard can be allocated to node1, even though it only has 25% free,
// because it's a primary that's never been allocated before
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> start the shards (primaries)"); logger.info("--> start the shards (primaries)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState); logShardStates(clusterState);
// Shard can't be allocated to node1 (or node2) because the average usage is 75% > 70% // A single shard is started on node1, even though it normally would not
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); // be allowed, because it's a primary that hasn't been allocated, and node1
// No shards are started, node1 doesn't have enough disk usage // is still below the high watermark (unlike node3)
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(0)); assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
} }
@Test @Test