Merge pull request #13882 from xuzha/single_datanode_early_terminate
Early terminate high disk watermark checks on single data node cluster
This commit is contained in:
commit
fcdd8a29a9
|
@ -598,12 +598,12 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
|
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allow allocation regardless if only a single node is available
|
// Allow allocation regardless if only a single data node is available
|
||||||
if (allocation.nodes().size() <= 1) {
|
if (allocation.nodes().dataNodes().size() <= 1) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("only a single node is present, allowing allocation");
|
logger.trace("only a single data node is present, allowing allocation");
|
||||||
}
|
}
|
||||||
return allocation.decision(Decision.YES, NAME, "only a single node is present");
|
return allocation.decision(Decision.YES, NAME, "only a single data node is present");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fail open there is no info available
|
// Fail open there is no info available
|
||||||
|
|
|
@ -50,13 +50,11 @@ import org.elasticsearch.test.ESAllocationTestCase;
|
||||||
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.AbstractMap;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||||
|
@ -912,6 +910,137 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), equalTo("node2"));
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), equalTo("node2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testForSingleDataNode() {
|
||||||
|
Settings diskSettings = settingsBuilder()
|
||||||
|
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
|
||||||
|
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true)
|
||||||
|
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "60%")
|
||||||
|
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "70%").build();
|
||||||
|
|
||||||
|
Map<String, DiskUsage> usages = new HashMap<>();
|
||||||
|
usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 100)); // 0% used
|
||||||
|
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 20)); // 80% used
|
||||||
|
usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 100)); // 0% used
|
||||||
|
|
||||||
|
// We have an index with 1 primary shards each taking 40 bytes. Each node has 100 bytes available
|
||||||
|
Map<String, Long> shardSizes = new HashMap<>();
|
||||||
|
shardSizes.put("[test][0][p]", 40L);
|
||||||
|
shardSizes.put("[test][1][p]", 40L);
|
||||||
|
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), MockInternalClusterInfoService.DEV_NULL_MAP);
|
||||||
|
|
||||||
|
DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings);
|
||||||
|
MetaData metaData = MetaData.builder()
|
||||||
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
RoutingTable routingTable = RoutingTable.builder()
|
||||||
|
.addAsNew(metaData.index("test"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
logger.info("--> adding one master node, one data node");
|
||||||
|
Map<String, String> masterNodeAttributes = new HashMap<>();
|
||||||
|
masterNodeAttributes.put("master", "true");
|
||||||
|
masterNodeAttributes.put("data", "false");
|
||||||
|
Map<String, String> dataNodeAttributes = new HashMap<>();
|
||||||
|
dataNodeAttributes.put("master", "false");
|
||||||
|
dataNodeAttributes.put("data", "true");
|
||||||
|
DiscoveryNode discoveryNode1 = new DiscoveryNode("", "node1", new LocalTransportAddress("1"), masterNodeAttributes, Version.CURRENT);
|
||||||
|
DiscoveryNode discoveryNode2 = new DiscoveryNode("", "node2", new LocalTransportAddress("2"), dataNodeAttributes, Version.CURRENT);
|
||||||
|
|
||||||
|
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(discoveryNode1).put(discoveryNode2).build();
|
||||||
|
ClusterState baseClusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
|
||||||
|
.metaData(metaData)
|
||||||
|
.routingTable(routingTable)
|
||||||
|
.nodes(discoveryNodes)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Two shards consumes 80% of disk space in data node, but we have only one data node, shards should remain.
|
||||||
|
ShardRouting firstRouting = TestShardRouting.newShardRouting("test", 0, "node2", null, null, true, ShardRoutingState.STARTED, 1);
|
||||||
|
ShardRouting secondRouting = TestShardRouting.newShardRouting("test", 1, "node2", null, null, true, ShardRoutingState.STARTED, 1);
|
||||||
|
RoutingNode firstRoutingNode = new RoutingNode("node2", discoveryNode2, Arrays.asList(firstRouting, secondRouting));
|
||||||
|
|
||||||
|
RoutingTable.Builder builder = RoutingTable.builder().add(
|
||||||
|
IndexRoutingTable.builder("test")
|
||||||
|
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0))
|
||||||
|
.addShard(firstRouting)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1))
|
||||||
|
.addShard(secondRouting)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build();
|
||||||
|
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
|
||||||
|
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||||
|
|
||||||
|
// Two shards should start happily
|
||||||
|
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||||
|
ClusterInfoService cis = new ClusterInfoService() {
|
||||||
|
@Override
|
||||||
|
public ClusterInfo getClusterInfo() {
|
||||||
|
logger.info("--> calling fake getClusterInfo");
|
||||||
|
return clusterInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addListener(Listener listener) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(
|
||||||
|
new SameShardAllocationDecider(Settings.EMPTY), diskThresholdDecider
|
||||||
|
)));
|
||||||
|
|
||||||
|
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||||
|
.put("cluster.routing.allocation.concurrent_recoveries", 10)
|
||||||
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
|
||||||
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
|
.build(), deciders, makeShardsAllocators(), cis);
|
||||||
|
RoutingAllocation.Result result = strategy.reroute(clusterState);
|
||||||
|
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().currentNodeId(), equalTo("node2"));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().relocatingNodeId(), nullValue());
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().state(), equalTo(STARTED));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().currentNodeId(), equalTo("node2"));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), nullValue());
|
||||||
|
|
||||||
|
// Add another datanode, it should relocate.
|
||||||
|
logger.info("--> adding node3");
|
||||||
|
DiscoveryNode discoveryNode3 = new DiscoveryNode("", "node3", new LocalTransportAddress("3"), dataNodeAttributes, Version.CURRENT);
|
||||||
|
ClusterState updateClusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
|
||||||
|
.put(discoveryNode3)).build();
|
||||||
|
|
||||||
|
firstRouting = TestShardRouting.newShardRouting("test", 0, "node2", null, null, true, ShardRoutingState.STARTED, 1);
|
||||||
|
secondRouting = TestShardRouting.newShardRouting("test", 1, "node2", "node3", null, true, ShardRoutingState.RELOCATING, 1);
|
||||||
|
firstRoutingNode = new RoutingNode("node2", discoveryNode2, Arrays.asList(firstRouting, secondRouting));
|
||||||
|
builder = RoutingTable.builder().add(
|
||||||
|
IndexRoutingTable.builder("test")
|
||||||
|
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0))
|
||||||
|
.addShard(firstRouting)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1))
|
||||||
|
.addShard(secondRouting)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
clusterState = ClusterState.builder(updateClusterState).routingTable(builder).build();
|
||||||
|
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
|
||||||
|
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||||
|
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||||
|
|
||||||
|
result = strategy.reroute(clusterState);
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().currentNodeId(), equalTo("node2"));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().relocatingNodeId(), nullValue());
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().state(), equalTo(RELOCATING));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().currentNodeId(), equalTo("node2"));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), equalTo("node3"));
|
||||||
|
}
|
||||||
|
|
||||||
public void logShardStates(ClusterState state) {
|
public void logShardStates(ClusterState state) {
|
||||||
RoutingNodes rn = state.getRoutingNodes();
|
RoutingNodes rn = state.getRoutingNodes();
|
||||||
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",
|
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",
|
||||||
|
|
Loading…
Reference in New Issue