Mark shard as stale on non-replicated write, not on node shutdown (#20023)

Non-stale shard copies are currently tracked using their allocation ids in the cluster state. When a node leaves the cluster, shard copies of that node are marked as stale by removing their allocation ids from the active set in the cluster. For full cluster restarts, this can have the unwanted effect that only the last node holding a copy of the shard will be seen as non-stale. The other shard copies are not really stale though as long as no writes have happened on this shard copy. Shard copies should thus only be marked as stale (by the master in the cluster state) if other active shards have received writes.

This commit implements the above logic and also renames the persistent structure used to track non-stale shard copies from "active_allocations" to "in_sync_allocations" as we now also support tracking non-stale shard copies that have no active routing entries in the cluster state.
This commit is contained in:
Yannick Welsch 2016-08-26 10:09:57 +02:00 committed by GitHub
parent c5f8e1b64d
commit 6fe9ae29ea
32 changed files with 855 additions and 286 deletions

View File

@ -258,7 +258,7 @@ public class TransportClusterAllocationExplainAction
Float weight = weights.get(node);
IndicesShardStoresResponse.StoreStatus storeStatus = nodeToStatus.get(node);
NodeExplanation nodeExplanation = calculateNodeExplanation(shard, indexMetaData, node, decision, weight,
storeStatus, shard.currentNodeId(), indexMetaData.activeAllocationIds(shard.getId()),
storeStatus, shard.currentNodeId(), indexMetaData.inSyncAllocationIds(shard.getId()),
allocation.hasPendingAsyncFetch());
explanations.put(node, nodeExplanation);
}

View File

@ -25,11 +25,14 @@ import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
@ -40,10 +43,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
@ -65,7 +71,7 @@ public class ReplicationOperation<
* operations and the primary finishes.</li>
* </ul>
*/
private final AtomicInteger pendingShards = new AtomicInteger();
private final AtomicInteger pendingActions = new AtomicInteger();
private final AtomicInteger successfulShards = new AtomicInteger();
private final boolean executeOnReplicas;
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
@ -102,7 +108,7 @@ public class ReplicationOperation<
}
totalShards.incrementAndGet();
pendingShards.incrementAndGet();
pendingActions.incrementAndGet();
primaryResult = primary.perform(request);
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
@ -110,19 +116,45 @@ public class ReplicationOperation<
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
performOnReplicas(primaryId, replicaRequest);
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
ClusterState clusterState = clusterStateSupplier.get();
final List<ShardRouting> shards = getShards(primaryId, clusterState);
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);
markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);
performOnReplicas(replicaRequest, shards);
successfulShards.incrementAndGet();
decPendingAndFinishIfNeeded();
}
private void performOnReplicas(ShardId primaryId, ReplicaRequest replicaRequest) {
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
// If the index gets deleted after primary operation, we skip replication
final List<ShardRouting> shards = getShards(primaryId, clusterStateSupplier.get());
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<String> inSyncAllocationIds, List<ShardRouting> shards) {
if (inSyncAllocationIds.isEmpty() == false && shards.isEmpty() == false) {
Set<String> availableAllocationIds = shards.stream()
.map(ShardRouting::allocationId)
.filter(Objects::nonNull)
.map(AllocationId::getId)
.collect(Collectors.toSet());
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
for (String allocationId : Sets.difference(inSyncAllocationIds, availableAllocationIds)) {
// mark copy as stale
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStale(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
);
}
}
}
private void performOnReplicas(ReplicaRequest replicaRequest, List<ShardRouting> shards) {
final String localNodeId = primary.routingEntry().currentNodeId();
// If the index gets deleted after primary operation, we skip replication
for (final ShardRouting shard : shards) {
if (executeOnReplicas == false || shard.unassigned()) {
if (shard.primary() == false) {
@ -147,7 +179,7 @@ public class ReplicationOperation<
}
totalShards.incrementAndGet();
pendingShards.incrementAndGet();
pendingActions.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, new ActionListener<TransportResponse.Empty>() {
@Override
public void onResponse(TransportResponse.Empty empty) {
@ -222,6 +254,14 @@ public class ReplicationOperation<
}
}
protected Set<String> getInSyncAllocationIds(ShardId shardId, ClusterState clusterState) {
IndexMetaData indexMetaData = clusterState.metaData().index(shardId.getIndex());
if (indexMetaData != null) {
return indexMetaData.inSyncAllocationIds(shardId.id());
}
return Collections.emptySet();
}
protected List<ShardRouting> getShards(ShardId shardId, ClusterState state) {
// can be null if the index is deleted / closed on us..
final IndexShardRoutingTable shardRoutingTable = state.getRoutingTable().shardRoutingTableOrNull(shardId);
@ -230,8 +270,8 @@ public class ReplicationOperation<
}
private void decPendingAndFinishIfNeeded() {
assert pendingShards.get() > 0;
if (pendingShards.decrementAndGet() == 0) {
assert pendingActions.get() > 0;
if (pendingActions.decrementAndGet() == 0) {
finish();
}
}
@ -337,6 +377,20 @@ public class ReplicationOperation<
*/
void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
/**
* Marks shard copy as stale, removing its allocation id from the set of in-sync allocation ids.
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param primaryTerm the primary term of the primary shard when requesting the failure
* @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set.
* @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted
* by the master.
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
*/
void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
}
public static class RetryOnPrimaryException extends ElasticsearchException {

View File

@ -867,29 +867,39 @@ public abstract class TransportReplicationAction<
@Override
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onFailure, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(
replica, primaryTerm, message, exception,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onSuccess.run();
}
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onFailure.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
@Override
public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onSuccess.run();
}
@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onPrimaryDemoted.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
}
);
};
}
}

View File

