Estimate shard size for shrinked indices (#18659)

When we shrink an index we can estimate the shards size for the primary
from the source index. This is important for allocation decisions since we
should try out best to ensure we have enough space on the node we shrink the
index.
This commit is contained in:
Simon Willnauer 2016-05-31 16:33:38 +02:00
parent 740bdc8d99
commit 82645563bf
4 changed files with 135 additions and 27 deletions

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -721,11 +722,13 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
assert decision != null && minNode != null || decision == null && minNode == null;
if (minNode != null) {
long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
if (decision.type() == Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
shard = routingNodes.initialize(shard, minNode.getNodeId(), null, shardSize);
minNode.addShard(shard);
changed = true;

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -44,6 +45,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.RatioValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import java.util.Set;
@ -303,26 +305,23 @@ public class DiskThresholdDecider extends AllocationDecider {
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
* of all shards
*/
public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo,
public static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
boolean subtractShardsMovingAway, String dataPath) {
ClusterInfo clusterInfo = allocation.clusterInfo();
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
String actualPath = clusterInfo.getDataPath(routing);
if (dataPath.equals(actualPath)) {
if (routing.initializing() && routing.relocatingNodeId() != null) {
totalSize += getShardSize(routing, clusterInfo);
totalSize += getExpectedShardSize(routing, allocation, 0);
} else if (subtractShardsMovingAway && routing.relocating()) {
totalSize -= getShardSize(routing, clusterInfo);
totalSize -= getExpectedShardSize(routing, allocation, 0);
}
}
}
return totalSize;
}
static long getShardSize(ShardRouting routing, ClusterInfo clusterInfo) {
Long shardSize = clusterInfo.getShardSize(routing);
return shardSize == null ? 0 : shardSize;
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
@ -426,7 +425,7 @@ public class DiskThresholdDecider extends AllocationDecider {
}
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
final long shardSize = getShardSize(shardRouting, allocation.clusterInfo());
final long shardSize = getExpectedShardSize(shardRouting, allocation, 0);
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) {
@ -518,7 +517,7 @@ public class DiskThresholdDecider extends AllocationDecider {
}
if (includeRelocations) {
long relocatingShardsSize = sizeOfRelocatingShards(node, clusterInfo, true, usage.getPath());
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, true, usage.getPath());
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
@ -643,4 +642,26 @@ public class DiskThresholdDecider extends AllocationDecider {
}
return null;
}
/**
* Returns the expected shard size for the given shard or the default value provided if not enough information are available
* to estimate the shards size.
*/
public static final long getExpectedShardSize(ShardRouting shard, RoutingAllocation allocation, long defaultValue) {
final IndexMetaData metaData = allocation.metaData().getIndexSafe(shard.index());
final ClusterInfo info = allocation.clusterInfo();
if (metaData.getMergeSourceIndex() != null && shard.allocatedPostIndexCreate(metaData) == false) {
// in the shrink index case we sum up the source index shards since we basically make a copy of the shard in
// the worst case
Index mergeSourceIndex = metaData.getMergeSourceIndex();
long targetShardSize = 0;
for (IndexShardRoutingTable shardRoutingTable : allocation.routingTable().index(mergeSourceIndex.getName())) {
targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0);
}
return targetShardSize == 0 ? defaultValue : targetShardSize;
} else {
return info.getShardSize(shard, defaultValue);
}
}
}

View File

