diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index 085cc513f24..e592b5092b7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -290,7 +290,7 @@ public class ClusterState implements ToXContent, Diffable { for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) { sb.append(TAB).append(TAB).append(shard).append(": "); sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], "); - sb.append("a_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n"); + sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n"); } } sb.append(blocks().prettyPrint()); diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index cccd3ebb72e..ce6473ecb42 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -422,7 +422,7 @@ public class ShardStateAction extends AbstractComponent { ClusterState maybeUpdatedState = currentState; try { - maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied, true); + maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied); builder.successes(tasksToBeApplied); } catch (Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 7f6d9a703bd..58ee6d70f2c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -133,11 +133,22 @@ public class IndexRoutingTable extends AbstractDiffable imple throw new IllegalStateException("shard routing has an index [" + shardRouting.index() + "] that is different " + "from the routing table"); } + final Set inSyncAllocationIds = indexMetaData.inSyncAllocationIds(shardRouting.id()); if (shardRouting.active() && - indexMetaData.inSyncAllocationIds(shardRouting.id()).contains(shardRouting.allocationId().getId()) == false) { + inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) { throw new IllegalStateException("active shard routing " + shardRouting + " has no corresponding entry in the in-sync " + - "allocation set " + indexMetaData.inSyncAllocationIds(shardRouting.id())); + "allocation set " + inSyncAllocationIds); } + + if (indexMetaData.getCreationVersion().onOrAfter(Version.V_5_0_0_alpha1) && + IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.getSettings()) == false && // see #20650 + shardRouting.primary() && shardRouting.initializing() && shardRouting.relocating() == false && + RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) == false && + inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) + throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " + + "a known allocation id but has no corresponding entry in the in-sync " + + "allocation set " + inSyncAllocationIds); + } } return true; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index e710836685e..323adf78046 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -78,10 +78,6 @@ public class AllocationService extends AbstractComponent { * If the same instance of the {@link ClusterState} is returned, then no change has been made.