@ -292,7 +292,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
sb.append(TAB).append(TAB).append(shard).append(": ");
sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");
sb.append("a_ids ").append(indexMetaData.activeAllocationIds(shard)).append("\n");
sb.append("a_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
}
}
sb.append(blocks().prettyPrint());
@ -501,8 +501,8 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
}
builder.endObject();
builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS);
for (IntObjectCursor<Set<String>> cursor : indexMetaData.getActiveAllocationIds()) {
builder.startObject(IndexMetaData.KEY_IN_SYNC_ALLOCATIONS);
for (IntObjectCursor<Set<String>> cursor : indexMetaData.getInSyncAllocationIds()) {
builder.startArray(String.valueOf(cursor.key));
for (String allocationId : cursor.value) {
builder.value(allocationId);

View File

@ -127,29 +127,32 @@ public class ShardStateAction extends AbstractComponent {
}
/**
* Send a shard failed request to the master node to update the cluster state with the failure of a shard on another node.
* Send a shard failed request to the master node to update the cluster state with the failure of a shard on another node. This means
* that the shard should be failed because a write made it into the primary but was not replicated to this shard copy. If the shard
* does not exist anymore but still has an entry in the in-sync set, remove its allocation id from the in-sync set.
*
* @param shardRouting the shard to fail
* @param primaryTerm the primary term associated with the primary shard that is failing the shard.
* @param shardId shard id of the shard to fail
* @param allocationId allocation id of the shard to fail
* @param primaryTerm the primary term associated with the primary shard that is failing the shard. Must be strictly positive.
* @param message the reason for the failure
* @param failure the underlying cause of the failure
* @param listener callback upon completion of the request
*/
public void remoteShardFailed(final ShardRouting shardRouting, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) {
public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) {
assert primaryTerm > 0L : "primary term should be strictly positive";
shardFailed(shardRouting, primaryTerm, message, failure, listener);
shardFailed(shardId, allocationId, primaryTerm, message, failure, listener);
}
/**
* Send a shard failed request to the master node to update the cluster state when a shard on the local node failed.
*/
public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure, Listener listener) {
shardFailed(shardRouting, 0L, message, failure, listener);
shardFailed(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, listener);
}
private void shardFailed(final ShardRouting shardRouting, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) {
private void shardFailed(final ShardId shardId, String allocationId, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
ShardEntry shardEntry = new ShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message, failure);
ShardEntry shardEntry = new ShardEntry(shardId, allocationId, primaryTerm, message, failure);
sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardEntry, listener);
}
@ -248,16 +251,23 @@ public class ShardStateAction extends AbstractComponent {
BatchResult.Builder<ShardEntry> batchResultBuilder = BatchResult.builder();
List<ShardEntry> tasksToBeApplied = new ArrayList<>();
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>();
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
List<FailedRerouteAllocation.StaleShard> staleShardsToBeApplied = new ArrayList<>();
for (ShardEntry task : tasks) {
IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex());
if (indexMetaData == null) {
// tasks that correspond to non-existent shards are marked as successful
// tasks that correspond to non-existent indices are marked as successful
logger.debug("{} ignoring shard failed task [{}] (unknown index {})", task.shardId, task, task.shardId.getIndex());
batchResultBuilder.success(task);
} else {
// non-local requests
// The primary term is 0 if the shard failed itself. It is > 0 if a write was done on a primary but was failed to be
// replicated to the shard copy with the provided allocation id. In case where the shard failed itself, it's ok to just
// remove the corresponding routing entry from the routing table. In case where a write could not be replicated,
// however, it is important to ensure that the shard copy with the missing write is considered as stale from that point
// on, which is implemented by removing the allocation id of the shard copy from the in-sync allocations set.
// We check here that the primary to which the write happened was not already failed in an earlier cluster state update.
// This prevents situations where a new primary has already been selected and replication failures from an old stale
// primary unnecessarily fail currently active shards.
if (task.primaryTerm > 0) {
long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id());
if (currentPrimaryTerm != task.primaryTerm) {
@ -274,28 +284,32 @@ public class ShardStateAction extends AbstractComponent {
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful
logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", task.shardId, task);
batchResultBuilder.success(task);
} else {
// remove duplicate actions as allocation service expects a clean list without duplicates
if (seenShardRoutings.contains(matched)) {
logger.trace("{} ignoring shard failed task [{}] (already scheduled to fail {})", task.shardId, task, matched);
Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(task.shardId.id());
// mark shard copies without routing entries that are in in-sync allocations set only as stale if the reason why
// they were failed is because a write made it into the primary but not to this copy (which corresponds to
// the check "primaryTerm > 0").
if (task.primaryTerm > 0 && inSyncAllocationIds.contains(task.allocationId)) {
logger.debug("{} marking shard {} as stale (shard failed task: [{}])", task.shardId, task.allocationId, task);
tasksToBeApplied.add(task);
staleShardsToBeApplied.add(new FailedRerouteAllocation.StaleShard(task.shardId, task.allocationId));
} else {
logger.debug("{} failing shard {} (shard failed task: [{}])", task.shardId, matched, task);
tasksToBeApplied.add(task);
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(matched, task.message, task.failure));
seenShardRoutings.add(matched);
// tasks that correspond to non-existent shards are marked as successful
logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", task.shardId, task);
batchResultBuilder.success(task);
}
} else {
// failing a shard also possibly marks it as stale (see IndexMetaDataUpdater)
logger.debug("{} failing shard {} (shard failed task: [{}])", task.shardId, matched, task);
tasksToBeApplied.add(task);
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(matched, task.message, task.failure));
}
}
}
assert tasksToBeApplied.size() >= shardRoutingsToBeApplied.size();
assert tasksToBeApplied.size() == shardRoutingsToBeApplied.size() + staleShardsToBeApplied.size();
ClusterState maybeUpdatedState = currentState;
try {
RoutingAllocation.Result result = applyFailedShards(currentState, shardRoutingsToBeApplied);
RoutingAllocation.Result result = applyFailedShards(currentState, shardRoutingsToBeApplied, staleShardsToBeApplied);
if (result.changed()) {
maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
}
@ -311,8 +325,9 @@ public class ShardStateAction extends AbstractComponent {
}
// visible for testing
RoutingAllocation.Result applyFailedShards(ClusterState currentState, List<FailedRerouteAllocation.FailedShard> failedShards) {
return allocationService.applyFailedShards(currentState, failedShards);
RoutingAllocation.Result applyFailedShards(ClusterState currentState, List<FailedRerouteAllocation.FailedShard> failedShards,
List<FailedRerouteAllocation.StaleShard> staleShards) {
return allocationService.applyFailedShards(currentState, failedShards, staleShards);
}
@Override

View File

@ -232,7 +232,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
public static final String KEY_ACTIVE_ALLOCATIONS = "active_allocations";
public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
static final String KEY_VERSION = "version";
static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
static final String KEY_SETTINGS = "settings";
@ -262,7 +262,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
private final ImmutableOpenMap<String, Custom> customs;
private final ImmutableOpenIntMap<Set<String>> activeAllocationIds;
private final ImmutableOpenIntMap<Set<String>> inSyncAllocationIds;
private final transient int totalNumberOfShards;
@ -279,7 +279,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> activeAllocationIds,
ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> inSyncAllocationIds,
DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion,
int routingNumShards, ActiveShardCount waitForActiveShards) {
@ -296,7 +296,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
this.mappings = mappings;
this.customs = customs;
this.aliases = aliases;
this.activeAllocationIds = activeAllocationIds;
this.inSyncAllocationIds = inSyncAllocationIds;
this.requireFilters = requireFilters;
this.includeFilters = includeFilters;
this.excludeFilters = excludeFilters;
@ -340,7 +340,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
* a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary.
*
* Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard
* that can be indexed into) is larger than 0. See {@link IndexMetaDataUpdater#applyChanges(MetaData)}.
* that can be indexed into) is larger than 0. See {@link IndexMetaDataUpdater#applyChanges}.
**/
public long primaryTerm(int shardId) {
return this.primaryTerms[shardId];
@ -447,13 +447,13 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
return (T) customs.get(type);
}
public ImmutableOpenIntMap<Set<String>> getActiveAllocationIds() {
return activeAllocationIds;
public ImmutableOpenIntMap<Set<String>> getInSyncAllocationIds() {
return inSyncAllocationIds;
}
public Set<String> activeAllocationIds(int shardId) {
public Set<String> inSyncAllocationIds(int shardId) {
assert shardId >= 0 && shardId < numberOfShards;
return activeAllocationIds.get(shardId);
return inSyncAllocationIds.get(shardId);
}
@Nullable
@ -518,7 +518,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
if (Arrays.equals(primaryTerms, that.primaryTerms) == false) {
return false;
}
if (!activeAllocationIds.equals(that.activeAllocationIds)) {
if (!inSyncAllocationIds.equals(that.inSyncAllocationIds)) {
return false;
}
return true;
@ -536,7 +536,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
result = 31 * result + Long.hashCode(routingFactor);
result = 31 * result + Long.hashCode(routingNumShards);
result = 31 * result + Arrays.hashCode(primaryTerms);
result = 31 * result + activeAllocationIds.hashCode();
result = 31 * result + inSyncAllocationIds.hashCode();
return result;
}
@ -573,7 +573,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
private final Diff<ImmutableOpenMap<String, MappingMetaData>> mappings;
private final Diff<ImmutableOpenMap<String, AliasMetaData>> aliases;
private final Diff<ImmutableOpenMap<String, Custom>> customs;
private final Diff<ImmutableOpenIntMap<Set<String>>> activeAllocationIds;
private final Diff<ImmutableOpenIntMap<Set<String>>> inSyncAllocationIds;
public IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) {
index = after.index.getName();
@ -585,7 +585,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
mappings = DiffableUtils.diff(before.mappings, after.mappings, DiffableUtils.getStringKeySerializer());
aliases = DiffableUtils.diff(before.aliases, after.aliases, DiffableUtils.getStringKeySerializer());
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
activeAllocationIds = DiffableUtils.diff(before.activeAllocationIds, after.activeAllocationIds,
inSyncAllocationIds = DiffableUtils.diff(before.inSyncAllocationIds, after.inSyncAllocationIds,
DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance());
}
@ -610,7 +610,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
return lookupPrototypeSafe(key).readDiffFrom(in);
}
});
activeAllocationIds = DiffableUtils.readImmutableOpenIntMapDiff(in, DiffableUtils.getVIntKeySerializer(),
inSyncAllocationIds = DiffableUtils.readImmutableOpenIntMapDiff(in, DiffableUtils.getVIntKeySerializer(),
DiffableUtils.StringSetValueSerializer.getInstance());
}
@ -625,7 +625,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
mappings.writeTo(out);
aliases.writeTo(out);
customs.writeTo(out);
activeAllocationIds.writeTo(out);
inSyncAllocationIds.writeTo(out);
}
@Override
@ -639,7 +639,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
builder.mappings.putAll(mappings.apply(part.mappings));
builder.aliases.putAll(aliases.apply(part.aliases));
builder.customs.putAll(customs.apply(part.customs));
builder.activeAllocationIds.putAll(activeAllocationIds.apply(part.activeAllocationIds));
builder.inSyncAllocationIds.putAll(inSyncAllocationIds.apply(part.inSyncAllocationIds));
return builder.build();
}
}
@ -668,11 +668,11 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
Custom customIndexMetaData = lookupPrototypeSafe(type).readFrom(in);
builder.putCustom(type, customIndexMetaData);
}
int activeAllocationIdsSize = in.readVInt();
for (int i = 0; i < activeAllocationIdsSize; i++) {
int inSyncAllocationIdsSize = in.readVInt();
for (int i = 0; i < inSyncAllocationIdsSize; i++) {
int key = in.readVInt();
Set<String> allocationIds = DiffableUtils.StringSetValueSerializer.getInstance().read(in, key);
builder.putActiveAllocationIds(key, allocationIds);
builder.putInSyncAllocationIds(key, allocationIds);
}
return builder.build();
}
@ -698,8 +698,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
out.writeString(cursor.key);
cursor.value.writeTo(out);
}
out.writeVInt(activeAllocationIds.size());
for (IntObjectCursor<Set<String>> cursor : activeAllocationIds) {
out.writeVInt(inSyncAllocationIds.size());
for (IntObjectCursor<Set<String>> cursor : inSyncAllocationIds) {
out.writeVInt(cursor.key);
DiffableUtils.StringSetValueSerializer.getInstance().write(cursor.value, out);
}
@ -723,7 +723,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
private final ImmutableOpenMap.Builder<String, MappingMetaData> mappings;
private final ImmutableOpenMap.Builder<String, AliasMetaData> aliases;
private final ImmutableOpenMap.Builder<String, Custom> customs;
private final ImmutableOpenIntMap.Builder<Set<String>> activeAllocationIds;
private final ImmutableOpenIntMap.Builder<Set<String>> inSyncAllocationIds;
private Integer routingNumShards;
public Builder(String index) {
@ -731,7 +731,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
this.mappings = ImmutableOpenMap.builder();
this.aliases = ImmutableOpenMap.builder();
this.customs = ImmutableOpenMap.builder();
this.activeAllocationIds = ImmutableOpenIntMap.builder();
this.inSyncAllocationIds = ImmutableOpenIntMap.builder();
}
public Builder(IndexMetaData indexMetaData) {
@ -744,7 +744,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
this.aliases = ImmutableOpenMap.builder(indexMetaData.aliases);
this.customs = ImmutableOpenMap.builder(indexMetaData.customs);
this.routingNumShards = indexMetaData.routingNumShards;
this.activeAllocationIds = ImmutableOpenIntMap.builder(indexMetaData.activeAllocationIds);
this.inSyncAllocationIds = ImmutableOpenIntMap.builder(indexMetaData.inSyncAllocationIds);
}
public String index() {
@ -854,13 +854,13 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
return this;
}
public Builder putActiveAllocationIds(int shardId, Set<String> allocationIds) {
activeAllocationIds.put(shardId, new HashSet(allocationIds));
return this;
public Set<String> getInSyncAllocationIds(int shardId) {
return inSyncAllocationIds.get(shardId);
}
public Set<String> getActiveAllocationIds(int shardId) {
return activeAllocationIds.get(shardId);
public Builder putInSyncAllocationIds(int shardId, Set<String> allocationIds) {
inSyncAllocationIds.put(shardId, new HashSet(allocationIds));
return this;
}
public long version() {
@ -938,13 +938,13 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
throw new IllegalArgumentException("must specify non-negative number of shards for index [" + index + "]");
}
// fill missing slots in activeAllocationIds with empty set if needed and make all entries immutable
ImmutableOpenIntMap.Builder<Set<String>> filledActiveAllocationIds = ImmutableOpenIntMap.builder();
// fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable
ImmutableOpenIntMap.Builder<Set<String>> filledInSyncAllocationIds = ImmutableOpenIntMap.builder();
for (int i = 0; i < numberOfShards; i++) {
if (activeAllocationIds.containsKey(i)) {
filledActiveAllocationIds.put(i, Collections.unmodifiableSet(new HashSet<>(activeAllocationIds.get(i))));
if (inSyncAllocationIds.containsKey(i)) {
filledInSyncAllocationIds.put(i, Collections.unmodifiableSet(new HashSet<>(inSyncAllocationIds.get(i))));
} else {
filledActiveAllocationIds.put(i, Collections.emptySet());
filledInSyncAllocationIds.put(i, Collections.emptySet());
}
}
final Map<String, String> requireMap = INDEX_ROUTING_REQUIRE_GROUP_SETTING.get(settings).getAsMap();
@ -1005,7 +1005,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
tmpAliases.build(), customs.build(), filledInSyncAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), waitForActiveShards);
}
@ -1056,8 +1056,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
}
builder.endArray();
builder.startObject(KEY_ACTIVE_ALLOCATIONS);
for (IntObjectCursor<Set<String>> cursor : indexMetaData.activeAllocationIds) {
builder.startObject(KEY_IN_SYNC_ALLOCATIONS);
for (IntObjectCursor<Set<String>> cursor : indexMetaData.inSyncAllocationIds) {
builder.startArray(String.valueOf(cursor.key));
for (String allocationId : cursor.value) {
builder.value(allocationId);
@ -1108,7 +1108,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
builder.putAlias(AliasMetaData.Builder.fromXContent(parser));
}
} else if (KEY_ACTIVE_ALLOCATIONS.equals(currentFieldName)) {
} else if (KEY_IN_SYNC_ALLOCATIONS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@ -1120,7 +1120,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
allocationIds.add(parser.text());
}
}
builder.putActiveAllocationIds(Integer.valueOf(shardId), allocationIds);
builder.putInSyncAllocationIds(Integer.valueOf(shardId), allocationIds);
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}

