Remove NoopGatewayAllocator in favor of a more realistic mock (#20637)

Many of our unit tests instantiate an `AllocationService`, which requires having a `GatewayAllocator`. Today almost all of our test use a class called `NoopGatewayAllocator` which does nothing, effectively leaving all shard assignments to the balanced allocator. This is sad as it means we test a system that behaves differently than our production logic in very basic things. For example, a started primary that is lost will be assigned to a node that didn't use to have it.

This PR removes `NoopGatewayAllocator` in favor of a new `TestGatewayAllocator` that inherits the standard `GatewayAllocator` and overrides shard information fetching to return information based on historical assignments the allocator has done. The only exception is `BalanceConfigurationTests` which does test only the balancer and I opted to not have it work around the `GatewayAllocator` being in it's way.
This commit is contained in:
Boaz Leskes 2016-09-25 20:15:30 +02:00 committed by GitHub
parent 660d9d0de3
commit ee76c1a5c9
31 changed files with 519 additions and 206 deletions

View File

@ -290,7 +290,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) { for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
sb.append(TAB).append(TAB).append(shard).append(": "); sb.append(TAB).append(TAB).append(shard).append(": ");
sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], "); sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");
sb.append("a_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n"); sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
} }
} }
sb.append(blocks().prettyPrint()); sb.append(blocks().prettyPrint());

View File

@ -422,7 +422,7 @@ public class ShardStateAction extends AbstractComponent {
ClusterState maybeUpdatedState = currentState; ClusterState maybeUpdatedState = currentState;
try { try {
maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied, true); maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied);
builder.successes(tasksToBeApplied); builder.successes(tasksToBeApplied);
} catch (Exception e) { } catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e); logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e);

View File

@ -133,11 +133,22 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
throw new IllegalStateException("shard routing has an index [" + shardRouting.index() + "] that is different " + throw new IllegalStateException("shard routing has an index [" + shardRouting.index() + "] that is different " +
"from the routing table"); "from the routing table");
} }
final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(shardRouting.id());
if (shardRouting.active() && if (shardRouting.active() &&
indexMetaData.inSyncAllocationIds(shardRouting.id()).contains(shardRouting.allocationId().getId()) == false) { inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) {
throw new IllegalStateException("active shard routing " + shardRouting + " has no corresponding entry in the in-sync " + throw new IllegalStateException("active shard routing " + shardRouting + " has no corresponding entry in the in-sync " +
"allocation set " + indexMetaData.inSyncAllocationIds(shardRouting.id())); "allocation set " + inSyncAllocationIds);
} }
if (indexMetaData.getCreationVersion().onOrAfter(Version.V_5_0_0_alpha1) &&
IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.getSettings()) == false && // see #20650
shardRouting.primary() && shardRouting.initializing() && shardRouting.relocating() == false &&
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) == false &&
inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " +
"a known allocation id but has no corresponding entry in the in-sync " +
"allocation set " + inSyncAllocationIds);
} }
} }
return true; return true;

View File

@ -78,10 +78,6 @@ public class AllocationService extends AbstractComponent {
* If the same instance of the {@link ClusterState} is returned, then no change has been made.</p> * If the same instance of the {@link ClusterState} is returned, then no change has been made.</p>
*/ */
public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) { public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) {
return applyStartedShards(clusterState, startedShards, true);
}
public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards, boolean withReroute) {
if (startedShards.isEmpty()) { if (startedShards.isEmpty()) {
return clusterState; return clusterState;
} }
@ -92,9 +88,7 @@ public class AllocationService extends AbstractComponent {
clusterInfoService.getClusterInfo(), currentNanoTime(), false); clusterInfoService.getClusterInfo(), currentNanoTime(), false);
applyStartedShards(allocation, startedShards); applyStartedShards(allocation, startedShards);
gatewayAllocator.applyStartedShards(allocation, startedShards); gatewayAllocator.applyStartedShards(allocation, startedShards);
if (withReroute) { reroute(allocation);
reroute(allocation);
}
String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString()); String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
return buildResultAndLogHealthChange(clusterState, allocation, "shards started [" + startedShardsAsString + "] ..."); return buildResultAndLogHealthChange(clusterState, allocation, "shards started [" + startedShardsAsString + "] ...");
} }

View File

@ -130,6 +130,13 @@ public class GatewayAllocator extends AbstractComponent {
} }
public void allocateUnassigned(final RoutingAllocation allocation) { public void allocateUnassigned(final RoutingAllocation allocation) {
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator);
}
// allow for testing infra to change shard allocators implementation
protected static void innerAllocatedUnassigned(RoutingAllocation allocation,
PrimaryShardAllocator primaryShardAllocator,
ReplicaShardAllocator replicaShardAllocator) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering

View File

