Add routing changes API to RoutingAllocation (#19992)

Adds a class that records changes made to RoutingAllocation, so that at the end of the allocation round other values can be more easily derived based on these changes. Most notably, it:

- replaces the explicit boolean flag that is passed around everywhere to denote changes to the routing table. The boolean flag is automatically updated now when changes actually occur, preventing issues where it got out of sync with actual changes to the routing table.
- records actual changes made to RoutingNodes so that primary term and in-sync allocation ids, which are part of index metadata, can be efficiently updated just by looking at the shards that were actually changed.
This commit is contained in:
Yannick Welsch 2016-08-17 10:46:59 +02:00 committed by GitHub
parent d894db1590
commit 27a760f9c1
23 changed files with 738 additions and 346 deletions

View File

@ -60,8 +60,8 @@ public final class Allocators {
}
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
return false;
public void allocateUnassigned(RoutingAllocation allocation) {
// noop
}
}

View File

@ -23,7 +23,6 @@ import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.Diff;
@ -32,8 +31,7 @@ import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.IndexMetaDataUpdater;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
@ -342,8 +340,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
* a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary.
*
* Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard
* that can be indexed into) is larger than 0.
* See {@link AllocationService#updateMetaDataWithRoutingTable(MetaData, RoutingTable, RoutingTable)}.
* that can be indexed into) is larger than 0. See {@link IndexMetaDataUpdater#applyChanges(MetaData)}.
**/
public long primaryTerm(int shardId) {
return this.primaryTerms[shardId];

View File

@ -0,0 +1,196 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
/**
* Records changes made to {@link RoutingNodes} during an allocation round.
*/
public interface RoutingChangesObserver {
/**
* Called when unassigned shard is initialized. Does not include initializing relocation target shards.
*/
void shardInitialized(ShardRouting unassignedShard);
/**
* Called when an initializing shard is started.
*/
void shardStarted(ShardRouting initializingShard, ShardRouting startedShard);
/**
* Called when relocation of a started shard is initiated.
*/
void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard);
/**
* Called when an unassigned shard's unassigned information was updated
*/
void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo);
/**
* Called when a shard is failed or cancelled.
*/
void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo);
/**
* Called on relocation source when relocation completes after relocation target is started.
*/
void relocationCompleted(ShardRouting removedRelocationSource);
/**
* Called on replica relocation target when replica relocation source fails. Promotes the replica relocation target to ordinary
* initializing shard.
*/
void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource);
/**
* Called on started primary shard after it has been promoted from replica to primary and is reinitialized due to shadow replicas.
*/
void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard);
/**
* Called when started replica is promoted to primary.
*/
void replicaPromoted(ShardRouting replicaShard);
/**
* Abstract implementation of {@link RoutingChangesObserver} that does not take any action. Useful for subclasses that only override
* certain methods.
*/
class AbstractRoutingChangesObserver implements RoutingChangesObserver {
@Override
public void shardInitialized(ShardRouting unassignedShard) {
}
@Override
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
}
@Override
public void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard) {
}
@Override
public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) {
}
@Override
public void shardFailed(ShardRouting activeShard, UnassignedInfo unassignedInfo) {
}
@Override
public void relocationCompleted(ShardRouting removedRelocationSource) {
}
@Override
public void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource) {
}
@Override
public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) {
}
@Override
public void replicaPromoted(ShardRouting replicaShard) {
}
}
class DelegatingRoutingChangesObserver implements RoutingChangesObserver {
private final RoutingChangesObserver[] routingChangesObservers;
public DelegatingRoutingChangesObserver(RoutingChangesObserver... routingChangesObservers) {
this.routingChangesObservers = routingChangesObservers;
}
@Override
public void shardInitialized(ShardRouting unassignedShard) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.shardInitialized(unassignedShard);
}
}
@Override
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.shardStarted(initializingShard, startedShard);
}
}
@Override
public void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.relocationStarted(startedShard, targetRelocatingShard);
}
}
@Override
public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.unassignedInfoUpdated(unassignedShard, newUnassignedInfo);
}
}
@Override
public void shardFailed(ShardRouting activeShard, UnassignedInfo unassignedInfo) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.shardFailed(activeShard, unassignedInfo);
}
}
@Override
public void relocationCompleted(ShardRouting removedRelocationSource) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.relocationCompleted(removedRelocationSource);
}
}
@Override
public void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.relocationSourceRemoved(removedReplicaRelocationSource);
}
}
@Override
public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.startedPrimaryReinitialized(startedPrimaryShard, initializedShard);
}
}
@Override
public void replicaPromoted(ShardRouting replicaShard) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.replicaPromoted(replicaShard);
}
}
}
}

View File