View File

@ -129,10 +129,10 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
"from the routing table");
}
if (shardRouting.active() &&
indexMetaData.activeAllocationIds(shardRouting.id()).contains(shardRouting.allocationId().getId()) == false) {
throw new IllegalStateException("active shard routing " + shardRouting + " has no corresponding entry in the " +
"in-sync allocation set " + indexMetaData.activeAllocationIds(shardRouting.id()));
}
indexMetaData.inSyncAllocationIds(shardRouting.id()).contains(shardRouting.allocationId().getId()) == false) {
throw new IllegalStateException("active shard routing " + shardRouting + " has no corresponding entry in the in-sync " +
"allocation set " + indexMetaData.inSyncAllocationIds(shardRouting.id()));
}
}
}
return true;

View File

@ -26,7 +26,7 @@ public interface RoutingChangesObserver {
/**
* Called when unassigned shard is initialized. Does not include initializing relocation target shards.
*/
void shardInitialized(ShardRouting unassignedShard);
void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard);
/**
* Called when an initializing shard is started.
@ -77,7 +77,7 @@ public interface RoutingChangesObserver {
class AbstractRoutingChangesObserver implements RoutingChangesObserver {
@Override
public void shardInitialized(ShardRouting unassignedShard) {
public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) {
}
@ -131,9 +131,9 @@ public interface RoutingChangesObserver {
}
@Override
public void shardInitialized(ShardRouting unassignedShard) {
public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.shardInitialized(unassignedShard);
routingChangesObserver.shardInitialized(unassignedShard, initializedShard);
}
}

View File

@ -420,7 +420,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
addRecovery(initializedShard);
assignedShardsAdd(initializedShard);
routingChangesObserver.shardInitialized(unassignedShard);
routingChangesObserver.shardInitialized(unassignedShard, initializedShard);
return initializedShard;
}

View File

@ -255,7 +255,7 @@ public final class ShardRouting implements Writeable, ToXContent {
return false;
}
if (indexMetaData.activeAllocationIds(id()).isEmpty() && indexMetaData.getCreationVersion().onOrAfter(Version.V_5_0_0_alpha1)) {
if (indexMetaData.inSyncAllocationIds(id()).isEmpty() && indexMetaData.getCreationVersion().onOrAfter(Version.V_5_0_0_alpha1)) {
// when no shards with this id have ever been active for this index
return false;
}

View File

@ -111,7 +111,7 @@ public class AllocationService extends AbstractComponent {
RoutingTable oldRoutingTable = allocation.routingTable();
RoutingNodes newRoutingNodes = allocation.routingNodes();
final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build();
MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges();
MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable);
assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata
logClusterHealthStateChange(
new ClusterStateHealth(ClusterState.builder(clusterName).
@ -124,20 +124,29 @@ public class AllocationService extends AbstractComponent {
}
public Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
return applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(failedShard, null, null)));
return applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(failedShard, null, null)),
Collections.emptyList());
}
public Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
return applyFailedShards(clusterState, failedShards, Collections.emptyList());
}
/**
* Applies the failed shards. Note, only assigned ShardRouting instances that exist in the routing table should be
* provided as parameter and no duplicates should be contained.
* provided as parameter. Also applies a list of allocation ids to remove from the in-sync set for shard copies for which there
* are no routing entries in the routing table.
*
* <p>
* If the same instance of the routing table is returned, then no change has been made.</p>
*/
public Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
if (failedShards.isEmpty()) {
public Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards,
List<FailedRerouteAllocation.StaleShard> staleShards) {
if (staleShards.isEmpty() && failedShards.isEmpty()) {
return Result.unchanged(clusterState);
}
clusterState = IndexMetaDataUpdater.removeStaleIdsWithoutRoutings(clusterState, staleShards);
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();

View File

@ -58,6 +58,21 @@ public class FailedRerouteAllocation extends RoutingAllocation {
}
}
public static class StaleShard {
public final ShardId shardId;
public final String allocationId;
public StaleShard(ShardId shardId, String allocationId) {
this.shardId = shardId;
this.allocationId = allocationId;
}
@Override
public String toString() {
return "stale shard, shard " + shardId + ", alloc. id [" + allocationId + "]";
}
}
private final List<FailedShard> failedShards;
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState,