@ -138,7 +138,8 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
if (inSyncAllocationIds.isEmpty()) { if (inSyncAllocationIds.isEmpty()) {
assert Version.indexCreated(indexMetaData.getSettings()).before(Version.V_5_0_0_alpha1) : assert Version.indexCreated(indexMetaData.getSettings()).before(Version.V_5_0_0_alpha1) :
"trying to allocated a primary with an empty allocation id set, but index is new"; "trying to allocate a primary with an empty in sync allocation id set, but index is new. index: "
+ indexMetaData.getIndex();
// when we load an old index (after upgrading cluster) or restore a snapshot of an old index // when we load an old index (after upgrading cluster) or restore a snapshot of an old index
// fall back to old version-based allocation mode // fall back to old version-based allocation mode
// Note that once the shard has been active, lastActiveAllocationIds will be non-empty // Note that once the shard has been active, lastActiveAllocationIds will be non-empty
@ -257,11 +258,11 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
/** /**
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
* lastActiveAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
* entries with matching allocation id are always at the front of the list. * entries with matching allocation id are always at the front of the list.
*/ */
protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard,
Set<String> ignoreNodes, Set<String> lastActiveAllocationIds, Set<String> ignoreNodes, Set<String> inSyncAllocationIds,
FetchResult<NodeGatewayStartedShards> shardState, FetchResult<NodeGatewayStartedShards> shardState,
Logger logger) { Logger logger) {
LinkedList<NodeGatewayStartedShards> matchingNodeShardStates = new LinkedList<>(); LinkedList<NodeGatewayStartedShards> matchingNodeShardStates = new LinkedList<>();
@ -292,7 +293,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
if (allocationId != null) { if (allocationId != null) {
numberOfAllocationsFound++; numberOfAllocationsFound++;
if (lastActiveAllocationIds.contains(allocationId)) { if (inSyncAllocationIds.contains(allocationId)) {
if (nodeShardState.primary()) { if (nodeShardState.primary()) {
matchingNodeShardStates.addFirst(nodeShardState); matchingNodeShardStates.addFirst(nodeShardState);
} else { } else {

View File

@ -41,7 +41,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
@ -82,7 +82,7 @@ public class ClusterRerouteTests extends ESAllocationTestCase {
public void testClusterStateUpdateTask() { public void testClusterStateUpdateTask() {
AllocationService allocationService = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, AllocationService allocationService = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
ClusterState clusterState = createInitialClusterState(allocationService); ClusterState clusterState = createInitialClusterState(allocationService);
ClusterRerouteRequest req = new ClusterRerouteRequest(); ClusterRerouteRequest req = new ClusterRerouteRequest();
req.dryRun(true); req.dryRun(true);

View File

@ -42,7 +42,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -97,7 +97,7 @@ public class TransportShrinkActionTests extends ESTestCase {
.build(); .build();
AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -120,7 +120,7 @@ public class TransportShrinkActionTests extends ESTestCase {
.build(); .build();
AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

View File

@ -48,7 +48,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -139,7 +139,7 @@ public class ClusterStateHealthTests extends ESTestCase {
listenerCalled.await(); listenerCalled.await();
TransportClusterHealthAction action = new TransportClusterHealthAction(Settings.EMPTY, transportService, TransportClusterHealthAction action = new TransportClusterHealthAction(Settings.EMPTY, transportService,
clusterService, threadPool, new ActionFilters(new HashSet<>()), indexNameExpressionResolver, NoopGatewayAllocator.INSTANCE); clusterService, threadPool, new ActionFilters(new HashSet<>()), indexNameExpressionResolver, new TestGatewayAllocator());
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>(); PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
action.execute(new ClusterHealthRequest(), listener); action.execute(new ClusterHealthRequest(), listener);

View File

@ -39,7 +39,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -133,7 +133,7 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
.build(); .build();
AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -161,7 +161,7 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
.build(); .build();
AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

View File

@ -63,7 +63,7 @@ public class PrimaryTermsTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE) .put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE)
.build()); .build());
this.numberOfShards = randomIntBetween(1, 5); this.numberOfShards = randomIntBetween(1, 5);
this.numberOfReplicas = randomIntBetween(1, 5); this.numberOfReplicas = randomIntBetween(0, 5);
logger.info("Setup test with {} shards and {} replicas.", this.numberOfShards, this.numberOfReplicas); logger.info("Setup test with {} shards and {} replicas.", this.numberOfShards, this.numberOfReplicas);
this.primaryTermsPerIndex.clear(); this.primaryTermsPerIndex.clear();
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()

View File

@ -328,13 +328,20 @@ public class RoutingTableTests extends ESAllocationTestCase {
expectThrows(IllegalStateException.class, () -> indexRoutingTable.validate(metaData4)); expectThrows(IllegalStateException.class, () -> indexRoutingTable.validate(metaData4));
} }
/** reverse engineer the in sync aid based on the given indexRoutingTable **/
public static IndexMetaData updateActiveAllocations(IndexRoutingTable indexRoutingTable, IndexMetaData indexMetaData) { public static IndexMetaData updateActiveAllocations(IndexRoutingTable indexRoutingTable, IndexMetaData indexMetaData) {
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(indexMetaData); IndexMetaData.Builder imdBuilder = IndexMetaData.builder(indexMetaData);
for (IndexShardRoutingTable shardTable : indexRoutingTable) { for (IndexShardRoutingTable shardTable : indexRoutingTable) {
for (ShardRouting shardRouting : shardTable) { for (ShardRouting shardRouting : shardTable) {
Set<String> activeAllocations = shardTable.activeShards().stream().map( Set<String> insyncAids = shardTable.activeShards().stream().map(
shr -> shr.allocationId().getId()).collect(Collectors.toSet()); shr -> shr.allocationId().getId()).collect(Collectors.toSet());
imdBuilder.putInSyncAllocationIds(shardRouting.id(), activeAllocations); final ShardRouting primaryShard = shardTable.primaryShard();
if (primaryShard.initializing() && primaryShard.relocating() == false &&
RecoverySource.isInitialRecovery(primaryShard.recoverySource().getType()) == false ) {
// simulate a primary was initialized based on aid
insyncAids.add(primaryShard.allocationId().getId());
}
imdBuilder.putInSyncAllocationIds(shardRouting.id(), insyncAids);
} }
} }
return imdBuilder.build(); return imdBuilder.build();

View File

@ -41,10 +41,12 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
@ -71,7 +73,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance);
settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold);
AllocationService strategy = createAllocationService(settings.build()); AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator());
ClusterState clusterState = initCluster(strategy); ClusterState clusterState = initCluster(strategy);
assertIndexBalance(clusterState.getRoutingTable(), clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); assertIndexBalance(clusterState.getRoutingTable(), clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
@ -95,7 +97,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance);
settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold);
AllocationService strategy = createAllocationService(settings.build()); AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator());
ClusterState clusterState = initCluster(strategy); ClusterState clusterState = initCluster(strategy);
assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold);
@ -254,7 +256,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
Settings.Builder settings = Settings.builder(); Settings.Builder settings = Settings.builder();
AllocationService strategy = new AllocationService(settings.build(), randomAllocationDeciders(settings.build(), AllocationService strategy = new AllocationService(settings.build(), randomAllocationDeciders(settings.build(),
new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), random()), new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), random()),
NoopGatewayAllocator.INSTANCE, new ShardsAllocator() { new TestGatewayAllocator(), new ShardsAllocator() {
public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) { public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) {
return new HashMap<DiscoveryNode, Float>(); return new HashMap<DiscoveryNode, Float>();
@ -351,7 +353,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
assertThat(shardRouting.state(), Matchers.equalTo(ShardRoutingState.INITIALIZING)); assertThat(shardRouting.state(), Matchers.equalTo(ShardRoutingState.INITIALIZING));
} }
} }
strategy = createAllocationService(settings.build()); strategy = createAllocationService(settings.build(), new NoopGatewayAllocator());
logger.info("use the new allocator and check if it moves shards"); logger.info("use the new allocator and check if it moves shards");
routingNodes = clusterState.getRoutingNodes(); routingNodes = clusterState.getRoutingNodes();
@ -385,7 +387,26 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
assertThat(shardRouting.state(), Matchers.equalTo(ShardRoutingState.STARTED)); assertThat(shardRouting.state(), Matchers.equalTo(ShardRoutingState.STARTED));
} }
} }
} }
private class NoopGatewayAllocator extends GatewayAllocator {
public NoopGatewayAllocator() {
super(Settings.EMPTY, null, null);
}
@Override
public void applyStartedShards(RoutingAllocation allocation, List<ShardRouting> startedShards) {
// noop
}
@Override
public void applyFailedShards(RoutingAllocation allocation, List<FailedShard> failedShards) {
// noop
}
@Override
public void allocateUnassigned(RoutingAllocation allocation) {
// noop
}
}
} }

View File