@ -409,7 +409,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
* @return the initialized shard
*/
public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId,
long expectedSize) {
long expectedSize, RoutingChangesObserver routingChangesObserver) {
ensureMutable();
assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
@ -420,6 +420,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
addRecovery(initializedShard);
assignedShardsAdd(initializedShard);
routingChangesObserver.shardInitialized(unassignedShard);
return initializedShard;
}
@ -429,7 +430,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
*
* @return pair of source relocating and target initializing shards.
*/
public Tuple<ShardRouting,ShardRouting> relocateShard(ShardRouting startedShard, String nodeId, long expectedShardSize) {
public Tuple<ShardRouting,ShardRouting> relocateShard(ShardRouting startedShard, String nodeId, long expectedShardSize,
RoutingChangesObserver changes) {
ensureMutable();
relocatingShards++;
ShardRouting source = startedShard.relocate(nodeId, expectedShardSize);
@ -438,6 +440,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);
addRecovery(target);
changes.relocationStarted(startedShard, target);
return Tuple.tuple(source, target);
}
@ -448,10 +451,11 @@ public class RoutingNodes implements Iterable<RoutingNode> {
*
* @return the started shard
*/
public ShardRouting startShard(ESLogger logger, ShardRouting initializingShard) {
public ShardRouting startShard(ESLogger logger, ShardRouting initializingShard, RoutingChangesObserver routingChangesObserver) {
ensureMutable();
ShardRouting startedShard = started(initializingShard);
logger.trace("{} marked shard as started (routing: {})", initializingShard.shardId(), initializingShard);
routingChangesObserver.shardStarted(initializingShard, startedShard);
if (initializingShard.relocatingNodeId() != null) {
// relocation target has been started, remove relocation source
@ -461,6 +465,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
assert relocationSourceShard.getTargetRelocatingShard() == initializingShard : "relocation target mismatch, expected: "
+ initializingShard + " but was: " + relocationSourceShard.getTargetRelocatingShard();
remove(relocationSourceShard);
routingChangesObserver.relocationCompleted(relocationSourceShard);
}
return startedShard;
}
@ -478,7 +483,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
* - If shard is a (primary or replica) relocation target, this also clears the relocation information on the source shard.
*
*/
public void failShard(ESLogger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData) {
public void failShard(ESLogger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData,
RoutingChangesObserver routingChangesObserver) {
ensureMutable();
assert failedShard.assignedToNode() : "only assigned shards can be failed";
assert indexMetaData.getIndex().equals(failedShard.index()) :
@ -502,7 +508,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData);
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver);
}
}
}
@ -516,11 +522,13 @@ public class RoutingNodes implements Iterable<RoutingNode> {
logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard);
// cancel and remove target shard
remove(targetShard);
routingChangesObserver.shardFailed(targetShard, unassignedInfo);
} else {
logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard);
// promote to initializing shard without relocation source and ensure that removed relocation source
// is not added back as unassigned shard
removeRelocationSource(targetShard);
routingChangesObserver.relocationSourceRemoved(targetShard);
}
}
@ -542,6 +550,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
cancelRelocation(sourceShard);
remove(failedShard);
}
routingChangesObserver.shardFailed(failedShard, unassignedInfo);
} else {
assert failedShard.active();
if (failedShard.primary()) {
@ -555,8 +564,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
assert activeReplica.started() : "replica relocation should have been cancelled: " + activeReplica;
movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
ShardRouting primarySwappedCandidate = promoteActiveReplicaShardToPrimary(activeReplica);
routingChangesObserver.replicaPromoted(activeReplica);
if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.getSettings())) {
reinitShadowPrimary(primarySwappedCandidate);
ShardRouting initializedShard = reinitShadowPrimary(primarySwappedCandidate);
routingChangesObserver.startedPrimaryReinitialized(primarySwappedCandidate, initializedShard);
}
}
} else {
@ -567,6 +578,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
moveToUnassigned(failedShard, unassignedInfo);
}
}
routingChangesObserver.shardFailed(failedShard, unassignedInfo);
}
assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
" was matched but wasn't removed";
@ -806,13 +818,11 @@ public class RoutingNodes implements Iterable<RoutingNode> {
* Should be used with caution, typically,
* the correct usage is to removeAndIgnore from the iterator.
* @see #ignored()
* @see UnassignedIterator#removeAndIgnore(AllocationStatus)
* @see UnassignedIterator#removeAndIgnore(AllocationStatus, RoutingChangesObserver)
* @see #isIgnoredEmpty()
* @return true iff the decision caused a change to the unassigned info
*/
public boolean ignoreShard(ShardRouting shard, AllocationStatus allocationStatus) {
public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, RoutingChangesObserver changes) {
nodes.ensureMutable();
boolean changed = false;
if (shard.primary()) {
ignoredPrimaries++;
UnassignedInfo currInfo = shard.unassignedInfo();
@ -822,12 +832,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(),
currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(),
allocationStatus);
shard = shard.updateUnassignedInfo(newInfo);
changed = true;
ShardRouting updatedShard = shard.updateUnassignedInfo(newInfo);
changes.unassignedInfoUpdated(shard, newInfo);
shard = updatedShard;
}
}
ignored.add(shard);
return changed;
}
public class UnassignedIterator implements Iterator<ShardRouting> {
@ -854,10 +864,11 @@ public class RoutingNodes implements Iterable<RoutingNode> {
*
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
*/
public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) {
public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize,
RoutingChangesObserver routingChangesObserver) {
nodes.ensureMutable();
innerRemove();
return nodes.initializeShard(current, nodeId, existingAllocationId, expectedShardSize);
return nodes.initializeShard(current, nodeId, existingAllocationId, expectedShardSize, routingChangesObserver);
}
/**
@ -867,12 +878,11 @@ public class RoutingNodes implements Iterable<RoutingNode> {
* that subsequent consumers of this API won't try to allocate this shard again.
*
* @param attempt the result of the allocation attempt
* @return true iff the decision caused an update to the unassigned info
*/
public boolean removeAndIgnore(AllocationStatus attempt) {
public void removeAndIgnore(AllocationStatus attempt, RoutingChangesObserver changes) {
nodes.ensureMutable();
innerRemove();
return ignoreShard(current, attempt);
ignoreShard(current, attempt, changes);
}
private void updateShardRouting(ShardRouting shardRouting) {
@ -886,16 +896,17 @@ public class RoutingNodes implements Iterable<RoutingNode> {
* @param unassignedInfo the new unassigned info to use
* @return the shard with unassigned info updated
*/
public ShardRouting updateUnassignedInfo(UnassignedInfo unassignedInfo) {
public ShardRouting updateUnassignedInfo(UnassignedInfo unassignedInfo, RoutingChangesObserver changes) {
nodes.ensureMutable();
ShardRouting updatedShardRouting = current.updateUnassignedInfo(unassignedInfo);
changes.unassignedInfoUpdated(current, unassignedInfo);
updateShardRouting(updatedShardRouting);
return updatedShardRouting;
}
/**
* Unsupported operation, just there for the interface. Use {@link #removeAndIgnore(AllocationStatus)} or
* {@link #initialize(String, String, long)}.
* Unsupported operation, just there for the interface. Use {@link #removeAndIgnore(AllocationStatus, RoutingChangesObserver)} or
* {@link #initialize(String, String, long, RoutingChangesObserver)}.
*/
@Override
public void remove() {
@ -919,8 +930,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/**
* Returns <code>true</code> iff any unassigned shards are marked as temporarily ignored.
* @see UnassignedShards#ignoreShard(ShardRouting, AllocationStatus)
* @see UnassignedIterator#removeAndIgnore(AllocationStatus)
* @see UnassignedShards#ignoreShard(ShardRouting, AllocationStatus, RoutingChangesObserver)
* @see UnassignedIterator#removeAndIgnore(AllocationStatus, RoutingChangesObserver)
*/
public boolean isIgnoredEmpty() {
return ignored.isEmpty();

View File

@ -26,9 +26,6 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
@ -43,13 +40,10 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.shard.ShardId;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -93,7 +87,7 @@ public class AllocationService extends AbstractComponent {
public Result applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards, boolean withReroute) {
if (startedShards.isEmpty()) {
return new Result(false, clusterState.routingTable(), clusterState.metaData());
return Result.unchanged(clusterState);
}
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
@ -111,103 +105,22 @@ public class AllocationService extends AbstractComponent {
protected Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason) {
return buildResultAndLogHealthChange(allocation, reason, new RoutingExplanations());
}
protected Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason, RoutingExplanations explanations) {
MetaData oldMetaData = allocation.metaData();
RoutingTable oldRoutingTable = allocation.routingTable();
RoutingNodes newRoutingNodes = allocation.routingNodes();
final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build();
MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable);
MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges();
assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata
logClusterHealthStateChange(
new ClusterStateHealth(ClusterState.builder(clusterName).
metaData(allocation.metaData()).routingTable(allocation.routingTable()).build()),
metaData(allocation.metaData()).routingTable(oldRoutingTable).build()),
new ClusterStateHealth(ClusterState.builder(clusterName).
metaData(newMetaData).routingTable(newRoutingTable).build()),
reason
);
return new Result(true, newRoutingTable, newMetaData, explanations);
}
/**
* Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. Specifically
* we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on
* the changes made during this allocation.
*
* @param oldMetaData {@link MetaData} object from before the routing table was changed.
* @param oldRoutingTable {@link RoutingTable} from before the change.
* @param newRoutingTable new {@link RoutingTable} created by the allocation change
* @return adapted {@link MetaData}, potentially the original one if no change was needed.
*/
static MetaData updateMetaDataWithRoutingTable(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingTable newRoutingTable) {
MetaData.Builder metaDataBuilder = null;
for (IndexRoutingTable newIndexTable : newRoutingTable) {
final IndexMetaData oldIndexMetaData = oldMetaData.index(newIndexTable.getIndex());
if (oldIndexMetaData == null) {
throw new IllegalStateException("no metadata found for index " + newIndexTable.getIndex().getName());
}
IndexMetaData.Builder indexMetaDataBuilder = null;
for (IndexShardRoutingTable newShardTable : newIndexTable) {
final ShardId shardId = newShardTable.shardId();
// update activeAllocationIds
Set<String> activeAllocationIds = newShardTable.activeShards().stream()
.map(ShardRouting::allocationId)
.filter(Objects::nonNull)
.map(AllocationId::getId)
.collect(Collectors.toSet());
// only update active allocation ids if there is an active shard
if (activeAllocationIds.isEmpty() == false) {
// get currently stored allocation ids
Set<String> storedAllocationIds = oldIndexMetaData.activeAllocationIds(shardId.id());
if (activeAllocationIds.equals(storedAllocationIds) == false) {
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds);
}
}
// update primary terms
final ShardRouting newPrimary = newShardTable.primaryShard();
if (newPrimary == null) {
throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
}
final ShardRouting oldPrimary = oldRoutingTable.shardRoutingTable(shardId).primaryShard();
if (oldPrimary == null) {
throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
}
// we update the primary term on initial assignment or when a replica is promoted. Most notably we do *not*
// update them when a primary relocates
if (newPrimary.unassigned() ||
newPrimary.isSameAllocation(oldPrimary) ||
// we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to
// be initializing. However, when the target shard is activated, we still want the primary term to staty
// the same
(oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.getTargetRelocatingShard()))) {
// do nothing
} else {
// incrementing the primary term
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1);
}
}
if (indexMetaDataBuilder != null) {
if (metaDataBuilder == null) {
metaDataBuilder = MetaData.builder(oldMetaData);
}
metaDataBuilder.put(indexMetaDataBuilder);
}
}
if (metaDataBuilder != null) {
return metaDataBuilder.build();
} else {
return oldMetaData;
}
return Result.changed(newRoutingTable, newMetaData, explanations);
}
public Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
@ -223,7 +136,7 @@ public class AllocationService extends AbstractComponent {
*/
public Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
if (failedShards.isEmpty()) {
return new Result(false, clusterState.routingTable(), clusterState.metaData());
return Result.unchanged(clusterState);
}
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
@ -247,7 +160,7 @@ public class AllocationService extends AbstractComponent {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShardEntry.message,
failedShardEntry.failure, failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData);
routingNodes.failShard(logger, failedShard, unassignedInfo, indexMetaData, allocation.changes());
} else {
logger.trace("{} shard routing failed in an earlier iteration (routing: {})", shardToFail.shardId(), shardToFail);
}
@ -271,25 +184,24 @@ public class AllocationService extends AbstractComponent {
clusterInfoService.getClusterInfo(), currentNanoTime(), false);
// first, clear from the shards any node id they used to belong to that is now dead
boolean changed = deassociateDeadNodes(allocation);
deassociateDeadNodes(allocation);
if (reroute) {
changed |= reroute(allocation);
reroute(allocation);
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
if (allocation.routingNodesChanged() == false) {
return Result.unchanged(clusterState);
}
return buildResultAndLogHealthChange(allocation, reason);
}
/**
* Removes delay markers from unassigned shards based on current time stamp. Returns true if markers were removed.
* Removes delay markers from unassigned shards based on current time stamp.
*/
private boolean removeDelayMarkers(RoutingAllocation allocation) {
private void removeDelayMarkers(RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
final MetaData metaData = allocation.metaData();
boolean changed = false;
while (unassignedIterator.hasNext()) {
ShardRouting shardRouting = unassignedIterator.next();
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
@ -297,14 +209,12 @@ public class AllocationService extends AbstractComponent {
final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(allocation.getCurrentNanoTime(),
metaData.getIndexSafe(shardRouting.index()).getSettings());
if (newComputedLeftDelayNanos == 0) {
changed = true;
unassignedIterator.updateUnassignedInfo(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()));
unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()), allocation.changes());
}
}
}
return changed;
}
/**
@ -366,8 +276,9 @@ public class AllocationService extends AbstractComponent {
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime(), false);
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new Result(false, clusterState.routingTable(), clusterState.metaData());
reroute(allocation);
if (allocation.routingNodesChanged() == false) {
return Result.unchanged(clusterState);
}
return buildResultAndLogHealthChange(allocation, reason);
}
@ -380,43 +291,47 @@ public class AllocationService extends AbstractComponent {
}
}
private boolean reroute(RoutingAllocation allocation) {
assert deassociateDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
boolean changed = false;
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
changed |= removeDelayMarkers(allocation);
changed |= gatewayAllocator.allocateUnassigned(allocation);
private boolean hasDeadNodes(RoutingAllocation allocation) {
for (RoutingNode routingNode : allocation.routingNodes()) {
if (allocation.nodes().getDataNodes().containsKey(routingNode.nodeId()) == false) {
return true;
}
}
changed |= shardsAllocator.allocate(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
return changed;
return false;
}
private boolean deassociateDeadNodes(RoutingAllocation allocation) {
boolean changed = false;
private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
removeDelayMarkers(allocation);
gatewayAllocator.allocateUnassigned(allocation);
}
shardsAllocator.allocate(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}
private void deassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) {
RoutingNode node = it.next();
if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) {
// its a live node, continue
continue;
}
changed = true;
// now, go over all the shards routing on the node, and fail them
for (ShardRouting shardRouting : node.copyShards()) {
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData);
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
// since it relies on the fact that the RoutingNode exists in the list of nodes
it.remove();
}
return changed;
}
private void applyStartedShards(RoutingAllocation routingAllocation, List<ShardRouting> startedShardEntries) {
@ -430,7 +345,7 @@ public class AllocationService extends AbstractComponent {
"shard routing to start does not exist in routing table, expected: " + startedShard + " but was: " +
routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());
routingNodes.startShard(logger, startedShard);
routingNodes.startShard(logger, startedShard, routingAllocation.changes());
}
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Observer that tracks changes made to RoutingNodes in order to update the primary terms and in-sync allocation ids in
* {@link IndexMetaData} once the allocation round has completed.
*
* Primary terms are updated on primary initialization or primary promotion.
*
* Allocation ids are added for shards that become active and removed for shards that stop being active.
*/
public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver {
private final Map<ShardId, Updates> shardChanges = new HashMap<>();
@Override
public void shardInitialized(ShardRouting unassignedShard) {
if (unassignedShard.primary()) {
increasePrimaryTerm(unassignedShard);
}
}
@Override
public void replicaPromoted(ShardRouting replicaShard) {
increasePrimaryTerm(replicaShard);
}
@Override
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
addAllocationId(startedShard);
}
@Override
public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
if (failedShard.active()) {
removeAllocationId(failedShard);
}
}
@Override
public void relocationCompleted(ShardRouting removedRelocationSource) {
removeAllocationId(removedRelocationSource);
}
@Override
public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) {
removeAllocationId(startedPrimaryShard);
}
/**
* Updates the current {@link MetaData} based on the changes of this RoutingChangesObserver. Specifically
* we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on
* the changes made during this allocation.
*
* @param oldMetaData {@link MetaData} object from before the routing nodes was changed.
* @return adapted {@link MetaData}, potentially the original one if no change was needed.
*/
public MetaData applyChanges(MetaData oldMetaData) {
Map<Index, List<Map.Entry<ShardId, Updates>>> changesGroupedByIndex =
shardChanges.entrySet().stream().collect(Collectors.groupingBy(e -> e.getKey().getIndex()));
MetaData.Builder metaDataBuilder = null;
for (Map.Entry<Index, List<Map.Entry<ShardId, Updates>>> indexChanges : changesGroupedByIndex.entrySet()) {
Index index = indexChanges.getKey();
final IndexMetaData oldIndexMetaData = oldMetaData.index(index);
if (oldIndexMetaData == null) {
throw new IllegalStateException("no metadata found for index " + index);
}
IndexMetaData.Builder indexMetaDataBuilder = null;
for (Map.Entry<ShardId, Updates> shardEntry : indexChanges.getValue()) {
ShardId shardId = shardEntry.getKey();
Updates updates = shardEntry.getValue();
assert Sets.haveEmptyIntersection(updates.addedAllocationIds, updates.removedAllocationIds) :
"Allocation ids cannot be both added and removed in the same allocation round, added ids: " +
updates.addedAllocationIds + ", removed ids: " + updates.removedAllocationIds;
Set<String> activeAllocationIds = new HashSet<>(oldIndexMetaData.activeAllocationIds(shardId.id()));
activeAllocationIds.addAll(updates.addedAllocationIds);
activeAllocationIds.removeAll(updates.removedAllocationIds);
// only update active allocation ids if there is an active shard
if (activeAllocationIds.isEmpty() == false) {
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds);
}
if (updates.increaseTerm) {
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1);
}
}
if (indexMetaDataBuilder != null) {
if (metaDataBuilder == null) {
metaDataBuilder = MetaData.builder(oldMetaData);
}
metaDataBuilder.put(indexMetaDataBuilder);
}
}
if (metaDataBuilder != null) {
return metaDataBuilder.build();
} else {
return oldMetaData;
}
}
/**
* Helper method that creates update entry for the given shard id if such an entry does not exist yet.
*/
private Updates changes(ShardId shardId) {
return shardChanges.computeIfAbsent(shardId, k -> new Updates());
}
/**
* Remove allocation id of this shard from the set of in-sync shard copies
*/
private void removeAllocationId(ShardRouting shardRouting) {
changes(shardRouting.shardId()).removedAllocationIds.add(shardRouting.allocationId().getId());
}
/**
* Add allocation id of this shard to the set of in-sync shard copies
*/
private void addAllocationId(ShardRouting shardRouting) {
changes(shardRouting.shardId()).addedAllocationIds.add(shardRouting.allocationId().getId());
}
/**
* Increase primary term for this shard id
*/
private void increasePrimaryTerm(ShardRouting shardRouting) {
changes(shardRouting.shardId()).increaseTerm = true;
}
private static class Updates {
private boolean increaseTerm; // whether primary term should be increased
private Set<String> addedAllocationIds = new HashSet<>(); // allocation ids that should be added to the in-sync set
private Set<String> removedAllocationIds = new HashSet<>(); // allocation ids that should be removed from the in-sync set
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@ -59,13 +60,21 @@ public class RoutingAllocation {
private final RoutingExplanations explanations;
/**
* Creates a new {@link RoutingAllocation.Result}
* @param changed a flag to determine whether the actual {@link RoutingTable} has been changed
* Creates a new {@link RoutingAllocation.Result} where no change to the routing table was made.
* @param clusterState the unchanged {@link ClusterState}
*/
public static Result unchanged(ClusterState clusterState) {
return new Result(false, clusterState.routingTable(), clusterState.metaData(), new RoutingExplanations());
}
/**
* Creates a new {@link RoutingAllocation.Result} where changes were made to the routing table.
* @param routingTable the {@link RoutingTable} this Result references
* @param metaData the {@link MetaData} this Result references
* @param explanations Explanation for the reroute actions
*/
public Result(boolean changed, RoutingTable routingTable, MetaData metaData) {
this(changed, routingTable, metaData, new RoutingExplanations());
public static Result changed(RoutingTable routingTable, MetaData metaData, RoutingExplanations explanations) {
return new Result(true, routingTable, metaData, explanations);
}
/**
@ -75,7 +84,7 @@ public class RoutingAllocation {
* @param metaData the {@link MetaData} this Result references
* @param explanations Explanation for the reroute actions
*/
public Result(boolean changed, RoutingTable routingTable, MetaData metaData, RoutingExplanations explanations) {
private Result(boolean changed, RoutingTable routingTable, MetaData metaData, RoutingExplanations explanations) {
this.changed = changed;
this.routingTable = routingTable;
this.metaData = metaData;
@ -142,6 +151,12 @@ public class RoutingAllocation {
private final long currentNanoTime;
private final IndexMetaDataUpdater indexMetaDataUpdater = new IndexMetaDataUpdater();
private final RoutingNodesChangedObserver nodesChangedObserver = new RoutingNodesChangedObserver();
private final RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.DelegatingRoutingChangesObserver(
nodesChangedObserver, indexMetaDataUpdater
);
/**
* Creates a new {@link RoutingAllocation}
@ -282,6 +297,27 @@ public class RoutingAllocation {
return unmodifiableSet(new HashSet<>(ignore));
}
/**
* Returns observer to use for changes made to the routing nodes
*/
public RoutingChangesObserver changes() {
return routingChangesObserver;
}
/**
* Returns updated {@link MetaData} based on the changes that were made to the routing nodes
*/
public MetaData updateMetaDataWithRoutingChanges() {
return indexMetaDataUpdater.applyChanges(metaData);
}
/**
* Returns true iff changes were made to the routing nodes
*/
public boolean routingNodesChanged() {
return nodesChangedObserver.isChanged();
}
/**
* Create a routing decision, including the reason if the debug flag is
* turned on

View File

@ -0,0 +1,104 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
/**
* Records if changes were made to {@link RoutingNodes} during an allocation round.
*/
public class RoutingNodesChangedObserver implements RoutingChangesObserver {
private boolean changed;
/**
* Returns whether changes were made
*/
public boolean isChanged() {
return changed;
}
@Override
public void shardInitialized(ShardRouting unassignedShard) {
assert unassignedShard.unassigned() : "expected unassigned shard " + unassignedShard;
setChanged();
}
@Override
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
assert initializingShard.initializing() : "expected initializing shard " + initializingShard;
assert startedShard.started() : "expected started shard " + startedShard;
setChanged();
}
@Override
public void relocationStarted(ShardRouting startedShard, ShardRouting targetRelocatingShard) {
assert startedShard.started() : "expected started shard " + startedShard;
assert targetRelocatingShard.isRelocationTarget() : "expected relocation target shard " + targetRelocatingShard;
setChanged();
}
@Override
public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) {
assert unassignedShard.unassigned() : "expected unassigned shard " + unassignedShard;
setChanged();
}
@Override
public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
assert failedShard.assignedToNode() : "expected assigned shard " + failedShard;
setChanged();
}
@Override
public void relocationCompleted(ShardRouting removedRelocationSource) {
assert removedRelocationSource.relocating() : "expected relocating shard " + removedRelocationSource;
setChanged();
}
@Override
public void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource) {
assert removedReplicaRelocationSource.primary() == false && removedReplicaRelocationSource.isRelocationTarget() :
"expected replica relocation target shard " + removedReplicaRelocationSource;
setChanged();
}
@Override
public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) {
assert startedPrimaryShard.primary() && startedPrimaryShard.started() : "expected started primary shard " + startedPrimaryShard;
assert initializedShard.primary() && initializedShard.initializing(): "expected initializing primary shard " + initializedShard;
setChanged();
}
@Override
public void replicaPromoted(ShardRouting replicaShard) {
assert replicaShard.started() && replicaShard.primary() == false : "expected started replica shard " + replicaShard;
setChanged();
}
/**
* Marks the allocation as changed.
*/
private void setChanged() {
changed = true;
}
}

View File

@ -111,16 +111,15 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
@Override
public boolean allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
/* with no nodes this is pointless */
return false;
return;
}
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
boolean changed = balancer.allocateUnassigned();
changed |= balancer.moveShards();
changed |= balancer.balance();
return changed;
balancer.allocateUnassigned();
balancer.moveShards();
balancer.balance();
}
/**
@ -277,11 +276,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
/**
* Balances the nodes on the cluster model according to the weight function.
* The actual balancing is delegated to {@link #balanceByWeights()}
*
* @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code>
*/
private boolean balance() {
private void balance() {
if (logger.isTraceEnabled()) {
logger.trace("Start balancing cluster");
}
@ -294,17 +290,17 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* Therefore we only do a rebalance if we have fetched all information.
*/
logger.debug("skipping rebalance due to in-flight shard/store fetches");
return false;
return;
}
if (allocation.deciders().canRebalance(allocation).type() != Type.YES) {
logger.trace("skipping rebalance as it is disabled");
return false;
return;
}
if (nodes.size() < 2) { /* skip if we only have one node */
logger.trace("skipping rebalance as single node only");
return false;
return;
}
return balanceByWeights();
balanceByWeights();
}
public Map<DiscoveryNode, Float> weighShard(ShardRouting shard) {
@ -340,12 +336,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* the threshold is set to <tt>1.0</tt> to enforce gaining relocation
* only, or in other words relocations that move the weight delta closer
* to <tt>0.0</tt>
*
* @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code>
*/
private boolean balanceByWeights() {
boolean changed = false;
private void balanceByWeights() {
final NodeSorter sorter = newNodeSorter();
final AllocationDeciders deciders = allocation.deciders();
final ModelNode[] modelNodes = sorter.modelNodes;
@ -419,7 +411,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
sorter.sort(0, relevantNodes);
lowIdx = 0;
highIdx = relevantNodes - 1;
changed = true;
continue;
}
}
@ -441,7 +432,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
}
}
return changed;
}
/**
@ -506,14 +496,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* {@link ShardRoutingState#RELOCATING} and a shadow instance of this
* shard is created with an incremented version in the state
* {@link ShardRoutingState#INITIALIZING}.
*
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
public boolean moveShards() {
public void moveShards() {
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.
boolean changed = false;
final NodeSorter sorter = newNodeSorter();
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) {
ShardRouting shardRouting = it.next();
@ -524,20 +511,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
RoutingNode routingNode = sourceNode.getRoutingNode();
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
changed |= moveShard(sorter, shardRouting, sourceNode, routingNode);
moveShard(sorter, shardRouting, sourceNode, routingNode);
}
}
}
return changed;
}
/**
* Move started shard to the minimal eligible node with respect to the weight function
*
* @return <code>true</code> if the shard was moved successfully, otherwise <code>false</code>
*/
private boolean moveShard(NodeSorter sorter, ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) {
private void moveShard(NodeSorter sorter, ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
sorter.reset(shardRouting.getIndexName());
/*
@ -553,17 +536,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
if (allocationDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
sourceNode.removeShard(shardRouting);
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
currentNode.addShard(relocatingShards.v2());
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
}
return true;
return;
}
}
}
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
return false;
}
/**
@ -595,19 +577,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
/**
* Allocates all given shards on the minimal eligible node for the shards index
* with respect to the weight function. All given shards must be unassigned.
* @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code>
*/
private boolean allocateUnassigned() {
private void allocateUnassigned() {
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
assert !nodes.isEmpty();
if (logger.isTraceEnabled()) {
logger.trace("Start allocating unassigned shards");
}
if (unassigned.isEmpty()) {
return false;
return;
}
boolean changed = false;
/*
* TODO: We could be smarter here and group the shards by index and then
@ -651,9 +630,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final Decision decision = deciders.canAllocate(shard, allocation);
if (decision.type() == Type.NO) {
UnassignedInfo.AllocationStatus allocationStatus = UnassignedInfo.AllocationStatus.fromDecision(decision);
changed |= unassigned.ignoreShard(shard, allocationStatus);
unassigned.ignoreShard(shard, allocationStatus, allocation.changes());
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
changed |= unassigned.ignoreShard(primary[++i], allocationStatus);
unassigned.ignoreShard(primary[++i], allocationStatus, allocation.changes());
}
continue;
} else {
@ -728,9 +707,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize);
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
minNode.addShard(shard);
changed = true;
continue; // don't add to ignoreUnassigned
} else {
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
@ -754,10 +732,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
UnassignedInfo.AllocationStatus allocationStatus =
decision == null ? UnassignedInfo.AllocationStatus.DECIDERS_NO :
UnassignedInfo.AllocationStatus.fromDecision(decision);
changed |= unassigned.ignoreShard(shard, allocationStatus);
unassigned.ignoreShard(shard, allocationStatus, allocation.changes());
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
while(secondaryLength > 0 && comparator.compare(shard, secondary[secondaryLength-1]) == 0) {
changed |= unassigned.ignoreShard(secondary[--secondaryLength], allocationStatus);
unassigned.ignoreShard(secondary[--secondaryLength], allocationStatus, allocation.changes());
}
}
}
@ -768,7 +746,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
secondaryLength = 0;
} while (primaryLength > 0);
// clear everything we have either added it or moved to ignoreUnassigned
return changed;
}
/**
@ -820,7 +797,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
minNode.getNodeId());
}
/* now allocate on the cluster */
minNode.addShard(routingNodes.relocateShard(candidate, minNode.getNodeId(), shardSize).v1());
minNode.addShard(routingNodes.relocateShard(candidate, minNode.getNodeId(), shardSize, allocation.changes()).v1());
return true;
} else {
assert decision.type() == Type.THROTTLE;

View File

@ -40,9 +40,8 @@ public interface ShardsAllocator {
* - relocate shards to find a good shard balance in the cluster
*
* @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean allocate(RoutingAllocation allocation);
void allocate(RoutingAllocation allocation);
/**
* Returns a map of node to a float "weight" of where the allocator would like to place the shard.

View File

@ -207,9 +207,9 @@ public abstract class AbstractAllocateAllocationCommand implements AllocationCom
continue;
}
if (unassignedInfo != null) {
unassigned = it.updateUnassignedInfo(unassignedInfo);
unassigned = it.updateUnassignedInfo(unassignedInfo, allocation.changes());
}
it.initialize(routingNode.nodeId(), null, allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
it.initialize(routingNode.nodeId(), null, allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
return;
}
assert false : "shard to initialize not found in list of unassigned shards";

View File

@ -26,7 +26,6 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
@ -156,7 +155,7 @@ public class CancelAllocationCommand implements AllocationCommand {
}
}
routingNodes.failShard(Loggers.getLogger(CancelAllocationCommand.class), shardRouting,
new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData);
new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null), indexMetaData, allocation.changes());
return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command",
"shard " + shardId + " on node " + discoNode + " can be cancelled"));
}

View File

@ -132,7 +132,7 @@ public class MoveAllocationCommand implements AllocationCommand {
if (decision.type() == Decision.Type.THROTTLE) {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
}
allocation.routingNodes().relocateShard(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
allocation.routingNodes().relocateShard(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
}
if (!found) {

View File

@ -129,16 +129,13 @@ public class GatewayAllocator extends AbstractComponent {
}
}
public boolean allocateUnassigned(final RoutingAllocation allocation) {
boolean changed = false;
public void allocateUnassigned(final RoutingAllocation allocation) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering
changed |= primaryShardAllocator.allocateUnassigned(allocation);
changed |= replicaShardAllocator.processExistingRecoveries(allocation);
changed |= replicaShardAllocator.allocateUnassigned(allocation);
return changed;
primaryShardAllocator.allocateUnassigned(allocation);
replicaShardAllocator.processExistingRecoveries(allocation);
replicaShardAllocator.allocateUnassigned(allocation);
}
class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {

View File

@ -59,8 +59,7 @@ import java.util.stream.Collectors;
* Note that the PrimaryShardAllocator does *not* allocate primaries on index creation
* (see {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator}),
* nor does it allocate primaries when a primary shard failed and there is a valid replica
* copy that can immediately be promoted to primary, as this takes place in
* {@link RoutingNodes#failShard(ESLogger, ShardRouting, UnassignedInfo, IndexMetaData)}.
* copy that can immediately be promoted to primary, as this takes place in {@link RoutingNodes#failShard}.
*/
public abstract class PrimaryShardAllocator extends AbstractComponent {
@ -94,8 +93,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
logger.debug("using initial_shards [{}]", NODE_INITIAL_SHARDS_SETTING.get(settings));
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
public void allocateUnassigned(RoutingAllocation allocation) {
final RoutingNodes routingNodes = allocation.routingNodes();
final MetaData metaData = allocation.metaData();
@ -119,7 +117,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
if (shardState.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
allocation.setHasPendingAsyncFetch();
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA);
unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA, allocation.changes());
continue;
}
@ -160,7 +158,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
logger.debug("[{}][{}]: missing local data, recover from any node", shard.index(), shard.id());
} else {
// we can't really allocate, so ignore it and continue
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.NO_VALID_SHARD_COPY);
unassignedIterator.removeAndIgnore(AllocationStatus.NO_VALID_SHARD_COPY, allocation.changes());
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodeShardsResult.allocationsFound);
}
continue;
@ -172,8 +170,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
NodeGatewayStartedShards nodeShardState = nodesToAllocate.yesNodeShards.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode());
changed = true;
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation.changes());
} else if (nodesToAllocate.throttleNodeShards.isEmpty() == true && nodesToAllocate.noNodeShards.isEmpty() == false) {
// The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard
// can be force-allocated to one of the nodes.
@ -184,24 +181,22 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
NodeGatewayStartedShards nodeShardState = nodesToForceAllocate.yesNodeShards.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
shard.index(), shard.id(), shard, nodeShardState.getNode());
changed = true;
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(),
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation.changes());
} else if (nodesToForceAllocate.throttleNodeShards.isEmpty() == false) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation",
shard.index(), shard.id(), shard, nodesToForceAllocate.throttleNodeShards);
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED);
unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED, allocation.changes());
} else {
logger.debug("[{}][{}]: forced primary allocation denied [{}]", shard.index(), shard.id(), shard);
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_NO);
unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_NO, allocation.changes());
}
} else {
// we are throttling this, but we have enough to allocate to this node, ignore it for now
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodeShards);
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED);
unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED, allocation.changes());
}
}
return changed;
}
/**

View File

@ -31,11 +31,10 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -60,8 +59,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
* match. Today, a better match is one that has full sync id match compared to not having one in
* the previous recovery.
*/
public boolean processExistingRecoveries(RoutingAllocation allocation) {
boolean changed = false;
public void processExistingRecoveries(RoutingAllocation allocation) {
MetaData metaData = allocation.metaData();
RoutingNodes routingNodes = allocation.routingNodes();
List<Runnable> shardCancellationActions = new ArrayList<>();
@ -121,8 +119,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ nodeWithHighestMatch + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT);
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, indexMetaData));
changed = true;
shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, indexMetaData, allocation.changes()));
}
}
}
@ -130,11 +127,9 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
for (Runnable action : shardCancellationActions) {
action.run();
}
return changed;
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
public void allocateUnassigned(RoutingAllocation allocation) {
final RoutingNodes routingNodes = allocation.routingNodes();
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
MetaData metaData = allocation.metaData();
@ -154,7 +149,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
Decision decision = canBeAllocatedToAtLeastOneNode(shard, allocation);
if (decision.type() != Decision.Type.YES) {
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
changed |= unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision));
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision), allocation.changes());
continue;
}
@ -162,7 +157,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
if (shardStores.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard stores", shard);
allocation.setHasPendingAsyncFetch();
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA);
unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA, allocation.changes());
continue; // still fetching
}
@ -187,19 +182,17 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
if (decision.type() == Decision.Type.THROTTLE) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
// we are throttling this, but we have enough to allocate to this node, ignore it for now
changed |= unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision));
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision), allocation.changes());
} else {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
// we found a match
changed = true;
unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), null, allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), null, allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
}
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed
changed |= ignoreUnassignedIfDelayed(unassignedIterator, shard);
ignoreUnassignedIfDelayed(unassignedIterator, shard, allocation.changes());
}
}
return changed;
}
/**
@ -212,14 +205,12 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
*
* @param unassignedIterator iterator over unassigned shards
* @param shard the shard which might be delayed
* @return true iff there was a change to the unassigned info
*/
public boolean ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
public void ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard, RoutingChangesObserver changes) {
if (shard.unassignedInfo().isDelayed()) {
logger.debug("{}: allocation of [{}] is delayed", shard.shardId(), shard);
return unassignedIterator.removeAndIgnore(AllocationStatus.DELAYED_ALLOCATION);
unassignedIterator.removeAndIgnore(AllocationStatus.DELAYED_ALLOCATION, changes);
}
return false;
}
/**

View File

@ -51,8 +51,8 @@ public class ClusterModuleTests extends ModuleTestCase {
static class FakeShardsAllocator implements ShardsAllocator {
@Override
public boolean allocate(RoutingAllocation allocation) {
return false;
public void allocate(RoutingAllocation allocation) {
// noop
}
@Override

View File

@ -326,52 +326,50 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
--------[test][3], node[3], [P], s[STARTED]
---- unassigned
*/
public boolean allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
boolean changed = !unassigned.isEmpty();
ShardRouting[] drain = unassigned.drain();
ArrayUtil.timSort(drain, (a, b) -> { return a.primary() ? -1 : 1; }); // we have to allocate primaries first
for (ShardRouting sr : drain) {
switch (sr.id()) {
case 0:
if (sr.primary()) {
allocation.routingNodes().initializeShard(sr, "node1", null, -1);
allocation.routingNodes().initializeShard(sr, "node1", null, -1, allocation.changes());
} else {
allocation.routingNodes().initializeShard(sr, "node0", null, -1);
allocation.routingNodes().initializeShard(sr, "node0", null, -1, allocation.changes());
}
break;
case 1:
if (sr.primary()) {
allocation.routingNodes().initializeShard(sr, "node1", null, -1);
allocation.routingNodes().initializeShard(sr, "node1", null, -1, allocation.changes());
} else {
allocation.routingNodes().initializeShard(sr, "node2", null, -1);
allocation.routingNodes().initializeShard(sr, "node2", null, -1, allocation.changes());
}
break;
case 2:
if (sr.primary()) {
allocation.routingNodes().initializeShard(sr, "node3", null, -1);
allocation.routingNodes().initializeShard(sr, "node3", null, -1, allocation.changes());
} else {
allocation.routingNodes().initializeShard(sr, "node2", null, -1);
allocation.routingNodes().initializeShard(sr, "node2", null, -1, allocation.changes());
}
break;
case 3:
if (sr.primary()) {
allocation.routingNodes().initializeShard(sr, "node3", null, -1);
allocation.routingNodes().initializeShard(sr, "node3", null, -1, allocation.changes());
} else {
allocation.routingNodes().initializeShard(sr, "node1", null, -1);
allocation.routingNodes().initializeShard(sr, "node1", null, -1, allocation.changes());
}
break;
case 4:
if (sr.primary()) {
allocation.routingNodes().initializeShard(sr, "node2", null, -1);
allocation.routingNodes().initializeShard(sr, "node2", null, -1, allocation.changes());
} else {
allocation.routingNodes().initializeShard(sr, "node0", null, -1);
allocation.routingNodes().initializeShard(sr, "node0", null, -1, allocation.changes());
}
break;
}
}
return changed;
}
}, EmptyClusterInfoService.INSTANCE);
MetaData.Builder metaDataBuilder = MetaData.builder();