@ -21,11 +21,14 @@ package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -346,6 +349,7 @@ public class CreateIndexIT extends ESIntegTestCase {
.setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode)
.put("index.blocks.write", true)).get();
ensureGreen();
// now merge source into a single shard index
client().admin().indices().prepareShrinkIndex("source", "target")
.setSettings(Settings.builder()
@ -375,7 +379,16 @@ public class CreateIndexIT extends ESIntegTestCase {
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", mergeNode)).get();
ensureGreen("source");
client().admin().cluster().prepareReroute().setRetryFailed(true).get(); // kick off a retry and wait until it's done!
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalCluster().getInstance(ClusterInfoService.class,
internalCluster().getMasterName());
infoService.refresh();
// kick off a retry and wait until it's done!
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
long expectedShardSize = clusterRerouteResponse.getState().routingTable().index("target")
.shard(0).getShards().get(0).getExpectedShardSize();
// we support the expected shard size in the allocator to sum up over the source index shards
assertTrue("expected shard size must be set but wasn't: " + expectedShardSize, expectedShardSize > 0);
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
}

View File

@ -34,7 +34,9 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings;
@ -44,7 +46,7 @@ import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ESAllocationTestCase;
import java.util.Arrays;
import java.util.Collections;
@ -57,7 +59,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
/**
* Unit tests for the DiskThresholdDecider
*/
public class DiskThresholdDeciderUnitTests extends ESTestCase {
public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
public void testDynamicSettings() {
ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
@ -233,7 +235,18 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
shardSizes.put("[test][2][r]", 1000L);
shardSizes.put("[other][0][p]", 10000L);
ClusterInfo info = new DevNullClusterInfo(ImmutableOpenMap.of(), ImmutableOpenMap.of(), shardSizes.build());
final Index index = new Index("test", "_na_");
MetaData.Builder metaBuilder = MetaData.builder();
metaBuilder.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put("index.uuid", "1234")).numberOfShards(3).numberOfReplicas(1));
metaBuilder.put(IndexMetaData.builder("other").settings(settings(Version.CURRENT).put("index.uuid", "5678")).numberOfShards(1).numberOfReplicas(1));
MetaData metaData = metaBuilder.build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
routingTableBuilder.addAsNew(metaData.index("test"));
routingTableBuilder.addAsNew(metaData.index("other"));
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData).routingTable(routingTableBuilder.build()).build();
RoutingAllocation allocation = new RoutingAllocation(null, null, clusterState, info, 0, false);
final Index index = new Index("test", "1234");
ShardRouting test_0 = ShardRouting.newUnassigned(new ShardId(index, 0), null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
test_0 = ShardRoutingHelper.initialize(test_0, "node1");
test_0 = ShardRoutingHelper.moveToStarted(test_0);
@ -248,24 +261,24 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
test_2 = ShardRoutingHelper.initialize(test_2, "node1");
test_2 = ShardRoutingHelper.moveToStarted(test_2);
assertEquals(1000L, DiskThresholdDecider.getShardSize(test_2, info));
assertEquals(100L, DiskThresholdDecider.getShardSize(test_1, info));
assertEquals(10L, DiskThresholdDecider.getShardSize(test_0, info));
assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0));
assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0));
assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0));
RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"),
emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.buildTargetRelocatingShard(), test_2);
assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null"));
assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/null"));
assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/some/other/dev"));
assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/some/other/dev"));
assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null"));
assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null"));
assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev"));
assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev"));
ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
test_3 = ShardRoutingHelper.initialize(test_3, "node1");
test_3 = ShardRoutingHelper.moveToStarted(test_3);
assertEquals(0L, DiskThresholdDecider.getShardSize(test_3, info));
assertEquals(0L, DiskThresholdDecider.getExpectedShardSize(test_3, allocation, 0));
ShardRouting other_0 = ShardRouting.newUnassigned(new ShardId("other", "_NA_", 0), null, randomBoolean(), new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRouting other_0 = ShardRouting.newUnassigned(new ShardId("other", "5678", 0), null, randomBoolean(), new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
other_0 = ShardRoutingHelper.initialize(other_0, "node2");
other_0 = ShardRoutingHelper.moveToStarted(other_0);
other_0 = ShardRoutingHelper.relocate(other_0, "node1");
@ -273,12 +286,70 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"),
emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.buildTargetRelocatingShard(), test_2, other_0.buildTargetRelocatingShard());
if (other_0.primary()) {
assertEquals(10100L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null"));
assertEquals(10090L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/null"));
assertEquals(10100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null"));
assertEquals(10090L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null"));
} else {
assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null"));
assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/null"));
assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null"));
assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null"));
}
}
public void testSizeShrinkIndex() {
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
shardSizes.put("[test][0][p]", 10L);
shardSizes.put("[test][1][p]", 100L);
shardSizes.put("[test][2][p]", 1000L);
ClusterInfo info = new DevNullClusterInfo(ImmutableOpenMap.of(), ImmutableOpenMap.of(), shardSizes.build());
MetaData.Builder metaBuilder = MetaData.builder();
metaBuilder.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put("index.uuid", "1234"))
.numberOfShards(3).numberOfReplicas(0));
metaBuilder.put(IndexMetaData.builder("target").settings(settings(Version.CURRENT).put("index.uuid", "5678")
.put("index.shrink.source.name", "test").put("index.shrink.source.uuid", "1234")).numberOfShards(1).numberOfReplicas(0));
MetaData metaData = metaBuilder.build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
routingTableBuilder.addAsNew(metaData.index("test"));
routingTableBuilder.addAsNew(metaData.index("target"));
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData).routingTable(routingTableBuilder.build()).build();
AllocationService allocationService = createAllocationService();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")))
.build();
RoutingAllocation.Result result = allocationService.reroute(clusterState, "foo");
clusterState = ClusterState.builder(clusterState).routingTable(result.routingTable()).build();
result = allocationService.applyStartedShards(clusterState,
clusterState.getRoutingTable().index("test").shardsWithState(ShardRoutingState.UNASSIGNED));
clusterState = ClusterState.builder(clusterState).routingTable(result.routingTable()).build();
RoutingAllocation allocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, info, 0, false);
final Index index = new Index("test", "1234");
ShardRouting test_0 = ShardRouting.newUnassigned(new ShardId(index, 0), null, true,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
test_0 = ShardRoutingHelper.initialize(test_0, "node1");
test_0 = ShardRoutingHelper.moveToStarted(test_0);
ShardRouting test_1 = ShardRouting.newUnassigned(new ShardId(index, 1), null, true,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
test_1 = ShardRoutingHelper.initialize(test_1, "node2");
test_1 = ShardRoutingHelper.moveToStarted(test_1);
ShardRouting test_2 = ShardRouting.newUnassigned(new ShardId(index, 2), null, true,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
test_2 = ShardRoutingHelper.initialize(test_2, "node1");
assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0));
assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0));
assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0));
ShardRouting target = ShardRouting.newUnassigned(new ShardId(new Index("target", "5678"), 0),
null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
assertEquals(1110L, DiskThresholdDecider.getExpectedShardSize(target, allocation, 0));
}