@ -33,7 +33,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -577,7 +577,7 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
public void testRebalanceWithIgnoredUnassignedShards() { public void testRebalanceWithIgnoredUnassignedShards() {
final AtomicBoolean allocateTest1 = new AtomicBoolean(false); final AtomicBoolean allocateTest1 = new AtomicBoolean(false);
AllocationService strategy = createAllocationService(Settings.EMPTY, new NoopGatewayAllocator() { AllocationService strategy = createAllocationService(Settings.EMPTY, new TestGatewayAllocator() {
@Override @Override
public void allocateUnassigned(RoutingAllocation allocation) { public void allocateUnassigned(RoutingAllocation allocation) {
if (allocateTest1.get() == false) { if (allocateTest1.get() == false) {
@ -677,7 +677,7 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
public void testRebalanceWhileShardFetching() { public void testRebalanceWhileShardFetching() {
final AtomicBoolean hasFetches = new AtomicBoolean(true); final AtomicBoolean hasFetches = new AtomicBoolean(true);
AllocationService strategy = createAllocationService(Settings.builder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), AllocationService strategy = createAllocationService(Settings.builder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(),
ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoopGatewayAllocator() { ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new TestGatewayAllocator() {
@Override @Override
public void allocateUnassigned(RoutingAllocation allocation) { public void allocateUnassigned(RoutingAllocation allocation) {
if (hasFetches.get()) { if (hasFetches.get()) {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.health.ClusterStateHealth;
@ -38,8 +39,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
@ -161,7 +161,7 @@ public class DecisionsImpactOnClusterHealthTests extends ESAllocationTestCase {
private static AllocationService newAllocationService(Settings settings, Set<AllocationDecider> deciders) { private static AllocationService newAllocationService(Settings settings, Set<AllocationDecider> deciders) {
return new AllocationService(settings, return new AllocationService(settings,
new AllocationDeciders(settings, deciders), new AllocationDeciders(settings, deciders),
NoopGatewayAllocator.INSTANCE, new TestGatewayAllocator(),
new BalancedShardsAllocator(settings), new BalancedShardsAllocator(settings),
EmptyClusterInfoService.INSTANCE); EmptyClusterInfoService.INSTANCE);
} }

View File

@ -34,12 +34,14 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
@ -69,7 +71,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
RoutingTable routingTable = RoutingTable.builder() RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test")) .addAsNew(metaData.index("test"))
.build(); .build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding 2 nodes on same rack and do rerouting"); logger.info("--> adding 2 nodes on same rack and do rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
@ -154,7 +156,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
.addAsNew(metaData.index("test")) .addAsNew(metaData.index("test"))
.build(); .build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build();
logger.info("Adding two nodes and performing rerouting"); logger.info("Adding two nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build();
@ -227,7 +229,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
.addAsNew(metaData.index("test")) .addAsNew(metaData.index("test"))
.build(); .build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build();
logger.info("Adding single node and performing rerouting"); logger.info("Adding single node and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build();
@ -278,7 +280,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
.addAsNew(metaData.index("test")) .addAsNew(metaData.index("test"))
.build(); .build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build();
logger.info("Adding {} nodes and performing rerouting", numberOfReplicas + 1); logger.info("Adding {} nodes and performing rerouting", numberOfReplicas + 1);
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(); DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder();
@ -341,7 +343,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
.addAsNew(metaData.index("test")) .addAsNew(metaData.index("test"))
.build(); .build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build();
logger.info("Adding two nodes and performing rerouting"); logger.info("Adding two nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build();
@ -396,7 +398,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
.addAsNew(metaData.index("test")) .addAsNew(metaData.index("test"))
.build(); .build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build();
logger.info("Adding two nodes and performing rerouting"); logger.info("Adding two nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build();
@ -480,7 +482,10 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
.addAsNew(metaData.index("test")) .addAsNew(metaData.index("test"))
.build(); .build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(routingTable).build();
ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0);
// add 4 nodes // add 4 nodes
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).add(newNode("node4"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).add(newNode("node4"))).build();
@ -492,22 +497,26 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
// start one replica so it can take over.
clusterState = allocation.applyStartedShards(clusterState,
Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0)));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId);
// fail the primary shard, check replicas get removed as well... // fail the primary shard, check replicas get removed as well...
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(newState, not(equalTo(clusterState))); assertThat(newState, not(equalTo(clusterState)));
clusterState = newState; clusterState = newState;
// the primary gets allocated on another node, replicas are unassigned // the primary gets allocated on another node, replicas are initializing
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId()));
// start the primary shard
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
} }
public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToElect() { public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToElect() {
@ -522,7 +531,7 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase {
.addAsNew(metaData.index("test")) .addAsNew(metaData.index("test"))
.build(); .build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
// add 4 nodes // add 4 nodes
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).add(newNode("node4"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).add(newNode("node4"))).build();

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
@ -32,13 +33,15 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_NAME; import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_NAME;
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_UUID; import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_SHRINK_SOURCE_UUID;
@ -52,9 +55,10 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(Settings.EMPTY, FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY,
Arrays.asList(filterAllocationDecider, new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY))); Arrays.asList(filterAllocationDecider,
new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY)));
AllocationService service = new AllocationService(Settings.builder().build(), allocationDeciders, AllocationService service = new AllocationService(Settings.builder().build(), allocationDeciders,
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
ClusterState state = createInitialClusterState(service, Settings.builder().put("index.routing.allocation.initial_recovery._id", ClusterState state = createInitialClusterState(service, Settings.builder().put("index.routing.allocation.initial_recovery._id",
"node2").build()); "node2").build());
RoutingTable routingTable = state.routingTable(); RoutingTable routingTable = state.routingTable();
@ -72,10 +76,10 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
null, 0, false); null, 0, false);
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(), assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(),
state.getRoutingNodes().node("node2") state.getRoutingNodes().node("node2")
,allocation), Decision.YES); , allocation), Decision.YES);
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(), assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).primaryShard(),
state.getRoutingNodes().node("node1") state.getRoutingNodes().node("node1")
,allocation), Decision.NO); , allocation), Decision.NO);
state = service.reroute(state, "try allocate again"); state = service.reroute(state, "try allocate again");
routingTable = state.routingTable(); routingTable = state.routingTable();
@ -86,57 +90,80 @@ public class FilterAllocationDeciderTests extends ESAllocationTestCase {
routingTable = state.routingTable(); routingTable = state.routingTable();
// ok now we are started and can be allocated anywhere!! lets see... // ok now we are started and can be allocated anywhere!! lets see...
assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), STARTED); // first create another copy
assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node2");
// replicas should be initializing
assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).state(), INITIALIZING); assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).state(), INITIALIZING);
assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).currentNodeId(), "node1"); assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).currentNodeId(), "node1");
state = service.applyStartedShards(state, routingTable.index("idx").shard(0).replicaShardsWithState(INITIALIZING));
// we fail it again to check if we are initializing immediately on the other node
state = service.applyFailedShard(state, routingTable.index("idx").shard(0).shards().get(0));
routingTable = state.routingTable(); routingTable = state.routingTable();
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING); assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).state(), STARTED);
assertEquals(routingTable.index("idx").shard(0).shards().get(0).currentNodeId(), "node1"); assertEquals(routingTable.index("idx").shard(0).replicaShards().get(0).currentNodeId(), "node1");
// now remove the node of the other copy and fail the current
DiscoveryNode node1 = state.nodes().resolveNode("node1");
state = service.deassociateDeadNodes(
ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).remove("node1")).build(),
true, "test");
state = service.applyFailedShard(state, routingTable.index("idx").shard(0).primaryShard());
// now bring back node1 and see it's assigned
state = service.reroute(
ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).add(node1)).build(), "test");
routingTable = state.routingTable();
assertEquals(routingTable.index("idx").shard(0).primaryShard().state(), INITIALIZING);
assertEquals(routingTable.index("idx").shard(0).primaryShard().currentNodeId(), "node1");
allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state,
null, 0, false); null, 0, false);
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0), assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
state.getRoutingNodes().node("node2") state.getRoutingNodes().node("node2")
,allocation), Decision.YES); , allocation), Decision.YES);
assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0), assertEquals(filterAllocationDecider.canAllocate(routingTable.index("idx").shard(0).shards().get(0),
state.getRoutingNodes().node("node1") state.getRoutingNodes().node("node1")
,allocation), Decision.YES); , allocation), Decision.YES);
} }
private ClusterState createInitialClusterState(AllocationService service, Settings settings) { private ClusterState createInitialClusterState(AllocationService service, Settings settings) {
boolean shrinkIndex = randomBoolean(); RecoverySource.Type recoveryType = randomFrom(RecoverySource.Type.EMPTY_STORE,
RecoverySource.Type.LOCAL_SHARDS, RecoverySource.Type.SNAPSHOT);
MetaData.Builder metaData = MetaData.builder(); MetaData.Builder metaData = MetaData.builder();
final Settings.Builder indexSettings = settings(Version.CURRENT).put(settings); final Settings.Builder indexSettings = settings(Version.CURRENT).put(settings);
final IndexMetaData sourceIndex; final IndexMetaData sourceIndex;
if (shrinkIndex) { if (recoveryType == RecoverySource.Type.LOCAL_SHARDS) {
//put a fake closed source index //put a fake closed source index
sourceIndex = IndexMetaData.builder("sourceIndex") sourceIndex = IndexMetaData.builder("sourceIndex")
.settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0).build(); .settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)
.putInSyncAllocationIds(0, Collections.singleton("aid0"))
.putInSyncAllocationIds(1, Collections.singleton("aid1"))
.build();
metaData.put(sourceIndex, false); metaData.put(sourceIndex, false);
indexSettings.put(INDEX_SHRINK_SOURCE_UUID.getKey(), sourceIndex.getIndexUUID()); indexSettings.put(INDEX_SHRINK_SOURCE_UUID.getKey(), sourceIndex.getIndexUUID());
indexSettings.put(INDEX_SHRINK_SOURCE_NAME.getKey(), sourceIndex.getIndex().getName()); indexSettings.put(INDEX_SHRINK_SOURCE_NAME.getKey(), sourceIndex.getIndex().getName());
} else { } else {
sourceIndex = null; sourceIndex = null;
} }
final IndexMetaData indexMetaData = IndexMetaData.builder("idx").settings(indexSettings) final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder("idx").settings(indexSettings)
.numberOfShards(1).numberOfReplicas(1).build(); .numberOfShards(1).numberOfReplicas(1);
if (recoveryType == RecoverySource.Type.SNAPSHOT) {
indexMetaDataBuilder.putInSyncAllocationIds(0, Collections.singleton("_snapshot_restore"));
}
final IndexMetaData indexMetaData = indexMetaDataBuilder.build();
metaData.put(indexMetaData, false); metaData.put(indexMetaData, false);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
if (shrinkIndex) { switch (recoveryType) {
routingTableBuilder.addAsFromCloseToOpen(sourceIndex); case EMPTY_STORE:
routingTableBuilder.addAsNew(indexMetaData); routingTableBuilder.addAsNew(indexMetaData);
} if (randomBoolean()) { break;
routingTableBuilder.addAsNew(indexMetaData); case SNAPSHOT:
} else { routingTableBuilder.addAsRestore(indexMetaData, new RecoverySource.SnapshotRecoverySource(
routingTableBuilder.addAsRestore(indexMetaData, new RecoverySource.SnapshotRecoverySource( new Snapshot("repository", new SnapshotId("snapshot_name", "snapshot_uuid")),
new Snapshot("repository", new SnapshotId("snapshot_name", "snapshot_uuid")), Version.CURRENT, indexMetaData.getIndex().getName()));
Version.CURRENT, indexMetaData.getIndex().getName())); break;
case LOCAL_SHARDS:
routingTableBuilder.addAsFromCloseToOpen(sourceIndex);
routingTableBuilder.addAsNew(indexMetaData);
break;
default:
throw new UnsupportedOperationException(recoveryType + " is not supported");
} }
RoutingTable routingTable = routingTableBuilder.build(); RoutingTable routingTable = routingTableBuilder.build();