View File

@ -636,19 +636,19 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
AllocationService strategy = createAllocationService(Settings.EMPTY, new NoopGatewayAllocator() {
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
public void allocateUnassigned(RoutingAllocation allocation) {
if (allocateTest1.get() == false) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassigned.iterator();
while (iterator.hasNext()) {
ShardRouting next = iterator.next();
if ("test1".equals(next.index().getName())) {
iterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT);
iterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
}
}
}
return super.allocateUnassigned(allocation);
super.allocateUnassigned(allocation);
}
});
@ -742,11 +742,11 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
AllocationService strategy = createAllocationService(Settings.builder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(),
ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoopGatewayAllocator() {
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
public void allocateUnassigned(RoutingAllocation allocation) {
if (hasFetches.get()) {
allocation.setHasPendingAsyncFetch();
}
return super.allocateUnassigned(allocation);
super.allocateUnassigned(allocation);
}
});

View File

@ -82,8 +82,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), true, Version.V_2_1_0);
}
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().iterator().next().shardId(), equalTo(shardId));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -99,8 +99,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0);
}
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -118,8 +118,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0);
}
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -131,8 +131,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testNoMatchingAllocationIdFound() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "id1", randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -145,8 +145,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testNoActiveAllocationIds() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
testAllocator.addData(node1, 1, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
@ -165,8 +165,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test"));
}
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -185,8 +185,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0);
testAllocator.addData(node1, 3, null, randomBoolean());
}
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
@ -210,8 +210,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
new TestAllocateDecision(randomBoolean() ? Decision.YES : Decision.NO), getNoDeciderThatAllowsForceAllocate()
});
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1");
boolean changed = testAllocator.allocateUnassigned(allocation);
assertTrue(changed);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertTrue(allocation.routingNodes().unassigned().ignored().isEmpty());
assertEquals(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), 1);
assertEquals(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), node1.getId());
@ -233,8 +233,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
getNoDeciderThatThrottlesForceAllocate()
});
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1");
boolean changed = testAllocator.allocateUnassigned(allocation);
assertTrue(changed);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored();
assertEquals(ignored.size(), 1);
assertEquals(ignored.get(0).unassignedInfo().getLastAllocationStatus(),
@ -257,8 +257,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
new TestAllocateDecision(Decision.THROTTLE), getNoDeciderThatAllowsForceAllocate()
});
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1");
boolean changed = testAllocator.allocateUnassigned(allocation);
assertTrue(changed);
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored();
assertEquals(ignored.size(), 1);
assertEquals(ignored.get(0).unassignedInfo().getLastAllocationStatus(), AllocationStatus.DECIDERS_THROTTLED);
@ -275,8 +275,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
boolean node1HasPrimaryShard = randomBoolean();
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard);
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
DiscoveryNode allocatedNode = node1HasPrimaryShard ? node1 : node2;
@ -297,8 +297,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0);
testAllocator.addData(node1, 3, null, randomBoolean());
}
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -317,8 +317,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0);
testAllocator.addData(node1, 3, null, randomBoolean());
}
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
@ -331,8 +331,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testAllocateToTheHighestVersionOnLegacyIndex() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0);
testAllocator.addData(node1, 10, null, randomBoolean()).addData(node2, 12, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
@ -350,8 +350,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
testAllocator.addData(node1, 10, null, randomBoolean());
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some allocId", randomBoolean());
testAllocator.addData(node3, 12, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
@ -370,8 +370,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false;
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), clusterHasActiveAllocationIds);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -388,8 +388,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false;
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders(), clusterHasActiveAllocationIds);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
@ -405,8 +405,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false;
RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders(), clusterHasActiveAllocationIds);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -419,8 +419,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testRestoreDoesNotAssignIfNoShardAvailable() {
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, false);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -454,8 +454,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1;
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), hasActiveAllocation);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
@ -471,8 +471,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1;
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(throttleAllocationDeciders(), hasActiveAllocation);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
@ -487,8 +487,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1;
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders(), hasActiveAllocation);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
@ -501,8 +501,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
@ -544,8 +544,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
@ -553,8 +553,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
testAllocator.addData(node1, 1, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
@ -562,8 +562,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
testAllocator.addData(node2, 1, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
@ -588,8 +588,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
@ -597,8 +597,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
testAllocator.addData(node1, 1, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
@ -606,8 +606,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
testAllocator.addData(node2, 2, null, randomBoolean());
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state, null, System.nanoTime(), false);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));

