`index.routing.allocation.initial_recovery` limits replica allocation (#20589)

`index.routing.allocation.initial_recovery` is used with index shrinking to make sure the new index's primary is assigned to the node that holds a copy of each of the source index shards. Sadly with the introduction of `RecoverySource` a regression was introduced that limits the allocation of replicas of the new index.
This commit is contained in:
Boaz Leskes 2016-09-21 12:40:37 +02:00 committed by GitHub
parent c97f4f38e7
commit b3e5e6a0ba
4 changed files with 75 additions and 28 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -28,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.snapshots.Snapshot;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Objects;
/**
@ -247,4 +249,14 @@ public abstract class RecoverySource implements Writeable, ToXContent {
return "peer recovery";
}
}
private static EnumSet<RecoverySource.Type> INITIAL_RECOVERY_TYPES = EnumSet.of(Type.EMPTY_STORE, Type.LOCAL_SHARDS, Type.SNAPSHOT);
/**
* returns true for recovery types that indicate that a primary is being allocated for the very first time.
* This recoveries can be controlled by {@link IndexMetaData#INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING}
*/
public static boolean isInitialRecovery(RecoverySource.Type type) {
return INITIAL_RECOVERY_TYPES.contains(type);
}
}

View File

@ -93,8 +93,8 @@ public class FilterAllocationDecider extends AllocationDecider {
// this is a setting that can only be set within the system!
IndexMetaData indexMd = allocation.metaData().getIndexSafe(shardRouting.index());
DiscoveryNodeFilters initialRecoveryFilters = indexMd.getInitialRecoveryFilters();
if (shardRouting.recoverySource().getType() != RecoverySource.Type.EXISTING_STORE &&
initialRecoveryFilters != null &&
if (initialRecoveryFilters != null &&
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) &&
initialRecoveryFilters.match(node.node()) == false) {
return allocation.decision(Decision.NO, NAME, "node does not match index initial recovery filters [%s]",
indexMd.includeFilters());

View File

@ -388,17 +388,22 @@ public class CreateIndexIT extends ESIntegTestCase {
.put("index.blocks.write", true)).get();
ensureGreen();
// now merge source into a single shard index
final boolean createWithReplicas = randomBoolean();
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target")
.setSettings(Settings.builder().put("index.number_of_replicas", 0).build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
// bump replicas
client().admin().indices().prepareUpdateSettings("target")
.setSettings(Settings.builder()
.put("index.number_of_replicas", 1)).get();
.setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
if (createWithReplicas == false) {
// bump replicas
client().admin().indices().prepareUpdateSettings("target")
.setSettings(Settings.builder()
.put("index.number_of_replicas", 1)).get();
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
}
for (int i = 20; i < 40; i++) {
client().prepareIndex("target", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
}

View File

@ -25,28 +25,34 @@ import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.util.Collections;
import java.util.Arrays;
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_NAME;
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_UUID;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
public class FilterAllocationDeciderTests extends ESAllocationTestCase {
public void testFilterInitialAllocation() {
public void testFilterInitialRecovery() {
FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY,
Collections.singleton(filterAllocationDecider));
Arrays.asList(filterAllocationDecider, new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY)));
AllocationService service = new AllocationService(Settings.builder().build(), allocationDeciders,
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
ClusterState state = createInitialClusterState(service, Settings.builder().put("index.routing.allocation.initial_recovery._id",
@ -61,27 +67,31 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
assertNull(routingTable.index("idx").shard(0).shards().get(0).currentNodeId());
// after failing the shard we are unassigned since the node is blacklisted and we can't initialize on the other node
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0, false);
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(),
state.getRoutingNodes().node("node2")
,allocation), Decision.YES);
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(),
state.getRoutingNodes().node("node1")
,allocation), Decision.NO);
// after failing the shard we are unassigned since the node is blacklisted and we can't initialize on the other node
state = service.reroute(state, "try allocate again");
routingTable = state.routingTable();
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2");
assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), INITIALIZING);
assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node2");
state = service.applyStartedShards(state, routingTable.index("idx").shard(0).shards());
state = service.applyStartedShards(state, routingTable.index("idx").shard(0).shardsWithState(INITIALIZING));
routingTable = state.routingTable();
// ok now we are started and can be allocated anywhere!! lets see...
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), STARTED);
assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node2");
assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), STARTED);
assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node2");
// replicas should be initializing
assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).state(), INITIALIZING);
assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).currentNodeId(), "node1");
// we fail it again to check if we are initializing immediately on the other node
state = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0));
@ -100,20 +110,40 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
}
private ClusterState createInitialClusterState(AllocationService service, Settings settings) {
MetaData.Builder metaBuilder = MetaData.builder();
metaBuilder.put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT).put(settings))
.numberOfShards(1).numberOfReplicas(0));
MetaData metaData = metaBuilder.build();
boolean shrinkIndex = randomBoolean();
MetaData.Builder metaData = MetaData.builder();
final Settings.Builder indexSettings = settings(Version.CURRENT).put(settings);
final IndexMetaData sourceIndex;
if (shrinkIndex) {
//put a fake closed source index
sourceIndex = IndexMetaData.builder("sourceIndex")
.settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0).build();
metaData.put(sourceIndex, false);
indexSettings.put(INDEX_SHRINK_SOURCE_UUID.getKey(), sourceIndex.getIndexUUID());
indexSettings.put(INDEX_SHRINK_SOURCE_NAME.getKey(), sourceIndex.getIndex().getName());
} else {
sourceIndex = null;
}
final IndexMetaData indexMetaData = IndexMetaData.builder("idx").settings(indexSettings)
.numberOfShards(1).numberOfReplicas(1).build();
metaData.put(indexMetaData, false);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
routingTableBuilder.addAsNew(metaData.index("idx"));
if (shrinkIndex) {
routingTableBuilder.addAsFromCloseToOpen(sourceIndex);
routingTableBuilder.addAsNew(indexMetaData);
} if (randomBoolean()) {
routingTableBuilder.addAsNew(indexMetaData);
} else {
routingTableBuilder.addAsRestore(indexMetaData, new RecoverySource.SnapshotRecoverySource(
new Snapshot("repository", new SnapshotId("snapshot_name", "snapshot_uuid")),
Version.CURRENT, indexMetaData.getIndex().getName()));
}
RoutingTable routingTable = routingTableBuilder.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
.build();
routingTable = service.reroute(clusterState, "reroute", false).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
return clusterState;
return service.reroute(clusterState, "reroute", false);
}
}