View File

@ -35,7 +35,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -55,7 +55,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
super.setUp(); super.setUp();
strategy = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, strategy = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))), Collections.singleton(new MaxRetryAllocationDecider(Settings.EMPTY))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
} }
private ClusterState createInitialClusterState() { private ClusterState createInitialClusterState() {
@ -204,9 +204,9 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
routingTable.index("idx").shard(0).shards().get(0), null, new RoutingAllocation(null, null, clusterState, null, 0, false))); routingTable.index("idx").shard(0).shards().get(0), null, new RoutingAllocation(null, null, clusterState, null, 0, false)));
// now we start the shard // now we start the shard
routingTable = strategy.applyStartedShards(clusterState, Collections.singletonList( clusterState = strategy.applyStartedShards(clusterState, Collections.singletonList(
routingTable.index("idx").shard(0).shards().get(0))).routingTable(); routingTable.index("idx").shard(0).shards().get(0)));
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); routingTable = clusterState.routingTable();
// all counters have been reset to 0 ie. no unassigned info // all counters have been reset to 0 ie. no unassigned info
assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(routingTable.index("idx").shards().size(), 1);
@ -224,7 +224,7 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
assertEquals(routingTable.index("idx").shards().size(), 1); assertEquals(routingTable.index("idx").shards().size(), 1);
unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0); unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0);
assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), 1); assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), 1);
assertEquals(unassignedPrimary.state(), INITIALIZING); assertEquals(unassignedPrimary.state(), UNASSIGNED);
assertEquals(unassignedPrimary.unassignedInfo().getMessage(), "ZOOOMG"); assertEquals(unassignedPrimary.unassignedInfo().getMessage(), "ZOOOMG");
// Counter reset, so MaxRetryAllocationDecider#canForceAllocatePrimary should return a YES decision // Counter reset, so MaxRetryAllocationDecider#canForceAllocatePrimary should return a YES decision
assertEquals(Decision.YES, new MaxRetryAllocationDecider(Settings.EMPTY).canForceAllocatePrimary( assertEquals(Decision.YES, new MaxRetryAllocationDecider(Settings.EMPTY).canForceAllocatePrimary(

View File

@ -53,7 +53,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -326,7 +326,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, Collections.singleton(new NodeVersionAllocationDecider(Settings.EMPTY))); AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, Collections.singleton(new NodeVersionAllocationDecider(Settings.EMPTY)));
AllocationService strategy = new MockAllocationService(Settings.EMPTY, AllocationService strategy = new MockAllocationService(Settings.EMPTY,
allocationDeciders, allocationDeciders,
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState(); state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState();
// the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match // the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match
assertThat(state.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0)); assertThat(state.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
@ -342,10 +342,12 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
MASTER_DATA_ROLES, VersionUtils.getPreviousVersion()); MASTER_DATA_ROLES, VersionUtils.getPreviousVersion());
int numberOfShards = randomIntBetween(1, 3); int numberOfShards = randomIntBetween(1, 3);
MetaData metaData = MetaData.builder() final IndexMetaData.Builder indexMetaData = IndexMetaData.builder("test").settings(settings(Version.CURRENT))
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numberOfShards).numberOfReplicas .numberOfShards(numberOfShards).numberOfReplicas(randomIntBetween(0, 3));
(randomIntBetween(0, 3))) for (int i = 0; i < numberOfShards; i++) {
.build(); indexMetaData.putInSyncAllocationIds(i, Collections.singleton("_test_"));
}
MetaData metaData = MetaData.builder().put(indexMetaData).build();
ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData) .metaData(metaData)
@ -358,7 +360,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
new NodeVersionAllocationDecider(Settings.EMPTY))); new NodeVersionAllocationDecider(Settings.EMPTY)));
AllocationService strategy = new MockAllocationService(Settings.EMPTY, AllocationService strategy = new MockAllocationService(Settings.EMPTY,
allocationDeciders, allocationDeciders,
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState(); state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState();
// Make sure that primary shards are only allocated on the new node // Make sure that primary shards are only allocated on the new node

View File

@ -38,6 +38,7 @@ import static org.hamcrest.Matchers.equalTo;
/** /**
*/ */
public class PreferLocalPrimariesToRelocatingPrimariesTests extends ESAllocationTestCase { public class PreferLocalPrimariesToRelocatingPrimariesTests extends ESAllocationTestCase {
public void testPreferLocalPrimaryAllocationOverFiltered() { public void testPreferLocalPrimaryAllocationOverFiltered() {
int concurrentRecoveries = randomIntBetween(1, 10); int concurrentRecoveries = randomIntBetween(1, 10);
int primaryRecoveries = randomIntBetween(1, 10); int primaryRecoveries = randomIntBetween(1, 10);
@ -66,8 +67,7 @@ public class PreferLocalPrimariesToRelocatingPrimariesTests extends ESAllocation
logger.info("adding two nodes and performing rerouting till all are allocated"); logger.info("adding two nodes and performing rerouting till all are allocated");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.add(newNode("node1", singletonMap("tag1", "value1"))) .add(newNode("node1")).add(newNode("node2"))).build();
.add(newNode("node2", singletonMap("tag1", "value2")))).build();
clusterState = strategy.reroute(clusterState, "reroute"); clusterState = strategy.reroute(clusterState, "reroute");
@ -82,12 +82,12 @@ public class PreferLocalPrimariesToRelocatingPrimariesTests extends ESAllocation
.put(IndexMetaData.builder(clusterState.metaData().index("test1")).settings(settings(Version.CURRENT) .put(IndexMetaData.builder(clusterState.metaData().index("test1")).settings(settings(Version.CURRENT)
.put("index.number_of_shards", numberOfShards) .put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
.put("index.routing.allocation.exclude.tag1", "value2") .put("index.routing.allocation.exclude._name", "node2")
.build())) .build()))
.put(IndexMetaData.builder(clusterState.metaData().index("test2")).settings(settings(Version.CURRENT) .put(IndexMetaData.builder(clusterState.metaData().index("test2")).settings(settings(Version.CURRENT)
.put("index.number_of_shards", numberOfShards) .put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
.put("index.routing.allocation.exclude.tag1", "value2") .put("index.routing.allocation.exclude._name", "node2")
.build())) .build()))
.build(); .build();
clusterState = ClusterState.builder(clusterState).metaData(metaData).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build(); clusterState = ClusterState.builder(clusterState).metaData(metaData).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build();

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ -128,8 +129,9 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
routingNodes = clusterState.getRoutingNodes(); routingNodes = clusterState.getRoutingNodes();
assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1)); assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1));
assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(0));
assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(INITIALIZING).get(0).primary(), equalTo(true)); assertThat(routingNodes.shardsWithState(UNASSIGNED).size(), equalTo(3)); // 2 replicas and one primary
assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(STARTED).get(0).primary(), equalTo(true));
assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L)); assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L));
} }