*/ public ClusterState applyStartedShards(ClusterState clusterState, List startedShards) { - return applyStartedShards(clusterState, startedShards, true); - } - - public ClusterState applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { if (startedShards.isEmpty()) { return clusterState; } @@ -92,9 +88,7 @@ public class AllocationService extends AbstractComponent { clusterInfoService.getClusterInfo(), currentNanoTime(), false); applyStartedShards(allocation, startedShards); gatewayAllocator.applyStartedShards(allocation, startedShards); - if (withReroute) { - reroute(allocation); - } + reroute(allocation); String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString()); return buildResultAndLogHealthChange(clusterState, allocation, "shards started [" + startedShardsAsString + "] ..."); } diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 812d55d45ea..450255575d9 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -130,6 +130,13 @@ public class GatewayAllocator extends AbstractComponent { } public void allocateUnassigned(final RoutingAllocation allocation) { + innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator); + } + + // allow for testing infra to change shard allocators implementation + protected static void innerAllocatedUnassigned(RoutingAllocation allocation, + PrimaryShardAllocator primaryShardAllocator, + ReplicaShardAllocator replicaShardAllocator) { RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index 38afa11f5bd..25ae3b7cce9 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -138,7 +138,8 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { if (inSyncAllocationIds.isEmpty()) { assert Version.indexCreated(indexMetaData.getSettings()).before(Version.V_5_0_0_alpha1) : - "trying to allocated a primary with an empty allocation id set, but index is new"; + "trying to allocate a primary with an empty in sync allocation id set, but index is new. index: " + + indexMetaData.getIndex(); // when we load an old index (after upgrading cluster) or restore a snapshot of an old index // fall back to old version-based allocation mode // Note that once the shard has been active, lastActiveAllocationIds will be non-empty @@ -257,11 +258,11 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching - * lastActiveAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but + * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but * entries with matching allocation id are always at the front of the list. */ protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, - Set ignoreNodes, Set lastActiveAllocationIds, + Set ignoreNodes, Set inSyncAllocationIds, FetchResult shardState, Logger logger) { LinkedList matchingNodeShardStates = new LinkedList<>(); @@ -292,7 +293,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { if (allocationId != null) { numberOfAllocationsFound++; - if (lastActiveAllocationIds.contains(allocationId)) { + if (inSyncAllocationIds.contains(allocationId)) { if (nodeShardState.primary()) { matchingNodeShardStates.addFirst(nodeShardState); } else { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java index bbc2287deed..a9054879941 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteTests.java @@ -41,7 +41,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.io.IOException; import java.util.Collections; @@ -82,7 +82,7 @@ public class ClusterRerouteTests extends ESAllocationTestCase { public void testClusterStateUpdateTask() { AllocationService allocationService = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); ClusterState clusterState = createInitialClusterState(allocationService); ClusterRerouteRequest req = new ClusterRerouteRequest(); req.dryRun(true); diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java index 49d0ce447ba..0c5164aec5b 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java @@ -42,7 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.Arrays; import java.util.Collections; @@ -97,7 +97,7 @@ public class TransportShrinkActionTests extends ESTestCase { .build(); AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); @@ -120,7 +120,7 @@ public class TransportShrinkActionTests extends ESTestCase { .build(); AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java b/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java index d7439ae2e48..d80e16397ac 100644 --- a/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java @@ -48,7 +48,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -139,7 +139,7 @@ public class ClusterStateHealthTests extends ESTestCase { listenerCalled.await(); TransportClusterHealthAction action = new TransportClusterHealthAction(Settings.EMPTY, transportService, - clusterService, threadPool, new ActionFilters(new HashSet<>()), indexNameExpressionResolver, NoopGatewayAllocator.INSTANCE); + clusterService, threadPool, new ActionFilters(new HashSet<>()), indexNameExpressionResolver, new TestGatewayAllocator()); PlainActionFuture listener = new PlainActionFuture<>(); action.execute(new ClusterHealthRequest(), listener); diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java index de82ce8d073..d74b450f5bf 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java @@ -39,7 +39,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.Arrays; import java.util.Collections; @@ -133,7 +133,7 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase { .build(); AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); @@ -161,7 +161,7 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase { .build(); AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java index c07f6935298..863a33b1327 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java @@ -63,7 +63,7 @@ public class PrimaryTermsTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE) .build()); this.numberOfShards = randomIntBetween(1, 5); - this.numberOfReplicas = randomIntBetween(1, 5); + this.numberOfReplicas = randomIntBetween(0, 5); logger.info("Setup test with {} shards and {} replicas.", this.numberOfShards, this.numberOfReplicas); this.primaryTermsPerIndex.clear(); MetaData metaData = MetaData.builder() diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java index f0514794a5e..6ed42ee45aa 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java @@ -328,13 +328,20 @@ public class RoutingTableTests extends ESAllocationTestCase { expectThrows(IllegalStateException.class, () -> indexRoutingTable.validate(metaData4)); } + /** reverse engineer the in sync aid based on the given indexRoutingTable **/ public static IndexMetaData updateActiveAllocations(IndexRoutingTable indexRoutingTable, IndexMetaData indexMetaData) { IndexMetaData.Builder imdBuilder = IndexMetaData.builder(indexMetaData); for (IndexShardRoutingTable shardTable : indexRoutingTable) { for (ShardRouting shardRouting : shardTable) { - Set activeAllocations = shardTable.activeShards().stream().map( + Set insyncAids = shardTable.activeShards().stream().map( shr -> shr.allocationId().getId()).collect(Collectors.toSet()); - imdBuilder.putInSyncAllocationIds(shardRouting.id(), activeAllocations); + final ShardRouting primaryShard = shardTable.primaryShard(); + if (primaryShard.initializing() && primaryShard.relocating() == false && + RecoverySource.isInitialRecovery(primaryShard.recoverySource().getType()) == false ) { + // simulate a primary was initialized based on aid + insyncAids.add(primaryShard.allocationId().getId()); + } + imdBuilder.putInSyncAllocationIds(shardRouting.id(), insyncAids); } } return imdBuilder.build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index b2dd5ae0d26..e2a360b93fe 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -41,10 +41,12 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.hamcrest.Matchers; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -71,7 +73,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold); - AllocationService strategy = createAllocationService(settings.build()); + AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator()); ClusterState clusterState = initCluster(strategy); assertIndexBalance(clusterState.getRoutingTable(), clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); @@ -95,7 +97,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold); - AllocationService strategy = createAllocationService(settings.build()); + AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator()); ClusterState clusterState = initCluster(strategy); assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); @@ -254,7 +256,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { Settings.Builder settings = Settings.builder(); AllocationService strategy = new AllocationService(settings.build(), randomAllocationDeciders(settings.build(), new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), random()), - NoopGatewayAllocator.INSTANCE, new ShardsAllocator() { + new TestGatewayAllocator(), new ShardsAllocator() { public Map weighShard(RoutingAllocation allocation, ShardRouting shard) { return new HashMap(); @@ -351,7 +353,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { assertThat(shardRouting.state(), Matchers.equalTo(ShardRoutingState.INITIALIZING)); } } - strategy = createAllocationService(settings.build()); + strategy = createAllocationService(settings.build(), new NoopGatewayAllocator()); logger.info("use the new allocator and check if it moves shards"); routingNodes = clusterState.getRoutingNodes(); @@ -385,7 +387,26 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { assertThat(shardRouting.state(), Matchers.equalTo(ShardRoutingState.STARTED)); } } - } + private class NoopGatewayAllocator extends GatewayAllocator { + + public NoopGatewayAllocator() { + super(Settings.EMPTY, null, null); + } + + @Override + public void applyStartedShards(RoutingAllocation allocation, List startedShards) { + // noop + } + + @Override + public void applyFailedShards(RoutingAllocation allocation, List failedShards) { + // noop + } + @Override + public void allocateUnassigned(RoutingAllocation allocation) { + // noop + } + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java index 953ad1535e8..8cccdb08fb5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java @@ -33,7 +33,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.concurrent.atomic.AtomicBoolean; @@ -577,7 +577,7 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase { public void testRebalanceWithIgnoredUnassignedShards() { final AtomicBoolean allocateTest1 = new AtomicBoolean(false); - AllocationService strategy = createAllocationService(Settings.EMPTY, new NoopGatewayAllocator() { + AllocationService strategy = createAllocationService(Settings.EMPTY, new TestGatewayAllocator() { @Override public void allocateUnassigned(RoutingAllocation allocation) { if (allocateTest1.get() == false) { @@ -677,7 +677,7 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase { public void testRebalanceWhileShardFetching() { final AtomicBoolean hasFetches = new AtomicBoolean(true); AllocationService strategy = createAllocationService(Settings.builder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), - ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoopGatewayAllocator() { + ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new TestGatewayAllocator() { @Override public void allocateUnassigned(RoutingAllocation allocation) { if (hasFetches.get()) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java index a6897d972c4..bee2275743b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DecisionsImpactOnClusterHealthTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterStateHealth; @@ -38,8 +39,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.io.IOException; import java.util.Collections; @@ -161,7 +161,7 @@ public class DecisionsImpactOnClusterHealthTests extends ESAllocationTestCase { private static AllocationService newAllocationService(Settings settings, Set deciders) { return new AllocationService(settings, new AllocationDeciders(settings, deciders), - NoopGatewayAllocator.INSTANCE, + new TestGatewayAllocator(), new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE); } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 508fe1afa7f..667ae850bfa 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -34,12 +34,14 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; @@ -69,7 +71,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { RoutingTable routingTable = RoutingTable.builder() .addAsNew(metaData.index("test")) .build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); logger.info("--> adding 2 nodes on same rack and do rerouting"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() @@ -154,7 +156,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { .addAsNew(metaData.index("test")) .build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); logger.info("Adding two nodes and performing rerouting"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); @@ -227,7 +229,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { .addAsNew(metaData.index("test")) .build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); logger.info("Adding single node and performing rerouting"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); @@ -278,7 +280,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { .addAsNew(metaData.index("test")) .build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); logger.info("Adding {} nodes and performing rerouting", numberOfReplicas + 1); DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(); @@ -341,7 +343,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { .addAsNew(metaData.index("test")) .build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); logger.info("Adding two nodes and performing rerouting"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); @@ -396,7 +398,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { .addAsNew(metaData.index("test")) .build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); logger.info("Adding two nodes and performing rerouting"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); @@ -480,7 +482,10 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { .addAsNew(metaData.index("test")) .build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData).routingTable(routingTable).build(); + + ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0); // add 4 nodes clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).add(newNode("node4"))).build(); @@ -492,22 +497,26 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + // start one replica so it can take over. + clusterState = allocation.applyStartedShards(clusterState, + Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0))); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId); + + // fail the primary shard, check replicas get removed as well... ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; - // the primary gets allocated on another node, replicas are unassigned - assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); - assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); + // the primary gets allocated on another node, replicas are initializing + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); - - // start the primary shard - clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); - assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); - assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); } public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToElect() { @@ -522,7 +531,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { .addAsNew(metaData.index("test")) .build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); // add 4 nodes clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).add(newNode("node4"))).build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java index 88ac089c7d8..3a792ae991c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterAllocationDeciderTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; @@ -32,13 +33,15 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.Arrays; +import java.util.Collections; import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_NAME; import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_UUID; @@ -52,9 +55,10 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, - Arrays.asList(filterAllocationDecider, new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY))); + Arrays.asList(filterAllocationDecider, + new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY))); AllocationService service = new AllocationService(Settings.builder().build(), allocationDeciders, - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); ClusterState state = createInitialClusterState(service, Settings.builder().put("index.routing.allocation.initial_recovery._id", "node2").build()); RoutingTable routingTable = state.routingTable(); @@ -72,10 +76,10 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { null, 0, false); assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(), state.getRoutingNodes().node("node2") - ,allocation), Decision.YES); + , allocation), Decision.YES); assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(), state.getRoutingNodes().node("node1") - ,allocation), Decision.NO); + , allocation), Decision.NO); state = service.reroute(state, "try allocate again"); routingTable = state.routingTable(); @@ -86,57 +90,80 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase { routingTable = state.routingTable(); // ok now we are started and can be allocated anywhere!! lets see... - assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), STARTED); - assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node2"); - - // replicas should be initializing + // first create another copy assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).state(), INITIALIZING); assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).currentNodeId(), "node1"); - - // we fail it again to check if we are initializing immediately on the other node - state = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0)); + state = service.applyStartedShards(state, routingTable.index("idx").shard(0).replicaShardsWithState(INITIALIZING)); routingTable = state.routingTable(); - assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING); - assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node1"); + assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).state(), STARTED); + assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).currentNodeId(), "node1"); + + // now remove the node of the other copy and fail the current + DiscoveryNode node1 = state.nodes().resolveNode("node1"); + state = service.deassociateDeadNodes( + ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).remove("node1")).build(), + true, "test"); + state = service.applyFailedShard(state, routingTable.index("idx").shard(0).primaryShard()); + + // now bring back node1 and see it's assigned + state = service.reroute( + ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).add(node1)).build(), "test"); + routingTable = state.routingTable(); + assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), INITIALIZING); + assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node1"); allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, 0, false); assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0), state.getRoutingNodes().node("node2") - ,allocation), Decision.YES); + , allocation), Decision.YES); assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0), state.getRoutingNodes().node("node1") - ,allocation), Decision.YES); + , allocation), Decision.YES); } private ClusterState createInitialClusterState(AllocationService service, Settings settings) { - boolean shrinkIndex = randomBoolean(); + RecoverySource.Type recoveryType = randomFrom(RecoverySource.Type.EMPTY_STORE, + RecoverySource.Type.LOCAL_SHARDS, RecoverySource.Type.SNAPSHOT); MetaData.Builder metaData = MetaData.builder(); final Settings.Builder indexSettings = settings(Version.CURRENT).put(settings); final IndexMetaData sourceIndex; - if (shrinkIndex) { + if (recoveryType == RecoverySource.Type.LOCAL_SHARDS) { //put a fake closed source index sourceIndex = IndexMetaData.builder("sourceIndex") - .settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0).build(); + .settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0) + .putInSyncAllocationIds(0, Collections.singleton("aid0")) + .putInSyncAllocationIds(1, Collections.singleton("aid1")) + .build(); metaData.put(sourceIndex, false); indexSettings.put(INDEX_SHRINK_SOURCE_UUID.getKey(), sourceIndex.getIndexUUID()); indexSettings.put(INDEX_SHRINK_SOURCE_NAME.getKey(), sourceIndex.getIndex().getName()); } else { sourceIndex = null; } - final IndexMetaData indexMetaData = IndexMetaData.builder("idx").settings(indexSettings) - .numberOfShards(1).numberOfReplicas(1).build(); + final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder("idx").settings(indexSettings) + .numberOfShards(1).numberOfReplicas(1); + if (recoveryType == RecoverySource.Type.SNAPSHOT) { + indexMetaDataBuilder.putInSyncAllocationIds(0, Collections.singleton("_snapshot_restore")); + } + final IndexMetaData indexMetaData = indexMetaDataBuilder.build(); metaData.put(indexMetaData, false); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - if (shrinkIndex) { - routingTableBuilder.addAsFromCloseToOpen(sourceIndex); - routingTableBuilder.addAsNew(indexMetaData); - } if (randomBoolean()) { - routingTableBuilder.addAsNew(indexMetaData); - } else { - routingTableBuilder.addAsRestore(indexMetaData, new RecoverySource.SnapshotRecoverySource( - new Snapshot("repository", new SnapshotId("snapshot_name", "snapshot_uuid")), - Version.CURRENT, indexMetaData.getIndex().getName())); + switch (recoveryType) { + case EMPTY_STORE: + routingTableBuilder.addAsNew(indexMetaData); + break; + case SNAPSHOT: + routingTableBuilder.addAsRestore(indexMetaData, new RecoverySource.SnapshotRecoverySource( + new Snapshot("repository", new SnapshotId("snapshot_name", "snapshot_uuid")), + Version.CURRENT, indexMetaData.getIndex().getName())); + break; + case LOCAL_SHARDS: + routingTableBuilder.addAsFromCloseToOpen(sourceIndex); + routingTableBuilder.addAsNew(indexMetaData); + break; + default: + throw new UnsupportedOperationException(recoveryType + " is not supported"); } RoutingTable routingTable = routingTableBuilder.build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java index 3978b527df8..31e2330a600 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/MaxRetryAllocationDeciderTests.java @@ -35,7 +35,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.Collections; import java.util.List; @@ -55,7 +55,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { super.setUp(); strategy = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); } private ClusterState createInitialClusterState() { @@ -204,9 +204,9 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { routingTable.index("idx").shard(0).shards().get(0), null, new RoutingAllocation(null, null, clusterState, null, 0, false))); // now we start the shard - routingTable = strategy.applyStartedShards(clusterState, Collections.singletonList( - routingTable.index("idx").shard(0).shards().get(0))).routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + clusterState = strategy.applyStartedShards(clusterState, Collections.singletonList( + routingTable.index("idx").shard(0).shards().get(0))); + routingTable = clusterState.routingTable(); // all counters have been reset to 0 ie. no unassigned info assertEquals(routingTable.index("idx").shards().size(), 1); @@ -224,7 +224,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase { assertEquals(routingTable.index("idx").shards().size(), 1); unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), 1); - assertEquals(unassignedPrimary.state(), INITIALIZING); + assertEquals(unassignedPrimary.state(), UNASSIGNED); assertEquals(unassignedPrimary.unassignedInfo().getMessage(), "ZOOOMG"); // Counter reset, so MaxRetryAllocationDecider#canForceAllocatePrimary should return a YES decision assertEquals(Decision.YES, new MaxRetryAllocationDecider(Settings.EMPTY).canForceAllocatePrimary( diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index ab7e31ba431..b472cc5e0d2 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -53,7 +53,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.ArrayList; import java.util.Arrays; @@ -326,7 +326,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, Collections.singleton(new NodeVersionAllocationDecider(Settings.EMPTY))); AllocationService strategy = new MockAllocationService(Settings.EMPTY, allocationDeciders, - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState(); // the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match assertThat(state.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0)); @@ -342,10 +342,12 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { MASTER_DATA_ROLES, VersionUtils.getPreviousVersion()); int numberOfShards = randomIntBetween(1, 3); - MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numberOfShards).numberOfReplicas - (randomIntBetween(0, 3))) - .build(); + final IndexMetaData.Builder indexMetaData = IndexMetaData.builder("test").settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards).numberOfReplicas(randomIntBetween(0, 3)); + for (int i = 0; i < numberOfShards; i++) { + indexMetaData.putInSyncAllocationIds(i, Collections.singleton("_test_")); + } + MetaData metaData = MetaData.builder().put(indexMetaData).build(); ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metaData(metaData) @@ -358,7 +360,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { new NodeVersionAllocationDecider(Settings.EMPTY))); AllocationService strategy = new MockAllocationService(Settings.EMPTY, allocationDeciders, - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState(); // Make sure that primary shards are only allocated on the new node diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferLocalPrimariesToRelocatingPrimariesTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferLocalPrimariesToRelocatingPrimariesTests.java index 10f1cf69355..7e528e601d3 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferLocalPrimariesToRelocatingPrimariesTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferLocalPrimariesToRelocatingPrimariesTests.java @@ -38,6 +38,7 @@ import static org.hamcrest.Matchers.equalTo; /** */ public class PreferLocalPrimariesToRelocatingPrimariesTests extends ESAllocationTestCase { + public void testPreferLocalPrimaryAllocationOverFiltered() { int concurrentRecoveries = randomIntBetween(1, 10); int primaryRecoveries = randomIntBetween(1, 10); @@ -66,8 +67,7 @@ public class PreferLocalPrimariesToRelocatingPrimariesTests extends ESAllocation logger.info("adding two nodes and performing rerouting till all are allocated"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("tag1", "value1"))) - .add(newNode("node2", singletonMap("tag1", "value2")))).build(); + .add(newNode("node1")).add(newNode("node2"))).build(); clusterState = strategy.reroute(clusterState, "reroute"); @@ -82,12 +82,12 @@ public class PreferLocalPrimariesToRelocatingPrimariesTests extends ESAllocation .put(IndexMetaData.builder(clusterState.metaData().index("test1")).settings(settings(Version.CURRENT) .put("index.number_of_shards", numberOfShards) .put("index.number_of_replicas", 0) - .put("index.routing.allocation.exclude.tag1", "value2") + .put("index.routing.allocation.exclude._name", "node2") .build())) .put(IndexMetaData.builder(clusterState.metaData().index("test2")).settings(settings(Version.CURRENT) .put("index.number_of_shards", numberOfShards) .put("index.number_of_replicas", 0) - .put("index.routing.allocation.exclude.tag1", "value2") + .put("index.routing.allocation.exclude._name", "node2") .build())) .build(); clusterState = ClusterState.builder(clusterState).metaData(metaData).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java index ca9d61997b0..d789e6c4ec6 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -128,8 +129,9 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { routingNodes = clusterState.getRoutingNodes(); assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1)); - assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(1)); - assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(INITIALIZING).get(0).primary(), equalTo(true)); + assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(0)); + assertThat(routingNodes.shardsWithState(UNASSIGNED).size(), equalTo(3)); // 2 replicas and one primary + assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(STARTED).get(0).primary(), equalTo(true)); assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L)); } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java index 53d54c1835d..6722e048030 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java @@ -38,10 +38,11 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.hamcrest.Matchers; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Random; @@ -58,7 +59,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase { RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(random()); AllocationService strategy = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY), - randomAllocationDecider))), NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + randomAllocationDecider))), new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); int indices = scaledRandomIntBetween(1, 20); Builder metaBuilder = MetaData.builder(); int maxNumReplicas = 1; @@ -101,9 +102,25 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase { boolean nodesRemoved = false; if (nodeIdCounter > 1 && rarely()) { int nodeId = scaledRandomIntBetween(0, nodeIdCounter - 2); - logger.info("removing node [{}]", nodeId); - newNodesBuilder.remove("NODE_" + nodeId); - nodesRemoved = true; + final String node = "NODE_" + nodeId; + boolean safeToRemove = true; + RoutingNode routingNode = clusterState.getRoutingNodes().node(node); + for (ShardRouting shard: routingNode != null ? routingNode : Collections.emptyList()) { + if (shard.active() && shard.primary()) { + // make sure there is an active replica to prevent from going red + if (clusterState.routingTable().shardRoutingTable(shard.shardId()).activeShards().size() <= 1) { + safeToRemove = false; + break; + } + } + } + if (safeToRemove) { + logger.info("removing node [{}]", nodeId); + newNodesBuilder.remove(node); + nodesRemoved = true; + } else { + logger.debug("not removing node [{}] as it holds a primary with no replacement", nodeId); + } } stateBuilder.nodes(newNodesBuilder.build()); @@ -142,7 +159,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase { } while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() != 0 || clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size() != 0 && iterations < 200); - logger.info("Done Balancing after [{}] iterations", iterations); + logger.info("Done Balancing after [{}] iterations. State:\n{}", iterations, clusterState.prettyPrint()); // we stop after 200 iterations if it didn't stabelize by then something is likely to be wrong assertThat("max num iteration exceeded", iterations, Matchers.lessThan(200)); assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(0)); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index 33088bcce99..dd89d6b6a52 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -117,27 +117,41 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase { assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(STARTED)); assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1")); - logger.info("Killing node1 where the shard is, checking the shard is relocated"); + logger.info("Killing node1 where the shard is, checking the shard is unassigned"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build(); newState = strategy.deassociateDeadNodes(clusterState, true, "reroute"); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; + assertThat(clusterState.routingTable().index("test").shards().size(), equalTo(1)); + assertThat(clusterState.routingTable().index("test").shard(0).size(), equalTo(1)); + assertThat(clusterState.routingTable().index("test").shard(0).shards().size(), equalTo(1)); + assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), nullValue()); + + logger.info("Bring node1 back, and see it's assinged"); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node1"))).build(); + newState = strategy.reroute(clusterState, "reroute"); + assertThat(newState, not(equalTo(clusterState))); + clusterState = newState; + assertThat(clusterState.routingTable().index("test").shards().size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shard(0).size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shard(0).shards().size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(INITIALIZING)); - assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node2")); + assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1")); + logger.info("Start another node, make sure that things remain the same (shard is in node2 and initializing)"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build(); newState = strategy.reroute(clusterState, "reroute"); assertThat(newState, equalTo(clusterState)); - logger.info("Start the shard on node 2"); + logger.info("Start the shard on node 1"); routingNodes = clusterState.getRoutingNodes(); - newState = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); + newState = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; @@ -145,7 +159,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase { assertThat(clusterState.routingTable().index("test").shard(0).size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shard(0).shards().size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(STARTED)); - assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node2")); + assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1")); } public void testSingleIndexShardFailed() { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java index d8d00e3915f..454e8410484 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java @@ -69,14 +69,15 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase { logger.info("--> test starting of shard"); - ClusterState newState = allocation.applyStartedShards(state, Arrays.asList(initShard), false); + ClusterState newState = allocation.applyStartedShards(state, Arrays.asList(initShard)); assertThat("failed to start " + initShard + "\ncurrent routing table:" + newState.routingTable().prettyPrint(), newState, not(equalTo(state))); assertTrue(initShard + "isn't started \ncurrent routing table:" + newState.routingTable().prettyPrint(), newState.routingTable().index("test").shard(initShard.id()).allShardsStarted()); + state = newState; logger.info("--> testing starting of relocating shards"); - newState = allocation.applyStartedShards(state, Arrays.asList(relocatingShard.getTargetRelocatingShard()), false); + newState = allocation.applyStartedShards(state, Arrays.asList(relocatingShard.getTargetRelocatingShard())); assertThat("failed to start " + relocatingShard + "\ncurrent routing table:" + newState.routingTable().prettyPrint(), newState, not(equalTo(state))); ShardRouting shardRouting = newState.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java index d6aafbda34a..894b5b42f0c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -20,23 +20,35 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.IntHashSet; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.gateway.TestGatewayAllocator; +import java.util.Collections; + +import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; @@ -50,10 +62,12 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { private final Logger logger = Loggers.getLogger(ThrottlingAllocationTests.class); public void testPrimaryRecoveryThrottling() { + + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 3) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) - .build()); + .build(), gatewayAllocator); logger.info("Building initial routing table"); @@ -61,9 +75,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(10).numberOfReplicas(1)) .build(); - RoutingTable initialRoutingTable = createRecoveryRoutingTable(metaData.index("test")); - - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); + ClusterState clusterState = createRecoveryStateAndInitalizeAllocations(metaData, gatewayAllocator); logger.info("start one node, do reroute, only 3 should initialize"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); @@ -103,11 +115,13 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { } public void testReplicaAndPrimaryRecoveryThrottling() { + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 3) .put("cluster.routing.allocation.concurrent_source_recoveries", 3) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) - .build()); + .build(), + gatewayAllocator); logger.info("Building initial routing table"); @@ -115,12 +129,9 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1)) .build(); - RoutingTable initialRoutingTable = createRecoveryRoutingTable(metaData.index("test")); + ClusterState clusterState = createRecoveryStateAndInitalizeAllocations(metaData, gatewayAllocator); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); - - logger.info("start one node, do reroute, only 3 should initialize"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + logger.info("with one node, do reroute, only 3 should initialize"); clusterState = strategy.reroute(clusterState, "reroute"); assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0)); @@ -165,24 +176,22 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { } public void testThrottleIncomingAndOutgoing() { + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); Settings settings = Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 5) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 5) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 5) .build(); - AllocationService strategy = createAllocationService(settings); + AllocationService strategy = createAllocationService(settings, gatewayAllocator); logger.info("Building initial routing table"); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(9).numberOfReplicas(0)) .build(); - RoutingTable initialRoutingTable = createRecoveryRoutingTable(metaData.index("test")); + ClusterState clusterState = createRecoveryStateAndInitalizeAllocations(metaData, gatewayAllocator); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); - - logger.info("start one node, do reroute, only 5 should initialize"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + logger.info("with one node, do reroute, only 5 should initialize"); clusterState = strategy.reroute(clusterState, "reroute"); assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0)); assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(5)); @@ -225,9 +234,10 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { } public void testOutgoingThrottlesAllocation() { + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_outgoing_recoveries", 1) - .build()); + .build(), gatewayAllocator); logger.info("Building initial routing table"); @@ -235,12 +245,9 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) .build(); - RoutingTable initialRoutingTable = createRecoveryRoutingTable(metaData.index("test")); + ClusterState clusterState = createRecoveryStateAndInitalizeAllocations(metaData, gatewayAllocator); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); - - logger.info("start one node, do reroute, only 1 should initialize"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + logger.info("with one node, do reroute, only 1 should initialize"); clusterState = strategy.reroute(clusterState, "reroute"); assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0)); @@ -301,23 +308,66 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase { assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0); } - private RoutingTable createRecoveryRoutingTable(IndexMetaData indexMetaData) { + private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaData, TestGatewayAllocator gatewayAllocator) { + DiscoveryNode node1 = newNode("node1"); + MetaData.Builder metaDataBuilder = new MetaData.Builder(metaData); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - switch (randomInt(5)) { - case 0: routingTableBuilder.addAsRecovery(indexMetaData); break; - case 1: routingTableBuilder.addAsFromCloseToOpen(indexMetaData); break; - case 2: routingTableBuilder.addAsFromDangling(indexMetaData); break; - case 3: routingTableBuilder.addAsNewRestore(indexMetaData, - new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, - indexMetaData.getIndex().getName()), new IntHashSet()); break; - case 4: routingTableBuilder.addAsRestore(indexMetaData, - new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, - indexMetaData.getIndex().getName())); break; - case 5: routingTableBuilder.addAsNew(indexMetaData); break; - default: throw new IndexOutOfBoundsException(); + for (ObjectCursor cursor: metaData.indices().values()) { + Index index = cursor.value.getIndex(); + IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(cursor.value); + final int recoveryType = randomInt(5); + if (recoveryType <= 4) { + addInSyncAllocationIds(index, indexMetaDataBuilder, gatewayAllocator, node1); + } + IndexMetaData indexMetaData = indexMetaDataBuilder.build(); + metaDataBuilder.put(indexMetaData, false); + switch (recoveryType) { + case 0: + routingTableBuilder.addAsRecovery(indexMetaData); + break; + case 1: + routingTableBuilder.addAsFromCloseToOpen(indexMetaData); + break; + case 2: + routingTableBuilder.addAsFromDangling(indexMetaData); + break; + case 3: + routingTableBuilder.addAsNewRestore(indexMetaData, + new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, + indexMetaData.getIndex().getName()), new IntHashSet()); + break; + case 4: + routingTableBuilder.addAsRestore(indexMetaData, + new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, + indexMetaData.getIndex().getName())); + break; + case 5: + routingTableBuilder.addAsNew(indexMetaData); + break; + default: + throw new IndexOutOfBoundsException(); + } } - - return routingTableBuilder.build(); + return ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder().add(node1)) + .metaData(metaDataBuilder.build()) + .routingTable(routingTableBuilder.build()).build(); } + private void addInSyncAllocationIds(Index index, IndexMetaData.Builder indexMetaData, + TestGatewayAllocator gatewayAllocator, DiscoveryNode node1) { + for (int shard = 0; shard < indexMetaData.numberOfShards(); shard++) { + + final boolean primary = randomBoolean(); + final ShardRouting unassigned = ShardRouting.newUnassigned(new ShardId(index, shard), primary, + primary ? + RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test") + ); + ShardRouting started = ShardRoutingHelper.moveToStarted(ShardRoutingHelper.initialize(unassigned, node1.getId())); + indexMetaData.putInSyncAllocationIds(shard, Collections.singleton(started.allocationId().getId())); + gatewayAllocator.addKnownAllocation(started); + } + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 65fb1ac0020..062a018a82d 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -50,7 +50,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.Arrays; import java.util.HashMap; @@ -113,7 +113,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -194,7 +194,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); clusterState = strategy.reroute(clusterState, "reroute"); logShardStates(clusterState); @@ -224,7 +224,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); clusterState = strategy.reroute(clusterState, "reroute"); @@ -301,7 +301,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) @@ -358,7 +358,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); clusterState = strategy.reroute(clusterState, "reroute"); logShardStates(clusterState); @@ -421,7 +421,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); clusterState = strategy.reroute(clusterState, "reroute"); logShardStates(clusterState); @@ -451,7 +451,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); clusterState = strategy.reroute(clusterState, "reroute"); @@ -555,7 +555,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) @@ -625,7 +625,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) @@ -729,7 +729,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -900,7 +900,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); // Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away // and therefor we will have sufficient disk space on node1. ClusterState result = strategy.reroute(clusterState, "reroute"); @@ -998,7 +998,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); + .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); ClusterState result = strategy.reroute(clusterState, "reroute"); assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED)); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index ca75ea960ad..907d3786992 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -90,7 +90,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -@TestLogging("org.elasticsearch.discovery.zen:TRACE") +@TestLogging("org.elasticsearch.discovery.zen:TRACE,org.elasticsearch.cluster.service:TRACE") public class NodeJoinControllerTests extends ESTestCase { private static ThreadPool threadPool; diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index 080b1f0a00e..0fd89ec8898 100644 --- a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -23,18 +23,20 @@ import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; @@ -48,7 +50,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardStateMetaData; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.cluster.ESAllocationTestCase; import org.junit.Before; import java.util.Arrays; @@ -57,6 +58,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED; +import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.INDEX_CREATED; +import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.INDEX_REOPENED; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -78,11 +82,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testNoProcessPrimaryNotAllocatedBefore() { final RoutingAllocation allocation; - if (randomBoolean()) { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomBoolean(), Version.CURRENT); - } else { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), true, Version.V_2_1_0); - } + // with old version, we can't know if a shard was allocated before or not + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), + randomFrom(INDEX_CREATED, CLUSTER_RECOVERED, INDEX_REOPENED), Version.CURRENT); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); @@ -96,9 +98,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testNoAsyncFetchData() { final RoutingAllocation allocation; if (randomBoolean()) { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId"); + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.CURRENT, "allocId"); } else { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0); + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_0); } testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); @@ -114,9 +116,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testNoAllocationFound() { final RoutingAllocation allocation; if (randomBoolean()) { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId"); + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.CURRENT, "allocId"); } else { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0); + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_0); } testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean()); testAllocator.allocateUnassigned(allocation); @@ -130,7 +132,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore unassigned. */ public void testNoMatchingAllocationIdFound() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2"); + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.CURRENT, "id2"); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "id1", randomBoolean()); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); @@ -144,7 +146,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * This is the case when we have old shards from pre-3.0 days. */ public void testNoActiveAllocationIds() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1); testAllocator.addData(node1, 1, null, randomBoolean()); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); @@ -160,10 +162,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testStoreException() { final RoutingAllocation allocation; if (randomBoolean()) { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, + randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(), new CorruptIndexException("test", "test")); } else { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1); testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test")); } testAllocator.allocateUnassigned(allocation); @@ -180,10 +183,12 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { final RoutingAllocation allocation; boolean useAllocationIds = randomBoolean(); if (useAllocationIds) { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), + randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean()); } else { - allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0); + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), + Version.V_2_2_0); testAllocator.addData(node1, 3, null, randomBoolean()); } testAllocator.allocateUnassigned(allocation); @@ -210,7 +215,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { // the allocator will see if it can force assign the primary, where the decision will be YES new TestAllocateDecision(randomBoolean() ? Decision.YES : Decision.NO), getNoDeciderThatAllowsForceAllocate() )); - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1"); + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, Version.CURRENT, "allocId1"); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); assertTrue(allocation.routingNodes().unassigned().ignored().isEmpty()); @@ -233,7 +238,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { new TestAllocateDecision(Decision.NO), forceDecisionNo ? getNoDeciderThatDeniesForceAllocate() : getNoDeciderThatThrottlesForceAllocate() )); - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1"); + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, Version.CURRENT, "allocId1"); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); List ignored = allocation.routingNodes().unassigned().ignored(); @@ -257,7 +262,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { // force allocating the shard, we still THROTTLE due to the decision from TestAllocateDecision new TestAllocateDecision(Decision.THROTTLE), getNoDeciderThatAllowsForceAllocate() )); - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1"); + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, Version.CURRENT, "allocId1"); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); List ignored = allocation.routingNodes().unassigned().ignored(); @@ -272,7 +277,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testPreferAllocatingPreviousPrimary() { String primaryAllocId = UUIDs.randomBase64UUID(); String replicaAllocId = UUIDs.randomBase64UUID(); - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), primaryAllocId, replicaAllocId); + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), + randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), randomFrom(Version.V_2_0_0, Version.CURRENT), primaryAllocId, replicaAllocId); boolean node1HasPrimaryShard = randomBoolean(); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard); testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard); @@ -292,10 +298,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testFoundAllocationButThrottlingDecider() { final RoutingAllocation allocation; if (randomBoolean()) { - allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), CLUSTER_RECOVERED, + randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean()); } else { - allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0); + allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_2_0); testAllocator.addData(node1, 3, null, randomBoolean()); } testAllocator.allocateUnassigned(allocation); @@ -312,10 +319,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testFoundAllocationButNoDecider() { final RoutingAllocation allocation; if (randomBoolean()) { - allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), CLUSTER_RECOVERED, + randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean()); } else { - allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0); + allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_0_0); testAllocator.addData(node1, 3, null, randomBoolean()); } testAllocator.allocateUnassigned(allocation); @@ -330,7 +338,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * Tests that the highest version node is chosen for allocation. */ public void testAllocateToTheHighestVersionOnLegacyIndex() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0); + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_0_0); testAllocator.addData(node1, 10, null, randomBoolean()).addData(node2, 12, null, randomBoolean()); testAllocator.allocateUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); @@ -347,7 +355,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { * allocation mode would be chosen). */ public void testVersionBasedAllocationPrefersShardWithAllocationId() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0); + RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_0_0); testAllocator.addData(node1, 10, null, randomBoolean()); testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some allocId", randomBoolean()); testAllocator.addData(node3, 12, null, randomBoolean()); @@ -616,17 +624,27 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertClusterHealthStatus(allocation, ClusterHealthStatus.RED); } - private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, + private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, + UnassignedInfo.Reason reason, Version version, String... activeAllocationIds) { MetaData metaData = MetaData.builder() .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version)) .numberOfShards(1).numberOfReplicas(0).putInSyncAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds))) .build(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - if (asNew) { - routingTableBuilder.addAsNew(metaData.index(shardId.getIndex())); - } else { - routingTableBuilder.addAsRecovery(metaData.index(shardId.getIndex())); + switch (reason) { + + case INDEX_CREATED: + routingTableBuilder.addAsNew(metaData.index(shardId.getIndex())); + break; + case CLUSTER_RECOVERED: + routingTableBuilder.addAsRecovery(metaData.index(shardId.getIndex())); + break; + case INDEX_REOPENED: + routingTableBuilder.addAsFromCloseToOpen(metaData.index(shardId.getIndex())); + break; + default: + throw new IllegalArgumentException("can't do " + reason + " for you. teach me"); } ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metaData(metaData) diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 9ae9f620912..dcadd60cc77 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -70,7 +70,7 @@ import org.elasticsearch.index.NodeServicesProvider; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; @@ -116,7 +116,7 @@ public class ClusterStateChanges extends AbstractComponent { new HashSet<>(Arrays.asList(new SameShardAllocationDecider(settings), new ReplicaAfterPrimaryActiveAllocationDecider(settings), new RandomAllocationDeciderTests.RandomAllocationDecider(getRandom())))), - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), + new TestGatewayAllocator(), new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE); shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger); shardStartedClusterStateTaskExecutor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index d1b73d874e7..89fafc74c86 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -39,7 +39,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.ArrayList; import java.util.Arrays; @@ -75,13 +75,13 @@ public abstract class ESAllocationTestCase extends ESTestCase { public static MockAllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings, Random random) { return new MockAllocationService(settings, randomAllocationDeciders(settings, clusterSettings, random), - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE); + new TestGatewayAllocator(), new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE); } public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) { return new MockAllocationService(settings, randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()), - NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), clusterInfoService); + new TestGatewayAllocator(), new BalancedShardsAllocator(settings), clusterInfoService); } public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator gatewayAllocator) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/elasticsearch/test/gateway/TestGatewayAllocator.java new file mode 100644 index 00000000000..5caf4571272 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/gateway/TestGatewayAllocator.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.gateway; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.FailedShard; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.AsyncShardFetch; +import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.gateway.PrimaryShardAllocator; +import org.elasticsearch.gateway.ReplicaShardAllocator; +import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A gateway allocator implementation that keeps an in memory list of started shard allocation + * that are used as replies to the, normally async, fetch data requests. The in memory list + * is adapted when shards are started and failed. + * + * Nodes leaving and joining the cluster do not change the list of shards the class tracks but + * rather serves as a filter to what is returned by fetch data. Concretely - fetch data will + * only return shards that were started on nodes that are currently part of the cluster. + * + * For now only primary shard related data is fetched. Replica request always get an empty response. + * + * + * This class is useful to use in unit tests that require the functionality of {@link GatewayAllocator} but do + * not have all the infrastructure required to use it. + */ +public class TestGatewayAllocator extends GatewayAllocator { + + Map> knownAllocations = new HashMap<>(); + DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES; + + PrimaryShardAllocator primaryShardAllocator = new PrimaryShardAllocator(Settings.EMPTY) { + @Override + protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + // for now always return immediately what we know + final ShardId shardId = shard.shardId(); + final Set ignoreNodes = allocation.getIgnoreNodes(shardId); + Map foundShards = knownAllocations.values().stream() + .flatMap(shardMap -> shardMap.values().stream()) + .filter(ks -> ks.shardId().equals(shardId)) + .filter(ks -> ignoreNodes.contains(ks.currentNodeId()) == false) + .filter(ks -> currentNodes.nodeExists(ks.currentNodeId())) + .collect(Collectors.toMap( + routing -> currentNodes.get(routing.currentNodeId()), + routing -> + new NodeGatewayStartedShards( + currentNodes.get(routing.currentNodeId()), -1, routing.allocationId().getId(), routing.primary()))); + + return new AsyncShardFetch.FetchResult<>(shardId, foundShards, Collections.emptySet(), ignoreNodes); + } + }; + + ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator(Settings.EMPTY) { + @Override + protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + // for now, just pretend no node has data + final ShardId shardId = shard.shardId(); + return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), Collections.emptySet(), + allocation.getIgnoreNodes(shardId)); + } + }; + + public TestGatewayAllocator() { + super(Settings.EMPTY, null, null); + } + + @Override + public void applyStartedShards(RoutingAllocation allocation, List startedShards) { + currentNodes = allocation.nodes(); + allocation.routingNodes().shards(ShardRouting::active).forEach(this::addKnownAllocation); + } + + @Override + public void applyFailedShards(RoutingAllocation allocation, List failedShards) { + currentNodes = allocation.nodes(); + for (FailedShard failedShard : failedShards) { + final ShardRouting failedRouting = failedShard.getRoutingEntry(); + Map nodeAllocations = knownAllocations.get(failedRouting.currentNodeId()); + if (nodeAllocations != null) { + nodeAllocations.remove(failedRouting.shardId()); + if (nodeAllocations.isEmpty()) { + knownAllocations.remove(failedRouting.currentNodeId()); + } + } + } + } + + @Override + public void allocateUnassigned(RoutingAllocation allocation) { + currentNodes = allocation.nodes(); + innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator); + } + + /** + * manually add a specific shard to the allocations the gateway keeps track of + */ + public void addKnownAllocation(ShardRouting shard) { + knownAllocations.computeIfAbsent(shard.currentNodeId(), id -> new HashMap<>()) + .put(shard.shardId(), shard); + } +}