From b3e5e6a0bada1a90ee80edcbe52cc22d82c640e3 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 21 Sep 2016 12:40:37 +0200 Subject: [PATCH] `index.routing.allocation.initial_recovery` limits replica allocation (#20589) `index.routing.allocation.initial_recovery` is used with index shrinking to make sure the new index's primary is assigned to the node that holds a copy of each of the source index shards. Sadly with the introduction of `RecoverySource` a regression was introduced that limits the allocation of replicas of the new index. --- .../cluster/routing/RecoverySource.java | 12 ++++ .../decider/FilterAllocationDecider.java | 4 +- .../admin/indices/create/CreateIndexIT.java | 19 ++++-- .../FilterAllocationDeciderTests.java | 68 +++++++++++++------ 4 files changed, 75 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/core/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index 32afad99f27..f613cdbbada 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -28,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; +import java.util.EnumSet; import java.util.Objects; /** @@ -247,4 +249,14 @@ public abstract class RecoverySource implements Writeable, ToXContent { return "peer recovery"; } } + + private static EnumSet INITIAL_RECOVERY_TYPES = EnumSet.of(Type.EMPTY_STORE, Type.LOCAL_SHARDS, Type.SNAPSHOT); + + /** + * returns true for recovery types that indicate that a primary is being allocated for the very first time. + * This recoveries can be controlled by {@link IndexMetaData#INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING} + */ + public static boolean isInitialRecovery(RecoverySource.Type type) { + return INITIAL_RECOVERY_TYPES.contains(type); + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index 4071007c791..a42db129da9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -93,8 +93,8 @@ public class FilterAllocationDecider extends AllocationDecider { // this is a setting that can only be set within the system! IndexMetaData indexMd = allocation.metaData().getIndexSafe(shardRouting.index()); DiscoveryNodeFilters initialRecoveryFilters = indexMd.getInitialRecoveryFilters(); - if (shardRouting.recoverySource().getType() != RecoverySource.Type.EXISTING_STORE && - initialRecoveryFilters != null && + if (initialRecoveryFilters != null && + RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) && initialRecoveryFilters.match(node.node()) == false) { return allocation.decision(Decision.NO, NAME, "node does not match index initial recovery filters [%s]", indexMd.includeFilters()); diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index ba497ffca33..9d2e56f25bb 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -388,17 +388,22 @@ public class CreateIndexIT extends ESIntegTestCase { .put("index.blocks.write", true)).get(); ensureGreen(); // now merge source into a single shard index + + final boolean createWithReplicas = randomBoolean(); assertAcked(client().admin().indices().prepareShrinkIndex("source", "target") - .setSettings(Settings.builder().put("index.number_of_replicas", 0).build()).get()); - ensureGreen(); - assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); - // bump replicas - client().admin().indices().prepareUpdateSettings("target") - .setSettings(Settings.builder() - .put("index.number_of_replicas", 1)).get(); + .setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get()); ensureGreen(); assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + if (createWithReplicas == false) { + // bump replicas + client().admin().indices().prepareUpdateSettings("target") + .setSettings(Settings.builder() + .put("index.number_of_replicas", 1)).get(); + ensureGreen(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + } + for (int i = 20; i < 40; i++) { client().prepareIndex("target", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java index cfd4a25e0d1..88ac089c7d8 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java @@ -25,28 +25,34 @@ import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.gateway.NoopGatewayAllocator; -import java.util.Collections; +import java.util.Arrays; +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_NAME; +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_UUID; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; public class FilterAllocationDeciderTests extends ESAllocationTestCase { - public void testFilterInitialAllocation() { + public void testFilterInitialRecovery() { FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, - Collections.singleton(filterAllocationDecider)); + Arrays.asList(filterAllocationDecider, new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY))); AllocationService service = new AllocationService(Settings.builder().build(), allocationDeciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); ClusterState state = createInitialClusterState(service, Settings.builder().put("index.routing.allocation.initial_recovery._id", @@ -61,27 +67,31 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED); assertNull(routingTable.index("idx").shard(0).shards().get(0).currentNodeId()); + // after failing the shard we are unassigned since the node is blacklisted and we can't initialize on the other node RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, 0, false); - assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0), + assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(), state.getRoutingNodes().node("node2") ,allocation), Decision.YES); - assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0), + assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(), state.getRoutingNodes().node("node1") ,allocation), Decision.NO); - // after failing the shard we are unassigned since the node is blacklisted and we can't initialize on the other node state = service.reroute(state, "try allocate again"); routingTable = state.routingTable(); - assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING); - assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2"); + assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), INITIALIZING); + assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node2"); - state = service.applyStartedShards(state, routingTable.index("idx").shard(0).shards()); + state = service.applyStartedShards(state, routingTable.index("idx").shard(0).shardsWithState(INITIALIZING)); routingTable = state.routingTable(); // ok now we are started and can be allocated anywhere!! lets see... - assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), STARTED); - assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2"); + assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), STARTED); + assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node2"); + + // replicas should be initializing + assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).state(), INITIALIZING); + assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).currentNodeId(), "node1"); // we fail it again to check if we are initializing immediately on the other node state = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0)); @@ -100,20 +110,40 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { } private ClusterState createInitialClusterState(AllocationService service, Settings settings) { - MetaData.Builder metaBuilder = MetaData.builder(); - metaBuilder.put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT).put(settings)) - .numberOfShards(1).numberOfReplicas(0)); - MetaData metaData = metaBuilder.build(); + boolean shrinkIndex = randomBoolean(); + MetaData.Builder metaData = MetaData.builder(); + final Settings.Builder indexSettings = settings(Version.CURRENT).put(settings); + final IndexMetaData sourceIndex; + if (shrinkIndex) { + //put a fake closed source index + sourceIndex = IndexMetaData.builder("sourceIndex") + .settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0).build(); + metaData.put(sourceIndex, false); + indexSettings.put(INDEX_SHRINK_SOURCE_UUID.getKey(), sourceIndex.getIndexUUID()); + indexSettings.put(INDEX_SHRINK_SOURCE_NAME.getKey(), sourceIndex.getIndex().getName()); + } else { + sourceIndex = null; + } + final IndexMetaData indexMetaData = IndexMetaData.builder("idx").settings(indexSettings) + .numberOfShards(1).numberOfReplicas(1).build(); + metaData.put(indexMetaData, false); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - routingTableBuilder.addAsNew(metaData.index("idx")); + if (shrinkIndex) { + routingTableBuilder.addAsFromCloseToOpen(sourceIndex); + routingTableBuilder.addAsNew(indexMetaData); + } if (randomBoolean()) { + routingTableBuilder.addAsNew(indexMetaData); + } else { + routingTableBuilder.addAsRestore(indexMetaData, new RecoverySource.SnapshotRecoverySource( + new Snapshot("repository", new SnapshotId("snapshot_name", "snapshot_uuid")), + Version.CURRENT, indexMetaData.getIndex().getName())); + } RoutingTable routingTable = routingTableBuilder.build(); ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING .getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) .build(); - routingTable = service.reroute(clusterState, "reroute", false).routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - return clusterState; + return service.reroute(clusterState, "reroute", false); } }