From 0ebc49e8c62c29a7d202d64531588c9c9ad9ef40 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 23 Jun 2017 11:05:36 +0200 Subject: [PATCH] testCreateShrinkIndex should make sure to use the right source stats when testing shrunk target --- .../admin/indices/create/ShrinkIndexIT.java | 58 ++++++++++++++----- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java index a5c4cbb8a28..2098ead2811 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java @@ -35,12 +35,15 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; @@ -60,6 +63,7 @@ import org.elasticsearch.test.VersionUtils; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -239,7 +243,6 @@ public class ShrinkIndexIT extends ESIntegTestCase { client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); - String mergeNode = discoveryNodes[0].getName(); // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due // to the require._name below. @@ -247,33 +250,53 @@ public class ShrinkIndexIT extends ESIntegTestCase { // relocate all shards to one node such that we can merge it. client().admin().indices().prepareUpdateSettings("source") .setSettings(Settings.builder() - .put("index.routing.allocation.require._name", mergeNode) + .put("index.routing.allocation.require._name", discoveryNodes[0].getName()) .put("index.blocks.write", true)).get(); ensureGreen(); final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get(); - final long maxSeqNo = - Arrays.stream(sourceStats.getShards()).map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong(); - final long maxUnsafeAutoIdTimestamp = - Arrays.stream(sourceStats.getShards()) - .map(ShardStats::getStats) - .map(CommonStats::getSegments) - .mapToLong(SegmentsStats::getMaxUnsafeAutoIdTimestamp) - .max() - .getAsLong(); - // now merge source into a single shard index + // disable rebalancing to be able to capture the right stats. balancing can move the target primary + // making it hard to pin point the source shards. + client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put( + EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none" + )).get(); + + + // now merge source into a single shard index final boolean createWithReplicas = randomBoolean(); assertAcked(client().admin().indices().prepareShrinkIndex("source", "target") .setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get()); ensureGreen(); + // resolve true merge node - this is not always the node we required as all shards may be on another node + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + DiscoveryNode mergeNode = state.nodes().get(state.getRoutingTable().index("target").shard(0).primaryShard().currentNodeId()); + logger.info("merge node {}", mergeNode); + + final long maxSeqNo = Arrays.stream(sourceStats.getShards()) + .filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId())) + .map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong(); + final long maxUnsafeAutoIdTimestamp = Arrays.stream(sourceStats.getShards()) + .filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId())) + .map(ShardStats::getStats) + .map(CommonStats::getSegments) + .mapToLong(SegmentsStats::getMaxUnsafeAutoIdTimestamp) + .max() + .getAsLong(); + + for (ShardStats shard: Arrays.stream(sourceStats.getShards()).filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId())).collect(Collectors.toList())) { + logger.info("used {}, timestamp: {}", shard.getShardRouting(), shard.getStats().getSegments().getMaxUnsafeAutoIdTimestamp()); + } + final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get(); for (final ShardStats shardStats : targetStats.getShards()) { final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); - assertThat(seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo)); - assertThat(seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo)); - assertThat(shardStats.getStats().getSegments().getMaxUnsafeAutoIdTimestamp(), equalTo(maxUnsafeAutoIdTimestamp)); + final ShardRouting shardRouting = shardStats.getShardRouting(); + assertThat("failed on " + shardRouting, seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo)); + assertThat("failed on " + shardRouting, seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo)); + assertThat("failed on " + shardRouting, + shardStats.getStats().getSegments().getMaxUnsafeAutoIdTimestamp(), equalTo(maxUnsafeAutoIdTimestamp)); } final int size = docs > 0 ? 2 * docs : 1; @@ -297,6 +320,11 @@ public class ShrinkIndexIT extends ESIntegTestCase { assertHitCount(client().prepareSearch("source").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs); GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get(); assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null)); + + // clean up + client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put( + EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String)null + )).get(); } /** * Tests that we can manually recover from a failed allocation due to shards being moved away etc.