View File

@ -19,15 +19,20 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation.StaleShard;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -39,7 +44,7 @@ import java.util.stream.Collectors;
* Observer that tracks changes made to RoutingNodes in order to update the primary terms and in-sync allocation ids in
* {@link IndexMetaData} once the allocation round has completed.
*
* Primary terms are updated on primary initialization or primary promotion.
* Primary terms are updated on primary initialization or when an active primary fails.
*
* Allocation ids are added for shards that become active and removed for shards that stop being active.
*/
@ -47,15 +52,16 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
private final Map<ShardId, Updates> shardChanges = new HashMap<>();
@Override
public void shardInitialized(ShardRouting unassignedShard) {
if (unassignedShard.primary()) {
increasePrimaryTerm(unassignedShard);
}
}
public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) {
assert initializedShard.isRelocationTarget() == false : "shardInitialized is not called on relocation target: " + initializedShard;
if (initializedShard.primary()) {
increasePrimaryTerm(initializedShard.shardId());
@Override
public void replicaPromoted(ShardRouting replicaShard) {
increasePrimaryTerm(replicaShard);
Updates updates = changes(initializedShard.shardId());
assert updates.initializedPrimary == null : "Primary cannot be initialized more than once in same allocation round: " +
"(previous: " + updates.initializedPrimary + ", next: " + initializedShard + ")";
updates.initializedPrimary = initializedShard;
}
}
@Override
@ -65,8 +71,20 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
@Override
public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
if (failedShard.active()) {
if (failedShard.active() && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) {
removeAllocationId(failedShard);
if (failedShard.primary()) {
Updates updates = changes(failedShard.shardId());
if (updates.firstFailedPrimary == null) {
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
updates.firstFailedPrimary = failedShard;
}
}
}
if (failedShard.active() && failedShard.primary()) {
increasePrimaryTerm(failedShard.shardId());
}
}
@ -82,49 +100,27 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
/**
* Updates the current {@link MetaData} based on the changes of this RoutingChangesObserver. Specifically
* we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on
* we update {@link IndexMetaData#getInSyncAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on
* the changes made during this allocation.
*
* @param oldMetaData {@link MetaData} object from before the routing nodes was changed.
* @param newRoutingTable {@link RoutingTable} object after routing changes were applied.
* @return adapted {@link MetaData}, potentially the original one if no change was needed.
*/
public MetaData applyChanges(MetaData oldMetaData) {
public MetaData applyChanges(MetaData oldMetaData, RoutingTable newRoutingTable) {
Map<Index, List<Map.Entry<ShardId, Updates>>> changesGroupedByIndex =
shardChanges.entrySet().stream().collect(Collectors.groupingBy(e -> e.getKey().getIndex()));
MetaData.Builder metaDataBuilder = null;
for (Map.Entry<Index, List<Map.Entry<ShardId, Updates>>> indexChanges : changesGroupedByIndex.entrySet()) {
Index index = indexChanges.getKey();
final IndexMetaData oldIndexMetaData = oldMetaData.index(index);
if (oldIndexMetaData == null) {
throw new IllegalStateException("no metadata found for index " + index);
}
final IndexMetaData oldIndexMetaData = oldMetaData.getIndexSafe(index);
IndexMetaData.Builder indexMetaDataBuilder = null;
for (Map.Entry<ShardId, Updates> shardEntry : indexChanges.getValue()) {
ShardId shardId = shardEntry.getKey();
Updates updates = shardEntry.getValue();
assert Sets.haveEmptyIntersection(updates.addedAllocationIds, updates.removedAllocationIds) :
"Allocation ids cannot be both added and removed in the same allocation round, added ids: " +
updates.addedAllocationIds + ", removed ids: " + updates.removedAllocationIds;
Set<String> activeAllocationIds = new HashSet<>(oldIndexMetaData.activeAllocationIds(shardId.id()));
activeAllocationIds.addAll(updates.addedAllocationIds);
activeAllocationIds.removeAll(updates.removedAllocationIds);
// only update active allocation ids if there is an active shard
if (activeAllocationIds.isEmpty() == false) {
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds);
}
if (updates.increaseTerm) {
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1);
}
indexMetaDataBuilder = updateInSyncAllocations(newRoutingTable, oldIndexMetaData, indexMetaDataBuilder, shardId, updates);
indexMetaDataBuilder = updatePrimaryTerm(oldIndexMetaData, indexMetaDataBuilder, shardId, updates);
}
if (indexMetaDataBuilder != null) {
@ -142,6 +138,148 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
}
}
/**
* Updates in-sync allocations with routing changes that were made to the routing table.
*/
private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTable, IndexMetaData oldIndexMetaData,
IndexMetaData.Builder indexMetaDataBuilder, ShardId shardId, Updates updates) {
assert Sets.haveEmptyIntersection(updates.addedAllocationIds, updates.removedAllocationIds) :
"allocation ids cannot be both added and removed in the same allocation round, added ids: " +
updates.addedAllocationIds + ", removed ids: " + updates.removedAllocationIds;
Set<String> oldInSyncAllocationIds = oldIndexMetaData.inSyncAllocationIds(shardId.id());
// check if we have been force-initializing an empty primary or a stale primary
if (updates.initializedPrimary != null && oldInSyncAllocationIds.isEmpty() == false &&
oldInSyncAllocationIds.contains(updates.initializedPrimary.allocationId().getId()) == false) {
// we're not reusing an existing in-sync allocation id to initialize a primary, which means that we're either force-allocating
// an empty or a stale primary (see AllocateEmptyPrimaryAllocationCommand or AllocateStalePrimaryAllocationCommand).
boolean emptyPrimary = updates.initializedPrimary.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED;
assert updates.addedAllocationIds.isEmpty() : (emptyPrimary ? "empty" : "stale") +
" primary is not force-initialized in same allocation round where shards are started";
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
if (emptyPrimary) {
// forcing an empty primary resets the in-sync allocations to the empty set (ShardRouting.allocatedPostIndexCreate)
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.emptySet());
} else {
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(),
Collections.singleton(updates.initializedPrimary.allocationId().getId()));
}
} else {
// standard path for updating in-sync ids
Set<String> inSyncAllocationIds = new HashSet<>(oldInSyncAllocationIds);
inSyncAllocationIds.addAll(updates.addedAllocationIds);
inSyncAllocationIds.removeAll(updates.removedAllocationIds);
// Prevent set of inSyncAllocationIds to grow unboundedly. This can happen for example if we don't write to a primary
// but repeatedly shut down nodes that have active replicas.
// We use number_of_replicas + 1 (= possible active shard copies) to bound the inSyncAllocationIds set
int maxActiveShards = oldIndexMetaData.getNumberOfReplicas() + 1; // +1 for the primary
if (inSyncAllocationIds.size() > maxActiveShards) {
// trim entries that have no corresponding shard routing in the cluster state (i.e. trim unavailable copies)
List<ShardRouting> assignedShards = newRoutingTable.shardRoutingTable(shardId).assignedShards();
assert assignedShards.size() <= maxActiveShards :
"cannot have more assigned shards " + assignedShards + " than maximum possible active shards " + maxActiveShards;
Set<String> assignedAllocations = assignedShards.stream().map(s -> s.allocationId().getId()).collect(Collectors.toSet());
inSyncAllocationIds = inSyncAllocationIds.stream()
.sorted(Comparator.comparing(assignedAllocations::contains).reversed()) // values with routing entries first
.limit(maxActiveShards)
.collect(Collectors.toSet());
}
// only update in-sync allocation ids if there is at least one entry remaining. Assume for example that there only
// ever was a primary active and now it failed. If we were to remove the allocation id from the in-sync set, this would
// create an empty primary on the next allocation (see ShardRouting#allocatedPostIndexCreate)
if (inSyncAllocationIds.isEmpty() && oldInSyncAllocationIds.isEmpty() == false) {
assert updates.firstFailedPrimary != null :
"in-sync set became empty but active primary wasn't failed: " + oldInSyncAllocationIds;
if (updates.firstFailedPrimary != null) {
// add back allocation id of failed primary
inSyncAllocationIds.add(updates.firstFailedPrimary.allocationId().getId());
}
}
assert inSyncAllocationIds.isEmpty() == false || oldInSyncAllocationIds.isEmpty() :
"in-sync allocations cannot become empty after they have been non-empty: " + oldInSyncAllocationIds;
// be extra safe here and only update in-sync set if it is non-empty
if (inSyncAllocationIds.isEmpty() == false) {
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), inSyncAllocationIds);
}
}
return indexMetaDataBuilder;
}
/**
* Removes allocation ids from the in-sync set for shard copies for which there is no routing entries in the routing table.
* This method is called in AllocationService before any changes to the routing table are made.
*/
public static ClusterState removeStaleIdsWithoutRoutings(ClusterState clusterState, List<StaleShard> staleShards) {
MetaData oldMetaData = clusterState.metaData();
RoutingTable oldRoutingTable = clusterState.routingTable();
MetaData.Builder metaDataBuilder = null;
// group staleShards entries by index
for (Map.Entry<Index, List<StaleShard>> indexEntry : staleShards.stream().collect(
Collectors.groupingBy(fs -> fs.shardId.getIndex())).entrySet()) {
final IndexMetaData oldIndexMetaData = oldMetaData.getIndexSafe(indexEntry.getKey());
IndexMetaData.Builder indexMetaDataBuilder = null;
// group staleShards entries by shard id
for (Map.Entry<ShardId, List<StaleShard>> shardEntry : indexEntry.getValue().stream().collect(
Collectors.groupingBy(staleShard -> staleShard.shardId)).entrySet()) {
int shardNumber = shardEntry.getKey().getId();
Set<String> oldInSyncAllocations = oldIndexMetaData.inSyncAllocationIds(shardNumber);
Set<String> idsToRemove = shardEntry.getValue().stream().map(e -> e.allocationId).collect(Collectors.toSet());
assert idsToRemove.stream().allMatch(id -> oldRoutingTable.getByAllocationId(shardEntry.getKey(), id) == null) :
"removing stale ids: " + idsToRemove + ", some of which have still a routing entry: " + oldRoutingTable.prettyPrint();
Set<String> remainingInSyncAllocations = Sets.difference(oldInSyncAllocations, idsToRemove);
assert remainingInSyncAllocations.isEmpty() == false : "Set of in-sync ids cannot become empty for shard " +
shardEntry.getKey() + " (before: " + oldInSyncAllocations + ", ids to remove: " + idsToRemove + ")";
// be extra safe here: if the in-sync set were to become empty, this would create an empty primary on the next allocation
// (see ShardRouting#allocatedPostIndexCreate)
if (remainingInSyncAllocations.isEmpty() == false) {
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
indexMetaDataBuilder.putInSyncAllocationIds(shardNumber, remainingInSyncAllocations);
}
}
if (indexMetaDataBuilder != null) {
if (metaDataBuilder == null) {
metaDataBuilder = MetaData.builder(oldMetaData);
}
metaDataBuilder.put(indexMetaDataBuilder);
}
}
if (metaDataBuilder != null) {
return ClusterState.builder(clusterState).metaData(metaDataBuilder).build();
} else {
return clusterState;
}
}
/**
* Increases the primary term if {@link #increasePrimaryTerm} was called for this shard id.
*/
private IndexMetaData.Builder updatePrimaryTerm(IndexMetaData oldIndexMetaData, IndexMetaData.Builder indexMetaDataBuilder,
ShardId shardId, Updates updates) {
if (updates.increaseTerm) {
if (indexMetaDataBuilder == null) {
indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1);
}
return indexMetaDataBuilder;
}
/**
* Helper method that creates update entry for the given shard id if such an entry does not exist yet.
*/
@ -166,13 +304,15 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
/**
* Increase primary term for this shard id
*/
private void increasePrimaryTerm(ShardRouting shardRouting) {
changes(shardRouting.shardId()).increaseTerm = true;
private void increasePrimaryTerm(ShardId shardId) {
changes(shardId).increaseTerm = true;
}
private static class Updates {
private boolean increaseTerm; // whether primary term should be increased
private Set<String> addedAllocationIds = new HashSet<>(); // allocation ids that should be added to the in-sync set
private Set<String> removedAllocationIds = new HashSet<>(); // allocation ids that should be removed from the in-sync set
private ShardRouting initializedPrimary = null; // primary that was initialized from unassigned
private ShardRouting firstFailedPrimary = null; // first active primary that was failed
}
}

