From b19be2c34aa6415cb744b2b7338e33e5d85c2165 Mon Sep 17 00:00:00 2001 From: xuzha Date: Wed, 30 Sep 2015 10:43:27 -0700 Subject: [PATCH] DiskThresholdDecider check data nodes number Right now, we allow allocation if there is only a single node in the cluster. it would be nice to fail open when there is only one data node (instead of only one node total). closes #9391 --- .../decider/DiskThresholdDecider.java | 8 +- .../decider/DiskThresholdDeciderTests.java | 133 +++++++++++++++++- 2 files changed, 135 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 9a6353a46f8..e1a0b777d66 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -598,12 +598,12 @@ public class DiskThresholdDecider extends AllocationDecider { return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled"); } - // Allow allocation regardless if only a single node is available - if (allocation.nodes().size() <= 1) { + // Allow allocation regardless if only a single data node is available + if (allocation.nodes().dataNodes().size() <= 1) { if (logger.isTraceEnabled()) { - logger.trace("only a single node is present, allowing allocation"); + logger.trace("only a single data node is present, allowing allocation"); } - return allocation.decision(Decision.YES, NAME, "only a single node is present"); + return allocation.decision(Decision.YES, NAME, "only a single data node is present"); } // Fail open there is no info available diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index dfdd9ba5948..5852faf908e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -50,13 +50,11 @@ import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.junit.Test; -import java.util.AbstractMap; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Set; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -912,6 +910,137 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), equalTo("node2")); } + public void testForSingleDataNode() { + Settings diskSettings = settingsBuilder() + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true) + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true) + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "60%") + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "70%").build(); + + Map usages = new HashMap<>(); + usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 100)); // 0% used + usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 20)); // 80% used + usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 100)); // 0% used + + // We have an index with 1 primary shards each taking 40 bytes. Each node has 100 bytes available + Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 40L); + shardSizes.put("[test][1][p]", 40L); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), MockInternalClusterInfoService.DEV_NULL_MAP); + + DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + logger.info("--> adding one master node, one data node"); + Map masterNodeAttributes = new HashMap<>(); + masterNodeAttributes.put("master", "true"); + masterNodeAttributes.put("data", "false"); + Map dataNodeAttributes = new HashMap<>(); + dataNodeAttributes.put("master", "false"); + dataNodeAttributes.put("data", "true"); + DiscoveryNode discoveryNode1 = new DiscoveryNode("", "node1", new LocalTransportAddress("1"), masterNodeAttributes, Version.CURRENT); + DiscoveryNode discoveryNode2 = new DiscoveryNode("", "node2", new LocalTransportAddress("2"), dataNodeAttributes, Version.CURRENT); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(discoveryNode1).put(discoveryNode2).build(); + ClusterState baseClusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + + // Two shards consumes 80% of disk space in data node, but we have only one data node, shards should remain. + ShardRouting firstRouting = TestShardRouting.newShardRouting("test", 0, "node2", null, null, true, ShardRoutingState.STARTED, 1); + ShardRouting secondRouting = TestShardRouting.newShardRouting("test", 1, "node2", null, null, true, ShardRoutingState.STARTED, 1); + RoutingNode firstRoutingNode = new RoutingNode("node2", discoveryNode2, Arrays.asList(firstRouting, secondRouting)); + + RoutingTable.Builder builder = RoutingTable.builder().add( + IndexRoutingTable.builder("test") + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0)) + .addShard(firstRouting) + .build() + ) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1)) + .addShard(secondRouting) + .build() + ) + ); + ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build(); + RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); + + // Two shards should start happily + assertThat(decision.type(), equalTo(Decision.Type.YES)); + ClusterInfoService cis = new ClusterInfoService() { + @Override + public ClusterInfo getClusterInfo() { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; + } + + @Override + public void addListener(Listener listener) { + } + }; + + AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( + new SameShardAllocationDecider(Settings.EMPTY), diskThresholdDecider + ))); + + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .build(), deciders, makeShardsAllocators(), cis); + RoutingAllocation.Result result = strategy.reroute(clusterState); + + assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED)); + assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().currentNodeId(), equalTo("node2")); + assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().relocatingNodeId(), nullValue()); + assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().state(), equalTo(STARTED)); + assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().currentNodeId(), equalTo("node2")); + assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), nullValue()); + + // Add another datanode, it should relocate. + logger.info("--> adding node3"); + DiscoveryNode discoveryNode3 = new DiscoveryNode("", "node3", new LocalTransportAddress("3"), dataNodeAttributes, Version.CURRENT); + ClusterState updateClusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) + .put(discoveryNode3)).build(); + + firstRouting = TestShardRouting.newShardRouting("test", 0, "node2", null, null, true, ShardRoutingState.STARTED, 1); + secondRouting = TestShardRouting.newShardRouting("test", 1, "node2", "node3", null, true, ShardRoutingState.RELOCATING, 1); + firstRoutingNode = new RoutingNode("node2", discoveryNode2, Arrays.asList(firstRouting, secondRouting)); + builder = RoutingTable.builder().add( + IndexRoutingTable.builder("test") + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0)) + .addShard(firstRouting) + .build() + ) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1)) + .addShard(secondRouting) + .build() + ) + ); + + clusterState = ClusterState.builder(updateClusterState).routingTable(builder).build(); + routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + + result = strategy.reroute(clusterState); + assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED)); + assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().currentNodeId(), equalTo("node2")); + assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().relocatingNodeId(), nullValue()); + assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().state(), equalTo(RELOCATING)); + assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().currentNodeId(), equalTo("node2")); + assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), equalTo("node3")); + } + public void logShardStates(ClusterState state) { RoutingNodes rn = state.getRoutingNodes(); logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",