View File

@ -234,16 +234,16 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
// we sometime return empty list of files, make sure we test this as well
testAllocator.addData(node2, null);
}
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT);
testAllocator.addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
}
@ -253,8 +253,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node2, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node3, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
boolean changed = testAllocator.processExistingRecoveries(allocation);
assertThat(changed, equalTo(true));
testAllocator.processExistingRecoveries(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId));
}
@ -264,8 +264,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node3, randomBoolean() ? "MATCH" : "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
boolean changed = testAllocator.processExistingRecoveries(allocation);
assertThat(changed, equalTo(false));
testAllocator.processExistingRecoveries(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
}
@ -273,8 +273,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"))
.addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
boolean changed = testAllocator.processExistingRecoveries(allocation);
assertThat(changed, equalTo(false));
testAllocator.processExistingRecoveries(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0));
}

View File

@ -239,7 +239,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
public void applyFailedShards(FailedRerouteAllocation allocation) {}
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
public void allocateUnassigned(RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
@ -247,9 +247,8 @@ public abstract class ESAllocationTestCase extends ESTestCase {
if (shard.primary() || shard.allocatedPostIndexCreate(indexMetaData) == false) {
continue;
}
replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);
replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard, allocation.changes());
}
return false;
}
}
}

View File

@ -47,7 +47,7 @@ public class NoopGatewayAllocator extends GatewayAllocator {
}
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
return false;
public void allocateUnassigned(RoutingAllocation allocation) {
// noop
}
}