View File

@ -307,8 +307,8 @@ public class RoutingAllocation {
/**
* Returns updated {@link MetaData} based on the changes that were made to the routing nodes
*/
public MetaData updateMetaDataWithRoutingChanges() {
return indexMetaDataUpdater.applyChanges(metaData);
public MetaData updateMetaDataWithRoutingChanges(RoutingTable newRoutingTable) {
return indexMetaDataUpdater.applyChanges(metaData, newRoutingTable);
}
/**

View File

@ -38,8 +38,9 @@ public class RoutingNodesChangedObserver implements RoutingChangesObserver {
}
@Override
public void shardInitialized(ShardRouting unassignedShard) {
public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) {
assert unassignedShard.unassigned() : "expected unassigned shard " + unassignedShard;
assert initializedShard.initializing() : "expected initializing shard " + initializedShard;
setChanged();
}

View File

@ -121,7 +121,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
continue;
}
final Set<String> lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id());
final Set<String> lastActiveAllocationIds = indexMetaData.inSyncAllocationIds(shard.id());
final boolean snapshotRestore = shard.restoreSource() != null;
final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData);

View File

@ -61,7 +61,7 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, "uuid"))
.putActiveAllocationIds(0, Sets.newHashSet("aid1", "aid2"))
.putInSyncAllocationIds(0, Sets.newHashSet("aid1", "aid2"))
.numberOfShards(1)
.numberOfReplicas(1)
.build();

View File

@ -26,6 +26,8 @@ import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -34,6 +36,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
@ -69,7 +72,8 @@ public class ReplicationOperationTests extends ESTestCase {
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
final long primaryTerm = state.getMetaData().index(index).primaryTerm(0);
IndexMetaData indexMetaData = state.getMetaData().index(index);
final long primaryTerm = indexMetaData.primaryTerm(0);
final IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId);
ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
if (primaryShard.relocating() && randomBoolean()) {
@ -78,6 +82,10 @@ public class ReplicationOperationTests extends ESTestCase {
.nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
primaryShard = primaryShard.getTargetRelocatingShard();
}
// add a few in-sync allocation ids that don't have corresponding routing entries
Set<String> staleAllocationIds = Sets.newHashSet(generateRandomStringArray(4, 10, false));
state = ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).put(IndexMetaData.builder(indexMetaData)
.putInSyncAllocationIds(0, Sets.union(indexMetaData.inSyncAllocationIds(0), staleAllocationIds)))).build();
final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state);
@ -112,6 +120,7 @@ public class ReplicationOperationTests extends ESTestCase {
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
assertThat(replicasProxy.failedReplicas, equalTo(expectedFailedShards));
assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds));
assertTrue("listener is not marked as done", listener.isDone());
ShardInfo shardInfo = listener.actionGet().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(expectedFailedShards.size()));
@ -155,7 +164,8 @@ public class ReplicationOperationTests extends ESTestCase {
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(2), randomInt(2));
final long primaryTerm = state.getMetaData().index(index).primaryTerm(0);
IndexMetaData indexMetaData = state.getMetaData().index(index);
final long primaryTerm = indexMetaData.primaryTerm(0);
ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
if (primaryShard.relocating() && randomBoolean()) {
// simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
@ -163,6 +173,10 @@ public class ReplicationOperationTests extends ESTestCase {
.nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
primaryShard = primaryShard.getTargetRelocatingShard();
}
// add in-sync allocation id that doesn't have a corresponding routing entry
state = ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).put(IndexMetaData.builder(indexMetaData)
.putInSyncAllocationIds(0, Sets.union(indexMetaData.inSyncAllocationIds(0), Sets.newHashSet(randomAsciiOfLength(10))))))
.build();
final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state);
@ -173,13 +187,28 @@ public class ReplicationOperationTests extends ESTestCase {
Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final ClusterState finalState = state;
final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean();
final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) {
@Override
public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,
Consumer<Exception> onIgnoredFailure) {
assertThat(replica, equalTo(failedReplica));
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
if (testPrimaryDemotedOnStaleShardCopies) {
super.failShard(replica, primaryTerm, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure);
} else {
assertThat(replica, equalTo(failedReplica));
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
}
}
@Override
public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
if (testPrimaryDemotedOnStaleShardCopies) {
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
} else {
super.markShardCopyAsStale(shardId, allocationId, primaryTerm, onSuccess, onPrimaryDemoted, onIgnoredFailure);
}
}
};
AtomicBoolean primaryFailed = new AtomicBoolean();
@ -403,6 +432,8 @@ public class ReplicationOperationTests extends ESTestCase {
final Set<ShardRouting> failedReplicas = ConcurrentCollections.newConcurrentSet();
final Set<String> markedAsStaleCopies = ConcurrentCollections.newConcurrentSet();
TestReplicaProxy() {
this(Collections.emptyMap());
}
@ -437,6 +468,19 @@ public class ReplicationOperationTests extends ESTestCase {
fail("replica [" + replica + "] was failed");
}
}
@Override
public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
if (markedAsStaleCopies.add(allocationId) == false) {
fail("replica [" + allocationId + "] was marked as stale twice");
}
if (randomBoolean()) {
onSuccess.run();
} else {
onIgnoredFailure.accept(new ElasticsearchException("simulated"));
}
}
}
class TestReplicationOperation extends ReplicationOperation<Request, Request, TestPrimary.Result> {

View File

@ -117,7 +117,8 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
List<ShardStateAction.ShardEntry> nonExistentTasks = createNonExistentShards(currentState, reason);
ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) {
@Override
RoutingAllocation.Result applyFailedShards(ClusterState currentState, List<FailedRerouteAllocation.FailedShard> failedShards) {
RoutingAllocation.Result applyFailedShards(ClusterState currentState, List<FailedRerouteAllocation.FailedShard> failedShards,
List<FailedRerouteAllocation.StaleShard> staleShards) {
throw new RuntimeException("simulated applyFailedShards failure");
}
};

View File

@ -339,7 +339,8 @@ public class ShardStateActionTests extends ESTestCase {
long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id());
assertThat(primaryTerm, greaterThanOrEqualTo(1L));
shardStateAction.remoteShardFailed(failedShard, primaryTerm + 1, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), primaryTerm + 1, "test",
getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
failure.set(null);

View File

@ -330,7 +330,7 @@ public class ClusterStateHealthTests extends ESTestCase {
boolean atLeastOne = false;
for (int i = 0; i < numberOfShards; i++) {
if (atLeastOne == false || randomBoolean()) {
idxMetaWithAllocationIds.putActiveAllocationIds(i, Sets.newHashSet(UUIDs.randomBase64UUID()));
idxMetaWithAllocationIds.putInSyncAllocationIds(i, Sets.newHashSet(UUIDs.randomBase64UUID()));
atLeastOne = true;
}
}
@ -400,7 +400,7 @@ public class ClusterStateHealthTests extends ESTestCase {
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
IndexMetaData.Builder idxMetaBuilder = IndexMetaData.builder(clusterState.metaData().index(indexName));
for (final IntObjectCursor<Set<String>> entry : allocationIds.build()) {
idxMetaBuilder.putActiveAllocationIds(entry.key, entry.value);
idxMetaBuilder.putInSyncAllocationIds(entry.key, entry.value);
}
MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData()).put(idxMetaBuilder);
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).metaData(metaDataBuilder).build();
@ -447,7 +447,7 @@ public class ClusterStateHealthTests extends ESTestCase {
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
idxMetaBuilder = IndexMetaData.builder(clusterState.metaData().index(indexName));
for (final IntObjectCursor<Set<String>> entry : allocationIds.build()) {
idxMetaBuilder.putActiveAllocationIds(entry.key, entry.value);
idxMetaBuilder.putInSyncAllocationIds(entry.key, entry.value);
}
metaDataBuilder = MetaData.builder(clusterState.metaData()).put(idxMetaBuilder);
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).metaData(metaDataBuilder).build();
@ -523,7 +523,7 @@ public class ClusterStateHealthTests extends ESTestCase {
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : clusterState.routingTable().index(indexName).shards()) {
final ShardRouting primaryShard = shardRouting.value.primaryShard();
if (primaryShard.active() == false) {
if (clusterState.metaData().index(indexName).activeAllocationIds(shardRouting.key).isEmpty() == false) {
if (clusterState.metaData().index(indexName).inSyncAllocationIds(shardRouting.key).isEmpty() == false) {
return false;
}
if (primaryShard.unassignedInfo() != null &&

View File

@ -37,6 +37,7 @@ import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import java.util.Arrays;
@ -153,7 +154,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand("test", 0, dataNodeWithNoShardCopy, true)).get();
logger.info("--> wait until shard is failed and becomes unassigned again");
assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
}
@ -190,6 +191,11 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
}
assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L);
// allocation id of old primary was cleaned from the in-sync set
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()),
state.metaData().index(idxName).inSyncAllocationIds(0));
}
public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException {
@ -204,6 +210,59 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
ensureGreen("test");
}
public void testDoNotRemoveAllocationIdOnNodeLeave() throws Exception {
internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNode(Settings.EMPTY);
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1).put("index.unassigned.node_left.delayed_timeout", "0ms")).get());
String replicaNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
ensureGreen("test");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
ensureYellow("test");
assertEquals(2, client().admin().cluster().prepareState().get().getState().metaData().index("test").inSyncAllocationIds(0).size());
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public boolean clearData(String nodeName) {
return true;
}
});
logger.info("--> wait until shard is failed and becomes unassigned again");
assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
assertEquals(2, client().admin().cluster().prepareState().get().getState().metaData().index("test").inSyncAllocationIds(0).size());
logger.info("--> starting node that reuses data folder with the up-to-date shard");
internalCluster().startDataOnlyNode(Settings.EMPTY);
ensureGreen("test");
}
public void testRemoveAllocationIdOnWriteAfterNodeLeave() throws Exception {
internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNode(Settings.EMPTY);
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1).put("index.unassigned.node_left.delayed_timeout", "0ms")).get());
String replicaNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
ensureGreen("test");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
ensureYellow("test");
assertEquals(2, client().admin().cluster().prepareState().get().getState().metaData().index("test").inSyncAllocationIds(0).size());
logger.info("--> indexing...");
client().prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
assertEquals(1, client().admin().cluster().prepareState().get().getState().metaData().index("test").inSyncAllocationIds(0).size());
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public boolean clearData(String nodeName) {
return true;
}
});
logger.info("--> wait until shard is failed and becomes unassigned again");
assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
assertEquals(1, client().admin().cluster().prepareState().get().getState().metaData().index("test").inSyncAllocationIds(0).size());
logger.info("--> starting node that reuses data folder with the up-to-date shard");
internalCluster().startDataOnlyNode(Settings.EMPTY);
assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
}
public void testNotWaitForQuorumCopies() throws Exception {
logger.info("--> starting 3 nodes");
internalCluster().startNodesAsync(3).get();

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.cluster.ESAllocationTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -152,7 +153,8 @@ public class PrimaryTermsTests extends ESAllocationTestCase {
failedShards.add(new FailedRerouteAllocation.FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null));
incrementPrimaryTerm(index, shard); // the primary failure should increment the primary term;
}
RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(this.clusterState, failedShards);
RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(this.clusterState, failedShards,
Collections.emptyList());
applyRerouteResult(rerouteResult);
}

View File

@ -338,7 +338,7 @@ public class RoutingTableTests extends ESAllocationTestCase {
for (ShardRouting shardRouting : shardTable) {
Set<String> activeAllocations = shardTable.activeShards().stream().map(
shr -> shr.allocationId().getId()).collect(Collectors.toSet());
imdBuilder.putActiveAllocationIds(shardRouting.id(), activeAllocations);
imdBuilder.putInSyncAllocationIds(shardRouting.id(), activeAllocations);
}
}
return imdBuilder.build();

View File

@ -1,102 +0,0 @@
package org.elasticsearch.cluster.routing.allocation;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.cluster.ESAllocationTestCase;
import java.util.Arrays;
import java.util.HashSet;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.equalTo;
public class ActiveAllocationIdTests extends ESAllocationTestCase {
public void testActiveAllocationIdsUpdated() {
AllocationService allocation = createAllocationService();
logger.info("creating an index with 1 shard, 2 replicas");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
// add index metadata where we have no routing nodes to check that allocation ids are not removed
.put(IndexMetaData.builder("test-old").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)
.putActiveAllocationIds(0, new HashSet<>(Arrays.asList("x", "y"))))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
logger.info("adding three nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(
newNode("node1")).add(newNode("node2")).add(newNode("node3"))).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(0));
assertThat(clusterState.metaData().index("test-old").activeAllocationIds(0), equalTo(new HashSet<>(Arrays.asList("x", "y"))));
logger.info("start primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(1));
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).get(0).allocationId().getId(),
equalTo(clusterState.metaData().index("test").activeAllocationIds(0).iterator().next()));
assertThat(clusterState.metaData().index("test-old").activeAllocationIds(0), equalTo(new HashSet<>(Arrays.asList("x", "y"))));
logger.info("start replica shards");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(3));
logger.info("remove a node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove("node1"))
.build();
rerouteResult = allocation.deassociateDeadNodes(clusterState, true, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(2));
logger.info("remove all remaining nodes");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove("node2").remove("node3"))
.build();
rerouteResult = allocation.deassociateDeadNodes(clusterState, true, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
// active allocation ids should not be updated
assertThat(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).size(), equalTo(3));
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(2));
}
}

View File

@ -100,14 +100,14 @@ public abstract class CatAllocationTestCase extends ESAllocationTestCase {
.numberOfShards(idx.numShards()).numberOfReplicas(idx.numReplicas());
for (ShardRouting shardRouting : idx.routing) {
if (shardRouting.active()) {
Set<String> allocationIds = idxMetaBuilder.getActiveAllocationIds(shardRouting.id());
Set<String> allocationIds = idxMetaBuilder.getInSyncAllocationIds(shardRouting.id());
if (allocationIds == null) {
allocationIds = new HashSet<>();
} else {
allocationIds = new HashSet<>(allocationIds);
}
allocationIds.add(shardRouting.allocationId().getId());
idxMetaBuilder.putActiveAllocationIds(shardRouting.id(), allocationIds);
idxMetaBuilder.putInSyncAllocationIds(shardRouting.id(), allocationIds);
}
}
IndexMetaData idxMeta = idxMetaBuilder.build();

View File

@ -0,0 +1,308 @@
package org.elasticsearch.cluster.routing.allocation;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction.ShardEntry;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
public class InSyncAllocationIdTests extends ESAllocationTestCase {
private AllocationService allocation;
private ShardStateAction.ShardFailedClusterStateTaskExecutor failedClusterStateTaskExecutor;
@Before
public void setupAllocationService() {
allocation = createAllocationService();
failedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger);
}
public void testInSyncAllocationIdsUpdated() {
logger.info("creating an index with 1 shard, 2 replicas");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
// add index metadata where we have no routing nodes to check that allocation ids are not removed
.put(IndexMetaData.builder("test-old").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)
.putInSyncAllocationIds(0, new HashSet<>(Arrays.asList("x", "y"))))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
logger.info("adding three nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(
newNode("node1")).add(newNode("node2")).add(newNode("node3"))).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(0));
assertThat(clusterState.metaData().index("test-old").inSyncAllocationIds(0), equalTo(new HashSet<>(Arrays.asList("x", "y"))));
logger.info("start primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(1));
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).get(0).allocationId().getId(),
equalTo(clusterState.metaData().index("test").inSyncAllocationIds(0).iterator().next()));
assertThat(clusterState.metaData().index("test-old").inSyncAllocationIds(0), equalTo(new HashSet<>(Arrays.asList("x", "y"))));
logger.info("start replica shards");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(3));
logger.info("remove a node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove("node1"))
.build();
rerouteResult = allocation.deassociateDeadNodes(clusterState, true, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
// in-sync allocation ids should not be updated
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(3));
logger.info("remove all remaining nodes");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove("node2").remove("node3"))
.build();
rerouteResult = allocation.deassociateDeadNodes(clusterState, true, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
// in-sync allocation ids should not be updated
assertThat(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).size(), equalTo(3));
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(3));
// force empty primary
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.add(newNode("node1")))
.build();
rerouteResult = allocation.reroute(clusterState,
new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand("test", 0, "node1", true)), false, false);
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
// check that in-sync allocation ids are reset by forcing an empty primary
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(0));
logger.info("start primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(1));
logger.info("fail primary shard");
ShardRouting startedPrimary = clusterState.getRoutingNodes().shardsWithState(STARTED).get(0);
rerouteResult = allocation.applyFailedShard(clusterState, startedPrimary);
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(0));
assertEquals(Collections.singleton(startedPrimary.allocationId().getId()),
clusterState.metaData().index("test").inSyncAllocationIds(0));
}
/**
* Assume following scenario: indexing request is written to primary, but fails to be replicated to active replica.
* The primary instructs master to fail replica before acknowledging write to client. In the meanwhile, the node of the replica was
* removed from the cluster (deassociateDeadNodes). This means that the ShardRouting of the replica was failed, but it's allocation
* id is still part of the in-sync set. We have to make sure that the failShard request from the primary removes the allocation id
* from the in-sync set.
*/
public void testDeadNodesBeforeReplicaFailed() throws Exception {
ClusterState clusterState = createOnePrimaryOneReplicaClusterState(allocation);
logger.info("remove replica node");
IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index("test").shard(0);
ShardRouting replicaShard = shardRoutingTable.replicaShards().get(0);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove(replicaShard.currentNodeId()))
.build();
RoutingAllocation.Result rerouteResult = allocation.deassociateDeadNodes(clusterState, true, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(2));
logger.info("fail replica (for which there is no shard routing in the CS anymore)");
assertNull(clusterState.getRoutingNodes().getByAllocationId(replicaShard.shardId(), replicaShard.allocationId().getId()));
ShardStateAction.ShardFailedClusterStateTaskExecutor failedClusterStateTaskExecutor =
new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger);
long primaryTerm = clusterState.metaData().index("test").primaryTerm(0);
clusterState = failedClusterStateTaskExecutor.execute(clusterState, Arrays.asList(
new ShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null))
).resultingState;
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(1));
}
/**
* Assume following scenario: indexing request is written to primary, but fails to be replicated to active replica.
* The primary instructs master to fail replica before acknowledging write to client. In the meanwhile, primary fails for an unrelated
* reason. Master now batches both requests to fail primary and replica. We have to make sure that only the allocation id of the primary
* is kept in the in-sync allocation set before we acknowledge request to client. Otherwise we would acknowledge a write that made it
* into the primary but not the replica but the replica is still considered non-stale.
*/
public void testPrimaryFailureBatchedWithReplicaFailure() throws Exception {
ClusterState clusterState = createOnePrimaryOneReplicaClusterState(allocation);
IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index("test").shard(0);
ShardRouting primaryShard = shardRoutingTable.primaryShard();
ShardRouting replicaShard = shardRoutingTable.replicaShards().get(0);
long primaryTerm = clusterState.metaData().index("test").primaryTerm(0);
List<ShardEntry> failureEntries = new ArrayList<>();
failureEntries.add(new ShardEntry(
shardRoutingTable.shardId(), primaryShard.allocationId().getId(), 0L, "dummy", null));
failureEntries.add(new ShardEntry(
shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null));
Collections.shuffle(failureEntries, random());
logger.info("Failing {}", failureEntries);
clusterState = failedClusterStateTaskExecutor.execute(clusterState, failureEntries).resultingState;
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0),
equalTo(Collections.singleton(primaryShard.allocationId().getId())));
// resend shard failures to check if they are ignored
clusterState = failedClusterStateTaskExecutor.execute(clusterState, failureEntries).resultingState;
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0),
equalTo(Collections.singleton(primaryShard.allocationId().getId())));
}
/**
* Prevent set of inSyncAllocationIds to grow unboundedly. This can happen for example if we don't write to a primary
* but repeatedly shut down nodes that have active replicas.
* We use number_of_replicas + 1 (= possible active shard copies) to bound the inSyncAllocationIds set
*/
public void testInSyncIdsNotGrowingWithoutBounds() throws Exception {
ClusterState clusterState = createOnePrimaryOneReplicaClusterState(allocation);
Set<String> inSyncSet = clusterState.metaData().index("test").inSyncAllocationIds(0);
assertThat(inSyncSet.size(), equalTo(2));
IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index("test").shard(0);
ShardRouting primaryShard = shardRoutingTable.primaryShard();
ShardRouting replicaShard = shardRoutingTable.replicaShards().get(0);
logger.info("remove a node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove(replicaShard.currentNodeId()))
.build();
RoutingAllocation.Result rerouteResult = allocation.deassociateDeadNodes(clusterState, true, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
// in-sync allocation ids should not be updated
assertEquals(inSyncSet, clusterState.metaData().index("test").inSyncAllocationIds(0));
// check that inSyncAllocationIds can not grow without bounds
for (int i = 0; i < 5; i++) {
logger.info("add back node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.add(newNode(replicaShard.currentNodeId())))
.build();
rerouteResult = allocation.reroute(clusterState, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
logger.info("start replica shards");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
logger.info("remove the node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove(replicaShard.currentNodeId()))
.build();
rerouteResult = allocation.deassociateDeadNodes(clusterState, true, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
}
// in-sync allocation set is bounded
Set<String> newInSyncSet = clusterState.metaData().index("test").inSyncAllocationIds(0);
assertThat(newInSyncSet.size(), equalTo(2));
// only allocation id of replica was changed
assertFalse(Sets.haveEmptyIntersection(inSyncSet, newInSyncSet));
assertThat(newInSyncSet, hasItem(primaryShard.allocationId().getId()));
}
private ClusterState createOnePrimaryOneReplicaClusterState(AllocationService allocation) {
logger.info("creating an index with 1 shard, 1 replica");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();
logger.info("adding two nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(
newNode("node1")).add(newNode("node2"))).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(0));
logger.info("start primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(1));
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).get(0).allocationId().getId(),
equalTo(clusterState.metaData().index("test").inSyncAllocationIds(0).iterator().next()));
logger.info("start replica shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
assertThat(clusterState.metaData().index("test").inSyncAllocationIds(0).size(), equalTo(2));
return clusterState;
}
}

View File

@ -313,8 +313,8 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
AllocationId allocationId2P = AllocationId.newInitializing();
AllocationId allocationId2R = AllocationId.newInitializing();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shard1.getIndexName()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1).putActiveAllocationIds(0, Sets.newHashSet(allocationId1P.getId(), allocationId1R.getId())))
.put(IndexMetaData.builder(shard2.getIndexName()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1).putActiveAllocationIds(0, Sets.newHashSet(allocationId2P.getId(), allocationId2R.getId())))
.put(IndexMetaData.builder(shard1.getIndexName()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1).putInSyncAllocationIds(0, Sets.newHashSet(allocationId1P.getId(), allocationId1R.getId())))
.put(IndexMetaData.builder(shard2.getIndexName()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1).putInSyncAllocationIds(0, Sets.newHashSet(allocationId2P.getId(), allocationId2R.getId())))
.build();
RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shard1.getIndex())