View File

@ -38,10 +38,11 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Random; import java.util.Random;
@ -58,7 +59,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase {
RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(random()); RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(random());
AllocationService strategy = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, AllocationService strategy = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY), new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY),
randomAllocationDecider))), NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); randomAllocationDecider))), new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
int indices = scaledRandomIntBetween(1, 20); int indices = scaledRandomIntBetween(1, 20);
Builder metaBuilder = MetaData.builder(); Builder metaBuilder = MetaData.builder();
int maxNumReplicas = 1; int maxNumReplicas = 1;
@ -101,9 +102,25 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase {
boolean nodesRemoved = false; boolean nodesRemoved = false;
if (nodeIdCounter > 1 && rarely()) { if (nodeIdCounter > 1 && rarely()) {
int nodeId = scaledRandomIntBetween(0, nodeIdCounter - 2); int nodeId = scaledRandomIntBetween(0, nodeIdCounter - 2);
logger.info("removing node [{}]", nodeId); final String node = "NODE_" + nodeId;
newNodesBuilder.remove("NODE_" + nodeId); boolean safeToRemove = true;
nodesRemoved = true; RoutingNode routingNode = clusterState.getRoutingNodes().node(node);
for (ShardRouting shard: routingNode != null ? routingNode : Collections.<ShardRouting>emptyList()) {
if (shard.active() && shard.primary()) {
// make sure there is an active replica to prevent from going red
if (clusterState.routingTable().shardRoutingTable(shard.shardId()).activeShards().size() <= 1) {
safeToRemove = false;
break;
}
}
}
if (safeToRemove) {
logger.info("removing node [{}]", nodeId);
newNodesBuilder.remove(node);
nodesRemoved = true;
} else {
logger.debug("not removing node [{}] as it holds a primary with no replacement", nodeId);
}
} }
stateBuilder.nodes(newNodesBuilder.build()); stateBuilder.nodes(newNodesBuilder.build());
@ -142,7 +159,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase {
} while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() != 0 || } while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() != 0 ||
clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size() != 0 && iterations < 200); clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size() != 0 && iterations < 200);
logger.info("Done Balancing after [{}] iterations", iterations); logger.info("Done Balancing after [{}] iterations. State:\n{}", iterations, clusterState.prettyPrint());
// we stop after 200 iterations if it didn't stabelize by then something is likely to be wrong // we stop after 200 iterations if it didn't stabelize by then something is likely to be wrong
assertThat("max num iteration exceeded", iterations, Matchers.lessThan(200)); assertThat("max num iteration exceeded", iterations, Matchers.lessThan(200));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(0));

View File

