testCreateShrinkIndex should make sure to use the right source stats when testing shrunk target

This commit is contained in:
Boaz Leskes 2017-06-23 11:05:36 +02:00
parent 6a792d6d82
commit 0ebc49e8c6
1 changed files with 43 additions and 15 deletions

View File

@ -35,12 +35,15 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -60,6 +63,7 @@ import org.elasticsearch.test.VersionUtils;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -239,7 +243,6 @@ public class ShrinkIndexIT extends ESIntegTestCase {
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
String mergeNode = discoveryNodes[0].getName();
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below. // to the require._name below.
@ -247,33 +250,53 @@ public class ShrinkIndexIT extends ESIntegTestCase {
// relocate all shards to one node such that we can merge it. // relocate all shards to one node such that we can merge it.
client().admin().indices().prepareUpdateSettings("source") client().admin().indices().prepareUpdateSettings("source")
.setSettings(Settings.builder() .setSettings(Settings.builder()
.put("index.routing.allocation.require._name", mergeNode) .put("index.routing.allocation.require._name", discoveryNodes[0].getName())
.put("index.blocks.write", true)).get(); .put("index.blocks.write", true)).get();
ensureGreen(); ensureGreen();
final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get(); final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get();
final long maxSeqNo =
Arrays.stream(sourceStats.getShards()).map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong();
final long maxUnsafeAutoIdTimestamp =
Arrays.stream(sourceStats.getShards())
.map(ShardStats::getStats)
.map(CommonStats::getSegments)
.mapToLong(SegmentsStats::getMaxUnsafeAutoIdTimestamp)
.max()
.getAsLong();
// now merge source into a single shard index
// disable rebalancing to be able to capture the right stats. balancing can move the target primary
// making it hard to pin point the source shards.
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"
)).get();
// now merge source into a single shard index
final boolean createWithReplicas = randomBoolean(); final boolean createWithReplicas = randomBoolean();
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target") assertAcked(client().admin().indices().prepareShrinkIndex("source", "target")
.setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get()); .setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get());
ensureGreen(); ensureGreen();
// resolve true merge node - this is not always the node we required as all shards may be on another node
final ClusterState state = client().admin().cluster().prepareState().get().getState();
DiscoveryNode mergeNode = state.nodes().get(state.getRoutingTable().index("target").shard(0).primaryShard().currentNodeId());
logger.info("merge node {}", mergeNode);
final long maxSeqNo = Arrays.stream(sourceStats.getShards())
.filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId()))
.map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong();
final long maxUnsafeAutoIdTimestamp = Arrays.stream(sourceStats.getShards())
.filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId()))
.map(ShardStats::getStats)
.map(CommonStats::getSegments)
.mapToLong(SegmentsStats::getMaxUnsafeAutoIdTimestamp)
.max()
.getAsLong();
for (ShardStats shard: Arrays.stream(sourceStats.getShards()).filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId())).collect(Collectors.toList())) {
logger.info("used {}, timestamp: {}", shard.getShardRouting(), shard.getStats().getSegments().getMaxUnsafeAutoIdTimestamp());
}
final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get(); final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get();
for (final ShardStats shardStats : targetStats.getShards()) { for (final ShardStats shardStats : targetStats.getShards()) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
assertThat(seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo)); final ShardRouting shardRouting = shardStats.getShardRouting();
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat("failed on " + shardRouting, seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(shardStats.getStats().getSegments().getMaxUnsafeAutoIdTimestamp(), equalTo(maxUnsafeAutoIdTimestamp)); assertThat("failed on " + shardRouting, seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat("failed on " + shardRouting,
shardStats.getStats().getSegments().getMaxUnsafeAutoIdTimestamp(), equalTo(maxUnsafeAutoIdTimestamp));
} }
final int size = docs > 0 ? 2 * docs : 1; final int size = docs > 0 ? 2 * docs : 1;
@ -297,6 +320,11 @@ public class ShrinkIndexIT extends ESIntegTestCase {
assertHitCount(client().prepareSearch("source").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs); assertHitCount(client().prepareSearch("source").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);
GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get(); GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get();
assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null)); assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null));
// clean up
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String)null
)).get();
} }
/** /**
* Tests that we can manually recover from a failed allocation due to shards being moved away etc. * Tests that we can manually recover from a failed allocation due to shards being moved away etc.