View File

@ -51,7 +51,7 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase {
final IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(2).numberOfReplicas(0)
.putActiveAllocationIds(1, Collections.singleton(allocationId.getId()))
.putInSyncAllocationIds(1, Collections.singleton(allocationId.getId()))
.build();
final Index index = indexMetaData.getIndex();
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))

View File

@ -431,7 +431,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
Version version = hasActiveAllocation ? Version.CURRENT : Version.V_2_0_0;
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0)
.putActiveAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet()))
.putInSyncAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet()))
.build();
final Snapshot snapshot = new Snapshot("test", new SnapshotId("test", UUIDs.randomBase64UUID()));
@ -515,7 +515,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true))
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet()))
.numberOfShards(1).numberOfReplicas(0).putInSyncAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet()))
.build();
RoutingTable routingTable = RoutingTable.builder()
@ -620,7 +620,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
String... activeAllocationIds) {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version))
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds)))
.numberOfShards(1).numberOfReplicas(0).putInSyncAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds)))
.build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
if (asNew) {

View File

@ -288,7 +288,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT).put(settings))
.numberOfShards(1).numberOfReplicas(1)
.putActiveAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
.putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
.build();
// mark shard as delayed if reason is NODE_LEFT
boolean delayed = reason == UnassignedInfo.Reason.NODE_LEFT &&
@ -316,7 +316,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT))
.numberOfShards(1).numberOfReplicas(1)
.putActiveAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
.putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
.build();
RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shardId.getIndex())

View File

@ -424,6 +424,12 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
return replicationGroup.shardRoutings();
}
@Override
protected Set<String> getInSyncAllocationIds(ShardId shardId, ClusterState clusterState) {
return replicationGroup.shardRoutings().stream().filter(ShardRouting::active)
.map(shr -> shr.allocationId().getId()).collect(Collectors.toSet());
}
@Override
protected String checkActiveShardCount() {
return null;
@ -481,6 +487,12 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
throw new UnsupportedOperationException();
}
@Override
public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
throw new UnsupportedOperationException();
}
}