@ -117,27 +117,41 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase {
assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(STARTED)); assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1")); assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1"));
logger.info("Killing node1 where the shard is, checking the shard is relocated"); logger.info("Killing node1 where the shard is, checking the shard is unassigned");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build();
newState = strategy.deassociateDeadNodes(clusterState, true, "reroute"); newState = strategy.deassociateDeadNodes(clusterState, true, "reroute");
assertThat(newState, not(equalTo(clusterState))); assertThat(newState, not(equalTo(clusterState)));
clusterState = newState; clusterState = newState;
assertThat(clusterState.routingTable().index("test").shards().size(), equalTo(1));
assertThat(clusterState.routingTable().index("test").shard(0).size(), equalTo(1));
assertThat(clusterState.routingTable().index("test").shard(0).shards().size(), equalTo(1));
assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), nullValue());
logger.info("Bring node1 back, and see it's assinged");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node1"))).build();
newState = strategy.reroute(clusterState, "reroute");
assertThat(newState, not(equalTo(clusterState)));
clusterState = newState;
assertThat(clusterState.routingTable().index("test").shards().size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shards().size(), equalTo(1));
assertThat(clusterState.routingTable().index("test").shard(0).size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shard(0).size(), equalTo(1));
assertThat(clusterState.routingTable().index("test").shard(0).shards().size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shard(0).shards().size(), equalTo(1));
assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(INITIALIZING)); assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(INITIALIZING));
assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node2")); assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1"));
logger.info("Start another node, make sure that things remain the same (shard is in node2 and initializing)"); logger.info("Start another node, make sure that things remain the same (shard is in node2 and initializing)");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build();
newState = strategy.reroute(clusterState, "reroute"); newState = strategy.reroute(clusterState, "reroute");
assertThat(newState, equalTo(clusterState)); assertThat(newState, equalTo(clusterState));
logger.info("Start the shard on node 2"); logger.info("Start the shard on node 1");
routingNodes = clusterState.getRoutingNodes(); routingNodes = clusterState.getRoutingNodes();
newState = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); newState = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
assertThat(newState, not(equalTo(clusterState))); assertThat(newState, not(equalTo(clusterState)));
clusterState = newState; clusterState = newState;
@ -145,7 +159,7 @@ public class SingleShardNoReplicasRoutingTests extends ESAllocationTestCase {
assertThat(clusterState.routingTable().index("test").shard(0).size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shard(0).size(), equalTo(1));
assertThat(clusterState.routingTable().index("test").shard(0).shards().size(), equalTo(1)); assertThat(clusterState.routingTable().index("test").shard(0).shards().size(), equalTo(1));
assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(STARTED)); assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).state(), equalTo(STARTED));
assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node2")); assertThat(clusterState.routingTable().index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1"));
} }
public void testSingleIndexShardFailed() { public void testSingleIndexShardFailed() {

View File

@ -69,14 +69,15 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase {
logger.info("--> test starting of shard"); logger.info("--> test starting of shard");
ClusterState newState = allocation.applyStartedShards(state, Arrays.asList(initShard), false); ClusterState newState = allocation.applyStartedShards(state, Arrays.asList(initShard));
assertThat("failed to start " + initShard + "\ncurrent routing table:" + newState.routingTable().prettyPrint(), assertThat("failed to start " + initShard + "\ncurrent routing table:" + newState.routingTable().prettyPrint(),
newState, not(equalTo(state))); newState, not(equalTo(state)));
assertTrue(initShard + "isn't started \ncurrent routing table:" + newState.routingTable().prettyPrint(), assertTrue(initShard + "isn't started \ncurrent routing table:" + newState.routingTable().prettyPrint(),
newState.routingTable().index("test").shard(initShard.id()).allShardsStarted()); newState.routingTable().index("test").shard(initShard.id()).allShardsStarted());
state = newState;
logger.info("--> testing starting of relocating shards"); logger.info("--> testing starting of relocating shards");
newState = allocation.applyStartedShards(state, Arrays.asList(relocatingShard.getTargetRelocatingShard()), false); newState = allocation.applyStartedShards(state, Arrays.asList(relocatingShard.getTargetRelocatingShard()));
assertThat("failed to start " + relocatingShard + "\ncurrent routing table:" + newState.routingTable().prettyPrint(), assertThat("failed to start " + relocatingShard + "\ncurrent routing table:" + newState.routingTable().prettyPrint(),
newState, not(equalTo(state))); newState, not(equalTo(state)));
ShardRouting shardRouting = newState.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0); ShardRouting shardRouting = newState.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0);

View File

@ -20,23 +20,35 @@
package org.elasticsearch.cluster.routing.allocation; package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.Collections;
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
@ -50,10 +62,12 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
private final Logger logger = Loggers.getLogger(ThrottlingAllocationTests.class); private final Logger logger = Loggers.getLogger(ThrottlingAllocationTests.class);
public void testPrimaryRecoveryThrottling() { public void testPrimaryRecoveryThrottling() {
TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator();
AllocationService strategy = createAllocationService(Settings.builder() AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 3) .put("cluster.routing.allocation.node_concurrent_recoveries", 3)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3)
.build()); .build(), gatewayAllocator);
logger.info("Building initial routing table"); logger.info("Building initial routing table");
@ -61,9 +75,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(10).numberOfReplicas(1)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(10).numberOfReplicas(1))
.build(); .build();
RoutingTable initialRoutingTable = createRecoveryRoutingTable(metaData.index("test")); ClusterState clusterState = createRecoveryStateAndInitalizeAllocations(metaData, gatewayAllocator);
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build();
logger.info("start one node, do reroute, only 3 should initialize"); logger.info("start one node, do reroute, only 3 should initialize");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build();
@ -103,11 +115,13 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
} }
public void testReplicaAndPrimaryRecoveryThrottling() { public void testReplicaAndPrimaryRecoveryThrottling() {
TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator();
AllocationService strategy = createAllocationService(Settings.builder() AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 3) .put("cluster.routing.allocation.node_concurrent_recoveries", 3)
.put("cluster.routing.allocation.concurrent_source_recoveries", 3) .put("cluster.routing.allocation.concurrent_source_recoveries", 3)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3)
.build()); .build(),
gatewayAllocator);
logger.info("Building initial routing table"); logger.info("Building initial routing table");
@ -115,12 +129,9 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1))
.build(); .build();
RoutingTable initialRoutingTable = createRecoveryRoutingTable(metaData.index("test")); ClusterState clusterState = createRecoveryStateAndInitalizeAllocations(metaData, gatewayAllocator);
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); logger.info("with one node, do reroute, only 3 should initialize");
logger.info("start one node, do reroute, only 3 should initialize");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build();
clusterState = strategy.reroute(clusterState, "reroute"); clusterState = strategy.reroute(clusterState, "reroute");
assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0)); assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0));
@ -165,24 +176,22 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
} }
public void testThrottleIncomingAndOutgoing() { public void testThrottleIncomingAndOutgoing() {
TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator();
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 5) .put("cluster.routing.allocation.node_concurrent_recoveries", 5)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 5) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 5)
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 5) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 5)
.build(); .build();
AllocationService strategy = createAllocationService(settings); AllocationService strategy = createAllocationService(settings, gatewayAllocator);
logger.info("Building initial routing table"); logger.info("Building initial routing table");
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(9).numberOfReplicas(0)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(9).numberOfReplicas(0))
.build(); .build();
RoutingTable initialRoutingTable = createRecoveryRoutingTable(metaData.index("test")); ClusterState clusterState = createRecoveryStateAndInitalizeAllocations(metaData, gatewayAllocator);
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); logger.info("with one node, do reroute, only 5 should initialize");
logger.info("start one node, do reroute, only 5 should initialize");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build();
clusterState = strategy.reroute(clusterState, "reroute"); clusterState = strategy.reroute(clusterState, "reroute");
assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0)); assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0));
assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(5)); assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(5));
@ -225,9 +234,10 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
} }
public void testOutgoingThrottlesAllocation() { public void testOutgoingThrottlesAllocation() {
TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator();
AllocationService strategy = createAllocationService(Settings.builder() AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_outgoing_recoveries", 1) .put("cluster.routing.allocation.node_concurrent_outgoing_recoveries", 1)
.build()); .build(), gatewayAllocator);
logger.info("Building initial routing table"); logger.info("Building initial routing table");
@ -235,12 +245,9 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
.build(); .build();
RoutingTable initialRoutingTable = createRecoveryRoutingTable(metaData.index("test")); ClusterState clusterState = createRecoveryStateAndInitalizeAllocations(metaData, gatewayAllocator);
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(initialRoutingTable).build(); logger.info("with one node, do reroute, only 1 should initialize");
logger.info("start one node, do reroute, only 1 should initialize");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build();
clusterState = strategy.reroute(clusterState, "reroute"); clusterState = strategy.reroute(clusterState, "reroute");
assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0)); assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0));
@ -301,23 +308,66 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0); assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
} }
private RoutingTable createRecoveryRoutingTable(IndexMetaData indexMetaData) { private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaData, TestGatewayAllocator gatewayAllocator) {
DiscoveryNode node1 = newNode("node1");
MetaData.Builder metaDataBuilder = new MetaData.Builder(metaData);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
switch (randomInt(5)) { for (ObjectCursor<IndexMetaData> cursor: metaData.indices().values()) {
case 0: routingTableBuilder.addAsRecovery(indexMetaData); break; Index index = cursor.value.getIndex();
case 1: routingTableBuilder.addAsFromCloseToOpen(indexMetaData); break; IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(cursor.value);
case 2: routingTableBuilder.addAsFromDangling(indexMetaData); break; final int recoveryType = randomInt(5);
case 3: routingTableBuilder.addAsNewRestore(indexMetaData, if (recoveryType <= 4) {
new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, addInSyncAllocationIds(index, indexMetaDataBuilder, gatewayAllocator, node1);
indexMetaData.getIndex().getName()), new IntHashSet()); break; }
case 4: routingTableBuilder.addAsRestore(indexMetaData, IndexMetaData indexMetaData = indexMetaDataBuilder.build();
new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, metaDataBuilder.put(indexMetaData, false);
indexMetaData.getIndex().getName())); break; switch (recoveryType) {
case 5: routingTableBuilder.addAsNew(indexMetaData); break; case 0:
default: throw new IndexOutOfBoundsException(); routingTableBuilder.addAsRecovery(indexMetaData);
break;
case 1:
routingTableBuilder.addAsFromCloseToOpen(indexMetaData);
break;
case 2:
routingTableBuilder.addAsFromDangling(indexMetaData);
break;
case 3:
routingTableBuilder.addAsNewRestore(indexMetaData,
new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT,
indexMetaData.getIndex().getName()), new IntHashSet());
break;
case 4:
routingTableBuilder.addAsRestore(indexMetaData,
new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT,
indexMetaData.getIndex().getName()));
break;
case 5:
routingTableBuilder.addAsNew(indexMetaData);
break;
default:
throw new IndexOutOfBoundsException();
}
} }
return ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
return routingTableBuilder.build(); .nodes(DiscoveryNodes.builder().add(node1))
.metaData(metaDataBuilder.build())
.routingTable(routingTableBuilder.build()).build();
} }
private void addInSyncAllocationIds(Index index, IndexMetaData.Builder indexMetaData,
TestGatewayAllocator gatewayAllocator, DiscoveryNode node1) {
for (int shard = 0; shard < indexMetaData.numberOfShards(); shard++) {
final boolean primary = randomBoolean();
final ShardRouting unassigned = ShardRouting.newUnassigned(new ShardId(index, shard), primary,
primary ?
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE :
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test")
);
ShardRouting started = ShardRoutingHelper.moveToStarted(ShardRoutingHelper.initialize(unassigned, node1.getId()));
indexMetaData.putInSyncAllocationIds(shard, Collections.singleton(started.allocationId().getId()));
gatewayAllocator.addKnownAllocation(started);
}
}
} }

