From 27a760f9c19fff61f7dfaf1a7d0beaae7ff326d8 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 17 Aug 2016 10:46:59 +0200 Subject: [PATCH] Add routing changes API to RoutingAllocation (#19992) Adds a class that records changes made to RoutingAllocation, so that at the end of the allocation round other values can be more easily derived based on these changes. Most notably, it: - replaces the explicit boolean flag that is passed around everywhere to denote changes to the routing table. The boolean flag is automatically updated now when changes actually occur, preventing issues where it got out of sync with actual changes to the routing table. - records actual changes made to RoutingNodes so that primary term and in-sync allocation ids, which are part of index metadata, can be efficiently updated just by looking at the shards that were actually changed. --- .../routing/allocation/Allocators.java | 4 +- .../cluster/metadata/IndexMetaData.java | 7 +- .../routing/RoutingChangesObserver.java | 196 ++++++++++++++++++ .../cluster/routing/RoutingNodes.java | 57 +++-- .../routing/allocation/AllocationService.java | 161 ++++---------- .../allocation/IndexMetaDataUpdater.java | 178 ++++++++++++++++ .../routing/allocation/RoutingAllocation.java | 46 +++- .../RoutingNodesChangedObserver.java | 104 ++++++++++ .../allocator/BalancedShardsAllocator.java | 71 +++---- .../allocation/allocator/ShardsAllocator.java | 3 +- .../AbstractAllocateAllocationCommand.java | 4 +- .../command/CancelAllocationCommand.java | 3 +- .../command/MoveAllocationCommand.java | 2 +- .../gateway/GatewayAllocator.java | 11 +- .../gateway/PrimaryShardAllocator.java | 23 +- .../gateway/ReplicaShardAllocator.java | 31 +-- .../cluster/ClusterModuleTests.java | 4 +- .../allocation/BalanceConfigurationTests.java | 24 +-- .../ClusterRebalanceRoutingTests.java | 10 +- .../gateway/PrimaryShardAllocatorTests.java | 116 +++++------ .../gateway/ReplicaShardAllocatorTests.java | 20 +- .../test/ESAllocationTestCase.java | 5 +- .../test/gateway/NoopGatewayAllocator.java | 4 +- 23 files changed, 738 insertions(+), 346 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/cluster/routing/RoutingChangesObserver.java create mode 100644 core/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java create mode 100644 core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesChangedObserver.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/Allocators.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/Allocators.java index 97fbda80dc6..9b1cfaabf93 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/Allocators.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/Allocators.java @@ -60,8 +60,8 @@ public final class Allocators { } @Override - public boolean allocateUnassigned(RoutingAllocation allocation) { - return false; + public void allocateUnassigned(RoutingAllocation allocation) { + // noop } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 0af860953f0..4f51fa9818f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -23,7 +23,6 @@ import com.carrotsearch.hppc.LongArrayList; import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.elasticsearch.Version; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.Diff; @@ -32,8 +31,7 @@ import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.node.DiscoveryNodeFilters; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.IndexMetaDataUpdater; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.collect.ImmutableOpenIntMap; @@ -342,8 +340,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild * a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary. * * Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard - * that can be indexed into) is larger than 0. - * See {@link AllocationService#updateMetaDataWithRoutingTable(MetaData, RoutingTable, RoutingTable)}. + * that can be indexed into) is larger than 0. See {@link IndexMetaDataUpdater#applyChanges(MetaData)}. **/ public long primaryTerm(int shardId) { return this.primaryTerms[shardId]; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingChangesObserver.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingChangesObserver.java new file mode 100644 index 00000000000..d54df7e0801 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingChangesObserver.java @@ -0,0 +1,196 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +/** + * Records changes made to {@link RoutingNodes} during an allocation round. + */ +public interface RoutingChangesObserver { + /** + * Called when unassigned shard is initialized. Does not include initializing relocation target shards. + */ + void shardInitialized(ShardRouting unassignedShard); + + /** + * Called when an initializing shard is started. + */ + void shardStarted(ShardRouting initializingShard, ShardRouting startedShard); + + /** + * Called when relocation of a started shard is initiated. + */ + void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard); + + /** + * Called when an unassigned shard's unassigned information was updated + */ + void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo); + + /** + * Called when a shard is failed or cancelled. + */ + void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo); + + /** + * Called on relocation source when relocation completes after relocation target is started. + */ + void relocationCompleted(ShardRouting removedRelocationSource); + + /** + * Called on replica relocation target when replica relocation source fails. Promotes the replica relocation target to ordinary + * initializing shard. + */ + void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource); + + /** + * Called on started primary shard after it has been promoted from replica to primary and is reinitialized due to shadow replicas. + */ + void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard); + + /** + * Called when started replica is promoted to primary. + */ + void replicaPromoted(ShardRouting replicaShard); + + + /** + * Abstract implementation of {@link RoutingChangesObserver} that does not take any action. Useful for subclasses that only override + * certain methods. + */ + class AbstractRoutingChangesObserver implements RoutingChangesObserver { + + @Override + public void shardInitialized(ShardRouting unassignedShard) { + + } + + @Override + public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { + + } + + @Override + public void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard) { + + } + + @Override + public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) { + + } + + @Override + public void shardFailed(ShardRouting activeShard, UnassignedInfo unassignedInfo) { + + } + + @Override + public void relocationCompleted(ShardRouting removedRelocationSource) { + + } + + @Override + public void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource) { + + } + + @Override + public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) { + + } + + @Override + public void replicaPromoted(ShardRouting replicaShard) { + + } + } + + class DelegatingRoutingChangesObserver implements RoutingChangesObserver { + + private final RoutingChangesObserver[] routingChangesObservers; + + public DelegatingRoutingChangesObserver(RoutingChangesObserver... routingChangesObservers) { + this.routingChangesObservers = routingChangesObservers; + } + + @Override + public void shardInitialized(ShardRouting unassignedShard) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + routingChangesObserver.shardInitialized(unassignedShard); + } + } + + @Override + public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + routingChangesObserver.shardStarted(initializingShard, startedShard); + } + } + + @Override + public void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + routingChangesObserver.relocationStarted(startedShard, targetRelocatingShard); + } + } + + @Override + public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + routingChangesObserver.unassignedInfoUpdated(unassignedShard, newUnassignedInfo); + } + } + + @Override + public void shardFailed(ShardRouting activeShard, UnassignedInfo unassignedInfo) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + routingChangesObserver.shardFailed(activeShard, unassignedInfo); + } + } + + @Override + public void relocationCompleted(ShardRouting removedRelocationSource) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + routingChangesObserver.relocationCompleted(removedRelocationSource); + } + } + + @Override + public void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + routingChangesObserver.relocationSourceRemoved(removedReplicaRelocationSource); + } + } + + @Override + public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + routingChangesObserver.startedPrimaryReinitialized(startedPrimaryShard, initializedShard); + } + } + + @Override + public void replicaPromoted(ShardRouting replicaShard) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + routingChangesObserver.replicaPromoted(replicaShard); + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index eb9a18228f3..2ac9fcc8dd0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -409,7 +409,7 @@ public class RoutingNodes implements Iterable { * @return the initialized shard */ public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId, - long expectedSize) { + long expectedSize, RoutingChangesObserver routingChangesObserver) { ensureMutable(); assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard; ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize); @@ -420,6 +420,7 @@ public class RoutingNodes implements Iterable { } addRecovery(initializedShard); assignedShardsAdd(initializedShard); + routingChangesObserver.shardInitialized(unassignedShard); return initializedShard; } @@ -429,7 +430,8 @@ public class RoutingNodes implements Iterable { * * @return pair of source relocating and target initializing shards. */ - public Tuple relocateShard(ShardRouting startedShard, String nodeId, long expectedShardSize) { + public Tuple relocateShard(ShardRouting startedShard, String nodeId, long expectedShardSize, + RoutingChangesObserver changes) { ensureMutable(); relocatingShards++; ShardRouting source = startedShard.relocate(nodeId, expectedShardSize); @@ -438,6 +440,7 @@ public class RoutingNodes implements Iterable { node(target.currentNodeId()).add(target); assignedShardsAdd(target); addRecovery(target); + changes.relocationStarted(startedShard, target); return Tuple.tuple(source, target); } @@ -448,10 +451,11 @@ public class RoutingNodes implements Iterable { * * @return the started shard */ - public ShardRouting startShard(ESLogger logger, ShardRouting initializingShard) { + public ShardRouting startShard(ESLogger logger, ShardRouting initializingShard, RoutingChangesObserver routingChangesObserver) { ensureMutable(); ShardRouting startedShard = started(initializingShard); logger.trace("{} marked shard as started (routing: {})", initializingShard.shardId(), initializingShard); + routingChangesObserver.shardStarted(initializingShard, startedShard); if (initializingShard.relocatingNodeId() != null) { // relocation target has been started, remove relocation source @@ -461,6 +465,7 @@ public class RoutingNodes implements Iterable { assert relocationSourceShard.getTargetRelocatingShard() == initializingShard : "relocation target mismatch, expected: " + initializingShard + " but was: " + relocationSourceShard.getTargetRelocatingShard(); remove(relocationSourceShard); + routingChangesObserver.relocationCompleted(relocationSourceShard); } return startedShard; } @@ -478,7 +483,8 @@ public class RoutingNodes implements Iterable { * - If shard is a (primary or replica) relocation target, this also clears the relocation information on the source shard. * */ - public void failShard(ESLogger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData) { + public void failShard(ESLogger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData, + RoutingChangesObserver routingChangesObserver) { ensureMutable(); assert failedShard.assignedToNode() : "only assigned shards can be failed"; assert indexMetaData.getIndex().equals(failedShard.index()) : @@ -502,7 +508,7 @@ public class RoutingNodes implements Iterable { UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT); - failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData); + failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver); } } } @@ -516,11 +522,13 @@ public class RoutingNodes implements Iterable { logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard); // cancel and remove target shard remove(targetShard); + routingChangesObserver.shardFailed(targetShard, unassignedInfo); } else { logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard); // promote to initializing shard without relocation source and ensure that removed relocation source // is not added back as unassigned shard removeRelocationSource(targetShard); + routingChangesObserver.relocationSourceRemoved(targetShard); } } @@ -542,6 +550,7 @@ public class RoutingNodes implements Iterable { cancelRelocation(sourceShard); remove(failedShard); } + routingChangesObserver.shardFailed(failedShard, unassignedInfo); } else { assert failedShard.active(); if (failedShard.primary()) { @@ -555,8 +564,10 @@ public class RoutingNodes implements Iterable { assert activeReplica.started() : "replica relocation should have been cancelled: " + activeReplica; movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo); ShardRouting primarySwappedCandidate = promoteActiveReplicaShardToPrimary(activeReplica); + routingChangesObserver.replicaPromoted(activeReplica); if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.getSettings())) { - reinitShadowPrimary(primarySwappedCandidate); + ShardRouting initializedShard = reinitShadowPrimary(primarySwappedCandidate); + routingChangesObserver.startedPrimaryReinitialized(primarySwappedCandidate, initializedShard); } } } else { @@ -567,6 +578,7 @@ public class RoutingNodes implements Iterable { moveToUnassigned(failedShard, unassignedInfo); } } + routingChangesObserver.shardFailed(failedShard, unassignedInfo); } assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard + " was matched but wasn't removed"; @@ -806,13 +818,11 @@ public class RoutingNodes implements Iterable { * Should be used with caution, typically, * the correct usage is to removeAndIgnore from the iterator. * @see #ignored() - * @see UnassignedIterator#removeAndIgnore(AllocationStatus) + * @see UnassignedIterator#removeAndIgnore(AllocationStatus, RoutingChangesObserver) * @see #isIgnoredEmpty() - * @return true iff the decision caused a change to the unassigned info */ - public boolean ignoreShard(ShardRouting shard, AllocationStatus allocationStatus) { + public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, RoutingChangesObserver changes) { nodes.ensureMutable(); - boolean changed = false; if (shard.primary()) { ignoredPrimaries++; UnassignedInfo currInfo = shard.unassignedInfo(); @@ -822,12 +832,12 @@ public class RoutingNodes implements Iterable { currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(), currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(), allocationStatus); - shard = shard.updateUnassignedInfo(newInfo); - changed = true; + ShardRouting updatedShard = shard.updateUnassignedInfo(newInfo); + changes.unassignedInfoUpdated(shard, newInfo); + shard = updatedShard; } } ignored.add(shard); - return changed; } public class UnassignedIterator implements Iterator { @@ -854,10 +864,11 @@ public class RoutingNodes implements Iterable { * * @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated. */ - public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) { + public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize, + RoutingChangesObserver routingChangesObserver) { nodes.ensureMutable(); innerRemove(); - return nodes.initializeShard(current, nodeId, existingAllocationId, expectedShardSize); + return nodes.initializeShard(current, nodeId, existingAllocationId, expectedShardSize, routingChangesObserver); } /** @@ -867,12 +878,11 @@ public class RoutingNodes implements Iterable { * that subsequent consumers of this API won't try to allocate this shard again. * * @param attempt the result of the allocation attempt - * @return true iff the decision caused an update to the unassigned info */ - public boolean removeAndIgnore(AllocationStatus attempt) { + public void removeAndIgnore(AllocationStatus attempt, RoutingChangesObserver changes) { nodes.ensureMutable(); innerRemove(); - return ignoreShard(current, attempt); + ignoreShard(current, attempt, changes); } private void updateShardRouting(ShardRouting shardRouting) { @@ -886,16 +896,17 @@ public class RoutingNodes implements Iterable { * @param unassignedInfo the new unassigned info to use * @return the shard with unassigned info updated */ - public ShardRouting updateUnassignedInfo(UnassignedInfo unassignedInfo) { + public ShardRouting updateUnassignedInfo(UnassignedInfo unassignedInfo, RoutingChangesObserver changes) { nodes.ensureMutable(); ShardRouting updatedShardRouting = current.updateUnassignedInfo(unassignedInfo); + changes.unassignedInfoUpdated(current, unassignedInfo); updateShardRouting(updatedShardRouting); return updatedShardRouting; } /** - * Unsupported operation, just there for the interface. Use {@link #removeAndIgnore(AllocationStatus)} or - * {@link #initialize(String, String, long)}. + * Unsupported operation, just there for the interface. Use {@link #removeAndIgnore(AllocationStatus, RoutingChangesObserver)} or + * {@link #initialize(String, String, long, RoutingChangesObserver)}. */ @Override public void remove() { @@ -919,8 +930,8 @@ public class RoutingNodes implements Iterable { /** * Returns true iff any unassigned shards are marked as temporarily ignored. - * @see UnassignedShards#ignoreShard(ShardRouting, AllocationStatus) - * @see UnassignedIterator#removeAndIgnore(AllocationStatus) + * @see UnassignedShards#ignoreShard(ShardRouting, AllocationStatus, RoutingChangesObserver) + * @see UnassignedIterator#removeAndIgnore(AllocationStatus, RoutingChangesObserver) */ public boolean isIgnoredEmpty() { return ignored.isEmpty(); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 8401b5f48e1..0b9e43c5c84 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -26,9 +26,6 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.AllocationId; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; @@ -43,13 +40,10 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayAllocator; -import org.elasticsearch.index.shard.ShardId; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Objects; -import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -93,7 +87,7 @@ public class AllocationService extends AbstractComponent { public Result applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { if (startedShards.isEmpty()) { - return new Result(false, clusterState.routingTable(), clusterState.metaData()); + return Result.unchanged(clusterState); } RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards @@ -111,103 +105,22 @@ public class AllocationService extends AbstractComponent { protected Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason) { return buildResultAndLogHealthChange(allocation, reason, new RoutingExplanations()); - } protected Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason, RoutingExplanations explanations) { - MetaData oldMetaData = allocation.metaData(); RoutingTable oldRoutingTable = allocation.routingTable(); RoutingNodes newRoutingNodes = allocation.routingNodes(); final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build(); - MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable); + MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(); assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata logClusterHealthStateChange( new ClusterStateHealth(ClusterState.builder(clusterName). - metaData(allocation.metaData()).routingTable(allocation.routingTable()).build()), + metaData(allocation.metaData()).routingTable(oldRoutingTable).build()), new ClusterStateHealth(ClusterState.builder(clusterName). metaData(newMetaData).routingTable(newRoutingTable).build()), reason ); - return new Result(true, newRoutingTable, newMetaData, explanations); - } - - /** - * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. Specifically - * we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on - * the changes made during this allocation. - * - * @param oldMetaData {@link MetaData} object from before the routing table was changed. - * @param oldRoutingTable {@link RoutingTable} from before the change. - * @param newRoutingTable new {@link RoutingTable} created by the allocation change - * @return adapted {@link MetaData}, potentially the original one if no change was needed. - */ - static MetaData updateMetaDataWithRoutingTable(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingTable newRoutingTable) { - MetaData.Builder metaDataBuilder = null; - for (IndexRoutingTable newIndexTable : newRoutingTable) { - final IndexMetaData oldIndexMetaData = oldMetaData.index(newIndexTable.getIndex()); - if (oldIndexMetaData == null) { - throw new IllegalStateException("no metadata found for index " + newIndexTable.getIndex().getName()); - } - IndexMetaData.Builder indexMetaDataBuilder = null; - for (IndexShardRoutingTable newShardTable : newIndexTable) { - final ShardId shardId = newShardTable.shardId(); - - // update activeAllocationIds - Set activeAllocationIds = newShardTable.activeShards().stream() - .map(ShardRouting::allocationId) - .filter(Objects::nonNull) - .map(AllocationId::getId) - .collect(Collectors.toSet()); - // only update active allocation ids if there is an active shard - if (activeAllocationIds.isEmpty() == false) { - // get currently stored allocation ids - Set storedAllocationIds = oldIndexMetaData.activeAllocationIds(shardId.id()); - if (activeAllocationIds.equals(storedAllocationIds) == false) { - if (indexMetaDataBuilder == null) { - indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); - } - indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds); - } - } - - // update primary terms - final ShardRouting newPrimary = newShardTable.primaryShard(); - if (newPrimary == null) { - throw new IllegalStateException("missing primary shard for " + newShardTable.shardId()); - } - final ShardRouting oldPrimary = oldRoutingTable.shardRoutingTable(shardId).primaryShard(); - if (oldPrimary == null) { - throw new IllegalStateException("missing primary shard for " + newShardTable.shardId()); - } - // we update the primary term on initial assignment or when a replica is promoted. Most notably we do *not* - // update them when a primary relocates - if (newPrimary.unassigned() || - newPrimary.isSameAllocation(oldPrimary) || - // we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to - // be initializing. However, when the target shard is activated, we still want the primary term to staty - // the same - (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.getTargetRelocatingShard()))) { - // do nothing - } else { - // incrementing the primary term - if (indexMetaDataBuilder == null) { - indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); - } - indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1); - } - } - if (indexMetaDataBuilder != null) { - if (metaDataBuilder == null) { - metaDataBuilder = MetaData.builder(oldMetaData); - } - metaDataBuilder.put(indexMetaDataBuilder); - } - } - if (metaDataBuilder != null) { - return metaDataBuilder.build(); - } else { - return oldMetaData; - } + return Result.changed(newRoutingTable, newMetaData, explanations); } public Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { @@ -223,7 +136,7 @@ public class AllocationService extends AbstractComponent { */ public Result applyFailedShards(ClusterState clusterState, List failedShards) { if (failedShards.isEmpty()) { - return new Result(false, clusterState.routingTable(), clusterState.metaData()); + return Result.unchanged(clusterState); } RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards @@ -247,7 +160,7 @@ public class AllocationService extends AbstractComponent { UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShardEntry.message, failedShardEntry.failure, failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT); - routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData); + routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData, allocation.changes()); } else { logger.trace("{} shard routing failed in an earlier iteration (routing: {})", shardToFail.shardId(), shardToFail); } @@ -271,25 +184,24 @@ public class AllocationService extends AbstractComponent { clusterInfoService.getClusterInfo(), currentNanoTime(), false); // first, clear from the shards any node id they used to belong to that is now dead - boolean changed = deassociateDeadNodes(allocation); + deassociateDeadNodes(allocation); if (reroute) { - changed |= reroute(allocation); + reroute(allocation); } - if (!changed) { - return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); + if (allocation.routingNodesChanged() == false) { + return Result.unchanged(clusterState); } return buildResultAndLogHealthChange(allocation, reason); } /** - * Removes delay markers from unassigned shards based on current time stamp. Returns true if markers were removed. + * Removes delay markers from unassigned shards based on current time stamp. */ - private boolean removeDelayMarkers(RoutingAllocation allocation) { + private void removeDelayMarkers(RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator(); final MetaData metaData = allocation.metaData(); - boolean changed = false; while (unassignedIterator.hasNext()) { ShardRouting shardRouting = unassignedIterator.next(); UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); @@ -297,14 +209,12 @@ public class AllocationService extends AbstractComponent { final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(allocation.getCurrentNanoTime(), metaData.getIndexSafe(shardRouting.index()).getSettings()); if (newComputedLeftDelayNanos == 0) { - changed = true; unassignedIterator.updateUnassignedInfo(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus())); + unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()), allocation.changes()); } } } - return changed; } /** @@ -366,8 +276,9 @@ public class AllocationService extends AbstractComponent { RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, clusterInfoService.getClusterInfo(), currentNanoTime(), false); allocation.debugDecision(debug); - if (!reroute(allocation)) { - return new Result(false, clusterState.routingTable(), clusterState.metaData()); + reroute(allocation); + if (allocation.routingNodesChanged() == false) { + return Result.unchanged(clusterState); } return buildResultAndLogHealthChange(allocation, reason); } @@ -380,43 +291,47 @@ public class AllocationService extends AbstractComponent { } } - private boolean reroute(RoutingAllocation allocation) { - assert deassociateDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes"; - - boolean changed = false; - // now allocate all the unassigned to available nodes - if (allocation.routingNodes().unassigned().size() > 0) { - changed |= removeDelayMarkers(allocation); - changed |= gatewayAllocator.allocateUnassigned(allocation); + private boolean hasDeadNodes(RoutingAllocation allocation) { + for (RoutingNode routingNode : allocation.routingNodes()) { + if (allocation.nodes().getDataNodes().containsKey(routingNode.nodeId()) == false) { + return true; + } } - - changed |= shardsAllocator.allocate(allocation); - assert RoutingNodes.assertShardStats(allocation.routingNodes()); - return changed; + return false; } - private boolean deassociateDeadNodes(RoutingAllocation allocation) { - boolean changed = false; + private void reroute(RoutingAllocation allocation) { + assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes"; + + // now allocate all the unassigned to available nodes + if (allocation.routingNodes().unassigned().size() > 0) { + removeDelayMarkers(allocation); + gatewayAllocator.allocateUnassigned(allocation); + } + + shardsAllocator.allocate(allocation); + assert RoutingNodes.assertShardStats(allocation.routingNodes()); + } + + private void deassociateDeadNodes(RoutingAllocation allocation) { for (Iterator it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) { RoutingNode node = it.next(); if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) { // its a live node, continue continue; } - changed = true; // now, go over all the shards routing on the node, and fail them for (ShardRouting shardRouting : node.copyShards()) { final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0; UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT); - allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData); + allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes()); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard // since it relies on the fact that the RoutingNode exists in the list of nodes it.remove(); } - return changed; } private void applyStartedShards(RoutingAllocation routingAllocation, List startedShardEntries) { @@ -430,7 +345,7 @@ public class AllocationService extends AbstractComponent { "shard routing to start does not exist in routing table, expected: " + startedShard + " but was: " + routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId()); - routingNodes.startShard(logger, startedShard); + routingNodes.startShard(logger, startedShard, routingAllocation.changes()); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java new file mode 100644 index 00000000000..1f232cd8ac5 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -0,0 +1,178 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.RoutingChangesObserver; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Observer that tracks changes made to RoutingNodes in order to update the primary terms and in-sync allocation ids in + * {@link IndexMetaData} once the allocation round has completed. + * + * Primary terms are updated on primary initialization or primary promotion. + * + * Allocation ids are added for shards that become active and removed for shards that stop being active. + */ +public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver { + private final Map shardChanges = new HashMap<>(); + + @Override + public void shardInitialized(ShardRouting unassignedShard) { + if (unassignedShard.primary()) { + increasePrimaryTerm(unassignedShard); + } + } + + @Override + public void replicaPromoted(ShardRouting replicaShard) { + increasePrimaryTerm(replicaShard); + } + + @Override + public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { + addAllocationId(startedShard); + } + + @Override + public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) { + if (failedShard.active()) { + removeAllocationId(failedShard); + } + } + + @Override + public void relocationCompleted(ShardRouting removedRelocationSource) { + removeAllocationId(removedRelocationSource); + } + + @Override + public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) { + removeAllocationId(startedPrimaryShard); + } + + /** + * Updates the current {@link MetaData} based on the changes of this RoutingChangesObserver. Specifically + * we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on + * the changes made during this allocation. + * + * @param oldMetaData {@link MetaData} object from before the routing nodes was changed. + * @return adapted {@link MetaData}, potentially the original one if no change was needed. + */ + public MetaData applyChanges(MetaData oldMetaData) { + Map>> changesGroupedByIndex = + shardChanges.entrySet().stream().collect(Collectors.groupingBy(e -> e.getKey().getIndex())); + + MetaData.Builder metaDataBuilder = null; + for (Map.Entry>> indexChanges : changesGroupedByIndex.entrySet()) { + Index index = indexChanges.getKey(); + final IndexMetaData oldIndexMetaData = oldMetaData.index(index); + if (oldIndexMetaData == null) { + throw new IllegalStateException("no metadata found for index " + index); + } + IndexMetaData.Builder indexMetaDataBuilder = null; + for (Map.Entry shardEntry : indexChanges.getValue()) { + ShardId shardId = shardEntry.getKey(); + Updates updates = shardEntry.getValue(); + + assert Sets.haveEmptyIntersection(updates.addedAllocationIds, updates.removedAllocationIds) : + "Allocation ids cannot be both added and removed in the same allocation round, added ids: " + + updates.addedAllocationIds + ", removed ids: " + updates.removedAllocationIds; + + Set activeAllocationIds = new HashSet<>(oldIndexMetaData.activeAllocationIds(shardId.id())); + activeAllocationIds.addAll(updates.addedAllocationIds); + activeAllocationIds.removeAll(updates.removedAllocationIds); + // only update active allocation ids if there is an active shard + if (activeAllocationIds.isEmpty() == false) { + if (indexMetaDataBuilder == null) { + indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); + } + indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds); + } + + if (updates.increaseTerm) { + if (indexMetaDataBuilder == null) { + indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); + } + indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1); + } + } + + if (indexMetaDataBuilder != null) { + if (metaDataBuilder == null) { + metaDataBuilder = MetaData.builder(oldMetaData); + } + metaDataBuilder.put(indexMetaDataBuilder); + } + } + + if (metaDataBuilder != null) { + return metaDataBuilder.build(); + } else { + return oldMetaData; + } + } + + /** + * Helper method that creates update entry for the given shard id if such an entry does not exist yet. + */ + private Updates changes(ShardId shardId) { + return shardChanges.computeIfAbsent(shardId, k -> new Updates()); + } + + /** + * Remove allocation id of this shard from the set of in-sync shard copies + */ + private void removeAllocationId(ShardRouting shardRouting) { + changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId()); + } + + /** + * Add allocation id of this shard to the set of in-sync shard copies + */ + private void addAllocationId(ShardRouting shardRouting) { + changes(shardRouting.shardId()).addedAllocationIds.add(shardRouting.allocationId().getId()); + } + + /** + * Increase primary term for this shard id + */ + private void increasePrimaryTerm(ShardRouting shardRouting) { + changes(shardRouting.shardId()).increaseTerm = true; + } + + private static class Updates { + private boolean increaseTerm; // whether primary term should be increased + private Set addedAllocationIds = new HashSet<>(); // allocation ids that should be added to the in-sync set + private Set removedAllocationIds = new HashSet<>(); // allocation ids that should be removed from the in-sync set + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index e3b16836d01..0794c6d828e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -59,13 +60,21 @@ public class RoutingAllocation { private final RoutingExplanations explanations; /** - * Creates a new {@link RoutingAllocation.Result} - * @param changed a flag to determine whether the actual {@link RoutingTable} has been changed + * Creates a new {@link RoutingAllocation.Result} where no change to the routing table was made. + * @param clusterState the unchanged {@link ClusterState} + */ + public static Result unchanged(ClusterState clusterState) { + return new Result(false, clusterState.routingTable(), clusterState.metaData(), new RoutingExplanations()); + } + + /** + * Creates a new {@link RoutingAllocation.Result} where changes were made to the routing table. * @param routingTable the {@link RoutingTable} this Result references * @param metaData the {@link MetaData} this Result references + * @param explanations Explanation for the reroute actions */ - public Result(boolean changed, RoutingTable routingTable, MetaData metaData) { - this(changed, routingTable, metaData, new RoutingExplanations()); + public static Result changed(RoutingTable routingTable, MetaData metaData, RoutingExplanations explanations) { + return new Result(true, routingTable, metaData, explanations); } /** @@ -75,7 +84,7 @@ public class RoutingAllocation { * @param metaData the {@link MetaData} this Result references * @param explanations Explanation for the reroute actions */ - public Result(boolean changed, RoutingTable routingTable, MetaData metaData, RoutingExplanations explanations) { + private Result(boolean changed, RoutingTable routingTable, MetaData metaData, RoutingExplanations explanations) { this.changed = changed; this.routingTable = routingTable; this.metaData = metaData; @@ -142,6 +151,12 @@ public class RoutingAllocation { private final long currentNanoTime; + private final IndexMetaDataUpdater indexMetaDataUpdater = new IndexMetaDataUpdater(); + private final RoutingNodesChangedObserver nodesChangedObserver = new RoutingNodesChangedObserver(); + private final RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.DelegatingRoutingChangesObserver( + nodesChangedObserver, indexMetaDataUpdater + ); + /** * Creates a new {@link RoutingAllocation} @@ -282,6 +297,27 @@ public class RoutingAllocation { return unmodifiableSet(new HashSet<>(ignore)); } + /** + * Returns observer to use for changes made to the routing nodes + */ + public RoutingChangesObserver changes() { + return routingChangesObserver; + } + + /** + * Returns updated {@link MetaData} based on the changes that were made to the routing nodes + */ + public MetaData updateMetaDataWithRoutingChanges() { + return indexMetaDataUpdater.applyChanges(metaData); + } + + /** + * Returns true iff changes were made to the routing nodes + */ + public boolean routingNodesChanged() { + return nodesChangedObserver.isChanged(); + } + /** * Create a routing decision, including the reason if the debug flag is * turned on diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesChangedObserver.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesChangedObserver.java new file mode 100644 index 00000000000..c971687234f --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesChangedObserver.java @@ -0,0 +1,104 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.routing.RoutingChangesObserver; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; + +/** + * Records if changes were made to {@link RoutingNodes} during an allocation round. + */ +public class RoutingNodesChangedObserver implements RoutingChangesObserver { + private boolean changed; + + /** + * Returns whether changes were made + */ + public boolean isChanged() { + return changed; + } + + @Override + public void shardInitialized(ShardRouting unassignedShard) { + assert unassignedShard.unassigned() : "expected unassigned shard " + unassignedShard; + setChanged(); + } + + @Override + public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { + assert initializingShard.initializing() : "expected initializing shard " + initializingShard; + assert startedShard.started() : "expected started shard " + startedShard; + setChanged(); + } + + @Override + public void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard) { + assert startedShard.started() : "expected started shard " + startedShard; + assert targetRelocatingShard.isRelocationTarget() : "expected relocation target shard " + targetRelocatingShard; + setChanged(); + } + + @Override + public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) { + assert unassignedShard.unassigned() : "expected unassigned shard " + unassignedShard; + setChanged(); + } + + @Override + public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) { + assert failedShard.assignedToNode() : "expected assigned shard " + failedShard; + setChanged(); + } + + @Override + public void relocationCompleted(ShardRouting removedRelocationSource) { + assert removedRelocationSource.relocating() : "expected relocating shard " + removedRelocationSource; + setChanged(); + } + + @Override + public void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource) { + assert removedReplicaRelocationSource.primary() == false && removedReplicaRelocationSource.isRelocationTarget() : + "expected replica relocation target shard " + removedReplicaRelocationSource; + setChanged(); + } + + @Override + public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) { + assert startedPrimaryShard.primary() && startedPrimaryShard.started() : "expected started primary shard " + startedPrimaryShard; + assert initializedShard.primary() && initializedShard.initializing(): "expected initializing primary shard " + initializedShard; + setChanged(); + } + + @Override + public void replicaPromoted(ShardRouting replicaShard) { + assert replicaShard.started() && replicaShard.primary() == false : "expected started replica shard " + replicaShard; + setChanged(); + } + + /** + * Marks the allocation as changed. + */ + private void setChanged() { + changed = true; + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index ed856f44e68..c86e256bd6f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -111,16 +111,15 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } @Override - public boolean allocate(RoutingAllocation allocation) { + public void allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { /* with no nodes this is pointless */ - return false; + return; } final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); - boolean changed = balancer.allocateUnassigned(); - changed |= balancer.moveShards(); - changed |= balancer.balance(); - return changed; + balancer.allocateUnassigned(); + balancer.moveShards(); + balancer.balance(); } /** @@ -277,11 +276,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards /** * Balances the nodes on the cluster model according to the weight function. * The actual balancing is delegated to {@link #balanceByWeights()} - * - * @return true if the current configuration has been - * changed, otherwise false */ - private boolean balance() { + private void balance() { if (logger.isTraceEnabled()) { logger.trace("Start balancing cluster"); } @@ -294,17 +290,17 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * Therefore we only do a rebalance if we have fetched all information. */ logger.debug("skipping rebalance due to in-flight shard/store fetches"); - return false; + return; } if (allocation.deciders().canRebalance(allocation).type() != Type.YES) { logger.trace("skipping rebalance as it is disabled"); - return false; + return; } if (nodes.size() < 2) { /* skip if we only have one node */ logger.trace("skipping rebalance as single node only"); - return false; + return; } - return balanceByWeights(); + balanceByWeights(); } public Map weighShard(ShardRouting shard) { @@ -340,12 +336,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * the threshold is set to 1.0 to enforce gaining relocation * only, or in other words relocations that move the weight delta closer * to 0.0 - * - * @return true if the current configuration has been - * changed, otherwise false */ - private boolean balanceByWeights() { - boolean changed = false; + private void balanceByWeights() { final NodeSorter sorter = newNodeSorter(); final AllocationDeciders deciders = allocation.deciders(); final ModelNode[] modelNodes = sorter.modelNodes; @@ -419,7 +411,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards sorter.sort(0, relevantNodes); lowIdx = 0; highIdx = relevantNodes - 1; - changed = true; continue; } } @@ -441,7 +432,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } } } - return changed; } /** @@ -506,14 +496,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * {@link ShardRoutingState#RELOCATING} and a shadow instance of this * shard is created with an incremented version in the state * {@link ShardRoutingState#INITIALIZING}. - * - * @return true if the allocation has changed, otherwise false */ - public boolean moveShards() { + public void moveShards() { // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // offloading the shards. - boolean changed = false; final NodeSorter sorter = newNodeSorter(); for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) { ShardRouting shardRouting = it.next(); @@ -524,20 +511,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards RoutingNode routingNode = sourceNode.getRoutingNode(); Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); if (decision.type() == Decision.Type.NO) { - changed |= moveShard(sorter, shardRouting, sourceNode, routingNode); + moveShard(sorter, shardRouting, sourceNode, routingNode); } } } - - return changed; } /** * Move started shard to the minimal eligible node with respect to the weight function - * - * @return true if the shard was moved successfully, otherwise false */ - private boolean moveShard(NodeSorter sorter, ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) { + private void moveShard(NodeSorter sorter, ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) { logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); sorter.reset(shardRouting.getIndexName()); /* @@ -553,17 +536,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); if (allocationDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too? sourceNode.removeShard(shardRouting); - Tuple relocatingShards = routingNodes.relocateShard(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + Tuple relocatingShards = routingNodes.relocateShard(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes()); currentNode.addShard(relocatingShards.v2()); if (logger.isTraceEnabled()) { logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node()); } - return true; + return; } } } logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); - return false; } /** @@ -595,19 +577,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards /** * Allocates all given shards on the minimal eligible node for the shards index * with respect to the weight function. All given shards must be unassigned. - * @return true if the current configuration has been - * changed, otherwise false */ - private boolean allocateUnassigned() { + private void allocateUnassigned() { RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); assert !nodes.isEmpty(); if (logger.isTraceEnabled()) { logger.trace("Start allocating unassigned shards"); } if (unassigned.isEmpty()) { - return false; + return; } - boolean changed = false; /* * TODO: We could be smarter here and group the shards by index and then @@ -651,9 +630,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards final Decision decision = deciders.canAllocate(shard, allocation); if (decision.type() == Type.NO) { UnassignedInfo.AllocationStatus allocationStatus = UnassignedInfo.AllocationStatus.fromDecision(decision); - changed |= unassigned.ignoreShard(shard, allocationStatus); + unassigned.ignoreShard(shard, allocationStatus, allocation.changes()); while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) { - changed |= unassigned.ignoreShard(primary[++i], allocationStatus); + unassigned.ignoreShard(primary[++i], allocationStatus, allocation.changes()); } continue; } else { @@ -728,9 +707,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); } - shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize); + shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes()); minNode.addShard(shard); - changed = true; continue; // don't add to ignoreUnassigned } else { minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize)); @@ -754,10 +732,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards UnassignedInfo.AllocationStatus allocationStatus = decision == null ? UnassignedInfo.AllocationStatus.DECIDERS_NO : UnassignedInfo.AllocationStatus.fromDecision(decision); - changed |= unassigned.ignoreShard(shard, allocationStatus); + unassigned.ignoreShard(shard, allocationStatus, allocation.changes()); if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas while(secondaryLength > 0 && comparator.compare(shard, secondary[secondaryLength-1]) == 0) { - changed |= unassigned.ignoreShard(secondary[--secondaryLength], allocationStatus); + unassigned.ignoreShard(secondary[--secondaryLength], allocationStatus, allocation.changes()); } } } @@ -768,7 +746,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards secondaryLength = 0; } while (primaryLength > 0); // clear everything we have either added it or moved to ignoreUnassigned - return changed; } /** @@ -820,7 +797,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards minNode.getNodeId()); } /* now allocate on the cluster */ - minNode.addShard(routingNodes.relocateShard(candidate, minNode.getNodeId(), shardSize).v1()); + minNode.addShard(routingNodes.relocateShard(candidate, minNode.getNodeId(), shardSize, allocation.changes()).v1()); return true; } else { assert decision.type() == Type.THROTTLE; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java index aa59e7788f3..35f3b265418 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -40,9 +40,8 @@ public interface ShardsAllocator { * - relocate shards to find a good shard balance in the cluster * * @param allocation current node allocation - * @return true if the allocation has changed, otherwise false */ - boolean allocate(RoutingAllocation allocation); + void allocate(RoutingAllocation allocation); /** * Returns a map of node to a float "weight" of where the allocator would like to place the shard. diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java index 2c5aa670f4f..f9b5a398ba7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java @@ -207,9 +207,9 @@ public abstract class AbstractAllocateAllocationCommand implements AllocationCom continue; } if (unassignedInfo != null) { - unassigned = it.updateUnassignedInfo(unassignedInfo); + unassigned = it.updateUnassignedInfo(unassignedInfo, allocation.changes()); } - it.initialize(routingNode.nodeId(), null, allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + it.initialize(routingNode.nodeId(), null, allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes()); return; } assert false : "shard to initialize not found in list of unassigned shards"; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java index 349df2d7af7..eae4739c127 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RerouteExplanation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -156,7 +155,7 @@ public class CancelAllocationCommand implements AllocationCommand { } } routingNodes.failShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting, - new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData); + new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData, allocation.changes()); return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command", "shard " + shardId + " on node " + discoNode + " can be cancelled")); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java index dbd345a81a0..6e302d458ba 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/MoveAllocationCommand.java @@ -132,7 +132,7 @@ public class MoveAllocationCommand implements AllocationCommand { if (decision.type() == Decision.Type.THROTTLE) { // its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it... } - allocation.routingNodes().relocateShard(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + allocation.routingNodes().relocateShard(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes()); } if (!found) { diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index ab6f6ae3ed2..a9fbe0ac82e 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -129,16 +129,13 @@ public class GatewayAllocator extends AbstractComponent { } } - public boolean allocateUnassigned(final RoutingAllocation allocation) { - boolean changed = false; - + public void allocateUnassigned(final RoutingAllocation allocation) { RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering - changed |= primaryShardAllocator.allocateUnassigned(allocation); - changed |= replicaShardAllocator.processExistingRecoveries(allocation); - changed |= replicaShardAllocator.allocateUnassigned(allocation); - return changed; + primaryShardAllocator.allocateUnassigned(allocation); + replicaShardAllocator.processExistingRecoveries(allocation); + replicaShardAllocator.allocateUnassigned(allocation); } class InternalAsyncFetch extends AsyncShardFetch { diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index dce50aa0a1a..600e2c5e404 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -59,8 +59,7 @@ import java.util.stream.Collectors; * Note that the PrimaryShardAllocator does *not* allocate primaries on index creation * (see {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator}), * nor does it allocate primaries when a primary shard failed and there is a valid replica - * copy that can immediately be promoted to primary, as this takes place in - * {@link RoutingNodes#failShard(ESLogger, ShardRouting, UnassignedInfo, IndexMetaData)}. + * copy that can immediately be promoted to primary, as this takes place in {@link RoutingNodes#failShard}. */ public abstract class PrimaryShardAllocator extends AbstractComponent { @@ -94,8 +93,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { logger.debug("using initial_shards [{}]", NODE_INITIAL_SHARDS_SETTING.get(settings)); } - public boolean allocateUnassigned(RoutingAllocation allocation) { - boolean changed = false; + public void allocateUnassigned(RoutingAllocation allocation) { final RoutingNodes routingNodes = allocation.routingNodes(); final MetaData metaData = allocation.metaData(); @@ -119,7 +117,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { if (shardState.hasData() == false) { logger.trace("{}: ignoring allocation, still fetching shard started state", shard); allocation.setHasPendingAsyncFetch(); - changed |= unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA); + unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA, allocation.changes()); continue; } @@ -160,7 +158,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { logger.debug("[{}][{}]: missing local data, recover from any node", shard.index(), shard.id()); } else { // we can't really allocate, so ignore it and continue - changed |= unassignedIterator.removeAndIgnore(AllocationStatus.NO_VALID_SHARD_COPY); + unassignedIterator.removeAndIgnore(AllocationStatus.NO_VALID_SHARD_COPY, allocation.changes()); logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodeShardsResult.allocationsFound); } continue; @@ -172,8 +170,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { if (nodesToAllocate.yesNodeShards.isEmpty() == false) { NodeGatewayStartedShards nodeShardState = nodesToAllocate.yesNodeShards.get(0); logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode()); - changed = true; - unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation.changes()); } else if (nodesToAllocate.throttleNodeShards.isEmpty() == true && nodesToAllocate.noNodeShards.isEmpty() == false) { // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard // can be force-allocated to one of the nodes. @@ -184,24 +181,22 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { NodeGatewayStartedShards nodeShardState = nodesToForceAllocate.yesNodeShards.get(0); logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode()); - changed = true; unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), - ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation.changes()); } else if (nodesToForceAllocate.throttleNodeShards.isEmpty() == false) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation", shard.index(), shard.id(), shard, nodesToForceAllocate.throttleNodeShards); - changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED); + unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED, allocation.changes()); } else { logger.debug("[{}][{}]: forced primary allocation denied [{}]", shard.index(), shard.id(), shard); - changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_NO); + unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_NO, allocation.changes()); } } else { // we are throttling this, but we have enough to allocate to this node, ignore it for now logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodeShards); - changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED); + unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED, allocation.changes()); } } - return changed; } /** diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 1cb555347cc..75a8a43fabd 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -31,11 +31,10 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; -import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -60,8 +59,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { * match. Today, a better match is one that has full sync id match compared to not having one in * the previous recovery. */ - public boolean processExistingRecoveries(RoutingAllocation allocation) { - boolean changed = false; + public void processExistingRecoveries(RoutingAllocation allocation) { MetaData metaData = allocation.metaData(); RoutingNodes routingNodes = allocation.routingNodes(); List shardCancellationActions = new ArrayList<>(); @@ -121,8 +119,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT); // don't cancel shard in the loop as it will cause a ConcurrentModificationException - shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, indexMetaData)); - changed = true; + shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, indexMetaData, allocation.changes())); } } } @@ -130,11 +127,9 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { for (Runnable action : shardCancellationActions) { action.run(); } - return changed; } - public boolean allocateUnassigned(RoutingAllocation allocation) { - boolean changed = false; + public void allocateUnassigned(RoutingAllocation allocation) { final RoutingNodes routingNodes = allocation.routingNodes(); final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); MetaData metaData = allocation.metaData(); @@ -154,7 +149,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { Decision decision = canBeAllocatedToAtLeastOneNode(shard, allocation); if (decision.type() != Decision.Type.YES) { logger.trace("{}: ignoring allocation, can't be allocated on any node", shard); - changed |= unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision)); + unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision), allocation.changes()); continue; } @@ -162,7 +157,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { if (shardStores.hasData() == false) { logger.trace("{}: ignoring allocation, still fetching shard stores", shard); allocation.setHasPendingAsyncFetch(); - changed |= unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA); + unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA, allocation.changes()); continue; // still fetching } @@ -187,19 +182,17 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { if (decision.type() == Decision.Type.THROTTLE) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node()); // we are throttling this, but we have enough to allocate to this node, ignore it for now - changed |= unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision)); + unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision), allocation.changes()); } else { logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node()); // we found a match - changed = true; - unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), null, allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), null, allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes()); } } else if (matchingNodes.hasAnyData() == false) { // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed - changed |= ignoreUnassignedIfDelayed(unassignedIterator, shard); + ignoreUnassignedIfDelayed(unassignedIterator, shard, allocation.changes()); } } - return changed; } /** @@ -212,14 +205,12 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { * * @param unassignedIterator iterator over unassigned shards * @param shard the shard which might be delayed - * @return true iff there was a change to the unassigned info */ - public boolean ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) { + public void ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard, RoutingChangesObserver changes) { if (shard.unassignedInfo().isDelayed()) { logger.debug("{}: allocation of [{}] is delayed", shard.shardId(), shard); - return unassignedIterator.removeAndIgnore(AllocationStatus.DELAYED_ALLOCATION); + unassignedIterator.removeAndIgnore(AllocationStatus.DELAYED_ALLOCATION, changes); } - return false; } /** diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 514968d1643..98404a22754 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -51,8 +51,8 @@ public class ClusterModuleTests extends ModuleTestCase { static class FakeShardsAllocator implements ShardsAllocator { @Override - public boolean allocate(RoutingAllocation allocation) { - return false; + public void allocate(RoutingAllocation allocation) { + // noop } @Override diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index 46376ac3afa..12b37a32156 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -326,52 +326,50 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { --------[test][3], node[3], [P], s[STARTED] ---- unassigned */ - public boolean allocate(RoutingAllocation allocation) { + public void allocate(RoutingAllocation allocation) { RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); - boolean changed = !unassigned.isEmpty(); ShardRouting[] drain = unassigned.drain(); ArrayUtil.timSort(drain, (a, b) -> { return a.primary() ? -1 : 1; }); // we have to allocate primaries first for (ShardRouting sr : drain) { switch (sr.id()) { case 0: if (sr.primary()) { - allocation.routingNodes().initializeShard(sr, "node1", null, -1); + allocation.routingNodes().initializeShard(sr, "node1", null, -1, allocation.changes()); } else { - allocation.routingNodes().initializeShard(sr, "node0", null, -1); + allocation.routingNodes().initializeShard(sr, "node0", null, -1, allocation.changes()); } break; case 1: if (sr.primary()) { - allocation.routingNodes().initializeShard(sr, "node1", null, -1); + allocation.routingNodes().initializeShard(sr, "node1", null, -1, allocation.changes()); } else { - allocation.routingNodes().initializeShard(sr, "node2", null, -1); + allocation.routingNodes().initializeShard(sr, "node2", null, -1, allocation.changes()); } break; case 2: if (sr.primary()) { - allocation.routingNodes().initializeShard(sr, "node3", null, -1); + allocation.routingNodes().initializeShard(sr, "node3", null, -1, allocation.changes()); } else { - allocation.routingNodes().initializeShard(sr, "node2", null, -1); + allocation.routingNodes().initializeShard(sr, "node2", null, -1, allocation.changes()); } break; case 3: if (sr.primary()) { - allocation.routingNodes().initializeShard(sr, "node3", null, -1); + allocation.routingNodes().initializeShard(sr, "node3", null, -1, allocation.changes()); } else { - allocation.routingNodes().initializeShard(sr, "node1", null, -1); + allocation.routingNodes().initializeShard(sr, "node1", null, -1, allocation.changes()); } break; case 4: if (sr.primary()) { - allocation.routingNodes().initializeShard(sr, "node2", null, -1); + allocation.routingNodes().initializeShard(sr, "node2", null, -1, allocation.changes()); } else { - allocation.routingNodes().initializeShard(sr, "node0", null, -1); + allocation.routingNodes().initializeShard(sr, "node0", null, -1, allocation.changes()); } break; } } - return changed; } }, EmptyClusterInfoService.INSTANCE); MetaData.Builder metaDataBuilder = MetaData.builder(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java index e52738fcc4d..a4227ea7e4b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java @@ -636,19 +636,19 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase { AllocationService strategy = createAllocationService(Settings.EMPTY, new NoopGatewayAllocator() { @Override - public boolean allocateUnassigned(RoutingAllocation allocation) { + public void allocateUnassigned(RoutingAllocation allocation) { if (allocateTest1.get() == false) { RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassigned.iterator(); while (iterator.hasNext()) { ShardRouting next = iterator.next(); if ("test1".equals(next.index().getName())) { - iterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT); + iterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes()); } } } - return super.allocateUnassigned(allocation); + super.allocateUnassigned(allocation); } }); @@ -742,11 +742,11 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase { AllocationService strategy = createAllocationService(Settings.builder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoopGatewayAllocator() { @Override - public boolean allocateUnassigned(RoutingAllocation allocation) { + public void allocateUnassigned(RoutingAllocation allocation) { if (hasFetches.get()) { allocation.setHasPendingAsyncFetch(); } - return super.allocateUnassigned(allocation); + super.allocateUnassigned(allocation); } }); diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index 3899158c68e..aeb4ff7b697 100644 --- a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -82,8 +82,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { } else { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), true, Version.V_2_1_0); } - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(false)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().iterator().next().shardId(), equalTo(shardId)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -99,8 +99,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { } else { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0); } - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -118,8 +118,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0); } testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -131,8 +131,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testNoMatchingAllocationIdFound() { RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2"); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "id1", randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -145,8 +145,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testNoActiveAllocationIds() { RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); testAllocator.addData(node1, 1, null, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId())); @@ -165,8 +165,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1); testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test")); } - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -185,8 +185,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0); testAllocator.addData(node1, 3, null, randomBoolean()); } - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId())); @@ -210,8 +210,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { new TestAllocateDecision(randomBoolean() ? Decision.YES : Decision.NO), getNoDeciderThatAllowsForceAllocate() }); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1"); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertTrue(changed); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertTrue(allocation.routingNodes().unassigned().ignored().isEmpty()); assertEquals(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), 1); assertEquals(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), node1.getId()); @@ -233,8 +233,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { getNoDeciderThatThrottlesForceAllocate() }); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1"); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertTrue(changed); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); List ignored = allocation.routingNodes().unassigned().ignored(); assertEquals(ignored.size(), 1); assertEquals(ignored.get(0).unassignedInfo().getLastAllocationStatus(), @@ -257,8 +257,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { new TestAllocateDecision(Decision.THROTTLE), getNoDeciderThatAllowsForceAllocate() }); RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1"); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertTrue(changed); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); List ignored = allocation.routingNodes().unassigned().ignored(); assertEquals(ignored.size(), 1); assertEquals(ignored.get(0).unassignedInfo().getLastAllocationStatus(), AllocationStatus.DECIDERS_THROTTLED); @@ -275,8 +275,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { boolean node1HasPrimaryShard = randomBoolean(); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard); testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); DiscoveryNode allocatedNode = node1HasPrimaryShard ? node1 : node2; @@ -297,8 +297,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0); testAllocator.addData(node1, 3, null, randomBoolean()); } - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -317,8 +317,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0); testAllocator.addData(node1, 3, null, randomBoolean()); } - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId())); @@ -331,8 +331,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testAllocateToTheHighestVersionOnLegacyIndex() { RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0); testAllocator.addData(node1, 10, null, randomBoolean()).addData(node2, 12, null, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId())); @@ -350,8 +350,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { testAllocator.addData(node1, 10, null, randomBoolean()); testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some allocId", randomBoolean()); testAllocator.addData(node3, 12, null, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId())); @@ -370,8 +370,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false; RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), clusterHasActiveAllocationIds); testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -388,8 +388,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false; RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders(), clusterHasActiveAllocationIds); testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } @@ -405,8 +405,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false; RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders(), clusterHasActiveAllocationIds); testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -419,8 +419,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testRestoreDoesNotAssignIfNoShardAvailable() { RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), randomBoolean()); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, false); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(false)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -454,8 +454,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1; RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), hasActiveAllocation); testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertClusterHealthStatus(allocation, ClusterHealthStatus.RED); @@ -471,8 +471,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1; RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(throttleAllocationDeciders(), hasActiveAllocation); testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } @@ -487,8 +487,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1; RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders(), hasActiveAllocation); testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertClusterHealthStatus(allocation, ClusterHealthStatus.RED); @@ -501,8 +501,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() { RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), randomBoolean()); testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean()); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(false)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); @@ -544,8 +544,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)).build(); RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas @@ -553,8 +553,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { testAllocator.addData(node1, 1, null, randomBoolean()); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false); - changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas @@ -562,8 +562,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { testAllocator.addData(node2, 1, null, randomBoolean()); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false); - changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); @@ -588,8 +588,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)).build(); RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false); - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas @@ -597,8 +597,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { testAllocator.addData(node1, 1, null, randomBoolean()); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false); - changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas @@ -606,8 +606,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { testAllocator.addData(node2, 2, null, randomBoolean()); allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false); - changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); diff --git a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 549b4e786cb..2570df3a561 100644 --- a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -234,16 +234,16 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { // we sometime return empty list of files, make sure we test this as well testAllocator.addData(node2, null); } - boolean changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(false)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT); testAllocator.addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); - changed = testAllocator.allocateUnassigned(allocation); - assertThat(changed, equalTo(true)); + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId())); } @@ -253,8 +253,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) .addData(node2, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) .addData(node3, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); - boolean changed = testAllocator.processExistingRecoveries(allocation); - assertThat(changed, equalTo(true)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); } @@ -264,8 +264,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) .addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) .addData(node3, randomBoolean() ? "MATCH" : "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); - boolean changed = testAllocator.processExistingRecoveries(allocation); - assertThat(changed, equalTo(false)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); } @@ -273,8 +273,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) .addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); - boolean changed = testAllocator.processExistingRecoveries(allocation); - assertThat(changed, equalTo(false)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java index 1aa0428454e..482d7c22c81 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java @@ -239,7 +239,7 @@ public abstract class ESAllocationTestCase extends ESTestCase { public void applyFailedShards(FailedRerouteAllocation allocation) {} @Override - public boolean allocateUnassigned(RoutingAllocation allocation) { + public void allocateUnassigned(RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator(); while (unassignedIterator.hasNext()) { ShardRouting shard = unassignedIterator.next(); @@ -247,9 +247,8 @@ public abstract class ESAllocationTestCase extends ESTestCase { if (shard.primary() || shard.allocatedPostIndexCreate(indexMetaData) == false) { continue; } - replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard); + replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard, allocation.changes()); } - return false; } } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/gateway/NoopGatewayAllocator.java b/test/framework/src/main/java/org/elasticsearch/test/gateway/NoopGatewayAllocator.java index 825b203022d..e321a98f371 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/gateway/NoopGatewayAllocator.java +++ b/test/framework/src/main/java/org/elasticsearch/test/gateway/NoopGatewayAllocator.java @@ -47,7 +47,7 @@ public class NoopGatewayAllocator extends GatewayAllocator { } @Override - public boolean allocateUnassigned(RoutingAllocation allocation) { - return false; + public void allocateUnassigned(RoutingAllocation allocation) { + // noop } }