From c6a03bc5497dda8aeffe36e56e8ce45c4ad09f73 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 26 Jun 2017 14:09:15 -0400 Subject: [PATCH] Introduce primary context (#25122) * Introduce primary context The target of a primary relocation is not aware of the state of the replication group. In particular, it is not tracking in-sync and initializing shards and their checkpoints. This means that after the target shard is started, its knowledge of the replication group could differ from that of the relocation source. In particular, this differing view can lead to it computing a global checkpoint that moves backwards after it becomes aware of the state of the entire replication group. This commit addresses this issue by transferring a primary context during relocation handoff. * Fix test * Add assertion messages * Javadocs * Barrier between marking a shard in sync and relocating * Fix misplaced call * Paranoia * Better latch countdown * Catch any exception * Fix comment * Fix wait for cluster state relocation test * Update knowledge via upate local checkpoint API * toString * Visibility * Refactor permit * Push down * Imports * Docs * Fix compilation * Remove assertion * Fix compilation * Remove context wrapper * Move PrimaryContext to new package * Piping for cluster state version This commit adds piping for the cluster state version to the global checkpoint tracker. We do not use it yet. * Remove unused import * Implement versioning in tracker * Fix test * Unneeded public * Imports * Promote on our own * Add tests * Import * Newline * Update comment * Serialization * Assertion message * Update stale comment * Remove newline * Less verbose * Remove redundant assertion * Tracking -> in-sync * Assertions * Just say no Friends do not let friends block the cluster state update thread on network operations. * Extra newline * Add allocation ID to assertion * Rename method * Another rename * Introduce sealing * Sealing tests * One more assertion * Fix imports * Safer sealing * Remove check * Remove another sealed check --- .../common/collect/LongTuple.java | 66 +++++ .../index/seqno/GlobalCheckpointTracker.java | 195 ++++++++++++- .../index/seqno/SequenceNumbersService.java | 38 ++- .../elasticsearch/index/shard/IndexShard.java | 100 +++++-- .../index/shard/PrimaryContext.java | 105 +++++++ .../cluster/IndicesClusterStateService.java | 12 +- .../recovery/PeerRecoveryTargetService.java | 18 ++ .../RecoveryHandoffPrimaryContextRequest.java | 94 ++++++ .../recovery/RecoverySourceHandler.java | 23 +- .../indices/recovery/RecoveryTarget.java | 8 +- .../recovery/RecoveryTargetHandler.java | 8 + .../recovery/RemoteRecoveryTargetHandler.java | 13 +- .../index/engine/InternalEngineTests.java | 5 +- .../ESIndexLevelReplicationTestCase.java | 11 +- .../seqno/GlobalCheckpointTrackerTests.java | 270 ++++++++++++++++-- .../index/shard/IndexShardTests.java | 18 +- .../shard/PrimaryReplicaSyncerTests.java | 2 +- ...actIndicesClusterStateServiceTestCase.java | 5 +- .../recovery/RecoverySourceHandlerTests.java | 10 +- .../recovery/RecoveryWhileUnderLoadIT.java | 2 +- .../elasticsearch/recovery/RelocationIT.java | 8 - 21 files changed, 908 insertions(+), 103 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/common/collect/LongTuple.java create mode 100644 core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java create mode 100644 core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java diff --git a/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java new file mode 100644 index 00000000000..fab8850d162 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.collect; + +public class LongTuple { + + public static LongTuple tuple(final T v1, final long v2) { + return new LongTuple<>(v1, v2); + } + + private final T v1; + private final long v2; + + private LongTuple(final T v1, final long v2) { + this.v1 = v1; + this.v2 = v2; + } + + public T v1() { + return v1; + } + + public long v2() { + return v2; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LongTuple tuple = (LongTuple) o; + + return (v1 == null ? tuple.v1 == null : v1.equals(tuple.v1)) && (v2 == tuple.v2); + } + + @Override + public int hashCode() { + int result = v1 != null ? v1.hashCode() : 0; + result = 31 * result + Long.hashCode(v2); + return result; + } + + @Override + public String toString() { + return "Tuple [v1=" + v1 + ", v2=" + v2 + "]"; + } + +} diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java index ea6edef7a12..aeafbc11108 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java @@ -23,13 +23,20 @@ import com.carrotsearch.hppc.ObjectLongHashMap; import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.LongTuple; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or @@ -42,6 +49,8 @@ import java.util.Set; */ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { + long appliedClusterStateVersion; + /* * This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed * through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by @@ -68,6 +77,12 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { */ private long globalCheckpoint; + /* + * During relocation handoff, the state of the global checkpoint tracker is sampled. After sampling, there should be no additional + * mutations to this tracker until the handoff has completed. + */ + private boolean sealed = false; + /** * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. @@ -94,6 +109,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * @param localCheckpoint the local checkpoint for the shard */ public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } final boolean updated; if (updateLocalCheckpoint(allocationId, localCheckpoint, inSyncLocalCheckpoints, "in-sync")) { updated = true; @@ -210,11 +228,18 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { /** * Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed. * - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ public synchronized void updateAllocationIdsFromMaster( - final Set activeAllocationIds, final Set initializingAllocationIds) { + final long applyingClusterStateVersion, final Set activeAllocationIds, final Set initializingAllocationIds) { + if (applyingClusterStateVersion < appliedClusterStateVersion) { + return; + } + + appliedClusterStateVersion = applyingClusterStateVersion; + // remove shards whose allocation ID no longer exists inSyncLocalCheckpoints.removeAll(a -> !activeAllocationIds.contains(a) && !initializingAllocationIds.contains(a)); @@ -248,6 +273,135 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { updateGlobalCheckpointOnPrimary(); } + /** + * Get the primary context for the shard. This includes the state of the global checkpoint tracker. + * + * @return the primary context + */ + synchronized PrimaryContext primaryContext() { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } + sealed = true; + final ObjectLongMap inSyncLocalCheckpoints = new ObjectLongHashMap<>(this.inSyncLocalCheckpoints); + final ObjectLongMap trackingLocalCheckpoints = new ObjectLongHashMap<>(this.trackingLocalCheckpoints); + return new PrimaryContext(appliedClusterStateVersion, inSyncLocalCheckpoints, trackingLocalCheckpoints); + } + + /** + * Releases a previously acquired primary context. + */ + synchronized void releasePrimaryContext() { + assert sealed; + sealed = false; + } + + /** + * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. + * + * @param primaryContext the primary context + */ + synchronized void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } + /* + * We are gathered here today to witness the relocation handoff transferring knowledge from the relocation source to the relocation + * target. We need to consider the possibility that the version of the cluster state on the relocation source when the primary + * context was sampled is different than the version of the cluster state on the relocation target at this exact moment. We define + * the following values: + * - version(source) = the cluster state version on the relocation source used to ensure a minimum cluster state version on the + * relocation target + * - version(context) = the cluster state version on the relocation source when the primary context was sampled + * - version(target) = the current cluster state version on the relocation target + * + * We know that version(source) <= version(target) and version(context) < version(target), version(context) = version(target), and + * version(target) < version(context) are all possibilities. + * + * The case of version(context) = version(target) causes no issues as in this case the knowledge of the in-sync and initializing + * shards the target receives from the master will be equal to the knowledge of the in-sync and initializing shards the target + * receives from the relocation source via the primary context. + * + * Let us now consider the case that version(context) < version(target). In this case, the active allocation IDs in the primary + * context can be a superset of the active allocation IDs contained in the applied cluster state. This is because no new shards can + * have been started as marking a shard as in-sync is blocked during relocation handoff. Note however that the relocation target + * itself will have been marked in-sync during recovery and therefore is an active allocation ID from the perspective of the primary + * context. + * + * Finally, we consider the case that version(target) < version(context). In this case, the active allocation IDs in the primary + * context can be a subset of the active allocation IDs contained the applied cluster state. This is again because no new shards can + * have been started. Moreover, existing active allocation IDs could have been removed from the cluster state. + * + * In each of these latter two cases, consider initializing shards that are contained in the primary context but not contained in + * the cluster state applied on the target. + * + * If version(context) < version(target) it means that the shard has been removed by a later cluster state update that is already + * applied on the target and we only need to ensure that we do not add it to the tracking map on the target. The call to + * GlobalCheckpointTracker#updateLocalCheckpoint(String, long) is a no-op for such shards and this is safe. + * + * If version(target) < version(context) it means that the shard has started initializing by a later cluster state update has not + * yet arrived on the target. However, there is a delay on recoveries before we ensure that version(source) <= version(target). + * Therefore, such a shard can never initialize from the relocation source and will have to await the handoff completing. As such, + * these shards are not problematic. + * + * Lastly, again in these two cases, what about initializing shards that are contained in cluster state applied on the target but + * not contained in the cluster state applied on the target. + * + * If version(context) < version(target) it means that a shard has started initializing by a later cluster state that is applied on + * the target but not yet known to what would be the relocation source. As recoveries are delayed at this time, these shards can not + * cause a problem and we do not mutate remove these shards from the tracking map, so we are safe here. + * + * If version(target) < version(context) it means that a shard has started initializing but was removed by a later cluster state. In + * this case, as the cluster state version on the primary context exceeds the applied cluster state version, we replace the tracking + * map and are safe here too. + */ + + assert StreamSupport + .stream(inSyncLocalCheckpoints.spliterator(), false) + .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : inSyncLocalCheckpoints; + assert StreamSupport + .stream(trackingLocalCheckpoints.spliterator(), false) + .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : trackingLocalCheckpoints; + assert pendingInSync.isEmpty() : pendingInSync; + + if (primaryContext.clusterStateVersion() > appliedClusterStateVersion) { + final Set activeAllocationIds = + new HashSet<>(Arrays.asList(primaryContext.inSyncLocalCheckpoints().keys().toArray(String.class))); + final Set initializingAllocationIds = + new HashSet<>(Arrays.asList(primaryContext.trackingLocalCheckpoints().keys().toArray(String.class))); + updateAllocationIdsFromMaster(primaryContext.clusterStateVersion(), activeAllocationIds, initializingAllocationIds); + } + + /* + * As we are updating the local checkpoints for the in-sync allocation IDs, the global checkpoint will advance in place; this means + * that we have to sort the incoming local checkpoints from smallest to largest lest we violate that the global checkpoint does not + * regress. + */ + final List> inSync = + StreamSupport + .stream(primaryContext.inSyncLocalCheckpoints().spliterator(), false) + .map(e -> LongTuple.tuple(e.key, e.value)) + .collect(Collectors.toList()); + + inSync.sort(Comparator.comparingLong(LongTuple::v2)); + + for (final LongTuple cursor : inSync) { + assert cursor.v2() >= globalCheckpoint + : "local checkpoint [" + cursor.v2() + "] " + + "for allocation ID [" + cursor.v1() + "] " + + "violates being at least the global checkpoint [" + globalCheckpoint + "]"; + updateLocalCheckpoint(cursor.v1(), cursor.v2()); + if (trackingLocalCheckpoints.containsKey(cursor.v1())) { + moveAllocationIdFromTrackingToInSync(cursor.v1(), "relocation"); + updateGlobalCheckpointOnPrimary(); + } + } + + for (final ObjectLongCursor cursor : primaryContext.trackingLocalCheckpoints()) { + updateLocalCheckpoint(cursor.key, cursor.value); + } + } + /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint * on the specified shard advances above the current global checkpoint. @@ -258,6 +412,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * @throws InterruptedException if the thread is interrupted waiting for the local checkpoint on the shard to advance */ public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } if (!trackingLocalCheckpoints.containsKey(allocationId)) { /* * This can happen if the recovery target has been failed and the cluster state update from the master has triggered removing @@ -295,15 +452,13 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { */ final long current = trackingLocalCheckpoints.getOrDefault(allocationId, Long.MIN_VALUE); if (current >= globalCheckpoint) { - logger.trace("marked [{}] as in-sync with local checkpoint [{}]", allocationId, current); - trackingLocalCheckpoints.remove(allocationId); /* * This is prematurely adding the allocation ID to the in-sync map as at this point recovery is not yet finished and could * still abort. At this point we will end up with a shard in the in-sync map holding back the global checkpoint because the * shard never recovered and we would have to wait until either the recovery retries and completes successfully, or the * master fails the shard and issues a cluster state update that removes the shard from the set of active allocation IDs. */ - inSyncLocalCheckpoints.put(allocationId, current); + moveAllocationIdFromTrackingToInSync(allocationId, "recovery"); break; } else { waitForLocalCheckpointToAdvance(); @@ -311,6 +466,21 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { } } + /** + * Moves a tracking allocation ID to be in-sync. This can occur when a shard is recovering from the primary and its local checkpoint has + * advanced past the global checkpoint, or during relocation hand-off when the relocation target learns of an in-sync shard from the + * relocation source. + * + * @param allocationId the allocation ID to move + * @param reason the reason for the transition + */ + private synchronized void moveAllocationIdFromTrackingToInSync(final String allocationId, final String reason) { + assert trackingLocalCheckpoints.containsKey(allocationId); + final long current = trackingLocalCheckpoints.remove(allocationId); + inSyncLocalCheckpoints.put(allocationId, current); + logger.trace("marked [{}] as in-sync with local checkpoint [{}] due to [{}]", allocationId, current, reason); + } + /** * Wait for the local checkpoint to advance to the global checkpoint. * @@ -324,12 +494,21 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { /** * Check if there are any recoveries pending in-sync. * - * @return {@code true} if there is at least one shard pending in-sync, otherwise false + * @return true if there is at least one shard pending in-sync, otherwise false */ - public boolean pendingInSync() { + boolean pendingInSync() { return !pendingInSync.isEmpty(); } + /** + * Check if the tracker is sealed. + * + * @return true if the tracker is sealed, otherwise false. + */ + boolean sealed() { + return sealed; + } + /** * Returns the local checkpoint for the shard with the specified allocation ID, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if * the shard is not in-sync. diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 4180c7e0f7d..6d8b87599a1 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import java.util.Set; @@ -165,13 +166,24 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { /** * Notifies the service of the current allocation IDs in the cluster state. See - * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details. + * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details. * - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ - public void updateAllocationIdsFromMaster(final Set activeAllocationIds, final Set initializingAllocationIds) { - globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); + public void updateAllocationIdsFromMaster( + final long applyingClusterStateVersion, final Set activeAllocationIds, final Set initializingAllocationIds) { + globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); + } + + /** + * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. + * + * @param primaryContext the sequence number context + */ + public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext); } /** @@ -183,4 +195,20 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { return globalCheckpointTracker.pendingInSync(); } + /** + * Get the primary context for the shard. This includes the state of the global checkpoint tracker. + * + * @return the primary context + */ + public PrimaryContext primaryContext() { + return globalCheckpointTracker.primaryContext(); + } + + /** + * Releases a previously acquired primary context. + */ + public void releasePrimaryContext() { + globalCheckpointTracker.releasePrimaryContext(); + } + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 71efacf7dcf..13ced02f6b8 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -515,31 +515,37 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean(); - public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { + /** + * Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided + * {@link Runnable} is executed after all operations are successfully blocked. + * + * @param reason the reason for the relocation + * @param consumer a {@link Runnable} that is executed after operations are blocked + * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation + * @throws InterruptedException if blocking operations is interrupted + */ + public void relocated( + final String reason, final Consumer consumer) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == 0 : - "in-flight operations in progress while moving shard state to relocated"; - synchronized (mutex) { - if (state != IndexShardState.STARTED) { - throw new IndexShardNotStartedException(shardId, state); + "in-flight operations in progress while moving shard state to relocated"; + /* + * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a + * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. + */ + verifyRelocatingState(); + final PrimaryContext primaryContext = getEngine().seqNoService().primaryContext(); + try { + consumer.accept(primaryContext); + synchronized (mutex) { + verifyRelocatingState(); + changeState(IndexShardState.RELOCATED, reason); } - // if the master cancelled the recovery, the target will be removed - // and the recovery will stopped. - // However, it is still possible that we concurrently end up here - // and therefore have to protect we don't mark the shard as relocated when - // its shard routing says otherwise. - if (shardRouting.relocating() == false) { - throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, - ": shard is no longer relocating " + shardRouting); - } - if (primaryReplicaResyncInProgress.get()) { - throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, - ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting); - } - changeState(IndexShardState.RELOCATED, reason); + } catch (final Exception e) { + getEngine().seqNoService().releasePrimaryContext(); } }); } catch (TimeoutException e) { @@ -551,6 +557,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + private void verifyRelocatingState() { + if (state != IndexShardState.STARTED) { + throw new IndexShardNotStartedException(shardId, state); + } + /* + * If the master cancelled recovery, the target will be removed and the recovery will be cancelled. However, it is still possible + * that we concurrently end up here and therefore have to protect that we do not mark the shard as relocated when its shard routing + * says otherwise. + */ + + if (shardRouting.relocating() == false) { + throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, + ": shard is no longer relocating " + shardRouting); + } + + if (primaryReplicaResyncInProgress.get()) { + throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, + ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting); + } + } public IndexShardState state() { return state; @@ -1319,7 +1345,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private void verifyPrimary() { if (shardRouting.primary() == false) { - throw new IllegalStateException("shard is not a primary " + shardRouting); + throw new IllegalStateException("shard " + shardRouting + " is not a primary"); } } @@ -1327,8 +1353,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final IndexShardState state = state(); if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) { // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException - throw new IllegalStateException("active primary shard cannot be a replication target before " + - " relocation hand off " + shardRouting + ", state is [" + state + "]"); + throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " + + "relocation hand off, state is [" + state + "]"); } } @@ -1603,8 +1629,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl verifyPrimary(); getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); /* - * We could have blocked waiting for the replica to catch up that we fell idle and there will not be a background sync to the - * replica; mark our self as active to force a future background sync. + * We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to + * the replica; mark our self as active to force a future background sync. */ active.compareAndSet(false, true); } @@ -1654,18 +1680,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Notifies the service of the current allocation IDs in the cluster state. See - * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} + * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} * for details. * - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ - public void updateAllocationIdsFromMaster(final Set activeAllocationIds, final Set initializingAllocationIds) { + public void updateAllocationIdsFromMaster( + final long applyingClusterStateVersion, final Set activeAllocationIds, final Set initializingAllocationIds) { verifyPrimary(); final Engine engine = getEngineOrNull(); // if the engine is not yet started, we are not ready yet and can just ignore this if (engine != null) { - engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); + engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); + } + } + + /** + * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. + * + * @param primaryContext the sequence number context + */ + public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + verifyPrimary(); + assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting; + final Engine engine = getEngineOrNull(); + if (engine != null) { + engine.seqNoService().updateAllocationIdsFromPrimaryContext(primaryContext); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java new file mode 100644 index 00000000000..8a067d37181 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.ObjectLongMap; +import com.carrotsearch.hppc.cursors.ObjectLongCursor; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing + * shards and their local checkpoints. + */ +public class PrimaryContext implements Writeable { + + private long clusterStateVersion; + + public long clusterStateVersion() { + return clusterStateVersion; + } + + private ObjectLongMap inSyncLocalCheckpoints; + + public ObjectLongMap inSyncLocalCheckpoints() { + return inSyncLocalCheckpoints; + } + + private ObjectLongMap trackingLocalCheckpoints; + + public ObjectLongMap trackingLocalCheckpoints() { + return trackingLocalCheckpoints; + } + + public PrimaryContext( + final long clusterStateVersion, + final ObjectLongMap inSyncLocalCheckpoints, + final ObjectLongMap trackingLocalCheckpoints) { + this.clusterStateVersion = clusterStateVersion; + this.inSyncLocalCheckpoints = inSyncLocalCheckpoints; + this.trackingLocalCheckpoints = trackingLocalCheckpoints; + } + + public PrimaryContext(final StreamInput in) throws IOException { + clusterStateVersion = in.readVLong(); + inSyncLocalCheckpoints = readMap(in); + trackingLocalCheckpoints = readMap(in); + } + + private static ObjectLongMap readMap(final StreamInput in) throws IOException { + final int length = in.readVInt(); + final ObjectLongMap map = new ObjectLongHashMap<>(length); + for (int i = 0; i < length; i++) { + final String key = in.readString(); + final long value = in.readZLong(); + map.addTo(key, value); + } + return map; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeVLong(clusterStateVersion); + writeMap(out, inSyncLocalCheckpoints); + writeMap(out, trackingLocalCheckpoints); + } + + private static void writeMap(final StreamOutput out, final ObjectLongMap map) throws IOException { + out.writeVInt(map.size()); + for (ObjectLongCursor cursor : map) { + out.writeString(cursor.key); + out.writeZLong(cursor.value); + } + } + + @Override + public String toString() { + return "PrimaryContext{" + + "clusterStateVersion=" + clusterStateVersion + + ", inSyncLocalCheckpoints=" + inSyncLocalCheckpoints + + ", trackingLocalCheckpoints=" + trackingLocalCheckpoints + + '}'; + } + +} diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 385b342efbe..81c0f601e1c 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -571,7 +571,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()), primaryReplicaSyncer::resync); - shard.updateAllocationIdsFromMaster(activeIds, initializingIds); + shard.updateAllocationIdsFromMaster(clusterState.version(), activeIds, initializingIds); } } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); @@ -758,12 +758,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple /** * Notifies the service of the current allocation ids in the cluster state. - * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details. + * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details. * - * @param activeAllocationIds the allocation ids of the currently active shard copies - * @param initializingAllocationIds the allocation ids of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation ids of the currently active shard copies + * @param initializingAllocationIds the allocation ids of the currently initializing shard copies */ - void updateAllocationIdsFromMaster(Set activeAllocationIds, Set initializingAllocationIds); + void updateAllocationIdsFromMaster( + long applyingClusterStateVersion, Set activeAllocationIds, Set initializingAllocationIds); } public interface AllocatedIndex extends Iterable, IndexComponent { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 4823edcc2f1..37ab2798b1f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -82,6 +82,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog"; public static final String FINALIZE = "internal:index/shard/recovery/finalize"; public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate"; + public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/hand_off_primary_context"; } private final ThreadPool threadPool; @@ -116,6 +117,11 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde FinalizeRecoveryRequestHandler()); transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new, ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler()); + transportService.registerRequestHandler( + Actions.HANDOFF_PRIMARY_CONTEXT, + RecoveryHandoffPrimaryContextRequest::new, + ThreadPool.Names.GENERIC, + new HandoffPrimaryContextRequestHandler()); } @Override @@ -411,6 +417,18 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde } } + class HandoffPrimaryContextRequestHandler implements TransportRequestHandler { + + @Override + public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel) throws Exception { + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + recoveryRef.target().handoffPrimaryContext(request.primaryContext()); + } + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + + } + class TranslogOperationsRequestHandler implements TransportRequestHandler { @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java new file mode 100644 index 00000000000..6646f6cea5d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.PrimaryContext; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +/** + * The request object to handoff the primary context to the relocation target. + */ +class RecoveryHandoffPrimaryContextRequest extends TransportRequest { + + private long recoveryId; + private ShardId shardId; + private PrimaryContext primaryContext; + + /** + * Initialize an empty request (used to serialize into when reading from a stream). + */ + RecoveryHandoffPrimaryContextRequest() { + } + + /** + * Initialize a request for the specified relocation. + * + * @param recoveryId the recovery ID of the relocation + * @param shardId the shard ID of the relocation + * @param primaryContext the primary context + */ + RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId, final PrimaryContext primaryContext) { + this.recoveryId = recoveryId; + this.shardId = shardId; + this.primaryContext = primaryContext; + } + + long recoveryId() { + return this.recoveryId; + } + + ShardId shardId() { + return shardId; + } + + PrimaryContext primaryContext() { + return primaryContext; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + recoveryId = in.readLong(); + shardId = ShardId.readShardId(in); + primaryContext = new PrimaryContext(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(recoveryId); + shardId.writeTo(out); + primaryContext.writeTo(out); + } + + @Override + public String toString() { + return "RecoveryHandoffPrimaryContextRequest{" + + "recoveryId=" + recoveryId + + ", shardId=" + shardId + + ", primaryContext=" + primaryContext + + '}'; + } +} diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 36f71899fa8..3097c8e668f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -31,6 +31,7 @@ import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; @@ -41,6 +42,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -52,6 +54,7 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; import java.io.BufferedOutputStream; @@ -60,7 +63,9 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -450,7 +455,21 @@ public class RecoverySourceHandler { StopWatch stopWatch = new StopWatch().start(); logger.trace("finalizing recovery"); cancellableThreads.execute(() -> { - shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint); + /* + * Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a + * shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done + * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire + * the permit then the state of the shard will be relocated and this recovery will fail. + */ + final PlainActionFuture onAcquired = new PlainActionFuture<>(); + shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME); + try (Releasable ignored = onAcquired.actionGet()) { + if (shard.state() == IndexShardState.RELOCATED) { + throw new IndexShardRelocatedException(shard.shardId()); + } + shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint); + } + recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint()); }); @@ -465,7 +484,7 @@ public class RecoverySourceHandler { cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion)); logger.trace("performing relocation hand-off"); - cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode())); + cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext)); } /* * if the recovery process fails after setting the shard state to RELOCATED, both relocation source and diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 3b96ef1a02e..2837a85d1ae 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -41,10 +41,10 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -63,7 +63,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongConsumer; -import java.util.stream.Collectors; /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of @@ -379,6 +378,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget ensureClusterStateVersionCallback.accept(clusterStateVersion); } + @Override + public void handoffPrimaryContext(final PrimaryContext primaryContext) { + indexShard.updateAllocationIdsFromPrimaryContext(primaryContext); + } + @Override public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final RecoveryState.Translog translog = state().getTranslog(); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 42cf1bc1ce1..34b0df2293f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -49,6 +50,13 @@ public interface RecoveryTargetHandler { */ void ensureClusterStateVersion(long clusterStateVersion); + /** + * Handoff the primary context between the relocation source and the relocation target. + * + * @param primaryContext the primary context from the relocation source + */ + void handoffPrimaryContext(PrimaryContext primaryContext); + /** * Index a set of translog operations on the target * @param operations operations to index diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 414cbd4ea49..14c8f762e6d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -23,6 +23,7 @@ import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -95,7 +96,17 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE, new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + + @Override + public void handoffPrimaryContext(final PrimaryContext primaryContext) { + transportService.submitRequest( + targetNode, + PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT, + new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8811083baa9..e9c89166348 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2022,7 +2022,10 @@ public class InternalEngineTests extends ESTestCase { initialEngine = engine; initialEngine .seqNoService() - .updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet()); + .updateAllocationIdsFromMaster( + randomNonNegativeLong(), + new HashSet<>(Arrays.asList("primary", "replica")), + Collections.emptySet()); for (int op = 0; op < opCount; op++) { final String id; // mostly index, sometimes delete diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index e7518bd5944..7962f23caf0 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -122,6 +122,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } protected class ReplicationGroup implements AutoCloseable, Iterable { + private long clusterStateVersion; private IndexShard primary; private IndexMetaData indexMetaData; private final List replicas; @@ -142,6 +143,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting)); replicas = new ArrayList<>(); this.indexMetaData = indexMetaData; + clusterStateVersion = 1; updateAllocationIDsOnPrimary(); for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) { addReplica(); @@ -222,6 +224,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); primary.recoverFromStore(); primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry())); + clusterStateVersion++; updateAllocationIDsOnPrimary(); for (final IndexShard replica : replicas) { recoverReplica(replica); @@ -241,6 +244,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; replicas.add(replica); + clusterStateVersion++; updateAllocationIDsOnPrimary(); } @@ -255,6 +259,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting)); replicas.add(newReplica); + clusterStateVersion++; updateAllocationIDsOnPrimary(); return newReplica; } @@ -274,6 +279,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase closeShards(primary); primary = replica; primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); + PlainActionFuture fut = new PlainActionFuture<>(); primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard, new ActionListener() { @@ -289,6 +295,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase fut.onFailure(e); } })); + clusterStateVersion++; updateAllocationIDsOnPrimary(); return fut; } @@ -296,6 +303,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase synchronized boolean removeReplica(IndexShard replica) { final boolean removed = replicas.remove(replica); if (removed) { + clusterStateVersion++; updateAllocationIDsOnPrimary(); } return removed; @@ -315,6 +323,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase BiFunction targetSupplier, boolean markAsRecovering) throws IOException { ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering); + clusterStateVersion++; updateAllocationIDsOnPrimary(); } @@ -402,7 +411,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase initializing.add(shard.allocationId().getId()); } } - primary.updateAllocationIdsFromMaster(active, initializing); + primary.updateAllocationIdsFromMaster(clusterStateVersion, active, initializing); } } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java index 61eb4581328..ae4aab107f2 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java @@ -19,9 +19,13 @@ package org.elasticsearch.index.seqno; +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.ObjectLongMap; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -29,7 +33,6 @@ import org.junit.Before; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,11 +46,15 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; public class GlobalCheckpointTrackerTests extends ESTestCase { @@ -79,6 +86,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { } public void testGlobalCheckpointUpdate() { + final long initialClusterStateVersion = randomNonNegativeLong(); Map allocations = new HashMap<>(); Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5); Set active = new HashSet<>(activeWithCheckpoints.keySet()); @@ -107,7 +115,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type); }); - tracker.updateAllocationIdsFromMaster(active, initializing); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing); initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId, tracker.getGlobalCheckpoint())); allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId))); @@ -130,7 +138,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { Set newActive = new HashSet<>(active); newActive.add(extraId); - tracker.updateAllocationIdsFromMaster(newActive, initializing); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActive, initializing); // now notify for the new id tracker.updateLocalCheckpoint(extraId, minLocalCheckpointAfterUpdates + 1 + randomInt(4)); @@ -146,6 +154,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { assigned.putAll(active); assigned.putAll(initializing); tracker.updateAllocationIdsFromMaster( + randomNonNegativeLong(), active.keySet(), initializing.keySet()); randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); @@ -166,7 +175,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { public void testMissingInSyncIdsPreventAdvance() { final Map active = randomAllocationsWithLocalCheckpoints(0, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); - tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); + tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet()); initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); randomSubsetOf(randomInt(initializing.size() - 1), initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId))); @@ -184,7 +193,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final Map active = randomAllocationsWithLocalCheckpoints(1, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); final Map nonApproved = randomAllocationsWithLocalCheckpoints(1, 5); - tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); + tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet()); initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); nonApproved.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); @@ -196,6 +205,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { } public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { + final long initialClusterStateVersion = randomNonNegativeLong(); final Map activeToStay = randomAllocationsWithLocalCheckpoints(1, 5); final Map initializingToStay = randomAllocationsWithLocalCheckpoints(1, 5); final Map activeToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5); @@ -211,7 +221,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { if (randomBoolean()) { allocations.putAll(initializingToBeRemoved); } - tracker.updateAllocationIdsFromMaster(active, initializing); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing); if (randomBoolean()) { initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); } else { @@ -223,11 +233,11 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { // now remove shards if (randomBoolean()) { - tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet()); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, activeToStay.keySet(), initializingToStay.keySet()); allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); } else { allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); - tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet()); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, activeToStay.keySet(), initializingToStay.keySet()); } final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream()) @@ -243,7 +253,8 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final AtomicBoolean complete = new AtomicBoolean(); final String inSyncAllocationId =randomAlphaOfLength(16); final String trackingAllocationId = randomAlphaOfLength(16); - tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); + tracker.updateAllocationIdsFromMaster( + randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint); final Thread thread = new Thread(() -> { try { @@ -291,7 +302,8 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final AtomicBoolean interrupted = new AtomicBoolean(); final String inSyncAllocationId = randomAlphaOfLength(16); final String trackingAllocationId = randomAlphaOfLength(32); - tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); + tracker.updateAllocationIdsFromMaster( + randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint); final Thread thread = new Thread(() -> { try { @@ -329,21 +341,14 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { } public void testUpdateAllocationIdsFromMaster() throws Exception { + final long initialClusterStateVersion = randomNonNegativeLong(); final int numberOfActiveAllocationsIds = randomIntBetween(2, 16); - final Set activeAllocationIds = - IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16)).collect(Collectors.toSet()); final int numberOfInitializingIds = randomIntBetween(2, 16); - final Set initializingIds = - IntStream.range(0, numberOfInitializingIds).mapToObj(i -> { - do { - final String initializingId = randomAlphaOfLength(16); - // ensure we do not duplicate an allocation ID in active and initializing sets - if (!activeAllocationIds.contains(initializingId)) { - return initializingId; - } - } while (true); - }).collect(Collectors.toSet()); - tracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingIds); + final Tuple, Set> activeAndInitializingAllocationIds = + randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingIds = activeAndInitializingAllocationIds.v2(); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingIds); // first we assert that the in-sync and tracking sets are set up correctly assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); @@ -364,7 +369,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final List removingInitializingAllocationIds = randomSubsetOf(initializingIds); final Set newInitializingAllocationIds = initializingIds.stream().filter(a -> !removingInitializingAllocationIds.contains(a)).collect(Collectors.toSet()); - tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActiveAllocationIds, newInitializingAllocationIds); assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); assertTrue(removingActiveAllocationIds.stream().noneMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); @@ -376,7 +381,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { */ newActiveAllocationIds.add(randomAlphaOfLength(32)); newInitializingAllocationIds.add(randomAlphaOfLength(64)); - tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, newActiveAllocationIds, newInitializingAllocationIds); assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); assertTrue( newActiveAllocationIds @@ -416,7 +421,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { // using a different length than we have been using above ensures that we can not collide with a previous allocation ID final String newSyncingAllocationId = randomAlphaOfLength(128); newInitializingAllocationIds.add(newSyncingAllocationId); - tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 3, newActiveAllocationIds, newInitializingAllocationIds); final CyclicBarrier barrier = new CyclicBarrier(2); final Thread thread = new Thread(() -> { try { @@ -450,7 +455,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { * the in-sync set even if we receive a cluster state update that does not reflect this. * */ - tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 4, newActiveAllocationIds, newInitializingAllocationIds); assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId)); assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId)); } @@ -471,7 +476,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final String active = randomAlphaOfLength(16); final String initializing = randomAlphaOfLength(32); - tracker.updateAllocationIdsFromMaster(Collections.singleton(active), Collections.singleton(initializing)); + tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(active), Collections.singleton(initializing)); final CyclicBarrier barrier = new CyclicBarrier(4); @@ -516,7 +521,216 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { markingThread.join(); assertThat(tracker.getGlobalCheckpoint(), equalTo((long) nextActiveLocalCheckpoint)); + } + public void testPrimaryContextOlderThanAppliedClusterState() { + final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE - 1) + 1; + final int numberOfActiveAllocationsIds = randomIntBetween(0, 8); + final int numberOfInitializingIds = randomIntBetween(0, 8); + final Tuple, Set> activeAndInitializingAllocationIds = + randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingAllocationIds = activeAndInitializingAllocationIds.v2(); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds); + + /* + * We are going to establish a primary context from a cluster state version older than the applied cluster state version on the + * tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the + * newer cluster state is a superset of the allocation IDs in the applied cluster state with the caveat that an existing + * initializing allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the + * set of initializing allocation IDs is otherwise arbitrary. + */ + final int numberOfAdditionalInitializingAllocationIds = randomIntBetween(0, 8); + final Set initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds)); + final Set newInitializingAllocationIds = + randomAllocationIdsExcludingExistingIds( + Sets.union(activeAllocationIds, initializingAllocationIds), numberOfAdditionalInitializingAllocationIds); + final Set contextInitializingIds = Sets.union( + new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))), + newInitializingAllocationIds); + + final int numberOfAdditionalActiveAllocationIds = randomIntBetween(0, 8); + final Set contextActiveAllocationIds = Sets.union( + Sets.union( + activeAllocationIds, + randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfAdditionalActiveAllocationIds)), + initializedAllocationIds); + + final ObjectLongMap activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>(); + for (final String allocationId : contextActiveAllocationIds) { + activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); + } + final ObjectLongMap initializingAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>(); + for (final String allocationId : contextInitializingIds) { + initializingAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); + } + + final PrimaryContext primaryContext = new PrimaryContext( + initialClusterStateVersion - randomIntBetween(0, Math.toIntExact(initialClusterStateVersion) - 1), + activeAllocationIdsLocalCheckpoints, + initializingAllocationIdsLocalCheckpoints); + + tracker.updateAllocationIdsFromPrimaryContext(primaryContext); + + // the primary context carries an older cluster state version + assertThat(tracker.appliedClusterStateVersion, equalTo(initialClusterStateVersion)); + + // only existing active allocation IDs and initializing allocation IDs that moved to initialized should be in-sync + assertThat( + Sets.union(activeAllocationIds, initializedAllocationIds), + equalTo( + StreamSupport + .stream(tracker.inSyncLocalCheckpoints.keys().spliterator(), false) + .map(e -> e.value) + .collect(Collectors.toSet()))); + + // the local checkpoints known to the tracker for in-sync shards should match what is known in the primary context + for (final String allocationId : Sets.union(activeAllocationIds, initializedAllocationIds)) { + assertThat( + tracker.inSyncLocalCheckpoints.get(allocationId), equalTo(primaryContext.inSyncLocalCheckpoints().get(allocationId))); + } + + // only existing initializing allocation IDs that did not moved to initialized should be tracked + assertThat( + Sets.difference(initializingAllocationIds, initializedAllocationIds), + equalTo( + StreamSupport + .stream(tracker.trackingLocalCheckpoints.keys().spliterator(), false) + .map(e -> e.value) + .collect(Collectors.toSet()))); + + // the local checkpoints known to the tracker for initializing shards should match what is known in the primary context + for (final String allocationId : Sets.difference(initializingAllocationIds, initializedAllocationIds)) { + if (primaryContext.trackingLocalCheckpoints().containsKey(allocationId)) { + assertThat( + tracker.trackingLocalCheckpoints.get(allocationId), + equalTo(primaryContext.trackingLocalCheckpoints().get(allocationId))); + } else { + assertThat(tracker.trackingLocalCheckpoints.get(allocationId), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } + } + + // the global checkpoint can only be computed from active allocation IDs and initializing allocation IDs that moved to initializing + final long globalCheckpoint = + StreamSupport + .stream(activeAllocationIdsLocalCheckpoints.spliterator(), false) + .filter(e -> tracker.inSyncLocalCheckpoints.containsKey(e.key) || initializedAllocationIds.contains(e.key)) + .mapToLong(e -> e.value) + .min() + .orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO); + assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint)); + } + + public void testPrimaryContextNewerThanAppliedClusterState() { + final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE); + final int numberOfActiveAllocationsIds = randomIntBetween(0, 8); + final int numberOfInitializingIds = randomIntBetween(0, 8); + final Tuple, Set> activeAndInitializingAllocationIds = + randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingAllocationIds = activeAndInitializingAllocationIds.v2(); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds); + + /* + * We are going to establish a primary context from a cluster state version older than the applied cluster state version on the + * tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the + * newer cluster state is a subset of the allocation IDs in the applied cluster state with the caveat that an existing initializing + * allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the set of + * initializing allocation IDs is otherwise arbitrary. + */ + final int numberOfNewInitializingAllocationIds = randomIntBetween(0, 8); + final Set initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds)); + final Set newInitializingAllocationIds = + randomAllocationIdsExcludingExistingIds( + Sets.union(activeAllocationIds, initializingAllocationIds), numberOfNewInitializingAllocationIds); + + final ObjectLongMap activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>(); + for (final String allocationId : Sets.union(new HashSet<>(randomSubsetOf(activeAllocationIds)), initializedAllocationIds)) { + activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); + } + final ObjectLongMap initializingIdsLocalCheckpoints = new ObjectLongHashMap<>(); + final Set contextInitializingAllocationIds = Sets.union( + new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))), + newInitializingAllocationIds); + for (final String allocationId : contextInitializingAllocationIds) { + initializingIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); + } + + final PrimaryContext primaryContext = + new PrimaryContext( + initialClusterStateVersion + randomIntBetween(0, Integer.MAX_VALUE) + 1, + activeAllocationIdsLocalCheckpoints, + initializingIdsLocalCheckpoints); + + tracker.updateAllocationIdsFromPrimaryContext(primaryContext); + + final PrimaryContext trackerPrimaryContext = tracker.primaryContext(); + try { + assertTrue(tracker.sealed()); + final long globalCheckpoint = + StreamSupport + .stream(activeAllocationIdsLocalCheckpoints.values().spliterator(), false) + .mapToLong(e -> e.value) + .min() + .orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO); + + // the primary context contains knowledge of the state of the entire universe + assertThat(primaryContext.clusterStateVersion(), equalTo(trackerPrimaryContext.clusterStateVersion())); + assertThat(primaryContext.inSyncLocalCheckpoints(), equalTo(trackerPrimaryContext.inSyncLocalCheckpoints())); + assertThat(primaryContext.trackingLocalCheckpoints(), equalTo(trackerPrimaryContext.trackingLocalCheckpoints())); + assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint)); + } finally { + tracker.releasePrimaryContext(); + assertFalse(tracker.sealed()); + } + } + + public void testPrimaryContextSealing() { + // the tracker should start in the state of not being sealed + assertFalse(tracker.sealed()); + + // sampling the primary context should seal the tracker + tracker.primaryContext(); + assertTrue(tracker.sealed()); + + // invoking any method that mutates the state of the tracker should fail + assertIllegalStateExceptionWhenSealed(() -> tracker.updateLocalCheckpoint(randomAlphaOfLength(16), randomNonNegativeLong())); + assertIllegalStateExceptionWhenSealed(() -> tracker.updateGlobalCheckpointOnReplica(randomNonNegativeLong())); + assertIllegalStateExceptionWhenSealed( + () -> tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.emptySet(), Collections.emptySet())); + assertIllegalStateExceptionWhenSealed(() -> tracker.updateAllocationIdsFromPrimaryContext(mock(PrimaryContext.class))); + assertIllegalStateExceptionWhenSealed(() -> tracker.primaryContext()); + assertIllegalStateExceptionWhenSealed(() -> tracker.markAllocationIdAsInSync(randomAlphaOfLength(16), randomNonNegativeLong())); + + // closing the releasable should unseal the tracker + tracker.releasePrimaryContext(); + assertFalse(tracker.sealed()); + } + + private void assertIllegalStateExceptionWhenSealed(final ThrowingRunnable runnable) { + final IllegalStateException e = expectThrows(IllegalStateException.class, runnable); + assertThat(e, hasToString(containsString("global checkpoint tracker is sealed"))); + } + + private Tuple, Set> randomActiveAndInitializingAllocationIds( + final int numberOfActiveAllocationsIds, + final int numberOfInitializingIds) { + final Set activeAllocationIds = + IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16) + i).collect(Collectors.toSet()); + final Set initializingIds = randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfInitializingIds); + return Tuple.tuple(activeAllocationIds, initializingIds); + } + + private Set randomAllocationIdsExcludingExistingIds(final Set existingAllocationIds, final int numberOfAllocationIds) { + return IntStream.range(0, numberOfAllocationIds).mapToObj(i -> { + do { + final String newAllocationId = randomAlphaOfLength(16); + // ensure we do not duplicate an allocation ID + if (!existingAllocationIds.contains(newAllocationId)) { + return newAllocationId + i; + } + } while (true); + }).collect(Collectors.toSet()); } private void markAllocationIdAsInSyncQuietly( diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index cc837a0afe1..a341c268987 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -536,7 +536,7 @@ public class IndexShardTests extends IndexShardTestCase { routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); indexShard.updateRoutingEntry(routing); - indexShard.relocated("test"); + indexShard.relocated("test", primaryContext -> {}); engineClosed = false; break; } @@ -551,7 +551,7 @@ public class IndexShardTests extends IndexShardTestCase { if (shardRouting.primary() == false) { final IllegalStateException e = expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX)); - assertThat(e, hasToString(containsString("shard is not a primary"))); + assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary"))); } final long primaryTerm = indexShard.getPrimaryTerm(); @@ -1042,7 +1042,7 @@ public class IndexShardTests extends IndexShardTestCase { Thread recoveryThread = new Thread(() -> { latch.countDown(); try { - shard.relocated("simulated recovery"); + shard.relocated("simulated recovery", primaryContext -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1071,7 +1071,7 @@ public class IndexShardTests extends IndexShardTestCase { shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); Thread recoveryThread = new Thread(() -> { try { - shard.relocated("simulated recovery"); + shard.relocated("simulated recovery", primaryContext -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1124,7 +1124,7 @@ public class IndexShardTests extends IndexShardTestCase { AtomicBoolean relocated = new AtomicBoolean(); final Thread recoveryThread = new Thread(() -> { try { - shard.relocated("simulated recovery"); + shard.relocated("simulated recovery", primaryContext -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1158,7 +1158,7 @@ public class IndexShardTests extends IndexShardTestCase { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node")); - shard.relocated("test"); + shard.relocated("test", primaryContext -> {}); expectThrows(IllegalIndexShardStateException.class, () -> shard.updateRoutingEntry(originalRouting)); closeShards(shard); } @@ -1168,7 +1168,7 @@ public class IndexShardTests extends IndexShardTestCase { final ShardRouting originalRouting = shard.routingEntry(); shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node")); shard.updateRoutingEntry(originalRouting); - expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test")); + expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test", primaryContext -> {})); closeShards(shard); } @@ -1187,7 +1187,7 @@ public class IndexShardTests extends IndexShardTestCase { @Override protected void doRun() throws Exception { cyclicBarrier.await(); - shard.relocated("test"); + shard.relocated("test", primaryContext -> {}); } }); relocationThread.start(); @@ -1362,7 +1362,7 @@ public class IndexShardTests extends IndexShardTestCase { assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); shard.updateRoutingEntry(inRecoveryRouting); - shard.relocated("simulate mark as relocated"); + shard.relocated("simulate mark as relocated", primaryContext -> {}); assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); try { shard.updateRoutingEntry(origRouting); diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index d19a51e6271..bf0b7286740 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -62,7 +62,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1; String allocationId = shard.routingEntry().allocationId().getId(); - shard.updateAllocationIdsFromMaster(Collections.singleton(allocationId), Collections.emptySet()); + shard.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(allocationId), Collections.emptySet()); shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint); assertEquals(globalCheckPoint, shard.getGlobalCheckpoint()); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 7a53f8f9f59..73e2d7eb0bd 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -322,6 +322,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC * Mock for {@link IndexShard} */ protected class MockIndexShard implements IndicesClusterStateService.Shard { + private volatile long clusterStateVersion; private volatile ShardRouting shardRouting; private volatile RecoveryState recoveryState; private volatile Set activeAllocationIds; @@ -372,7 +373,9 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC } @Override - public void updateAllocationIdsFromMaster(Set activeAllocationIds, Set initializingAllocationIds) { + public void updateAllocationIdsFromMaster( + long applyingClusterStateVersion, Set activeAllocationIds, Set initializingAllocationIds) { + this.clusterStateVersion = applyingClusterStateVersion; this.activeAllocationIds = activeAllocationIds; this.initializingAllocationIds = initializingAllocationIds; } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index e73bd8a9497..5532ad040f2 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -35,6 +35,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; @@ -76,6 +77,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -453,8 +455,14 @@ public class RecoverySourceHandlerTests extends ESTestCase { relocated.set(true); assertTrue(recoveriesDelayed.get()); return null; - }).when(shard).relocated(any(String.class)); + }).when(shard).relocated(any(String.class), any(Consumer.class)); when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + final ActionListener listener = (ActionListener)invocationOnMock.getArguments()[0]; + listener.onResponse(() -> {}); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(ActionListener.class), any(String.class)); // final Engine.IndexCommitRef indexCommitRef = mock(Engine.IndexCommitRef.class); // when(shard.acquireIndexCommit(anyBoolean())).thenReturn(indexCommitRef); diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java index e1a7a07448f..b0d25f43bd6 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java @@ -53,7 +53,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllS import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; -@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE") +@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.index.seqno:TRACE,org.elasticsearch.indices.recovery:TRACE") public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { private final Logger logger = Loggers.getLogger(RecoveryWhileUnderLoadIT.class); diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index fe83847bff2..48f6fdeaedb 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -514,14 +514,6 @@ public class RelocationIT extends ESIntegTestCase { // refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down client().admin().indices().prepareRefresh("test").get(); - /* - * We have to execute a second refresh as in the face of relocations, the relocation target is not aware of the in-sync set and so - * the first refresh would bring back the local checkpoint for any shards added to the in-sync set that the relocation target was - * not tracking. - */ - // TODO: remove this after a primary context is transferred during relocation handoff - client().admin().indices().prepareRefresh("test").get(); - } class RecoveryCorruption extends MockTransportService.DelegateTransport {