View File

@ -50,7 +50,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -113,7 +113,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@ -194,7 +194,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
clusterState = strategy.reroute(clusterState, "reroute"); clusterState = strategy.reroute(clusterState, "reroute");
logShardStates(clusterState); logShardStates(clusterState);
@ -224,7 +224,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
clusterState = strategy.reroute(clusterState, "reroute"); clusterState = strategy.reroute(clusterState, "reroute");
@ -301,7 +301,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
@ -358,7 +358,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
clusterState = strategy.reroute(clusterState, "reroute"); clusterState = strategy.reroute(clusterState, "reroute");
logShardStates(clusterState); logShardStates(clusterState);
@ -421,7 +421,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
clusterState = strategy.reroute(clusterState, "reroute"); clusterState = strategy.reroute(clusterState, "reroute");
logShardStates(clusterState); logShardStates(clusterState);
@ -451,7 +451,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
clusterState = strategy.reroute(clusterState, "reroute"); clusterState = strategy.reroute(clusterState, "reroute");
@ -555,7 +555,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
@ -625,7 +625,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
@ -729,7 +729,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@ -900,7 +900,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
// Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away // Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away
// and therefor we will have sufficient disk space on node1. // and therefor we will have sufficient disk space on node1.
ClusterState result = strategy.reroute(clusterState, "reroute"); ClusterState result = strategy.reroute(clusterState, "reroute");
@ -998,7 +998,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); .build(), deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
ClusterState result = strategy.reroute(clusterState, "reroute"); ClusterState result = strategy.reroute(clusterState, "reroute");
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED)); assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));

View File

@ -90,7 +90,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
@TestLogging("org.elasticsearch.discovery.zen:TRACE") @TestLogging("org.elasticsearch.discovery.zen:TRACE,org.elasticsearch.cluster.service:TRACE")
public class NodeJoinControllerTests extends ESTestCase { public class NodeJoinControllerTests extends ESTestCase {
private static ThreadPool threadPool; private static ThreadPool threadPool;

View File

@ -23,18 +23,20 @@ import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
@ -48,7 +50,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardStateMetaData; import org.elasticsearch.index.shard.ShardStateMetaData;
import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.junit.Before; import org.junit.Before;
import java.util.Arrays; import java.util.Arrays;
@ -57,6 +58,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED;
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.INDEX_CREATED;
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.INDEX_REOPENED;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -78,11 +82,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testNoProcessPrimaryNotAllocatedBefore() { public void testNoProcessPrimaryNotAllocatedBefore() {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { // with old version, we can't know if a shard was allocated before or not
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomBoolean(), Version.CURRENT); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(),
} else { randomFrom(INDEX_CREATED, CLUSTER_RECOVERED, INDEX_REOPENED), Version.CURRENT);
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), true, Version.V_2_1_0);
}
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
@ -96,9 +98,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testNoAsyncFetchData() { public void testNoAsyncFetchData() {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId"); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.CURRENT, "allocId");
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_0);
} }
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodesChanged(), equalTo(true));
@ -114,9 +116,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testNoAllocationFound() { public void testNoAllocationFound() {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId"); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.CURRENT, "allocId");
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_0);
} }
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean()); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean());
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
@ -130,7 +132,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore unassigned. * Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore unassigned.
*/ */
public void testNoMatchingAllocationIdFound() { public void testNoMatchingAllocationIdFound() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2"); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.CURRENT, "id2");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "id1", randomBoolean()); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "id1", randomBoolean());
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodesChanged(), equalTo(true));
@ -144,7 +146,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* This is the case when we have old shards from pre-3.0 days. * This is the case when we have old shards from pre-3.0 days.
*/ */
public void testNoActiveAllocationIds() { public void testNoActiveAllocationIds() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
testAllocator.addData(node1, 1, null, randomBoolean()); testAllocator.addData(node1, 1, null, randomBoolean());
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodesChanged(), equalTo(true));
@ -160,10 +162,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testStoreException() { public void testStoreException() {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(), new CorruptIndexException("test", "test")); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(), new CorruptIndexException("test", "test"));
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test")); testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test"));
} }
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
@ -180,10 +183,12 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
final RoutingAllocation allocation; final RoutingAllocation allocation;
boolean useAllocationIds = randomBoolean(); boolean useAllocationIds = randomBoolean();
if (useAllocationIds) { if (useAllocationIds) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED),
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean()); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0); allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED),
Version.V_2_2_0);
testAllocator.addData(node1, 3, null, randomBoolean()); testAllocator.addData(node1, 3, null, randomBoolean());
} }
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
@ -210,7 +215,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
// the allocator will see if it can force assign the primary, where the decision will be YES // the allocator will see if it can force assign the primary, where the decision will be YES
new TestAllocateDecision(randomBoolean() ? Decision.YES : Decision.NO), getNoDeciderThatAllowsForceAllocate() new TestAllocateDecision(randomBoolean() ? Decision.YES : Decision.NO), getNoDeciderThatAllowsForceAllocate()
)); ));
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1"); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, Version.CURRENT, "allocId1");
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodesChanged(), equalTo(true));
assertTrue(allocation.routingNodes().unassigned().ignored().isEmpty()); assertTrue(allocation.routingNodes().unassigned().ignored().isEmpty());
@ -233,7 +238,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
new TestAllocateDecision(Decision.NO), forceDecisionNo ? getNoDeciderThatDeniesForceAllocate() : new TestAllocateDecision(Decision.NO), forceDecisionNo ? getNoDeciderThatDeniesForceAllocate() :
getNoDeciderThatThrottlesForceAllocate() getNoDeciderThatThrottlesForceAllocate()
)); ));
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1"); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, Version.CURRENT, "allocId1");
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodesChanged(), equalTo(true));
List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored(); List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored();
@ -257,7 +262,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
// force allocating the shard, we still THROTTLE due to the decision from TestAllocateDecision // force allocating the shard, we still THROTTLE due to the decision from TestAllocateDecision
new TestAllocateDecision(Decision.THROTTLE), getNoDeciderThatAllowsForceAllocate() new TestAllocateDecision(Decision.THROTTLE), getNoDeciderThatAllowsForceAllocate()
)); ));
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1"); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, Version.CURRENT, "allocId1");
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodesChanged(), equalTo(true));
List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored(); List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored();
@ -272,7 +277,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testPreferAllocatingPreviousPrimary() { public void testPreferAllocatingPreviousPrimary() {
String primaryAllocId = UUIDs.randomBase64UUID(); String primaryAllocId = UUIDs.randomBase64UUID();
String replicaAllocId = UUIDs.randomBase64UUID(); String replicaAllocId = UUIDs.randomBase64UUID();
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), primaryAllocId, replicaAllocId); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(),
randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), randomFrom(Version.V_2_0_0, Version.CURRENT), primaryAllocId, replicaAllocId);
boolean node1HasPrimaryShard = randomBoolean(); boolean node1HasPrimaryShard = randomBoolean();
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard);
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard); testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard);
@ -292,10 +298,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testFoundAllocationButThrottlingDecider() { public void testFoundAllocationButThrottlingDecider() {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean()); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0); allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_2_0);
testAllocator.addData(node1, 3, null, randomBoolean()); testAllocator.addData(node1, 3, null, randomBoolean());
} }
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
@ -312,10 +319,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testFoundAllocationButNoDecider() { public void testFoundAllocationButNoDecider() {
final RoutingAllocation allocation; final RoutingAllocation allocation;
if (randomBoolean()) { if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean()); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
} else { } else {
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0); allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_0_0);
testAllocator.addData(node1, 3, null, randomBoolean()); testAllocator.addData(node1, 3, null, randomBoolean());
} }
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
@ -330,7 +338,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* Tests that the highest version node is chosen for allocation. * Tests that the highest version node is chosen for allocation.
*/ */
public void testAllocateToTheHighestVersionOnLegacyIndex() { public void testAllocateToTheHighestVersionOnLegacyIndex() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_0_0);
testAllocator.addData(node1, 10, null, randomBoolean()).addData(node2, 12, null, randomBoolean()); testAllocator.addData(node1, 10, null, randomBoolean()).addData(node2, 12, null, randomBoolean());
testAllocator.allocateUnassigned(allocation); testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodesChanged(), equalTo(true));
@ -347,7 +355,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* allocation mode would be chosen). * allocation mode would be chosen).
*/ */
public void testVersionBasedAllocationPrefersShardWithAllocationId() { public void testVersionBasedAllocationPrefersShardWithAllocationId() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_0_0);
testAllocator.addData(node1, 10, null, randomBoolean()); testAllocator.addData(node1, 10, null, randomBoolean());
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some allocId", randomBoolean()); testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some allocId", randomBoolean());
testAllocator.addData(node3, 12, null, randomBoolean()); testAllocator.addData(node3, 12, null, randomBoolean());
@ -616,17 +624,27 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED); assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
} }
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders,
UnassignedInfo.Reason reason, Version version,
String... activeAllocationIds) { String... activeAllocationIds) {
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version)) .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version))
.numberOfShards(1).numberOfReplicas(0).putInSyncAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds))) .numberOfShards(1).numberOfReplicas(0).putInSyncAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds)))
.build(); .build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
if (asNew) { switch (reason) {
routingTableBuilder.addAsNew(metaData.index(shardId.getIndex()));
} else { case INDEX_CREATED:
routingTableBuilder.addAsRecovery(metaData.index(shardId.getIndex())); routingTableBuilder.addAsNew(metaData.index(shardId.getIndex()));
break;
case CLUSTER_RECOVERED:
routingTableBuilder.addAsRecovery(metaData.index(shardId.getIndex()));
break;
case INDEX_REOPENED:
routingTableBuilder.addAsFromCloseToOpen(metaData.index(shardId.getIndex()));
break;
default:
throw new IllegalArgumentException("can't do " + reason + " for you. teach me");
} }
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData) .metaData(metaData)

