diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 71594dc1ec6..da90091bff6 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1027,7 +1027,7 @@ public abstract class TransportReplicationAction< localCheckpoint = in.readZLong(); } else { // 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing. - localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + localCheckpoint = SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT; } } @@ -1202,6 +1202,8 @@ public abstract class TransportReplicationAction< super.readFrom(in); if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { globalCheckpoint = in.readZLong(); + } else { + globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java index a669065d32b..c8a77889242 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java @@ -19,23 +19,20 @@ package org.elasticsearch.index.seqno; -import com.carrotsearch.hppc.ObjectLongHashMap; -import com.carrotsearch.hppc.ObjectLongMap; -import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; -import java.util.Arrays; -import java.util.Comparator; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; -import java.util.List; -import java.util.Locale; +import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; /** * This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or @@ -48,39 +45,172 @@ import java.util.stream.StreamSupport; */ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { + /** + * The global checkpoint tracker can operate in two modes: + * - primary: this shard is in charge of collecting local checkpoint information from all shard copies and computing the global + * checkpoint based on the local checkpoints of all in-sync shard copies. + * - replica: this shard receives global checkpoint information from the primary (see {@link #updateGlobalCheckpointOnReplica}). + * + * When a shard is initialized (be it a primary or replica), it initially operates in replica mode. The global checkpoint tracker is + * then switched to primary mode in the following three scenarios: + * + * - An initializing primary shard that is not a relocation target is moved to primary mode (using {@link #activatePrimaryMode}) once + * the shard becomes active. + * - An active replica shard is moved to primary mode (using {@link #activatePrimaryMode}) once it is promoted to primary. + * - A primary relocation target is moved to primary mode (using {@link #activateWithPrimaryContext}) during the primary relocation + * handoff. If the target shard is successfully initialized in primary mode, the source shard of a primary relocation is then moved + * to replica mode (using {@link #completeRelocationHandoff}), as the relocation target will be in charge of the global checkpoint + * computation from that point on. + */ + boolean primaryMode; + /** + * Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff} + * and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the + * handoff was successful or not. During the handoff, which has as main objective to transfer the internal state of the global + * checkpoint tracker from the relocation source to the target, the list of in-sync shard copies cannot grow, otherwise the relocation + * target might miss this information and increase the global checkpoint to eagerly. As consequence, some of the methods in this class + * are not allowed to be called while a handoff is in progress, in particular {@link #markAllocationIdAsInSync}. + * + * A notable exception to this is the method {@link #updateFromMaster}, which is still allowed to be called during a relocation handoff. + * The reason for this is that the handoff might fail and can be aborted (using {@link #abortRelocationHandoff}), in which case + * it is important that the global checkpoint tracker does not miss any state updates that might happened during the handoff attempt. + * This means, however, that the global checkpoint can still advance after the primary relocation handoff has been initiated, but only + * because the master could have failed some of the in-sync shard copies and marked them as stale. That is ok though, as this + * information is conveyed through cluster state updates, and the new primary relocation target will also eventually learn about those. + */ + boolean handoffInProgress; + + /** + * The global checkpoint tracker relies on the property that cluster state updates are applied in-order. After transferring a primary + * context from the primary relocation source to the target and initializing the target, it is possible for the target to apply a + * cluster state that is older than the one upon which the primary context was based. If we allowed this old cluster state + * to influence the list of in-sync shard copies here, this could possibly remove such an in-sync copy from the internal structures + * until the newer cluster state were to be applied, which would unsafely advance the global checkpoint. This field thus captures + * the version of the last applied cluster state to ensure in-order updates. + */ long appliedClusterStateVersion; - /* - * This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed - * through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by - * allocation IDs. All accesses to this set are guarded by a lock on this. + /** + * Local checkpoint information for all shard copies that are tracked. Has an entry for all shard copies that are either initializing + * and / or in-sync, possibly also containing information about unassigned in-sync shard copies. The information that is tracked for + * each shard copy is explained in the docs for the {@link LocalCheckpointState} class. */ - final ObjectLongMap inSyncLocalCheckpoints; + final Map localCheckpoints; - /* - * This map holds the last known local checkpoint for initializing shards that are undergoing recovery. Such shards do not participate - * in determining the global checkpoint. We must track these local checkpoints so that when a shard is activated we use the highest - * known checkpoint. - */ - final ObjectLongMap trackingLocalCheckpoints; - - /* + /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the * current global checkpoint. */ final Set pendingInSync; - /* - * The current global checkpoint for this shard. Note that this field is guarded by a lock on this and thus this field does not need to - * be volatile. + /** + * The global checkpoint: + * - computed based on local checkpoints, if the tracker is in primary mode + * - received from the primary, if the tracker is in replica mode */ - private long globalCheckpoint; + long globalCheckpoint; - /* - * During relocation handoff, the state of the global checkpoint tracker is sampled. After sampling, there should be no additional - * mutations to this tracker until the handoff has completed. + public static class LocalCheckpointState implements Writeable { + + /** + * the last local checkpoint information that we have for this shard + */ + long localCheckpoint; + /** + * whether this shard is treated as in-sync and thus contributes to the global checkpoint calculation + */ + boolean inSync; + + public LocalCheckpointState(long localCheckpoint, boolean inSync) { + this.localCheckpoint = localCheckpoint; + this.inSync = inSync; + } + + public LocalCheckpointState(StreamInput in) throws IOException { + this.localCheckpoint = in.readZLong(); + this.inSync = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeZLong(localCheckpoint); + out.writeBoolean(inSync); + } + + /** + * Returns a full copy of this object + */ + public LocalCheckpointState copy() { + return new LocalCheckpointState(localCheckpoint, inSync); + } + + public long getLocalCheckpoint() { + return localCheckpoint; + } + + @Override + public String toString() { + return "LocalCheckpointState{" + + "localCheckpoint=" + localCheckpoint + + ", inSync=" + inSync + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LocalCheckpointState that = (LocalCheckpointState) o; + + if (localCheckpoint != that.localCheckpoint) return false; + return inSync == that.inSync; + } + + @Override + public int hashCode() { + int result = (int) (localCheckpoint ^ (localCheckpoint >>> 32)); + result = 31 * result + (inSync ? 1 : 0); + return result; + } + } + + /** + * Class invariant that should hold before and after every invocation of public methods on this class. As Java lacks implication + * as a logical operator, many of the invariants are written under the form (!A || B), they should be read as (A implies B) however. */ - private boolean sealed = false; + private boolean invariant() { + // local checkpoints only set during primary mode + assert primaryMode || localCheckpoints.values().stream() + .allMatch(lcps -> lcps.localCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO || + lcps.localCheckpoint == SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT); + + // relocation handoff can only occur in primary mode + assert !handoffInProgress || primaryMode; + + // there is at least one in-sync shard copy when the global checkpoint tracker operates in primary mode (i.e. the shard itself) + assert !primaryMode || localCheckpoints.values().stream().anyMatch(lcps -> lcps.inSync); + + // during relocation handoff there are no entries blocking global checkpoint advancement + assert !handoffInProgress || pendingInSync.isEmpty() : + "entries blocking global checkpoint advancement during relocation handoff: " + pendingInSync; + + // entries blocking global checkpoint advancement can only exist in primary mode and when not having a relocation handoff + assert pendingInSync.isEmpty() || (primaryMode && !handoffInProgress); + + // the computed global checkpoint is always up-to-date + assert !primaryMode || globalCheckpoint == computeGlobalCheckpoint(pendingInSync, localCheckpoints.values(), globalCheckpoint) : + "global checkpoint is not up-to-date, expected: " + + computeGlobalCheckpoint(pendingInSync, localCheckpoints.values(), globalCheckpoint) + " but was: " + globalCheckpoint; + + for (Map.Entry entry : localCheckpoints.entrySet()) { + // blocking global checkpoint advancement only happens for shards that are not in-sync + assert !pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync : + "shard copy " + entry.getKey() + " blocks global checkpoint advancement but is in-sync"; + } + + return true; + } /** * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or @@ -93,109 +223,13 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbersService.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; - this.inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); - this.trackingLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); + this.primaryMode = false; + this.handoffInProgress = false; + this.appliedClusterStateVersion = -1L; this.globalCheckpoint = globalCheckpoint; + this.localCheckpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas()); this.pendingInSync = new HashSet<>(); - } - - /** - * Notifies the service to update the local checkpoint for the shard with the provided allocation ID. If the checkpoint is lower than - * the currently known one, this is a no-op. If the allocation ID is not tracked, it is ignored. This is to prevent late arrivals from - * shards that are removed to be re-added. - * - * @param allocationId the allocation ID of the shard to update the local checkpoint for - * @param localCheckpoint the local checkpoint for the shard - */ - public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) { - if (sealed) { - throw new IllegalStateException("global checkpoint tracker is sealed"); - } - final boolean updated; - if (updateLocalCheckpoint(allocationId, localCheckpoint, inSyncLocalCheckpoints, "in-sync")) { - updated = true; - updateGlobalCheckpointOnPrimary(); - } else if (updateLocalCheckpoint(allocationId, localCheckpoint, trackingLocalCheckpoints, "tracking")) { - updated = true; - } else { - logger.trace("ignored local checkpoint [{}] of [{}], allocation ID is not tracked", localCheckpoint, allocationId); - updated = false; - } - if (updated) { - notifyAllWaiters(); - } - } - - /** - * Notify all threads waiting on the monitor on this tracker. These threads should be waiting for the local checkpoint on a specific - * allocation ID to catch up to the global checkpoint. - */ - @SuppressForbidden(reason = "Object#notifyAll waiters for local checkpoint advancement") - private synchronized void notifyAllWaiters() { - this.notifyAll(); - } - - /** - * Update the local checkpoint for the specified allocation ID in the specified tracking map. If the checkpoint is lower than the - * currently known one, this is a no-op. If the allocation ID is not tracked, it is ignored. - * - * @param allocationId the allocation ID of the shard to update the local checkpoint for - * @param localCheckpoint the local checkpoint for the shard - * @param map the tracking map - * @param reason the reason for the update (used for logging) - * @return {@code true} if the local checkpoint was updated, otherwise {@code false} if this was a no-op - */ - private boolean updateLocalCheckpoint( - final String allocationId, final long localCheckpoint, ObjectLongMap map, final String reason) { - final int index = map.indexOf(allocationId); - if (index >= 0) { - final long current = map.indexGet(index); - if (current < localCheckpoint) { - map.indexReplace(index, localCheckpoint); - logger.trace("updated local checkpoint of [{}] in [{}] from [{}] to [{}]", allocationId, reason, current, localCheckpoint); - } else { - logger.trace( - "skipped updating local checkpoint of [{}] in [{}] from [{}] to [{}], current checkpoint is higher", - allocationId, - reason, - current, - localCheckpoint); - } - return true; - } else { - return false; - } - } - - /** - * Scans through the currently known local checkpoint and updates the global checkpoint accordingly. - */ - private synchronized void updateGlobalCheckpointOnPrimary() { - long minLocalCheckpoint = Long.MAX_VALUE; - if (inSyncLocalCheckpoints.isEmpty() || !pendingInSync.isEmpty()) { - return; - } - for (final ObjectLongCursor localCheckpoint : inSyncLocalCheckpoints) { - if (localCheckpoint.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) { - logger.trace("unknown local checkpoint for active allocation ID [{}], requesting a sync", localCheckpoint.key); - return; - } - minLocalCheckpoint = Math.min(localCheckpoint.value, minLocalCheckpoint); - } - assert minLocalCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO : "new global checkpoint must be assigned"; - if (minLocalCheckpoint < globalCheckpoint) { - final String message = - String.format( - Locale.ROOT, - "new global checkpoint [%d] is lower than previous one [%d]", - minLocalCheckpoint, - globalCheckpoint); - throw new IllegalStateException(message); - } - if (globalCheckpoint != minLocalCheckpoint) { - logger.trace("global checkpoint updated to [{}]", minLocalCheckpoint); - globalCheckpoint = minLocalCheckpoint; - } + assert invariant(); } /** @@ -212,7 +246,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * * @param globalCheckpoint the global checkpoint */ - synchronized void updateGlobalCheckpointOnReplica(final long globalCheckpoint) { + public synchronized void updateGlobalCheckpointOnReplica(final long globalCheckpoint) { + assert invariant(); + assert primaryMode == false; /* * The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary * information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other @@ -222,204 +258,93 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { this.globalCheckpoint = globalCheckpoint; logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint); } + assert invariant(); } /** - * Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed. + * Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion. + */ + public synchronized void activatePrimaryMode(final String allocationId, final long localCheckpoint) { + assert invariant(); + assert primaryMode == false; + assert localCheckpoints.get(allocationId) != null && localCheckpoints.get(allocationId).inSync && + localCheckpoints.get(allocationId).localCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO : + "expected " + allocationId + " to have initialized entry in " + localCheckpoints + " when activating primary"; + assert localCheckpoint >= SequenceNumbersService.NO_OPS_PERFORMED; + primaryMode = true; + updateLocalCheckpoint(allocationId, localCheckpoints.get(allocationId), localCheckpoint); + updateGlobalCheckpointOnPrimary(); + assert invariant(); + } + + /** + * Notifies the tracker of the current allocation IDs in the cluster state. * * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master - * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param inSyncAllocationIds the allocation IDs of the currently in-sync shard copies * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param pre60AllocationIds the allocation IDs of shards that are allocated to pre-6.0 nodes */ - public synchronized void updateAllocationIdsFromMaster( - final long applyingClusterStateVersion, final Set activeAllocationIds, final Set initializingAllocationIds) { - if (applyingClusterStateVersion < appliedClusterStateVersion) { - return; - } + public synchronized void updateFromMaster(final long applyingClusterStateVersion, final Set inSyncAllocationIds, + final Set initializingAllocationIds, final Set pre60AllocationIds) { + assert invariant(); + if (applyingClusterStateVersion > appliedClusterStateVersion) { + // check that the master does not fabricate new in-sync entries out of thin air once we are in primary mode + assert !primaryMode || inSyncAllocationIds.stream().allMatch( + inSyncId -> localCheckpoints.containsKey(inSyncId) && localCheckpoints.get(inSyncId).inSync) : + "update from master in primary mode contains in-sync ids " + inSyncAllocationIds + + " that have no matching entries in " + localCheckpoints; + // remove entries which don't exist on master + boolean removedEntries = localCheckpoints.keySet().removeIf( + aid -> !inSyncAllocationIds.contains(aid) && !initializingAllocationIds.contains(aid)); - appliedClusterStateVersion = applyingClusterStateVersion; - - // remove shards whose allocation ID no longer exists - inSyncLocalCheckpoints.removeAll(a -> !activeAllocationIds.contains(a) && !initializingAllocationIds.contains(a)); - - // add any new active allocation IDs - for (final String a : activeAllocationIds) { - if (!inSyncLocalCheckpoints.containsKey(a)) { - final long localCheckpoint = trackingLocalCheckpoints.getOrDefault(a, SequenceNumbersService.UNASSIGNED_SEQ_NO); - inSyncLocalCheckpoints.put(a, localCheckpoint); - logger.trace("marked [{}] as in-sync with local checkpoint [{}] via cluster state update from master", a, localCheckpoint); + if (primaryMode) { + // add new initializingIds that are missing locally. These are fresh shard copies - and not in-sync + for (String initializingId : initializingAllocationIds) { + if (localCheckpoints.containsKey(initializingId) == false) { + final boolean inSync = inSyncAllocationIds.contains(initializingId); + assert inSync == false : "update from master in primary mode has " + initializingId + + " as in-sync but it does not exist locally"; + final long localCheckpoint = pre60AllocationIds.contains(initializingId) ? + SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT : SequenceNumbersService.UNASSIGNED_SEQ_NO; + localCheckpoints.put(initializingId, new LocalCheckpointState(localCheckpoint, inSync)); + } + } + } else { + for (String initializingId : initializingAllocationIds) { + final long localCheckpoint = pre60AllocationIds.contains(initializingId) ? + SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT : SequenceNumbersService.UNASSIGNED_SEQ_NO; + localCheckpoints.put(initializingId, new LocalCheckpointState(localCheckpoint, false)); + } + for (String inSyncId : inSyncAllocationIds) { + final long localCheckpoint = pre60AllocationIds.contains(inSyncId) ? + SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT : SequenceNumbersService.UNASSIGNED_SEQ_NO; + localCheckpoints.put(inSyncId, new LocalCheckpointState(localCheckpoint, true)); + } } - } - - trackingLocalCheckpoints.removeAll(a -> !initializingAllocationIds.contains(a)); - for (final String a : initializingAllocationIds) { - if (inSyncLocalCheckpoints.containsKey(a)) { - /* - * This can happen if we mark the allocation ID as in sync at the end of recovery before seeing a cluster state update from - * marking the shard as active. - */ - continue; - } - if (trackingLocalCheckpoints.containsKey(a)) { - // we are already tracking this allocation ID - continue; - } - // this is a new allocation ID - trackingLocalCheckpoints.put(a, SequenceNumbersService.UNASSIGNED_SEQ_NO); - logger.trace("tracking [{}] via cluster state update from master", a); - } - - updateGlobalCheckpointOnPrimary(); - } - - /** - * Get the primary context for the shard. This includes the state of the global checkpoint tracker. - * - * @return the primary context - */ - synchronized PrimaryContext primaryContext() { - if (sealed) { - throw new IllegalStateException("global checkpoint tracker is sealed"); - } - sealed = true; - final ObjectLongMap inSyncLocalCheckpoints = new ObjectLongHashMap<>(this.inSyncLocalCheckpoints); - final ObjectLongMap trackingLocalCheckpoints = new ObjectLongHashMap<>(this.trackingLocalCheckpoints); - return new PrimaryContext(appliedClusterStateVersion, inSyncLocalCheckpoints, trackingLocalCheckpoints); - } - - /** - * Releases a previously acquired primary context. - */ - synchronized void releasePrimaryContext() { - assert sealed; - sealed = false; - } - - /** - * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. - * - * @param primaryContext the primary context - */ - synchronized void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { - if (sealed) { - throw new IllegalStateException("global checkpoint tracker is sealed"); - } - /* - * We are gathered here today to witness the relocation handoff transferring knowledge from the relocation source to the relocation - * target. We need to consider the possibility that the version of the cluster state on the relocation source when the primary - * context was sampled is different than the version of the cluster state on the relocation target at this exact moment. We define - * the following values: - * - version(source) = the cluster state version on the relocation source used to ensure a minimum cluster state version on the - * relocation target - * - version(context) = the cluster state version on the relocation source when the primary context was sampled - * - version(target) = the current cluster state version on the relocation target - * - * We know that version(source) <= version(target) and version(context) < version(target), version(context) = version(target), and - * version(target) < version(context) are all possibilities. - * - * The case of version(context) = version(target) causes no issues as in this case the knowledge of the in-sync and initializing - * shards the target receives from the master will be equal to the knowledge of the in-sync and initializing shards the target - * receives from the relocation source via the primary context. - * - * Let us now consider the case that version(context) < version(target). In this case, the active allocation IDs in the primary - * context can be a superset of the active allocation IDs contained in the applied cluster state. This is because no new shards can - * have been started as marking a shard as in-sync is blocked during relocation handoff. Note however that the relocation target - * itself will have been marked in-sync during recovery and therefore is an active allocation ID from the perspective of the primary - * context. - * - * Finally, we consider the case that version(target) < version(context). In this case, the active allocation IDs in the primary - * context can be a subset of the active allocation IDs contained the applied cluster state. This is again because no new shards can - * have been started. Moreover, existing active allocation IDs could have been removed from the cluster state. - * - * In each of these latter two cases, consider initializing shards that are contained in the primary context but not contained in - * the cluster state applied on the target. - * - * If version(context) < version(target) it means that the shard has been removed by a later cluster state update that is already - * applied on the target and we only need to ensure that we do not add it to the tracking map on the target. The call to - * GlobalCheckpointTracker#updateLocalCheckpoint(String, long) is a no-op for such shards and this is safe. - * - * If version(target) < version(context) it means that the shard has started initializing by a later cluster state update has not - * yet arrived on the target. However, there is a delay on recoveries before we ensure that version(source) <= version(target). - * Therefore, such a shard can never initialize from the relocation source and will have to await the handoff completing. As such, - * these shards are not problematic. - * - * Lastly, again in these two cases, what about initializing shards that are contained in cluster state applied on the target but - * not contained in the cluster state applied on the target. - * - * If version(context) < version(target) it means that a shard has started initializing by a later cluster state that is applied on - * the target but not yet known to what would be the relocation source. As recoveries are delayed at this time, these shards can not - * cause a problem and we do not mutate remove these shards from the tracking map, so we are safe here. - * - * If version(target) < version(context) it means that a shard has started initializing but was removed by a later cluster state. In - * this case, as the cluster state version on the primary context exceeds the applied cluster state version, we replace the tracking - * map and are safe here too. - */ - - assert StreamSupport - .stream(inSyncLocalCheckpoints.spliterator(), false) - .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : inSyncLocalCheckpoints; - assert StreamSupport - .stream(trackingLocalCheckpoints.spliterator(), false) - .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : trackingLocalCheckpoints; - assert pendingInSync.isEmpty() : pendingInSync; - - if (primaryContext.clusterStateVersion() > appliedClusterStateVersion) { - final Set activeAllocationIds = - new HashSet<>(Arrays.asList(primaryContext.inSyncLocalCheckpoints().keys().toArray(String.class))); - final Set initializingAllocationIds = - new HashSet<>(Arrays.asList(primaryContext.trackingLocalCheckpoints().keys().toArray(String.class))); - updateAllocationIdsFromMaster(primaryContext.clusterStateVersion(), activeAllocationIds, initializingAllocationIds); - } - - /* - * As we are updating the local checkpoints for the in-sync allocation IDs, the global checkpoint will advance in place; this means - * that we have to sort the incoming local checkpoints from smallest to largest lest we violate that the global checkpoint does not - * regress. - */ - - class AllocationIdLocalCheckpointPair { - - private final String allocationId; - - public String allocationId() { - return allocationId; - } - - private final long localCheckpoint; - - public long localCheckpoint() { - return localCheckpoint; - } - - private AllocationIdLocalCheckpointPair(final String allocationId, final long localCheckpoint) { - this.allocationId = allocationId; - this.localCheckpoint = localCheckpoint; - } - - } - - final List inSync = - StreamSupport - .stream(primaryContext.inSyncLocalCheckpoints().spliterator(), false) - .map(e -> new AllocationIdLocalCheckpointPair(e.key, e.value)) - .collect(Collectors.toList()); - inSync.sort(Comparator.comparingLong(AllocationIdLocalCheckpointPair::localCheckpoint)); - - for (final AllocationIdLocalCheckpointPair cursor : inSync) { - assert cursor.localCheckpoint() >= globalCheckpoint - : "local checkpoint [" + cursor.localCheckpoint() + "] " - + "for allocation ID [" + cursor.allocationId() + "] " - + "violates being at least the global checkpoint [" + globalCheckpoint + "]"; - updateLocalCheckpoint(cursor.allocationId(), cursor.localCheckpoint()); - if (trackingLocalCheckpoints.containsKey(cursor.allocationId())) { - moveAllocationIdFromTrackingToInSync(cursor.allocationId(), "relocation"); + appliedClusterStateVersion = applyingClusterStateVersion; + if (primaryMode && removedEntries) { updateGlobalCheckpointOnPrimary(); } } + assert invariant(); + } - for (final ObjectLongCursor cursor : primaryContext.trackingLocalCheckpoints()) { - updateLocalCheckpoint(cursor.key, cursor.value); + /** + * Called when the recovery process for a shard is ready to open the engine on the target shard. Ensures that the right data structures + * have been set up locally to track local checkpoint information for the shard. + * + * @param allocationId the allocation ID of the shard for which recovery was initiated + */ + public synchronized void initiateTracking(final String allocationId) { + assert invariant(); + assert primaryMode; + LocalCheckpointState lcps = localCheckpoints.get(allocationId); + if (lcps == null) { + // can happen if replica was removed from cluster but recovery process is unaware of it yet + throw new IllegalStateException("no local checkpoint tracking information available"); } + assert invariant(); } /** @@ -428,77 +353,253 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * * @param allocationId the allocation ID of the shard to mark as in-sync * @param localCheckpoint the current local checkpoint on the shard - * - * @throws InterruptedException if the thread is interrupted waiting for the local checkpoint on the shard to advance */ public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { - if (sealed) { - throw new IllegalStateException("global checkpoint tracker is sealed"); + assert invariant(); + assert primaryMode; + assert handoffInProgress == false; + LocalCheckpointState lcps = localCheckpoints.get(allocationId); + if (lcps == null) { + // can happen if replica was removed from cluster but recovery process is unaware of it yet + throw new IllegalStateException("no local checkpoint tracking information available for " + allocationId); } - if (!trackingLocalCheckpoints.containsKey(allocationId)) { - /* - * This can happen if the recovery target has been failed and the cluster state update from the master has triggered removing - * this allocation ID from the tracking map but this recovery thread has not yet been made aware that the recovery is - * cancelled. - */ - return; - } - - updateLocalCheckpoint(allocationId, localCheckpoint, trackingLocalCheckpoints, "tracking"); - if (!pendingInSync.add(allocationId)) { - throw new IllegalStateException("there is already a pending sync in progress for allocation ID [" + allocationId + "]"); - } - try { - waitForAllocationIdToBeInSync(allocationId); - } finally { - pendingInSync.remove(allocationId); + assert localCheckpoint >= SequenceNumbersService.NO_OPS_PERFORMED : + "expected known local checkpoint for " + allocationId + " but was " + localCheckpoint; + assert pendingInSync.contains(allocationId) == false : "shard copy " + allocationId + " is already marked as pending in-sync"; + updateLocalCheckpoint(allocationId, lcps, localCheckpoint); + // if it was already in-sync (because of a previously failed recovery attempt), global checkpoint must have been + // stuck from advancing + assert !lcps.inSync || (lcps.localCheckpoint >= globalCheckpoint) : + "shard copy " + allocationId + " that's already in-sync should have a local checkpoint " + lcps.localCheckpoint + + " that's above the global checkpoint " + globalCheckpoint; + if (lcps.localCheckpoint < globalCheckpoint) { + pendingInSync.add(allocationId); + try { + while (true) { + if (pendingInSync.contains(allocationId)) { + waitForLocalCheckpointToAdvance(); + } else { + break; + } + } + } finally { + pendingInSync.remove(allocationId); + } + } else { + lcps.inSync = true; + logger.trace("marked [{}] as in-sync", allocationId); updateGlobalCheckpointOnPrimary(); } + + assert invariant(); } - /** - * Wait for knowledge of the local checkpoint for the specified allocation ID to advance to the global checkpoint. Global checkpoint - * advancement is blocked while there are any allocation IDs waiting to catch up to the global checkpoint. - * - * @param allocationId the allocation ID - * @throws InterruptedException if this thread was interrupted before of during waiting - */ - private synchronized void waitForAllocationIdToBeInSync(final String allocationId) throws InterruptedException { - while (true) { - /* - * If the allocation has been cancelled and so removed from the tracking map from a cluster state update from the master it - * means that this recovery will be cancelled; we are here on a cancellable recovery thread and so this thread will throw an - * interrupted exception as soon as it tries to wait on the monitor. - */ - final long current = trackingLocalCheckpoints.getOrDefault(allocationId, Long.MIN_VALUE); - if (current >= globalCheckpoint) { - /* - * This is prematurely adding the allocation ID to the in-sync map as at this point recovery is not yet finished and could - * still abort. At this point we will end up with a shard in the in-sync map holding back the global checkpoint because the - * shard never recovered and we would have to wait until either the recovery retries and completes successfully, or the - * master fails the shard and issues a cluster state update that removes the shard from the set of active allocation IDs. - */ - moveAllocationIdFromTrackingToInSync(allocationId, "recovery"); - break; - } else { - waitForLocalCheckpointToAdvance(); - } + private boolean updateLocalCheckpoint(String allocationId, LocalCheckpointState lcps, long localCheckpoint) { + // a local checkpoint of PRE_60_NODE_LOCAL_CHECKPOINT cannot be overridden + assert lcps.localCheckpoint != SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT || + localCheckpoint == SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT : + "pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint; + if (localCheckpoint > lcps.localCheckpoint) { + logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", allocationId, lcps.localCheckpoint, localCheckpoint); + lcps.localCheckpoint = localCheckpoint; + return true; + } else { + logger.trace("skipped updating local checkpoint of [{}] from [{}] to [{}], current checkpoint is higher", allocationId, + lcps.localCheckpoint, localCheckpoint); + return false; } } /** - * Moves a tracking allocation ID to be in-sync. This can occur when a shard is recovering from the primary and its local checkpoint has - * advanced past the global checkpoint, or during relocation hand-off when the relocation target learns of an in-sync shard from the - * relocation source. + * Notifies the service to update the local checkpoint for the shard with the provided allocation ID. If the checkpoint is lower than + * the currently known one, this is a no-op. If the allocation ID is not tracked, it is ignored. * - * @param allocationId the allocation ID to move - * @param reason the reason for the transition + * @param allocationId the allocation ID of the shard to update the local checkpoint for + * @param localCheckpoint the local checkpoint for the shard */ - private synchronized void moveAllocationIdFromTrackingToInSync(final String allocationId, final String reason) { - assert trackingLocalCheckpoints.containsKey(allocationId); - final long current = trackingLocalCheckpoints.remove(allocationId); - inSyncLocalCheckpoints.put(allocationId, current); - logger.trace("marked [{}] as in-sync with local checkpoint [{}] due to [{}]", allocationId, current, reason); + public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) { + assert invariant(); + assert primaryMode; + assert handoffInProgress == false; + LocalCheckpointState lcps = localCheckpoints.get(allocationId); + if (lcps == null) { + // can happen if replica was removed from cluster but replication process is unaware of it yet + return; + } + boolean increasedLocalCheckpoint = updateLocalCheckpoint(allocationId, lcps, localCheckpoint); + boolean pending = pendingInSync.contains(allocationId); + if (pending && lcps.localCheckpoint >= globalCheckpoint) { + pendingInSync.remove(allocationId); + pending = false; + lcps.inSync = true; + logger.trace("marked [{}] as in-sync", allocationId); + notifyAllWaiters(); + } + if (increasedLocalCheckpoint && pending == false) { + updateGlobalCheckpointOnPrimary(); + } + assert invariant(); + } + + /** + * Computes the global checkpoint based on the given local checkpoints. In case where there are entries preventing the + * computation to happen (for example due to blocking), it returns the fallback value. + */ + private static long computeGlobalCheckpoint(final Set pendingInSync, final Collection localCheckpoints, + final long fallback) { + long minLocalCheckpoint = Long.MAX_VALUE; + if (pendingInSync.isEmpty() == false) { + return fallback; + } + for (final LocalCheckpointState lcps : localCheckpoints) { + if (lcps.inSync) { + if (lcps.localCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + // unassigned in-sync replica + return fallback; + } else if (lcps.localCheckpoint == SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT) { + // 5.x replica, ignore for global checkpoint calculation + } else { + minLocalCheckpoint = Math.min(lcps.localCheckpoint, minLocalCheckpoint); + } + } + } + assert minLocalCheckpoint != Long.MAX_VALUE; + return minLocalCheckpoint; + } + + /** + * Scans through the currently known local checkpoint and updates the global checkpoint accordingly. + */ + private synchronized void updateGlobalCheckpointOnPrimary() { + assert primaryMode; + final long computedGlobalCheckpoint = computeGlobalCheckpoint(pendingInSync, localCheckpoints.values(), globalCheckpoint); + assert computedGlobalCheckpoint >= globalCheckpoint : "new global checkpoint [" + computedGlobalCheckpoint + + "] is lower than previous one [" + globalCheckpoint + "]"; + if (globalCheckpoint != computedGlobalCheckpoint) { + logger.trace("global checkpoint updated to [{}]", computedGlobalCheckpoint); + globalCheckpoint = computedGlobalCheckpoint; + } + } + + /** + * Initiates a relocation handoff and returns the corresponding primary context. + */ + public synchronized PrimaryContext startRelocationHandoff() { + assert invariant(); + assert primaryMode; + assert handoffInProgress == false; + assert pendingInSync.isEmpty() : "relocation handoff started while there are still shard copies pending in-sync: " + pendingInSync; + handoffInProgress = true; + // copy clusterStateVersion and localCheckpoints and return + // all the entries from localCheckpoints that are inSync: the reason we don't need to care about initializing non-insync entries + // is that they will have to undergo a recovery attempt on the relocation target, and will hence be supplied by the cluster state + // update on the relocation target once relocation completes). We could alternatively also copy the map as-is (it’s safe), and it + // would be cleaned up on the target by cluster state updates. + Map localCheckpointsCopy = new HashMap<>(); + for (Map.Entry entry : localCheckpoints.entrySet()) { + localCheckpointsCopy.put(entry.getKey(), entry.getValue().copy()); + } + assert invariant(); + return new PrimaryContext(appliedClusterStateVersion, localCheckpointsCopy); + } + + /** + * Fails a relocation handoff attempt. + */ + public synchronized void abortRelocationHandoff() { + assert invariant(); + assert primaryMode; + assert handoffInProgress; + handoffInProgress = false; + assert invariant(); + } + + /** + * Marks a relocation handoff attempt as successful. Moves the tracker into replica mode. + */ + public synchronized void completeRelocationHandoff() { + assert invariant(); + assert primaryMode; + assert handoffInProgress; + primaryMode = false; + handoffInProgress = false; + // forget all checkpoint information + localCheckpoints.values().stream().forEach(lcps -> { + if (lcps.localCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO && + lcps.localCheckpoint != SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT) { + lcps.localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } + }); + assert invariant(); + } + + /** + * Activates the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary relocation target during + * primary relocation handoff. + * + * @param primaryContext the primary context used to initialize the state + */ + public synchronized void activateWithPrimaryContext(PrimaryContext primaryContext) { + assert invariant(); + assert primaryMode == false; + final Runnable runAfter = getMasterUpdateOperationFromCurrentState(); + primaryMode = true; + // capture current state to possibly replay missed cluster state update + appliedClusterStateVersion = primaryContext.clusterStateVersion(); + localCheckpoints.clear(); + for (Map.Entry entry : primaryContext.localCheckpoints.entrySet()) { + localCheckpoints.put(entry.getKey(), entry.getValue().copy()); + } + updateGlobalCheckpointOnPrimary(); + // reapply missed cluster state update + // note that if there was no cluster state update between start of the engine of this shard and the call to + // initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort. + runAfter.run(); + assert invariant(); + } + + private Runnable getMasterUpdateOperationFromCurrentState() { + assert primaryMode == false; + final long lastAppliedClusterStateVersion = appliedClusterStateVersion; + final Set inSyncAllocationIds = new HashSet<>(); + final Set initializingAllocationIds = new HashSet<>(); + final Set pre60AllocationIds = new HashSet<>(); + localCheckpoints.entrySet().forEach(entry -> { + if (entry.getValue().inSync) { + inSyncAllocationIds.add(entry.getKey()); + } else { + initializingAllocationIds.add(entry.getKey()); + } + if (entry.getValue().getLocalCheckpoint() == SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT) { + pre60AllocationIds.add(entry.getKey()); + } + }); + return () -> updateFromMaster(lastAppliedClusterStateVersion, inSyncAllocationIds, initializingAllocationIds, pre60AllocationIds); + } + + /** + * Whether the are shards blocking global checkpoint advancement. Used by tests. + */ + public synchronized boolean pendingInSync() { + assert primaryMode; + return pendingInSync.isEmpty() == false; + } + + /** + * Returns the local checkpoint information tracked for a specific shard. Used by tests. + */ + public synchronized LocalCheckpointState getTrackedLocalCheckpointForShard(String allocationId) { + assert primaryMode; + return localCheckpoints.get(allocationId); + } + + /** + * Notify all threads waiting on the monitor on this tracker. These threads should be waiting for the local checkpoint on a specific + * allocation ID to catch up to the global checkpoint. + */ + @SuppressForbidden(reason = "Object#notifyAll waiters for local checkpoint advancement") + private synchronized void notifyAllWaiters() { + this.notifyAll(); } /** @@ -512,35 +613,62 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { } /** - * Check if there are any recoveries pending in-sync. - * - * @return true if there is at least one shard pending in-sync, otherwise false + * Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing + * shards and their local checkpoints. */ - boolean pendingInSync() { - return !pendingInSync.isEmpty(); - } + public static class PrimaryContext implements Writeable { - /** - * Check if the tracker is sealed. - * - * @return true if the tracker is sealed, otherwise false. - */ - boolean sealed() { - return sealed; - } + private final long clusterStateVersion; + private final Map localCheckpoints; - /** - * Returns the local checkpoint for the shard with the specified allocation ID, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if - * the shard is not in-sync. - * - * @param allocationId the allocation ID of the shard to obtain the local checkpoint for - * @return the local checkpoint, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} - */ - synchronized long getLocalCheckpointForAllocationId(final String allocationId) { - if (inSyncLocalCheckpoints.containsKey(allocationId)) { - return inSyncLocalCheckpoints.get(allocationId); + public PrimaryContext(long clusterStateVersion, Map localCheckpoints) { + this.clusterStateVersion = clusterStateVersion; + this.localCheckpoints = localCheckpoints; } - return SequenceNumbersService.UNASSIGNED_SEQ_NO; - } + public PrimaryContext(StreamInput in) throws IOException { + clusterStateVersion = in.readVLong(); + localCheckpoints = in.readMap(StreamInput::readString, LocalCheckpointState::new); + } + + public long clusterStateVersion() { + return clusterStateVersion; + } + + public Map getLocalCheckpoints() { + return localCheckpoints; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(clusterStateVersion); + out.writeMap(localCheckpoints, (streamOutput, s) -> out.writeString(s), (streamOutput, lcps) -> lcps.writeTo(out)); + } + + @Override + public String toString() { + return "PrimaryContext{" + + "clusterStateVersion=" + clusterStateVersion + + ", localCheckpoints=" + localCheckpoints + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PrimaryContext that = (PrimaryContext) o; + + if (clusterStateVersion != that.clusterStateVersion) return false; + return localCheckpoints.equals(that.localCheckpoints); + } + + @Override + public int hashCode() { + int result = (int) (clusterStateVersion ^ (clusterStateVersion >>> 32)); + result = 31 * result + localCheckpoints.hashCode(); + return result; + } + } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 05f5fea2dc2..4cdcd7fbd84 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import java.util.Set; @@ -41,6 +40,11 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { */ public static final long NO_OPS_PERFORMED = -1L; + /** + * Represents a local checkpoint coming from a pre-6.0 node + */ + public static final long PRE_60_NODE_LOCAL_CHECKPOINT = -3L; + private final LocalCheckpointTracker localCheckpointTracker; private final GlobalCheckpointTracker globalCheckpointTracker; @@ -135,6 +139,16 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { globalCheckpointTracker.updateLocalCheckpoint(allocationId, checkpoint); } + /** + * Called when the recovery process for a shard is ready to open the engine on the target shard. + * See {@link GlobalCheckpointTracker#initiateTracking(String)} for details. + * + * @param allocationId the allocation ID of the shard for which recovery was initiated + */ + public void initiateTracking(final String allocationId) { + globalCheckpointTracker.initiateTracking(allocationId); + } + /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. See * {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} for additional details. @@ -174,25 +188,44 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { } /** - * Notifies the service of the current allocation IDs in the cluster state. See - * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details. - * - * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * Returns the local checkpoint information tracked for a specific shard. Used by tests. */ - public void updateAllocationIdsFromMaster( - final long applyingClusterStateVersion, final Set activeAllocationIds, final Set initializingAllocationIds) { - globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); + public synchronized long getTrackedLocalCheckpointForShard(final String allocationId) { + return globalCheckpointTracker.getTrackedLocalCheckpointForShard(allocationId).getLocalCheckpoint(); } /** - * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. - * - * @param primaryContext the sequence number context + * Activates the global checkpoint tracker in primary mode (see {@link GlobalCheckpointTracker#primaryMode}. + * Called on primary activation or promotion. */ - public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { - globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext); + public void activatePrimaryMode(final String allocationId, final long localCheckpoint) { + globalCheckpointTracker.activatePrimaryMode(allocationId, localCheckpoint); + } + + /** + * Notifies the service of the current allocation IDs in the cluster state. See + * {@link GlobalCheckpointTracker#updateFromMaster(long, Set, Set, Set)} for details. + * + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param inSyncAllocationIds the allocation IDs of the currently in-sync shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param pre60AllocationIds the allocation IDs of shards that are allocated to pre-6.0 nodes + */ + public void updateAllocationIdsFromMaster( + final long applyingClusterStateVersion, final Set inSyncAllocationIds, final Set initializingAllocationIds, + final Set pre60AllocationIds) { + globalCheckpointTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, initializingAllocationIds, + pre60AllocationIds); + } + + /** + * Activates the global checkpoint tracker in primary mode (see {@link GlobalCheckpointTracker#primaryMode}. + * Called on primary relocation target during primary relocation handoff. + * + * @param primaryContext the primary context used to initialize the state + */ + public void activateWithPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) { + globalCheckpointTracker.activateWithPrimaryContext(primaryContext); } /** @@ -209,15 +242,22 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { * * @return the primary context */ - public PrimaryContext primaryContext() { - return globalCheckpointTracker.primaryContext(); + public GlobalCheckpointTracker.PrimaryContext startRelocationHandoff() { + return globalCheckpointTracker.startRelocationHandoff(); } /** - * Releases a previously acquired primary context. + * Marks a relocation handoff attempt as successful. Moves the tracker into replica mode. */ - public void releasePrimaryContext() { - globalCheckpointTracker.releasePrimaryContext(); + public void completeRelocationHandoff() { + globalCheckpointTracker.completeRelocationHandoff(); + } + + /** + * Fails a relocation handoff attempt. + */ + public void abortRelocationHandoff() { + globalCheckpointTracker.abortRelocationHandoff(); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 839f8f53ca7..296dae0331e 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -43,6 +43,7 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; @@ -102,6 +103,7 @@ import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; +import org.elasticsearch.index.seqno.GlobalCheckpointTracker; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; @@ -362,21 +364,133 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final long newPrimaryTerm, final CheckedBiConsumer, IOException> primaryReplicaSyncer, final long applyingClusterStateVersion, - final Set activeAllocationIds, - final Set initializingAllocationIds) throws IOException { + final Set inSyncAllocationIds, + final Set initializingAllocationIds, + final Set pre60AllocationIds) throws IOException { final ShardRouting currentRouting; synchronized (mutex) { currentRouting = this.shardRouting; - updateRoutingEntry(newRouting); + + if (!newRouting.shardId().equals(shardId())) { + throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId()); + } + if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) { + throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting); + } + if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) { + throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current " + + currentRouting + ", new " + newRouting); + } + + if (newRouting.primary()) { + final Engine engine = getEngineOrNull(); + if (engine != null) { + engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, inSyncAllocationIds, initializingAllocationIds, pre60AllocationIds); + } + } + + if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { + assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting; + // we want to refresh *before* we move to internal STARTED state + try { + getEngine().refresh("cluster_state_started"); + } catch (Exception e) { + logger.debug("failed to refresh due to move to cluster wide started", e); + } + + if (newRouting.primary()) { + final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode(); + if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) { + // there was no primary context hand-off in < 6.0.0, need to manually activate the shard + getEngine().seqNoService().activatePrimaryMode(currentRouting.allocationId().getId(), getEngine().seqNoService().getLocalCheckpoint()); + } + } + + changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); + } else if (state == IndexShardState.RELOCATED && + (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { + // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery + // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two + // active primaries. + throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); + } + assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || + state == IndexShardState.CLOSED : + "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state; + this.shardRouting = newRouting; + persistMetadata(path, indexSettings, newRouting, currentRouting, logger); if (shardRouting.primary()) { - updatePrimaryTerm(newPrimaryTerm, primaryReplicaSyncer); + if (newPrimaryTerm != primaryTerm) { + assert currentRouting.primary() == false : "term is only increased as part of primary promotion"; + /* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned + * in one state causing it's term to be incremented. Note that if both current shard state and new + * shard state are initializing, we could replace the current shard and reinitialize it. It is however + * possible that this shard is being started. This can happen if: + * 1) Shard is post recovery and sends shard started to the master + * 2) Node gets disconnected and rejoins + * 3) Master assigns the shard back to the node + * 4) Master processes the shard started and starts the shard + * 5) The node process the cluster state where the shard is both started and primary term is incremented. + * + * We could fail the shard in that case, but this will cause it to be removed from the insync allocations list + * potentially preventing re-allocation. + */ + assert shardRouting.initializing() == false : + "a started primary shard should never update its term; " + + "shard " + shardRouting + ", " + + "current term [" + primaryTerm + "], " + + "new term [" + newPrimaryTerm + "]"; + assert newPrimaryTerm > primaryTerm : + "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]"; + /* + * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we + * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is + * incremented. + */ + final CountDownLatch latch = new CountDownLatch(1); + // to prevent primary relocation handoff while resync is not completed + boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true); + if (resyncStarted == false) { + throw new IllegalStateException("cannot start resync while it's already in progress"); + } + indexShardOperationPermits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + latch.await(); + try { + getEngine().fillSeqNoGaps(newPrimaryTerm); + updateLocalCheckpointForShard(currentRouting.allocationId().getId(), + getEngine().seqNoService().getLocalCheckpoint()); + primaryReplicaSyncer.accept(this, new ActionListener() { + @Override + public void onResponse(ResyncTask resyncTask) { + logger.info("primary-replica resync completed with {} operations", + resyncTask.getResyncedOperations()); + boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); + assert resyncCompleted : "primary-replica resync finished but was not started"; + } - final Engine engine = getEngineOrNull(); - // if the engine is not yet started, we are not ready yet and can just ignore this - if (engine != null) { - engine.seqNoService().updateAllocationIdsFromMaster( - applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); + @Override + public void onFailure(Exception e) { + boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); + assert resyncCompleted : "primary-replica resync finished but was not started"; + if (state == IndexShardState.CLOSED) { + // ignore, shutting down + } else { + failShard("exception during primary-replica resync", e); + } + } + }); + } catch (final AlreadyClosedException e) { + // okay, the index was deleted + } + }, + e -> failShard("exception during primary term transition", e)); + getEngine().seqNoService().activatePrimaryMode(currentRouting.allocationId().getId(), getEngine().seqNoService().getLocalCheckpoint()); + primaryTerm = newPrimaryTerm; + latch.countDown(); } } } @@ -388,117 +502,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } - private void updateRoutingEntry(ShardRouting newRouting) throws IOException { - assert Thread.holdsLock(mutex); - final ShardRouting currentRouting = this.shardRouting; - - if (!newRouting.shardId().equals(shardId())) { - throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId()); - } - if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) { - throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting); - } - if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) { - throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current " - + currentRouting + ", new " + newRouting); - } - - if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { - assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting; - // we want to refresh *before* we move to internal STARTED state - try { - getEngine().refresh("cluster_state_started"); - } catch (Exception e) { - logger.debug("failed to refresh due to move to cluster wide started", e); - } - changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); - } else if (state == IndexShardState.RELOCATED && - (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { - // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery - // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two - // active primaries. - throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); - } - assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || - state == IndexShardState.CLOSED : - "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state; - this.shardRouting = newRouting; - persistMetadata(path, indexSettings, newRouting, currentRouting, logger); - } - - private void updatePrimaryTerm( - final long newPrimaryTerm, final CheckedBiConsumer, IOException> primaryReplicaSyncer) { - assert Thread.holdsLock(mutex); - assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; - if (newPrimaryTerm != primaryTerm) { - /* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned - * in one state causing it's term to be incremented. Note that if both current shard state and new - * shard state are initializing, we could replace the current shard and reinitialize it. It is however - * possible that this shard is being started. This can happen if: - * 1) Shard is post recovery and sends shard started to the master - * 2) Node gets disconnected and rejoins - * 3) Master assigns the shard back to the node - * 4) Master processes the shard started and starts the shard - * 5) The node process the cluster state where the shard is both started and primary term is incremented. - * - * We could fail the shard in that case, but this will cause it to be removed from the insync allocations list - * potentially preventing re-allocation. - */ - assert shardRouting.initializing() == false : - "a started primary shard should never update its term; " - + "shard " + shardRouting + ", " - + "current term [" + primaryTerm + "], " - + "new term [" + newPrimaryTerm + "]"; - assert newPrimaryTerm > primaryTerm : - "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]"; - /* - * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we - * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is - * incremented. - */ - final CountDownLatch latch = new CountDownLatch(1); - // to prevent primary relocation handoff while resync is not completed - boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true); - if (resyncStarted == false) { - throw new IllegalStateException("cannot start resync while it's already in progress"); - } - indexShardOperationPermits.asyncBlockOperations( - 30, - TimeUnit.MINUTES, - () -> { - latch.await(); - try { - getEngine().fillSeqNoGaps(newPrimaryTerm); - primaryReplicaSyncer.accept(IndexShard.this, new ActionListener() { - @Override - public void onResponse(ResyncTask resyncTask) { - logger.info("primary-replica resync completed with {} operations", - resyncTask.getResyncedOperations()); - boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); - assert resyncCompleted : "primary-replica resync finished but was not started"; - } - - @Override - public void onFailure(Exception e) { - boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); - assert resyncCompleted : "primary-replica resync finished but was not started"; - if (state == IndexShardState.CLOSED) { - // ignore, shutting down - } else { - failShard("exception during primary-replica resync", e); - } - } - }); - } catch (final AlreadyClosedException e) { - // okay, the index was deleted - } - }, - e -> failShard("exception during primary term transition", e)); - primaryTerm = newPrimaryTerm; - latch.countDown(); - } - } - /** * Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set. */ @@ -537,7 +540,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @throws InterruptedException if blocking operations is interrupted */ public void relocated( - final String reason, final Consumer consumer) throws IllegalIndexShardStateException, InterruptedException { + final String reason, final Consumer consumer) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { @@ -549,16 +552,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. */ verifyRelocatingState(); - final PrimaryContext primaryContext = getEngine().seqNoService().primaryContext(); + final GlobalCheckpointTracker.PrimaryContext primaryContext = getEngine().seqNoService().startRelocationHandoff(); try { consumer.accept(primaryContext); synchronized (mutex) { verifyRelocatingState(); changeState(IndexShardState.RELOCATED, reason); } + getEngine().seqNoService().completeRelocationHandoff(); } catch (final Exception e) { try { - getEngine().seqNoService().releasePrimaryContext(); + getEngine().seqNoService().abortRelocationHandoff(); } catch (final Exception inner) { e.addSuppressed(inner); } @@ -1644,6 +1648,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl getEngine().seqNoService().waitForOpsToComplete(seqNo); } + /** + * Called when the recovery process for a shard is ready to open the engine on the target shard. + * See {@link GlobalCheckpointTracker#initiateTracking(String)} for details. + * + * @param allocationId the allocation ID of the shard for which recovery was initiated + */ + public void initiateTracking(final String allocationId) { + verifyPrimary(); + getEngine().seqNoService().initiateTracking(allocationId); + /* + * We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to + * the replica; mark our self as active to force a future background sync. + */ + active.compareAndSet(false, true); + } + /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. See * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} @@ -1710,13 +1730,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * * @param primaryContext the sequence number context */ - public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + public void activateWithPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) { verifyPrimary(); assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting; - final Engine engine = getEngineOrNull(); - if (engine != null) { - engine.seqNoService().updateAllocationIdsFromPrimaryContext(primaryContext); - } + assert primaryContext.getLocalCheckpoints().containsKey(routingEntry().allocationId().getId()) && + getEngine().seqNoService().getLocalCheckpoint() == + primaryContext.getLocalCheckpoints().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); + getEngine().seqNoService().activateWithPrimaryContext(primaryContext); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java deleted file mode 100644 index 8a067d37181..00000000000 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.shard; - -import com.carrotsearch.hppc.ObjectLongHashMap; -import com.carrotsearch.hppc.ObjectLongMap; -import com.carrotsearch.hppc.cursors.ObjectLongCursor; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; - -import java.io.IOException; - -/** - * Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing - * shards and their local checkpoints. - */ -public class PrimaryContext implements Writeable { - - private long clusterStateVersion; - - public long clusterStateVersion() { - return clusterStateVersion; - } - - private ObjectLongMap inSyncLocalCheckpoints; - - public ObjectLongMap inSyncLocalCheckpoints() { - return inSyncLocalCheckpoints; - } - - private ObjectLongMap trackingLocalCheckpoints; - - public ObjectLongMap trackingLocalCheckpoints() { - return trackingLocalCheckpoints; - } - - public PrimaryContext( - final long clusterStateVersion, - final ObjectLongMap inSyncLocalCheckpoints, - final ObjectLongMap trackingLocalCheckpoints) { - this.clusterStateVersion = clusterStateVersion; - this.inSyncLocalCheckpoints = inSyncLocalCheckpoints; - this.trackingLocalCheckpoints = trackingLocalCheckpoints; - } - - public PrimaryContext(final StreamInput in) throws IOException { - clusterStateVersion = in.readVLong(); - inSyncLocalCheckpoints = readMap(in); - trackingLocalCheckpoints = readMap(in); - } - - private static ObjectLongMap readMap(final StreamInput in) throws IOException { - final int length = in.readVInt(); - final ObjectLongMap map = new ObjectLongHashMap<>(length); - for (int i = 0; i < length; i++) { - final String key = in.readString(); - final long value = in.readZLong(); - map.addTo(key, value); - } - return map; - } - - @Override - public void writeTo(final StreamOutput out) throws IOException { - out.writeVLong(clusterStateVersion); - writeMap(out, inSyncLocalCheckpoints); - writeMap(out, trackingLocalCheckpoints); - } - - private static void writeMap(final StreamOutput out, final ObjectLongMap map) throws IOException { - out.writeVInt(map.size()); - for (ObjectLongCursor cursor : map) { - out.writeString(cursor.key); - out.writeZLong(cursor.value); - } - } - - @Override - public String toString() { - return "PrimaryContext{" + - "clusterStateVersion=" + clusterStateVersion + - ", inSyncLocalCheckpoints=" + inSyncLocalCheckpoints + - ", trackingLocalCheckpoints=" + trackingLocalCheckpoints + - '}'; - } - -} diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 0f8ae2185e5..9568399ea3f 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -554,17 +554,23 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple + "cluster state: " + shardRouting + " local: " + currentRoutingEntry; try { - final long primaryTerm = clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()); + final IndexMetaData indexMetaData = clusterState.metaData().index(shard.shardId().getIndex()); + final long primaryTerm = indexMetaData.primaryTerm(shard.shardId().id()); + final Set inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id()); final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); - /* - * Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on old - * nodes will go through a file-based recovery which will also transfer sequence number information. - */ - final Set activeIds = allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); - final Set initializingIds = - allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); - shard.updateShardState( - shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(), activeIds, initializingIds); + final Set initializingIds = indexShardRoutingTable.getAllInitializingShards() + .stream() + .map(ShardRouting::allocationId) + .map(AllocationId::getId) + .collect(Collectors.toSet()); + final Set pre60AllocationIds = indexShardRoutingTable.assignedShards() + .stream() + .filter(shr -> nodes.get(shr.currentNodeId()).getVersion().before(Version.V_6_0_0_alpha1)) + .map(ShardRouting::allocationId) + .map(AllocationId::getId) + .collect(Collectors.toSet()); + shard.updateShardState(shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(), + inSyncIds, initializingIds, pre60AllocationIds); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); return; @@ -587,17 +593,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple } } - private Set allocationIdsForShardsOnNodesThatUnderstandSeqNos( - final List shardRoutings, - final DiscoveryNodes nodes) { - return shardRoutings - .stream() - .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1)) - .map(ShardRouting::allocationId) - .map(AllocationId::getId) - .collect(Collectors.toSet()); - } - /** * Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard * routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to check if its needed or not. @@ -735,13 +730,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple * - Updates and persists the new routing value. * - Updates the primary term if this shard is a primary. * - Updates the allocation ids that are tracked by the shard if it is a primary. - * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details. + * See {@link GlobalCheckpointTracker#updateFromMaster(long, Set, Set, Set)} for details. * * @param shardRouting the new routing entry * @param primaryTerm the new primary term * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master - * @param activeAllocationIds the allocation ids of the currently active shard copies + * @param inSyncAllocationIds the allocation ids of the currently in-sync shard copies * @param initializingAllocationIds the allocation ids of the currently initializing shard copies * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted * @throws IOException if shard state could not be persisted @@ -750,8 +745,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple long primaryTerm, CheckedBiConsumer, IOException> primaryReplicaSyncer, long applyingClusterStateVersion, - Set activeAllocationIds, - Set initializingAllocationIds) throws IOException; + Set inSyncAllocationIds, + Set initializingAllocationIds, + Set pre60AllocationIds) throws IOException; } public interface AllocatedIndex extends Iterable, IndexComponent { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java index 6646f6cea5d..3a472c447b2 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java @@ -21,7 +21,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.PrimaryContext; +import org.elasticsearch.index.seqno.GlobalCheckpointTracker; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.TransportRequest; @@ -34,7 +34,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest { private long recoveryId; private ShardId shardId; - private PrimaryContext primaryContext; + private GlobalCheckpointTracker.PrimaryContext primaryContext; /** * Initialize an empty request (used to serialize into when reading from a stream). @@ -49,7 +49,8 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest { * @param shardId the shard ID of the relocation * @param primaryContext the primary context */ - RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId, final PrimaryContext primaryContext) { + RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId, + final GlobalCheckpointTracker.PrimaryContext primaryContext) { this.recoveryId = recoveryId; this.shardId = shardId; this.primaryContext = primaryContext; @@ -63,7 +64,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest { return shardId; } - PrimaryContext primaryContext() { + GlobalCheckpointTracker.PrimaryContext primaryContext() { return primaryContext; } @@ -72,7 +73,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest { super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); - primaryContext = new PrimaryContext(in); + primaryContext = new GlobalCheckpointTracker.PrimaryContext(in); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 6a39700545b..55d7eee908e 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -42,7 +42,6 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -63,9 +62,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -171,6 +168,8 @@ public class RecoverySourceHandler { } } + cancellableThreads.execute(() -> runUnderOperationPermit(() -> shard.initiateTracking(request.targetAllocationId()))); + try { prepareTargetForTranslog(translogView.estimateTotalOperations(startingSeqNo)); } catch (final Exception e) { @@ -208,6 +207,17 @@ public class RecoverySourceHandler { return response; } + private void runUnderOperationPermit(CancellableThreads.Interruptable runnable) throws InterruptedException { + final PlainActionFuture onAcquired = new PlainActionFuture<>(); + shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME); + try (Releasable ignored = onAcquired.actionGet()) { + if (shard.state() == IndexShardState.RELOCATED) { + throw new IndexShardRelocatedException(shard.shardId()); + } + runnable.run(); + } + } + /** * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source * translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source. @@ -465,15 +475,7 @@ public class RecoverySourceHandler { * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire * the permit then the state of the shard will be relocated and this recovery will fail. */ - final PlainActionFuture onAcquired = new PlainActionFuture<>(); - shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME); - try (Releasable ignored = onAcquired.actionGet()) { - if (shard.state() == IndexShardState.RELOCATED) { - throw new IndexShardRelocatedException(shard.shardId()); - } - shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint); - } - + runUnderOperationPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint)); recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint()); }); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 2837a85d1ae..3b47e84a901 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -41,10 +41,10 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; +import org.elasticsearch.index.seqno.GlobalCheckpointTracker; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -379,8 +379,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } @Override - public void handoffPrimaryContext(final PrimaryContext primaryContext) { - indexShard.updateAllocationIdsFromPrimaryContext(primaryContext); + public void handoffPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) { + indexShard.activateWithPrimaryContext(primaryContext); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 34b0df2293f..804c4f50484 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -19,7 +19,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.index.shard.PrimaryContext; +import org.elasticsearch.index.seqno.GlobalCheckpointTracker; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -55,7 +55,7 @@ public interface RecoveryTargetHandler { * * @param primaryContext the primary context from the relocation source */ - void handoffPrimaryContext(PrimaryContext primaryContext); + void handoffPrimaryContext(GlobalCheckpointTracker.PrimaryContext primaryContext); /** * Index a set of translog operations on the target diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 14c8f762e6d..279bec186a4 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -23,7 +23,7 @@ import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.index.shard.PrimaryContext; +import org.elasticsearch.index.seqno.GlobalCheckpointTracker; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -100,7 +100,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } @Override - public void handoffPrimaryContext(final PrimaryContext primaryContext) { + public void handoffPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) { transportService.submitRequest( targetNode, PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT, diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index a923b331042..31ffb026e3a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -188,7 +188,6 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25415") public void testMultipleNodesShutdownNonMasterNodes() throws Exception { Settings settings = Settings.builder() .put("discovery.zen.minimum_master_nodes", 3) diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 78d49647235..14d0cbb44b7 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2020,12 +2020,9 @@ public class InternalEngineTests extends ESTestCase { try { initialEngine = engine; - initialEngine - .seqNoService() - .updateAllocationIdsFromMaster( - randomNonNegativeLong(), - new HashSet<>(Arrays.asList("primary", "replica")), - Collections.emptySet()); + initialEngine.seqNoService().updateAllocationIdsFromMaster(1L, new HashSet<>(Arrays.asList("primary", "replica")), + Collections.emptySet(), Collections.emptySet()); + initialEngine.seqNoService().activatePrimaryMode("primary", primarySeqNo); for (int op = 0; op < opCount; op++) { final String id; // mostly index, sometimes delete diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index be1b4661c53..251e744a876 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -123,7 +123,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } protected class ReplicationGroup implements AutoCloseable, Iterable { - private long clusterStateVersion; private IndexShard primary; private IndexMetaData indexMetaData; private final List replicas; @@ -144,7 +143,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting)); replicas = new ArrayList<>(); this.indexMetaData = indexMetaData; - clusterStateVersion = 1; updateAllocationIDsOnPrimary(); for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) { addReplica(); @@ -231,7 +229,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase initializingIds.addAll(initializingIds()); initializingIds.remove(primary.routingEntry().allocationId().getId()); primary.updateShardState(ShardRoutingHelper.moveToStarted(primary.routingEntry()), primary.getPrimaryTerm(), null, - ++clusterStateVersion, activeIds, initializingIds); + currentClusterStateVersion.incrementAndGet(), activeIds, initializingIds, Collections.emptySet()); for (final IndexShard replica : replicas) { recoverReplica(replica); } @@ -250,7 +248,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; replicas.add(replica); - clusterStateVersion++; updateAllocationIDsOnPrimary(); } @@ -265,7 +262,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting)); replicas.add(newReplica); - clusterStateVersion++; updateAllocationIDsOnPrimary(); return newReplica; } @@ -284,13 +280,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase assertTrue(replicas.remove(replica)); closeShards(primary); primary = replica; + assert primary.routingEntry().active() : "only active replicas can be promoted to primary: " + primary.routingEntry(); PlainActionFuture fut = new PlainActionFuture<>(); - HashSet activeIds = new HashSet<>(); - activeIds.addAll(activeIds()); - activeIds.add(replica.routingEntry().allocationId().getId()); - HashSet initializingIds = new HashSet<>(); - initializingIds.addAll(initializingIds()); - initializingIds.remove(replica.routingEntry().allocationId().getId()); primary.updateShardState(replica.routingEntry().moveActiveReplicaToPrimary(), newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard, new ActionListener() { @@ -305,7 +296,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase listener.onFailure(e); fut.onFailure(e); } - }), ++clusterStateVersion, activeIds, initializingIds); + }), currentClusterStateVersion.incrementAndGet(), activeIds(), initializingIds(), Collections.emptySet()); return fut; } @@ -323,7 +314,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase synchronized boolean removeReplica(IndexShard replica) throws IOException { final boolean removed = replicas.remove(replica); if (removed) { - clusterStateVersion++; updateAllocationIDsOnPrimary(); } return removed; @@ -342,9 +332,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase IndexShard replica, BiFunction targetSupplier, boolean markAsRecovering) throws IOException { - ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering); - clusterStateVersion++; - updateAllocationIDsOnPrimary(); + ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering, activeIds(), + initializingIds()); } public synchronized DiscoveryNode getPrimaryNode() { @@ -422,8 +411,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } private void updateAllocationIDsOnPrimary() throws IOException { - primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, clusterStateVersion, - activeIds(), initializingIds()); + primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), + activeIds(), initializingIds(), Collections.emptySet()); } } diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 33a1cfed0b6..878f4bfce90 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.xcontent.XContentType; @@ -46,6 +47,7 @@ import org.hamcrest.Matcher; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -149,7 +151,6 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase startedShards = shards.startReplicas(randomIntBetween(1, 2)); } while (startedShards > 0); - final long unassignedSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; for (IndexShard shard : shards) { final SeqNoStats shardStats = shard.seqNoStats(); final ShardRouting shardRouting = shard.routingEntry(); @@ -164,9 +165,10 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase */ final Matcher globalCheckpointMatcher; if (shardRouting.primary()) { - globalCheckpointMatcher = numDocs == 0 ? equalTo(unassignedSeqNo) : equalTo(numDocs - 1L); + globalCheckpointMatcher = numDocs == 0 ? equalTo(SequenceNumbersService.NO_OPS_PERFORMED) : equalTo(numDocs - 1L); } else { - globalCheckpointMatcher = numDocs == 0 ? equalTo(unassignedSeqNo) : anyOf(equalTo(numDocs - 1L), equalTo(numDocs - 2L)); + globalCheckpointMatcher = numDocs == 0 ? equalTo(SequenceNumbersService.NO_OPS_PERFORMED) + : anyOf(equalTo(numDocs - 1L), equalTo(numDocs - 2L)); } assertThat(shardRouting + " global checkpoint mismatch", shardStats.getGlobalCheckpoint(), globalCheckpointMatcher); assertThat(shardRouting + " max seq no mismatch", shardStats.getMaxSeqNo(), equalTo(numDocs - 1L)); @@ -194,12 +196,15 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { shards.startAll(); - IndexShard replica1 = shards.getReplicas().get(0); - logger.info("--> isolated replica " + replica1.routingEntry()); - shards.removeReplica(replica1); + List replicas = shards.getReplicas(); + IndexShard replica1 = replicas.get(0); IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON); - shards.index(indexRequest); - shards.addReplica(replica1); + logger.info("--> isolated replica " + replica1.routingEntry()); + BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, shards.getPrimary()); + for (int i = 1; i < replicas.size(); i++) { + indexOnReplica(replicationRequest, replicas.get(i)); + } + logger.info("--> promoting replica to primary " + replica1.routingEntry()); shards.promoteReplicaToPrimary(replica1); indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"2\"}", XContentType.JSON); diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 87608dedd1e..d464897c861 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -245,6 +246,12 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } shards.promoteReplicaToPrimary(newPrimary); + + // check that local checkpoint of new primary is properly tracked after primary promotion + assertThat(newPrimary.getLocalCheckpoint(), equalTo(totalDocs - 1L)); + assertThat(IndexShardTestCase.getEngine(newPrimary).seqNoService() + .getTrackedLocalCheckpointForShard(newPrimary.routingEntry().allocationId().getId()), equalTo(totalDocs - 1L)); + // index some more totalDocs += shards.indexDocs(randomIntBetween(0, 5)); diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java index 0eee4eb8a44..8c16c76b9b2 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java @@ -19,18 +19,18 @@ package org.elasticsearch.index.seqno; -import com.carrotsearch.hppc.ObjectLongHashMap; -import com.carrotsearch.hppc.ObjectLongMap; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.junit.Before; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -46,15 +46,13 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import java.util.stream.StreamSupport; +import static java.util.Collections.emptySet; +import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.not; -import static org.mockito.Mockito.mock; public class GlobalCheckpointTrackerTests extends ESTestCase { @@ -88,7 +86,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { public void testGlobalCheckpointUpdate() { final long initialClusterStateVersion = randomNonNegativeLong(); Map allocations = new HashMap<>(); - Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5); + Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 5); Set active = new HashSet<>(activeWithCheckpoints.keySet()); allocations.putAll(activeWithCheckpoints); Map initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5); @@ -115,8 +113,9 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type); }); - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing); - initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId, tracker.getGlobalCheckpoint())); + tracker.updateFromMaster(initialClusterStateVersion, active, initializing, emptySet()); + tracker.activatePrimaryMode(active.iterator().next(), NO_OPS_PERFORMED); + initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId, NO_OPS_PERFORMED)); allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId))); assertThat(tracker.getGlobalCheckpoint(), equalTo(minLocalCheckpoint)); @@ -134,30 +133,37 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { // first check that adding it without the master blessing doesn't change anything. tracker.updateLocalCheckpoint(extraId, minLocalCheckpointAfterUpdates + 1 + randomInt(4)); - assertThat(tracker.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO)); + assertNull(tracker.localCheckpoints.get(extraId)); + expectThrows(IllegalStateException.class, () -> tracker.initiateTracking(extraId)); - Set newActive = new HashSet<>(active); - newActive.add(extraId); - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActive, initializing); + Set newInitializing = new HashSet<>(initializing); + newInitializing.add(extraId); + tracker.updateFromMaster(initialClusterStateVersion + 1, active, newInitializing, emptySet()); + + tracker.initiateTracking(extraId); // now notify for the new id - tracker.updateLocalCheckpoint(extraId, minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + if (randomBoolean()) { + tracker.updateLocalCheckpoint(extraId, minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + markAllocationIdAsInSyncQuietly(tracker, extraId, randomInt((int) minLocalCheckpointAfterUpdates)); + } else { + markAllocationIdAsInSyncQuietly(tracker, extraId, minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + } // now it should be incremented assertThat(tracker.getGlobalCheckpoint(), greaterThan(minLocalCheckpoint)); } public void testMissingActiveIdsPreventAdvance() { - final Map active = randomAllocationsWithLocalCheckpoints(1, 5); + final Map active = randomAllocationsWithLocalCheckpoints(2, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(0, 5); final Map assigned = new HashMap<>(); assigned.putAll(active); assigned.putAll(initializing); - tracker.updateAllocationIdsFromMaster( - randomNonNegativeLong(), - active.keySet(), - initializing.keySet()); - randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); + tracker.updateFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet(), emptySet()); + String primary = active.keySet().iterator().next(); + tracker.activatePrimaryMode(primary, NO_OPS_PERFORMED); + randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, NO_OPS_PERFORMED)); final String missingActiveID = randomFrom(active.keySet()); assigned .entrySet() @@ -165,24 +171,27 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { .filter(e -> !e.getKey().equals(missingActiveID)) .forEach(e -> tracker.updateLocalCheckpoint(e.getKey(), e.getValue())); - assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); - + if (missingActiveID.equals(primary) == false) { + assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + } // now update all knowledge of all shards assigned.forEach(tracker::updateLocalCheckpoint); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); } public void testMissingInSyncIdsPreventAdvance() { - final Map active = randomAllocationsWithLocalCheckpoints(0, 5); - final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); - tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet()); - initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); - randomSubsetOf(randomInt(initializing.size() - 1), - initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId))); + final Map active = randomAllocationsWithLocalCheckpoints(1, 5); + final Map initializing = randomAllocationsWithLocalCheckpoints(2, 5); + logger.info("active: {}, initializing: {}", active, initializing); + tracker.updateFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet(), emptySet()); + String primary = active.keySet().iterator().next(); + tracker.activatePrimaryMode(primary, NO_OPS_PERFORMED); + randomSubsetOf(randomIntBetween(1, initializing.size() - 1), + initializing.keySet()).forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId, NO_OPS_PERFORMED)); active.forEach(tracker::updateLocalCheckpoint); - assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(tracker.getGlobalCheckpoint(), equalTo(NO_OPS_PERFORMED)); // update again initializing.forEach(tracker::updateLocalCheckpoint); @@ -193,9 +202,11 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final Map active = randomAllocationsWithLocalCheckpoints(1, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); final Map nonApproved = randomAllocationsWithLocalCheckpoints(1, 5); - tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet()); - initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); - nonApproved.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); + tracker.updateFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet(), emptySet()); + tracker.activatePrimaryMode(active.keySet().iterator().next(), NO_OPS_PERFORMED); + initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, NO_OPS_PERFORMED)); + nonApproved.keySet().forEach(k -> + expectThrows(IllegalStateException.class, () -> markAllocationIdAsInSyncQuietly(tracker, k, NO_OPS_PERFORMED))); List> allocations = Arrays.asList(active, initializing, nonApproved); Collections.shuffle(allocations, random()); @@ -221,11 +232,12 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { if (randomBoolean()) { allocations.putAll(initializingToBeRemoved); } - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing); + tracker.updateFromMaster(initialClusterStateVersion, active, initializing, emptySet()); + tracker.activatePrimaryMode(active.iterator().next(), NO_OPS_PERFORMED); if (randomBoolean()) { - initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); + initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, NO_OPS_PERFORMED)); } else { - initializing.forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); + initializing.forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, NO_OPS_PERFORMED)); } if (randomBoolean()) { allocations.forEach(tracker::updateLocalCheckpoint); @@ -233,11 +245,13 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { // now remove shards if (randomBoolean()) { - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, activeToStay.keySet(), initializingToStay.keySet()); + tracker.updateFromMaster(initialClusterStateVersion + 1, activeToStay.keySet(), initializingToStay.keySet(), + emptySet()); allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); } else { allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, activeToStay.keySet(), initializingToStay.keySet()); + tracker.updateFromMaster(initialClusterStateVersion + 2, activeToStay.keySet(), initializingToStay.keySet(), + emptySet()); } final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream()) @@ -246,16 +260,16 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { assertThat(tracker.getGlobalCheckpoint(), equalTo(checkpoint)); } - public void testWaitForAllocationIdToBeInSync() throws BrokenBarrierException, InterruptedException { + public void testWaitForAllocationIdToBeInSync() throws Exception { final int localCheckpoint = randomIntBetween(1, 32); final int globalCheckpoint = randomIntBetween(localCheckpoint + 1, 64); final CyclicBarrier barrier = new CyclicBarrier(2); final AtomicBoolean complete = new AtomicBoolean(); final String inSyncAllocationId =randomAlphaOfLength(16); final String trackingAllocationId = randomAlphaOfLength(16); - tracker.updateAllocationIdsFromMaster( - randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); - tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint); + tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), + Collections.singleton(trackingAllocationId), emptySet()); + tracker.activatePrimaryMode(inSyncAllocationId, globalCheckpoint); final Thread thread = new Thread(() -> { try { // synchronize starting with the test thread @@ -279,18 +293,16 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { for (int i = 0; i < elements.size(); i++) { tracker.updateLocalCheckpoint(trackingAllocationId, elements.get(i)); assertFalse(complete.get()); - assertTrue(awaitBusy(() -> tracker.trackingLocalCheckpoints.containsKey(trackingAllocationId))); - assertTrue(awaitBusy(() -> tracker.pendingInSync.contains(trackingAllocationId))); - assertFalse(tracker.inSyncLocalCheckpoints.containsKey(trackingAllocationId)); + assertFalse(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId).inSync); + assertBusy(() -> assertTrue(tracker.pendingInSync.contains(trackingAllocationId))); } tracker.updateLocalCheckpoint(trackingAllocationId, randomIntBetween(globalCheckpoint, 64)); // synchronize with the waiting thread to mark that it is complete barrier.await(); assertTrue(complete.get()); - assertTrue(tracker.trackingLocalCheckpoints.isEmpty()); - assertTrue(tracker.pendingInSync.isEmpty()); - assertTrue(tracker.inSyncLocalCheckpoints.containsKey(trackingAllocationId)); + assertTrue(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId).inSync); + assertFalse(tracker.pendingInSync.contains(trackingAllocationId)); thread.join(); } @@ -302,9 +314,9 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final AtomicBoolean interrupted = new AtomicBoolean(); final String inSyncAllocationId = randomAlphaOfLength(16); final String trackingAllocationId = randomAlphaOfLength(32); - tracker.updateAllocationIdsFromMaster( - randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); - tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint); + tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), + Collections.singleton(trackingAllocationId), emptySet()); + tracker.activatePrimaryMode(inSyncAllocationId, globalCheckpoint); final Thread thread = new Thread(() -> { try { // synchronize starting with the test thread @@ -348,19 +360,25 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); final Set initializingIds = activeAndInitializingAllocationIds.v2(); - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingIds); + tracker.updateFromMaster(initialClusterStateVersion, activeAllocationIds, initializingIds, emptySet()); + String primaryId = activeAllocationIds.iterator().next(); + tracker.activatePrimaryMode(primaryId, NO_OPS_PERFORMED); // first we assert that the in-sync and tracking sets are set up correctly - assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); + assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).inSync)); assertTrue( activeAllocationIds .stream() - .allMatch(a -> tracker.inSyncLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO)); - assertTrue(initializingIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); + .filter(a -> a.equals(primaryId) == false) + .allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).getLocalCheckpoint() + == SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertTrue(initializingIds.stream().noneMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).inSync)); assertTrue( initializingIds .stream() - .allMatch(a -> tracker.trackingLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO)); + .filter(a -> a.equals(primaryId) == false) + .allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).getLocalCheckpoint() + == SequenceNumbersService.UNASSIGNED_SEQ_NO)); // now we will remove some allocation IDs from these and ensure that they propagate through final List removingActiveAllocationIds = randomSubsetOf(activeAllocationIds); @@ -369,29 +387,32 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final List removingInitializingAllocationIds = randomSubsetOf(initializingIds); final Set newInitializingAllocationIds = initializingIds.stream().filter(a -> !removingInitializingAllocationIds.contains(a)).collect(Collectors.toSet()); - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActiveAllocationIds, newInitializingAllocationIds); - assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); - assertTrue(removingActiveAllocationIds.stream().noneMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); - assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); - assertTrue(removingInitializingAllocationIds.stream().noneMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); + tracker.updateFromMaster(initialClusterStateVersion + 1, newActiveAllocationIds, newInitializingAllocationIds, + emptySet()); + assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).inSync)); + assertTrue(removingActiveAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a) == null)); + assertTrue(newInitializingAllocationIds.stream().noneMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).inSync)); + assertTrue(removingInitializingAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a) == null)); /* * Now we will add an allocation ID to each of active and initializing and ensure they propagate through. Using different lengths * than we have been using above ensures that we can not collide with a previous allocation ID */ - newActiveAllocationIds.add(randomAlphaOfLength(32)); newInitializingAllocationIds.add(randomAlphaOfLength(64)); - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, newActiveAllocationIds, newInitializingAllocationIds); - assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); + tracker.updateFromMaster(initialClusterStateVersion + 2, newActiveAllocationIds, newInitializingAllocationIds, emptySet()); + assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).inSync)); assertTrue( newActiveAllocationIds .stream() - .allMatch(a -> tracker.inSyncLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO)); - assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); + .filter(a -> a.equals(primaryId) == false) + .allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).getLocalCheckpoint() + == SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertTrue(newInitializingAllocationIds.stream().noneMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).inSync)); assertTrue( newInitializingAllocationIds .stream() - .allMatch(a -> tracker.trackingLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO)); + .allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a).getLocalCheckpoint() + == SequenceNumbersService.UNASSIGNED_SEQ_NO)); // the tracking allocation IDs should play no role in determining the global checkpoint final Map activeLocalCheckpoints = @@ -404,12 +425,12 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { activeLocalCheckpoints .entrySet() .stream() - .allMatch(e -> tracker.getLocalCheckpointForAllocationId(e.getKey()) == e.getValue())); + .allMatch(e -> tracker.getTrackedLocalCheckpointForShard(e.getKey()).getLocalCheckpoint() == e.getValue())); assertTrue( initializingLocalCheckpoints .entrySet() .stream() - .allMatch(e -> tracker.trackingLocalCheckpoints.get(e.getKey()) == e.getValue())); + .allMatch(e -> tracker.getTrackedLocalCheckpointForShard(e.getKey()).getLocalCheckpoint() == e.getValue())); final long minimumActiveLocalCheckpoint = (long) activeLocalCheckpoints.values().stream().min(Integer::compareTo).get(); assertThat(tracker.getGlobalCheckpoint(), equalTo(minimumActiveLocalCheckpoint)); final long minimumInitailizingLocalCheckpoint = (long) initializingLocalCheckpoints.values().stream().min(Integer::compareTo).get(); @@ -421,7 +442,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { // using a different length than we have been using above ensures that we can not collide with a previous allocation ID final String newSyncingAllocationId = randomAlphaOfLength(128); newInitializingAllocationIds.add(newSyncingAllocationId); - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 3, newActiveAllocationIds, newInitializingAllocationIds); + tracker.updateFromMaster(initialClusterStateVersion + 3, newActiveAllocationIds, newInitializingAllocationIds, emptySet()); final CyclicBarrier barrier = new CyclicBarrier(2); final Thread thread = new Thread(() -> { try { @@ -439,7 +460,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { assertBusy(() -> { assertTrue(tracker.pendingInSync.contains(newSyncingAllocationId)); - assertTrue(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId)); + assertFalse(tracker.getTrackedLocalCheckpointForShard(newSyncingAllocationId).inSync); }); tracker.updateLocalCheckpoint(newSyncingAllocationId, randomIntBetween(Math.toIntExact(minimumActiveLocalCheckpoint), 1024)); @@ -447,17 +468,16 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { barrier.await(); assertFalse(tracker.pendingInSync.contains(newSyncingAllocationId)); - assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId)); - assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId)); + assertTrue(tracker.getTrackedLocalCheckpointForShard(newSyncingAllocationId).inSync); /* * The new in-sync allocation ID is in the in-sync set now yet the master does not know this; the allocation ID should still be in * the in-sync set even if we receive a cluster state update that does not reflect this. * */ - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 4, newActiveAllocationIds, newInitializingAllocationIds); - assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId)); - assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId)); + tracker.updateFromMaster(initialClusterStateVersion + 4, newActiveAllocationIds, newInitializingAllocationIds, emptySet()); + assertTrue(tracker.getTrackedLocalCheckpointForShard(newSyncingAllocationId).inSync); + assertFalse(tracker.pendingInSync.contains(newSyncingAllocationId)); } /** @@ -476,12 +496,11 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final String active = randomAlphaOfLength(16); final String initializing = randomAlphaOfLength(32); - tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(active), Collections.singleton(initializing)); - final CyclicBarrier barrier = new CyclicBarrier(4); final int activeLocalCheckpoint = randomIntBetween(0, Integer.MAX_VALUE - 1); - tracker.updateLocalCheckpoint(active, activeLocalCheckpoint); + tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active), Collections.singleton(initializing), emptySet()); + tracker.activatePrimaryMode(active, activeLocalCheckpoint); final int nextActiveLocalCheckpoint = randomIntBetween(activeLocalCheckpoint + 1, Integer.MAX_VALUE); final Thread activeThread = new Thread(() -> { try { @@ -523,205 +542,194 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { assertThat(tracker.getGlobalCheckpoint(), equalTo((long) nextActiveLocalCheckpoint)); } - public void testPrimaryContextOlderThanAppliedClusterState() { - final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE - 1) + 1; - final int numberOfActiveAllocationsIds = randomIntBetween(0, 8); - final int numberOfInitializingIds = randomIntBetween(0, 8); - final Tuple, Set> activeAndInitializingAllocationIds = - randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); - final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); - final Set initializingAllocationIds = activeAndInitializingAllocationIds.v2(); - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds); + public void testPrimaryContextHandoff() throws IOException { + GlobalCheckpointTracker oldPrimary = new GlobalCheckpointTracker(new ShardId("test", "_na_", 0), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO); + GlobalCheckpointTracker newPrimary = new GlobalCheckpointTracker(new ShardId("test", "_na_", 0), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO); - /* - * We are going to establish a primary context from a cluster state version older than the applied cluster state version on the - * tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the - * newer cluster state is a superset of the allocation IDs in the applied cluster state with the caveat that an existing - * initializing allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the - * set of initializing allocation IDs is otherwise arbitrary. - */ - final int numberOfAdditionalInitializingAllocationIds = randomIntBetween(0, 8); - final Set initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds)); - final Set newInitializingAllocationIds = - randomAllocationIdsExcludingExistingIds( - Sets.union(activeAllocationIds, initializingAllocationIds), numberOfAdditionalInitializingAllocationIds); - final Set contextInitializingIds = Sets.union( - new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))), - newInitializingAllocationIds); + FakeClusterState clusterState = initialState(); + clusterState.apply(oldPrimary); + clusterState.apply(newPrimary); - final int numberOfAdditionalActiveAllocationIds = randomIntBetween(0, 8); - final Set contextActiveAllocationIds = Sets.union( - Sets.union( - activeAllocationIds, - randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfAdditionalActiveAllocationIds)), - initializedAllocationIds); + activatePrimary(clusterState, oldPrimary); - final ObjectLongMap activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>(); - for (final String allocationId : contextActiveAllocationIds) { - activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); - } - final ObjectLongMap initializingAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>(); - for (final String allocationId : contextInitializingIds) { - initializingAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); - } - - final PrimaryContext primaryContext = new PrimaryContext( - initialClusterStateVersion - randomIntBetween(0, Math.toIntExact(initialClusterStateVersion) - 1), - activeAllocationIdsLocalCheckpoints, - initializingAllocationIdsLocalCheckpoints); - - tracker.updateAllocationIdsFromPrimaryContext(primaryContext); - - // the primary context carries an older cluster state version - assertThat(tracker.appliedClusterStateVersion, equalTo(initialClusterStateVersion)); - - // only existing active allocation IDs and initializing allocation IDs that moved to initialized should be in-sync - assertThat( - Sets.union(activeAllocationIds, initializedAllocationIds), - equalTo( - StreamSupport - .stream(tracker.inSyncLocalCheckpoints.keys().spliterator(), false) - .map(e -> e.value) - .collect(Collectors.toSet()))); - - // the local checkpoints known to the tracker for in-sync shards should match what is known in the primary context - for (final String allocationId : Sets.union(activeAllocationIds, initializedAllocationIds)) { - assertThat( - tracker.inSyncLocalCheckpoints.get(allocationId), equalTo(primaryContext.inSyncLocalCheckpoints().get(allocationId))); - } - - // only existing initializing allocation IDs that did not moved to initialized should be tracked - assertThat( - Sets.difference(initializingAllocationIds, initializedAllocationIds), - equalTo( - StreamSupport - .stream(tracker.trackingLocalCheckpoints.keys().spliterator(), false) - .map(e -> e.value) - .collect(Collectors.toSet()))); - - // the local checkpoints known to the tracker for initializing shards should match what is known in the primary context - for (final String allocationId : Sets.difference(initializingAllocationIds, initializedAllocationIds)) { - if (primaryContext.trackingLocalCheckpoints().containsKey(allocationId)) { - assertThat( - tracker.trackingLocalCheckpoints.get(allocationId), - equalTo(primaryContext.trackingLocalCheckpoints().get(allocationId))); - } else { - assertThat(tracker.trackingLocalCheckpoints.get(allocationId), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + final int numUpdates = randomInt(10); + for (int i = 0; i < numUpdates; i++) { + if (rarely()) { + clusterState = randomUpdateClusterState(clusterState); + clusterState.apply(oldPrimary); + clusterState.apply(newPrimary); + } + if (randomBoolean()) { + randomLocalCheckpointUpdate(oldPrimary); + } + if (randomBoolean()) { + randomMarkInSync(oldPrimary); } } - // the global checkpoint can only be computed from active allocation IDs and initializing allocation IDs that moved to initializing - final long globalCheckpoint = - StreamSupport - .stream(activeAllocationIdsLocalCheckpoints.spliterator(), false) - .filter(e -> tracker.inSyncLocalCheckpoints.containsKey(e.key) || initializedAllocationIds.contains(e.key)) - .mapToLong(e -> e.value) - .min() - .orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO); - assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint)); + GlobalCheckpointTracker.PrimaryContext primaryContext = oldPrimary.startRelocationHandoff(); + + if (randomBoolean()) { + // cluster state update after primary context handoff + if (randomBoolean()) { + clusterState = randomUpdateClusterState(clusterState); + clusterState.apply(oldPrimary); + clusterState.apply(newPrimary); + } + + // abort handoff, check that we can continue updates and retry handoff + oldPrimary.abortRelocationHandoff(); + + if (rarely()) { + clusterState = randomUpdateClusterState(clusterState); + clusterState.apply(oldPrimary); + clusterState.apply(newPrimary); + } + if (randomBoolean()) { + randomLocalCheckpointUpdate(oldPrimary); + } + if (randomBoolean()) { + randomMarkInSync(oldPrimary); + } + + // do another handoff + primaryContext = oldPrimary.startRelocationHandoff(); + } + + // send primary context through the wire + BytesStreamOutput output = new BytesStreamOutput(); + primaryContext.writeTo(output); + StreamInput streamInput = output.bytes().streamInput(); + primaryContext = new GlobalCheckpointTracker.PrimaryContext(streamInput); + + switch (randomInt(3)) { + case 0: { + // apply cluster state update on old primary while primary context is being transferred + clusterState = randomUpdateClusterState(clusterState); + clusterState.apply(oldPrimary); + // activate new primary + newPrimary.activateWithPrimaryContext(primaryContext); + // apply cluster state update on new primary so that the states on old and new primary are comparable + clusterState.apply(newPrimary); + break; + } + case 1: { + // apply cluster state update on new primary while primary context is being transferred + clusterState = randomUpdateClusterState(clusterState); + clusterState.apply(newPrimary); + // activate new primary + newPrimary.activateWithPrimaryContext(primaryContext); + // apply cluster state update on old primary so that the states on old and new primary are comparable + clusterState.apply(oldPrimary); + break; + } + case 2: { + // apply cluster state update on both copies while primary context is being transferred + clusterState = randomUpdateClusterState(clusterState); + clusterState.apply(oldPrimary); + clusterState.apply(newPrimary); + newPrimary.activateWithPrimaryContext(primaryContext); + break; + } + case 3: { + // no cluster state update + newPrimary.activateWithPrimaryContext(primaryContext); + break; + } + } + + assertTrue(oldPrimary.primaryMode); + assertTrue(newPrimary.primaryMode); + assertThat(newPrimary.appliedClusterStateVersion, equalTo(oldPrimary.appliedClusterStateVersion)); + assertThat(newPrimary.localCheckpoints, equalTo(oldPrimary.localCheckpoints)); + assertThat(newPrimary.globalCheckpoint, equalTo(oldPrimary.globalCheckpoint)); + + oldPrimary.completeRelocationHandoff(); + assertFalse(oldPrimary.primaryMode); } - public void testPrimaryContextNewerThanAppliedClusterState() { - final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE); - final int numberOfActiveAllocationsIds = randomIntBetween(0, 8); + public void testIllegalStateExceptionIfUnknownAllocationId() { + final String active = randomAlphaOfLength(16); + final String initializing = randomAlphaOfLength(32); + tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active), Collections.singleton(initializing), emptySet()); + tracker.activatePrimaryMode(active, NO_OPS_PERFORMED); + + expectThrows(IllegalStateException.class, () -> tracker.initiateTracking(randomAlphaOfLength(10))); + expectThrows(IllegalStateException.class, () -> tracker.markAllocationIdAsInSync(randomAlphaOfLength(10), randomNonNegativeLong())); + } + + private static class FakeClusterState { + final long version; + final Set inSyncIds; + final Set initializingIds; + + private FakeClusterState(long version, Set inSyncIds, Set initializingIds) { + this.version = version; + this.inSyncIds = Collections.unmodifiableSet(inSyncIds); + this.initializingIds = Collections.unmodifiableSet(initializingIds); + } + + public Set allIds() { + return Sets.union(initializingIds, inSyncIds); + } + + public void apply(GlobalCheckpointTracker gcp) { + gcp.updateFromMaster(version, inSyncIds, initializingIds, Collections.emptySet()); + } + } + + private static FakeClusterState initialState() { + final long initialClusterStateVersion = randomIntBetween(1, Integer.MAX_VALUE); + final int numberOfActiveAllocationsIds = randomIntBetween(1, 8); final int numberOfInitializingIds = randomIntBetween(0, 8); final Tuple, Set> activeAndInitializingAllocationIds = - randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); + randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); final Set initializingAllocationIds = activeAndInitializingAllocationIds.v2(); - tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds); - - /* - * We are going to establish a primary context from a cluster state version older than the applied cluster state version on the - * tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the - * newer cluster state is a subset of the allocation IDs in the applied cluster state with the caveat that an existing initializing - * allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the set of - * initializing allocation IDs is otherwise arbitrary. - */ - final int numberOfNewInitializingAllocationIds = randomIntBetween(0, 8); - final Set initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds)); - final Set newInitializingAllocationIds = - randomAllocationIdsExcludingExistingIds( - Sets.union(activeAllocationIds, initializingAllocationIds), numberOfNewInitializingAllocationIds); - - final ObjectLongMap activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>(); - for (final String allocationId : Sets.union(new HashSet<>(randomSubsetOf(activeAllocationIds)), initializedAllocationIds)) { - activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); - } - final ObjectLongMap initializingIdsLocalCheckpoints = new ObjectLongHashMap<>(); - final Set contextInitializingAllocationIds = Sets.union( - new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))), - newInitializingAllocationIds); - for (final String allocationId : contextInitializingAllocationIds) { - initializingIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); - } - - final PrimaryContext primaryContext = - new PrimaryContext( - initialClusterStateVersion + randomIntBetween(0, Integer.MAX_VALUE) + 1, - activeAllocationIdsLocalCheckpoints, - initializingIdsLocalCheckpoints); - - tracker.updateAllocationIdsFromPrimaryContext(primaryContext); - - final PrimaryContext trackerPrimaryContext = tracker.primaryContext(); - try { - assertTrue(tracker.sealed()); - final long globalCheckpoint = - StreamSupport - .stream(activeAllocationIdsLocalCheckpoints.values().spliterator(), false) - .mapToLong(e -> e.value) - .min() - .orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO); - - // the primary context contains knowledge of the state of the entire universe - assertThat(primaryContext.clusterStateVersion(), equalTo(trackerPrimaryContext.clusterStateVersion())); - assertThat(primaryContext.inSyncLocalCheckpoints(), equalTo(trackerPrimaryContext.inSyncLocalCheckpoints())); - assertThat(primaryContext.trackingLocalCheckpoints(), equalTo(trackerPrimaryContext.trackingLocalCheckpoints())); - assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint)); - } finally { - tracker.releasePrimaryContext(); - assertFalse(tracker.sealed()); - } + return new FakeClusterState(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds); } - public void testPrimaryContextSealing() { - // the tracker should start in the state of not being sealed - assertFalse(tracker.sealed()); - - // sampling the primary context should seal the tracker - tracker.primaryContext(); - assertTrue(tracker.sealed()); - - /* - * Invoking methods that mutates the state of the tracker should fail (with the exception of updating allocation IDs and updating - * global checkpoint on replica which can happen on the relocation source). - */ - assertIllegalStateExceptionWhenSealed(() -> tracker.updateLocalCheckpoint(randomAlphaOfLength(16), randomNonNegativeLong())); - assertIllegalStateExceptionWhenSealed(() -> tracker.updateAllocationIdsFromPrimaryContext(mock(PrimaryContext.class))); - assertIllegalStateExceptionWhenSealed(() -> tracker.primaryContext()); - assertIllegalStateExceptionWhenSealed(() -> tracker.markAllocationIdAsInSync(randomAlphaOfLength(16), randomNonNegativeLong())); - - // closing the releasable should unseal the tracker - tracker.releasePrimaryContext(); - assertFalse(tracker.sealed()); + private static void activatePrimary(FakeClusterState clusterState, GlobalCheckpointTracker gcp) { + gcp.activatePrimaryMode(randomFrom(clusterState.inSyncIds), randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10)); } - private void assertIllegalStateExceptionWhenSealed(final ThrowingRunnable runnable) { - final IllegalStateException e = expectThrows(IllegalStateException.class, runnable); - assertThat(e, hasToString(containsString("global checkpoint tracker is sealed"))); + private static void randomLocalCheckpointUpdate(GlobalCheckpointTracker gcp) { + String allocationId = randomFrom(gcp.localCheckpoints.keySet()); + long currentLocalCheckpoint = gcp.localCheckpoints.get(allocationId).getLocalCheckpoint(); + gcp.updateLocalCheckpoint(allocationId, currentLocalCheckpoint + randomInt(5)); } - private Tuple, Set> randomActiveAndInitializingAllocationIds( + private static void randomMarkInSync(GlobalCheckpointTracker gcp) { + String allocationId = randomFrom(gcp.localCheckpoints.keySet()); + long newLocalCheckpoint = Math.max(NO_OPS_PERFORMED, gcp.getGlobalCheckpoint() + randomInt(5)); + markAllocationIdAsInSyncQuietly(gcp, allocationId, newLocalCheckpoint); + } + + private static FakeClusterState randomUpdateClusterState(FakeClusterState clusterState) { + final Set initializingIdsToAdd = randomAllocationIdsExcludingExistingIds(clusterState.allIds(), randomInt(2)); + final Set initializingIdsToRemove = new HashSet<>( + randomSubsetOf(randomInt(clusterState.initializingIds.size()), clusterState.initializingIds)); + final Set inSyncIdsToRemove = new HashSet<>( + randomSubsetOf(randomInt(clusterState.inSyncIds.size()), clusterState.inSyncIds)); + final Set remainingInSyncIds = Sets.difference(clusterState.inSyncIds, inSyncIdsToRemove); + return new FakeClusterState(clusterState.version + randomIntBetween(1, 5), + remainingInSyncIds.isEmpty() ? clusterState.inSyncIds : remainingInSyncIds, + Sets.difference(Sets.union(clusterState.initializingIds, initializingIdsToAdd), initializingIdsToRemove)); + } + + private static Tuple, Set> randomActiveAndInitializingAllocationIds( final int numberOfActiveAllocationsIds, final int numberOfInitializingIds) { final Set activeAllocationIds = - IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16) + i).collect(Collectors.toSet()); + IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16) + i).collect(Collectors.toSet()); final Set initializingIds = randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfInitializingIds); return Tuple.tuple(activeAllocationIds, initializingIds); } - private Set randomAllocationIdsExcludingExistingIds(final Set existingAllocationIds, final int numberOfAllocationIds) { + private static Set randomAllocationIdsExcludingExistingIds(final Set existingAllocationIds, + final int numberOfAllocationIds) { return IntStream.range(0, numberOfAllocationIds).mapToObj(i -> { do { final String newAllocationId = randomAlphaOfLength(16); @@ -733,7 +741,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { }).collect(Collectors.toSet()); } - private void markAllocationIdAsInSyncQuietly( + private static void markAllocationIdAsInSyncQuietly( final GlobalCheckpointTracker tracker, final String allocationId, final long localCheckpoint) { try { tracker.markAllocationIdAsInSync(allocationId, localCheckpoint); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e709780437a..b8ad55b6046 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -347,7 +347,7 @@ public class IndexShardTests extends IndexShardTestCase { ShardRoutingState.STARTED, replicaRouting.allocationId()); indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, - 0L, Collections.emptySet(), Collections.emptySet()); + 0L, Collections.singleton(primaryRouting.allocationId().getId()), Collections.emptySet(), Collections.emptySet()); final int delayedOperations = scaledRandomIntBetween(1, 64); final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations); @@ -422,7 +422,7 @@ public class IndexShardTests extends IndexShardTestCase { ShardRoutingState.STARTED, replicaRouting.allocationId()); indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, - 0L, Collections.emptySet(), Collections.emptySet()); + 0L, Collections.singleton(primaryRouting.allocationId().getId()), Collections.emptySet(), Collections.emptySet()); /* * This operation completing means that the delay operation executed as part of increasing the primary term has completed and the @@ -463,8 +463,8 @@ public class IndexShardTests extends IndexShardTestCase { ShardRouting replicaRouting = indexShard.routingEntry(); ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); - indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, - 0L, Collections.emptySet(), Collections.emptySet()); + indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, 0L, + Collections.singleton(indexShard.routingEntry().allocationId().getId()), Collections.emptySet(), Collections.emptySet()); } else { indexShard = newStartedShard(true); } @@ -598,11 +598,11 @@ public class IndexShardTests extends IndexShardTestCase { final long newPrimaryTerm = primaryTerm + 1 + randomInt(20); if (engineClosed == false) { assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); - assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); } final long newGlobalCheckPoint; if (engineClosed || randomBoolean()) { - newGlobalCheckPoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + newGlobalCheckPoint = SequenceNumbersService.NO_OPS_PERFORMED; } else { long localCheckPoint = indexShard.getGlobalCheckpoint() + randomInt(100); // advance local checkpoint @@ -1267,7 +1267,6 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(shard); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25419") public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, InterruptedException, BrokenBarrierException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); @@ -1321,8 +1320,11 @@ public class IndexShardTests extends IndexShardTestCase { public void testRecoverFromStore() throws IOException { final IndexShard shard = newStartedShard(true); - int translogOps = 1; - indexDoc(shard, "test", "0"); + int totalOps = randomInt(10); + int translogOps = totalOps; + for (int i = 0; i < totalOps; i++) { + indexDoc(shard, "test", Integer.toString(i)); + } if (randomBoolean()) { flushShard(shard); translogOps = 0; @@ -1336,10 +1338,33 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); - assertDocCount(newShard, 1); + // check that local checkpoint of new primary is properly tracked after recovery + assertThat(newShard.getLocalCheckpoint(), equalTo(totalOps - 1L)); + assertThat(IndexShardTestCase.getEngine(newShard).seqNoService() + .getTrackedLocalCheckpointForShard(newShard.routingEntry().allocationId().getId()), equalTo(totalOps - 1L)); + assertDocCount(newShard, totalOps); closeShards(newShard); } + public void testPrimaryHandOffUpdatesLocalCheckpoint() throws IOException { + final IndexShard primarySource = newStartedShard(true); + int totalOps = randomInt(10); + for (int i = 0; i < totalOps; i++) { + indexDoc(primarySource, "test", Integer.toString(i)); + } + IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate("n2", -1)); + final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard()); + updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData()); + recoverReplica(primaryTarget, primarySource); + + // check that local checkpoint of new primary is properly tracked after primary relocation + assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L)); + assertThat(IndexShardTestCase.getEngine(primaryTarget).seqNoService() + .getTrackedLocalCheckpointForShard(primaryTarget.routingEntry().allocationId().getId()), equalTo(totalOps - 1L)); + assertDocCount(primaryTarget, totalOps); + closeShards(primarySource, primaryTarget); + } + /* This test just verifies that we fill up local checkpoint up to max seen seqID on primary recovery */ public void testRecoverFromStoreWithNoOps() throws IOException { final IndexShard shard = newStartedShard(true); @@ -1888,6 +1913,10 @@ public class IndexShardTests extends IndexShardTestCase { } } IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry())); + // check that local checkpoint of new primary is properly tracked after recovery + assertThat(targetShard.getLocalCheckpoint(), equalTo(1L)); + assertThat(IndexShardTestCase.getEngine(targetShard).seqNoService() + .getTrackedLocalCheckpointForShard(targetShard.routingEntry().allocationId().getId()), equalTo(1L)); assertDocCount(targetShard, 2); } // now check that it's persistent ie. that the added shards are committed diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index c2c44421b84..7779dca1929 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -63,7 +63,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { String allocationId = shard.routingEntry().allocationId().getId(); shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), - Collections.emptySet()); + Collections.emptySet(), Collections.emptySet()); shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint); assertEquals(globalCheckPoint, shard.getGlobalCheckpoint()); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 419b7a430da..235d2ab4d32 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -35,7 +35,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -141,12 +140,12 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC if (shard.routingEntry().primary() && shard.routingEntry().active()) { IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shard.shardId()); - Set activeIds = shardRoutingTable.activeShards().stream() - .map(r -> r.allocationId().getId()).collect(Collectors.toSet()); + Set inSyncIds = state.metaData().index(shard.shardId().getIndex()) + .inSyncAllocationIds(shard.shardId().id()); Set initializingIds = shardRoutingTable.getAllInitializingShards().stream() .map(r -> r.allocationId().getId()).collect(Collectors.toSet()); - assertThat(shard.routingEntry() + " isn't updated with active aIDs", shard.activeAllocationIds, - equalTo(activeIds)); + assertThat(shard.routingEntry() + " isn't updated with in-sync aIDs", shard.inSyncAllocationIds, + equalTo(inSyncIds)); assertThat(shard.routingEntry() + " isn't updated with init aIDs", shard.initializingAllocationIds, equalTo(initializingIds)); } @@ -326,7 +325,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC private volatile long clusterStateVersion; private volatile ShardRouting shardRouting; private volatile RecoveryState recoveryState; - private volatile Set activeAllocationIds; + private volatile Set inSyncAllocationIds; private volatile Set initializingAllocationIds; private volatile long term; @@ -350,8 +349,9 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC long newPrimaryTerm, CheckedBiConsumer, IOException> primaryReplicaSyncer, long applyingClusterStateVersion, - Set activeAllocationIds, - Set initializingAllocationIds) throws IOException { + Set inSyncAllocationIds, + Set initializingAllocationIds, + Set pre60AllocationIds) throws IOException { failRandomly(); assertThat(this.shardId(), equalTo(shardRouting.shardId())); assertTrue("current: " + this.shardRouting + ", got: " + shardRouting, this.shardRouting.isSameAllocation(shardRouting)); @@ -363,7 +363,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC if (shardRouting.primary()) { term = newPrimaryTerm; this.clusterStateVersion = applyingClusterStateVersion; - this.activeAllocationIds = activeAllocationIds; + this.inSyncAllocationIds = inSyncAllocationIds; this.initializingAllocationIds = initializingAllocationIds; } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 5532ad040f2..156a66e3304 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -86,6 +86,7 @@ import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -378,6 +379,10 @@ public class RecoverySourceHandlerTests extends ESTestCase { when(shard.acquireTranslogView()).thenReturn(translogView); when(shard.state()).thenReturn(IndexShardState.RELOCATED); when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); + doAnswer(invocation -> { + ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(), anyString()); final AtomicBoolean phase1Called = new AtomicBoolean(); // final Engine.IndexCommitRef indexCommitRef = mock(Engine.IndexCommitRef.class); // when(shard.acquireIndexCommit(anyBoolean())).thenReturn(indexCommitRef); @@ -420,7 +425,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { expectThrows(IndexShardRelocatedException.class, handler::recoverToTarget); // phase1 should only be attempted if we are not doing a sequence-number-based recovery assertThat(phase1Called.get(), equalTo(!isTranslogReadyForSequenceNumberBasedRecovery)); - assertTrue(prepareTargetForTranslogCalled.get()); + assertFalse(prepareTargetForTranslogCalled.get()); assertFalse(phase2Called.get()); } diff --git a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java index d6daf3509f5..50035e1027b 100644 --- a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java @@ -54,7 +54,6 @@ public class FullRollingRestartIT extends ESIntegTestCase { return 1; } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25420") public void testFullRollingRestart() throws Exception { Settings settings = Settings.builder().put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "30s").build(); internalCluster().startNode(settings); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index a59f490e6ee..9cd0a6cce00 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -76,6 +76,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; import java.io.IOException; import java.util.Arrays; @@ -84,6 +85,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -110,11 +112,13 @@ public abstract class IndexShardTestCase extends ESTestCase { }; protected ThreadPool threadPool; + private long primaryTerm; @Override public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); + primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards } @Override @@ -164,7 +168,7 @@ public abstract class IndexShardTestCase extends ESTestCase { .build(); IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName()) .settings(settings) - .primaryTerm(0, randomIntBetween(1, 100)); + .primaryTerm(0, primaryTerm); return newShard(shardRouting, metaData.build(), listeners); } @@ -360,8 +364,15 @@ public abstract class IndexShardTestCase extends ESTestCase { updateRoutingEntry(primary, ShardRoutingHelper.moveToStarted(primary.routingEntry())); } + protected static AtomicLong currentClusterStateVersion = new AtomicLong(); + public static void updateRoutingEntry(IndexShard shard, ShardRouting shardRouting) throws IOException { - shard.updateShardState(shardRouting, shard.getPrimaryTerm(), null, 0L, Collections.emptySet(), Collections.emptySet()); + Set inSyncIds = + shardRouting.active() ? Collections.singleton(shardRouting.allocationId().getId()) : Collections.emptySet(); + Set initializingIds = + shardRouting.initializing() ? Collections.singleton(shardRouting.allocationId().getId()) : Collections.emptySet(); + shard.updateShardState(shardRouting, shard.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), + inSyncIds, initializingIds, Collections.emptySet()); } protected void recoveryEmptyReplica(IndexShard replica) throws IOException { @@ -387,6 +398,16 @@ public abstract class IndexShardTestCase extends ESTestCase { true); } + /** recovers a replica from the given primary **/ + protected void recoverReplica(final IndexShard replica, + final IndexShard primary, + final BiFunction targetSupplier, + final boolean markAsRecovering) throws IOException { + recoverReplica(replica, primary, targetSupplier, markAsRecovering, + Collections.singleton(primary.routingEntry().allocationId().getId()), + Collections.singleton(replica.routingEntry().allocationId().getId())); + } + /** * Recovers a replica from the give primary, allow the user to supply a custom recovery target. A typical usage of a custom recovery * target is to assert things in the various stages of recovery. @@ -398,7 +419,9 @@ public abstract class IndexShardTestCase extends ESTestCase { protected final void recoverReplica(final IndexShard replica, final IndexShard primary, final BiFunction targetSupplier, - final boolean markAsRecovering) throws IOException { + final boolean markAsRecovering, + final Set inSyncIds, + final Set initializingIds) throws IOException { final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); if (markAsRecovering) { @@ -419,7 +442,7 @@ public abstract class IndexShardTestCase extends ESTestCase { } final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId, - pNode, rNode, snapshot, false, 0, startingSeqNo); + pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo); final RecoverySourceHandler recovery = new RecoverySourceHandler( primary, recoveryTarget, @@ -428,9 +451,19 @@ public abstract class IndexShardTestCase extends ESTestCase { e -> () -> {}, (int) ByteSizeUnit.MB.toBytes(1), Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), pNode.getName()).build()); + primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), + inSyncIds, initializingIds, Collections.emptySet()); recovery.recoverToTarget(); recoveryTarget.markAsDone(); - updateRoutingEntry(replica, ShardRoutingHelper.moveToStarted(replica.routingEntry())); + Set initializingIdsWithoutReplica = new HashSet<>(initializingIds); + initializingIdsWithoutReplica.remove(replica.routingEntry().allocationId().getId()); + Set inSyncIdsWithReplica = new HashSet<>(inSyncIds); + inSyncIdsWithReplica.add(replica.routingEntry().allocationId().getId()); + // update both primary and replica shard state + primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), + inSyncIdsWithReplica, initializingIdsWithoutReplica, Collections.emptySet()); + replica.updateShardState(replica.routingEntry().moveToStarted(), replica.getPrimaryTerm(), null, + currentClusterStateVersion.get(), inSyncIdsWithReplica, initializingIdsWithoutReplica, Collections.emptySet()); } private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException { @@ -529,4 +562,11 @@ public abstract class IndexShardTestCase extends ESTestCase { protected void flushShard(IndexShard shard, boolean force) { shard.flush(new FlushRequest(shard.shardId().getIndexName()).force(force)); } + + /** + * Helper method to access (package-protected) engine from tests + */ + public static Engine getEngine(IndexShard indexShard) { + return indexShard.getEngine(); + } }