View File

@ -70,7 +70,7 @@ import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -116,7 +116,7 @@ public class ClusterStateChanges extends AbstractComponent {
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(settings), new HashSet<>(Arrays.asList(new SameShardAllocationDecider(settings),
new ReplicaAfterPrimaryActiveAllocationDecider(settings), new ReplicaAfterPrimaryActiveAllocationDecider(settings),
new RandomAllocationDeciderTests.RandomAllocationDecider(getRandom())))), new RandomAllocationDeciderTests.RandomAllocationDecider(getRandom())))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), new TestGatewayAllocator(), new BalancedShardsAllocator(settings),
EmptyClusterInfoService.INSTANCE); EmptyClusterInfoService.INSTANCE);
shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger); shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger);
shardStartedClusterStateTaskExecutor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger); shardStartedClusterStateTaskExecutor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger);

View File

@ -39,7 +39,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -75,13 +75,13 @@ public abstract class ESAllocationTestCase extends ESTestCase {
public static MockAllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings, Random random) { public static MockAllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings, Random random) {
return new MockAllocationService(settings, return new MockAllocationService(settings,
randomAllocationDeciders(settings, clusterSettings, random), randomAllocationDeciders(settings, clusterSettings, random),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE); new TestGatewayAllocator(), new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE);
} }
public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) { public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
return new MockAllocationService(settings, return new MockAllocationService(settings,
randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()), randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), clusterInfoService); new TestGatewayAllocator(), new BalancedShardsAllocator(settings), clusterInfoService);
} }
public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator gatewayAllocator) { public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator gatewayAllocator) {

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.gateway;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.AsyncShardFetch;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.gateway.ReplicaShardAllocator;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A gateway allocator implementation that keeps an in memory list of started shard allocation
* that are used as replies to the, normally async, fetch data requests. The in memory list
* is adapted when shards are started and failed.
*
* Nodes leaving and joining the cluster do not change the list of shards the class tracks but
* rather serves as a filter to what is returned by fetch data. Concretely - fetch data will
* only return shards that were started on nodes that are currently part of the cluster.
*
* For now only primary shard related data is fetched. Replica request always get an empty response.
*
*
* This class is useful to use in unit tests that require the functionality of {@link GatewayAllocator} but do
* not have all the infrastructure required to use it.
*/
public class TestGatewayAllocator extends GatewayAllocator {
Map<String /* node id */, Map<ShardId, ShardRouting>> knownAllocations = new HashMap<>();
DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES;
PrimaryShardAllocator primaryShardAllocator = new PrimaryShardAllocator(Settings.EMPTY) {
@Override
protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
// for now always return immediately what we know
final ShardId shardId = shard.shardId();
final Set<String> ignoreNodes = allocation.getIgnoreNodes(shardId);
Map<DiscoveryNode, NodeGatewayStartedShards> foundShards = knownAllocations.values().stream()
.flatMap(shardMap -> shardMap.values().stream())
.filter(ks -> ks.shardId().equals(shardId))
.filter(ks -> ignoreNodes.contains(ks.currentNodeId()) == false)
.filter(ks -> currentNodes.nodeExists(ks.currentNodeId()))
.collect(Collectors.toMap(
routing -> currentNodes.get(routing.currentNodeId()),
routing ->
new NodeGatewayStartedShards(
currentNodes.get(routing.currentNodeId()), -1, routing.allocationId().getId(), routing.primary())));
return new AsyncShardFetch.FetchResult<>(shardId, foundShards, Collections.emptySet(), ignoreNodes);
}
};
ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator(Settings.EMPTY) {
@Override
protected AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation) {
// for now, just pretend no node has data
final ShardId shardId = shard.shardId();
return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), Collections.emptySet(),
allocation.getIgnoreNodes(shardId));
}
};
public TestGatewayAllocator() {
super(Settings.EMPTY, null, null);
}
@Override
public void applyStartedShards(RoutingAllocation allocation, List<ShardRouting> startedShards) {
currentNodes = allocation.nodes();
allocation.routingNodes().shards(ShardRouting::active).forEach(this::addKnownAllocation);
}
@Override
public void applyFailedShards(RoutingAllocation allocation, List<FailedShard> failedShards) {
currentNodes = allocation.nodes();
for (FailedShard failedShard : failedShards) {
final ShardRouting failedRouting = failedShard.getRoutingEntry();
Map<ShardId, ShardRouting> nodeAllocations = knownAllocations.get(failedRouting.currentNodeId());
if (nodeAllocations != null) {
nodeAllocations.remove(failedRouting.shardId());
if (nodeAllocations.isEmpty()) {
knownAllocations.remove(failedRouting.currentNodeId());
}
}
}
}
@Override
public void allocateUnassigned(RoutingAllocation allocation) {
currentNodes = allocation.nodes();
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator);
}
/**
* manually add a specific shard to the allocations the gateway keeps track of
*/
public void addKnownAllocation(ShardRouting shard) {
knownAllocations.computeIfAbsent(shard.currentNodeId(), id -> new HashMap<>())
.put(shard.shardId(), shard);
}
}