diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java index fd9b2f4687f..9c9f9a3ca49 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java @@ -22,7 +22,7 @@ package org.elasticsearch.index.seqno; import com.carrotsearch.hppc.ObjectLongHashMap; import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.cursors.ObjectLongCursor; -import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; @@ -31,8 +31,6 @@ import java.util.HashSet; import java.util.Locale; import java.util.Set; -import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO; - /** * This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or * equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts @@ -49,14 +47,20 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by * allocation IDs. All accesses to this set are guarded by a lock on this. */ - private final ObjectLongMap inSyncLocalCheckpoints; + final ObjectLongMap inSyncLocalCheckpoints; /* - * This set holds the last set of known valid allocation ids as received by the master. This is important to make sure shard that are - * failed or relocated are cleaned up from {@link #inSyncLocalCheckpoints} and do not hold the global checkpoint back. All accesses to - * this set are guarded by a lock on this. + * This map holds the last known local checkpoint for initializing shards that are undergoing recovery. Such shards do not participate + * in determining the global checkpoint. We must track these local checkpoints so that when a shard is activated we use the highest + * known checkpoint. */ - private final Set assignedAllocationIds; + final ObjectLongMap trackingLocalCheckpoints; + + /* + * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the + * current global checkpoint. + */ + final Set pendingInSync; /* * The current global checkpoint for this shard. Note that this field is guarded by a lock on this and thus this field does not need to @@ -74,10 +78,11 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { */ GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { super(shardId, indexSettings); - assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; - inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); - assignedAllocationIds = new HashSet<>(1 + indexSettings.getNumberOfReplicas()); + assert globalCheckpoint >= SequenceNumbersService.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; + this.inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); + this.trackingLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); this.globalCheckpoint = globalCheckpoint; + this.pendingInSync = new HashSet<>(); } /** @@ -86,59 +91,85 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * shards that are removed to be re-added. * * @param allocationId the allocation ID of the shard to update the local checkpoint for - * @param checkpoint the local checkpoint for the shard + * @param localCheckpoint the local checkpoint for the shard */ - public synchronized void updateLocalCheckpoint(final String allocationId, final long checkpoint) { - final int indexOfKey = inSyncLocalCheckpoints.indexOf(allocationId); - if (indexOfKey >= 0) { - final long current = inSyncLocalCheckpoints.indexGet(indexOfKey); - if (current < checkpoint) { - inSyncLocalCheckpoints.indexReplace(indexOfKey, checkpoint); - if (logger.isTraceEnabled()) { - logger.trace("updated local checkpoint of [{}] to [{}] (was [{}])", allocationId, checkpoint, current); - } + public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) { + final boolean updated; + if (updateLocalCheckpoint(allocationId, localCheckpoint, inSyncLocalCheckpoints, "in-sync")) { + updated = true; + } else if (updateLocalCheckpoint(allocationId, localCheckpoint, trackingLocalCheckpoints, "tracking")) { + updated = true; + } else { + logger.trace("ignored local checkpoint [{}] of [{}], allocation ID is not tracked", localCheckpoint, allocationId); + updated = false; + } + if (updated) { + notifyAllWaiters(); + } + } + + @SuppressForbidden(reason = "Object#notifyAll waiters for local checkpoint advancement") + private synchronized void notifyAllWaiters() { + this.notifyAll(); + } + + private boolean updateLocalCheckpoint( + final String allocationId, final long localCheckpoint, ObjectLongMap map, final String reason) { + final int index = map.indexOf(allocationId); + if (index >= 0) { + final long current = map.indexGet(index); + if (current < localCheckpoint) { + map.indexReplace(index, localCheckpoint); + logger.trace("updated local checkpoint of [{}] in [{}] from [{}] to [{}]", allocationId, reason, current, localCheckpoint); } else { logger.trace( - "skipping update of local checkpoint [{}], current checkpoint is higher (current [{}], incoming [{}], type [{}])", - allocationId, - current, - checkpoint, - allocationId); + "skipped updating local checkpoint of [{}] in [{}] from [{}] to [{}], current checkpoint is higher", + allocationId, + reason, + current, + localCheckpoint); } + return true; } else { - logger.trace("[{}] isn't marked as in sync. ignoring local checkpoint of [{}].", allocationId, checkpoint); + return false; } } /** * Scans through the currently known local checkpoint and updates the global checkpoint accordingly. * - * @return {@code true} if the checkpoint has been updated or if it can not be updated since one of the local checkpoints of one of the - * active allocations is not known. + * @return {@code true} if the checkpoint has been updated or if it can not be updated since the local checkpoints of one of the active + * allocations is not known. */ synchronized boolean updateCheckpointOnPrimary() { - long minCheckpoint = Long.MAX_VALUE; - if (inSyncLocalCheckpoints.isEmpty()) { + long minLocalCheckpoint = Long.MAX_VALUE; + if (inSyncLocalCheckpoints.isEmpty() || !pendingInSync.isEmpty()) { return false; } - for (final ObjectLongCursor cp : inSyncLocalCheckpoints) { - if (cp.value == UNASSIGNED_SEQ_NO) { - logger.trace("unknown local checkpoint for active allocationId [{}], requesting a sync", cp.key); + for (final ObjectLongCursor localCheckpoint : inSyncLocalCheckpoints) { + if (localCheckpoint.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + logger.trace("unknown local checkpoint for active allocation ID [{}], requesting a sync", localCheckpoint.key); return true; } - minCheckpoint = Math.min(cp.value, minCheckpoint); + minLocalCheckpoint = Math.min(localCheckpoint.value, minLocalCheckpoint); } - if (minCheckpoint < globalCheckpoint) { + assert minLocalCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO : "new global checkpoint must be assigned"; + if (minLocalCheckpoint < globalCheckpoint) { final String message = - String.format(Locale.ROOT, "new global checkpoint [%d] is lower than previous one [%d]", minCheckpoint, globalCheckpoint); + String.format( + Locale.ROOT, + "new global checkpoint [%d] is lower than previous one [%d]", + minLocalCheckpoint, + globalCheckpoint); throw new IllegalStateException(message); } - if (globalCheckpoint != minCheckpoint) { - logger.trace("global checkpoint updated to [{}]", minCheckpoint); - globalCheckpoint = minCheckpoint; + if (globalCheckpoint != minLocalCheckpoint) { + logger.trace("global checkpoint updated to [{}]", minLocalCheckpoint); + globalCheckpoint = minLocalCheckpoint; return true; + } else { + return false; } - return false; } /** @@ -153,17 +184,17 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { /** * Updates the global checkpoint on a replica shard after it has been updated by the primary. * - * @param checkpoint the global checkpoint + * @param globalCheckpoint the global checkpoint */ - synchronized void updateCheckpointOnReplica(final long checkpoint) { + synchronized void updateGlobalCheckpointOnReplica(final long globalCheckpoint) { /* * The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary * information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other * replica shards). In these cases, the local knowledge of the global checkpoint could be higher than sync from the lagging primary. */ - if (this.globalCheckpoint <= checkpoint) { - this.globalCheckpoint = checkpoint; - logger.trace("global checkpoint updated from primary to [{}]", checkpoint); + if (this.globalCheckpoint <= globalCheckpoint) { + this.globalCheckpoint = globalCheckpoint; + logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint); } } @@ -173,33 +204,98 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * @param activeAllocationIds the allocation IDs of the currently active shard copies * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ - public synchronized void updateAllocationIdsFromMaster(final Set activeAllocationIds, - final Set initializingAllocationIds) { - assignedAllocationIds.removeIf( - aId -> activeAllocationIds.contains(aId) == false && initializingAllocationIds.contains(aId) == false); - assignedAllocationIds.addAll(activeAllocationIds); - assignedAllocationIds.addAll(initializingAllocationIds); - for (String activeId : activeAllocationIds) { - if (inSyncLocalCheckpoints.containsKey(activeId) == false) { - inSyncLocalCheckpoints.put(activeId, UNASSIGNED_SEQ_NO); + public synchronized void updateAllocationIdsFromMaster( + final Set activeAllocationIds, final Set initializingAllocationIds) { + // remove shards whose allocation ID no longer exists + inSyncLocalCheckpoints.removeAll(a -> !activeAllocationIds.contains(a) && !initializingAllocationIds.contains(a)); + + // add any new active allocation IDs + for (final String a : activeAllocationIds) { + if (!inSyncLocalCheckpoints.containsKey(a)) { + final long localCheckpoint = trackingLocalCheckpoints.getOrDefault(a, SequenceNumbersService.UNASSIGNED_SEQ_NO); + inSyncLocalCheckpoints.put(a, localCheckpoint); + logger.trace("marked [{}] as in-sync with local checkpoint [{}] via cluster state update from master", a, localCheckpoint); } } - inSyncLocalCheckpoints.removeAll(key -> assignedAllocationIds.contains(key) == false); + + trackingLocalCheckpoints.removeAll(a -> !initializingAllocationIds.contains(a)); + for (final String a : initializingAllocationIds) { + if (inSyncLocalCheckpoints.containsKey(a)) { + /* + * This can happen if we mark the allocation ID as in sync at the end of recovery before seeing a cluster state update from + * marking the shard as active. + */ + continue; + } + if (trackingLocalCheckpoints.containsKey(a)) { + // we are already tracking this allocation ID + continue; + } + // this is a new allocation ID + trackingLocalCheckpoints.put(a, SequenceNumbersService.UNASSIGNED_SEQ_NO); + logger.trace("tracking [{}] via cluster state update from master", a); + } } /** - * Marks the shard with the provided allocation ID as in-sync with the primary shard. This should be called at the end of recovery where - * the primary knows all operations below the global checkpoint have been completed on this shard. + * Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint + * on the specified shard advances above the current global checkpoint. * - * @param allocationId the allocation ID of the shard to mark as in-sync + * @param allocationId the allocation ID of the shard to mark as in-sync + * @param localCheckpoint the current local checkpoint on the shard + * + * @throws InterruptedException if the thread is interrupted waiting for the local checkpoint on the shard to advance */ - public synchronized void markAllocationIdAsInSync(final String allocationId) { - if (assignedAllocationIds.contains(allocationId) == false) { - // master has removed this allocation, ignore + public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { + if (!trackingLocalCheckpoints.containsKey(allocationId)) { + /* + * This can happen if the recovery target has been failed and the cluster state update from the master has triggered removing + * this allocation ID from the tracking map but this recovery thread has not yet been made aware that the recovery is + * cancelled. + */ return; } - logger.trace("marked [{}] as in sync", allocationId); - inSyncLocalCheckpoints.put(allocationId, UNASSIGNED_SEQ_NO); + + updateLocalCheckpoint(allocationId, localCheckpoint, trackingLocalCheckpoints, "tracking"); + waitForAllocationIdToBeInSync(allocationId); + } + + private synchronized void waitForAllocationIdToBeInSync(final String allocationId) throws InterruptedException { + if (!pendingInSync.add(allocationId)) { + throw new IllegalStateException("there is already a pending sync in progress for allocation ID [" + allocationId + "]"); + } + try { + while (true) { + /* + * If the allocation has been cancelled and so removed from the tracking map from a cluster state update from the master it + * means that this recovery will be cancelled; we are here on a cancellable recovery thread and so this thread will throw + * an interrupted exception as soon as it tries to wait on the monitor. + */ + final long current = trackingLocalCheckpoints.getOrDefault(allocationId, Long.MIN_VALUE); + if (current >= globalCheckpoint) { + logger.trace("marked [{}] as in-sync with local checkpoint [{}]", allocationId, current); + trackingLocalCheckpoints.remove(allocationId); + /* + * This is prematurely adding the allocation ID to the in-sync map as at this point recovery is not yet finished and + * could still abort. At this point we will end up with a shard in the in-sync map holding back the global checkpoint + * because the shard never recovered and we would have to wait until either the recovery retries and completes + * successfully, or the master fails the shard and issues a cluster state update that removes the shard from the set of + * active allocation IDs. + */ + inSyncLocalCheckpoints.put(allocationId, current); + break; + } else { + waitForLocalCheckpointToAdvance(); + } + } + } finally { + pendingInSync.remove(allocationId); + } + } + + @SuppressForbidden(reason = "Object#wait for local checkpoint advancement") + private synchronized void waitForLocalCheckpointToAdvance() throws InterruptedException { + this.wait(); } /** @@ -213,7 +309,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { if (inSyncLocalCheckpoints.containsKey(allocationId)) { return inSyncLocalCheckpoints.get(allocationId); } - return UNASSIGNED_SEQ_NO; + return SequenceNumbersService.UNASSIGNED_SEQ_NO; } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index ddc0669f892..4b14ce8fff5 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -127,12 +127,13 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. See - * {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String)} for additional details. + * {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} for additional details. * - * @param allocationId the allocation ID of the shard to mark as in-sync + * @param allocationId the allocation ID of the shard to mark as in-sync + * @param localCheckpoint the current local checkpoint on the shard */ - public void markAllocationIdAsInSync(final String allocationId) { - globalCheckpointTracker.markAllocationIdAsInSync(allocationId); + public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { + globalCheckpointTracker.markAllocationIdAsInSync(allocationId, localCheckpoint); } /** @@ -166,10 +167,10 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { /** * Updates the global checkpoint on a replica shard after it has been updated by the primary. * - * @param checkpoint the global checkpoint + * @param globalCheckpoint the global checkpoint */ - public void updateGlobalCheckpointOnReplica(final long checkpoint) { - globalCheckpointTracker.updateCheckpointOnReplica(checkpoint); + public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) { + globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1da5e6763bc..f1cef1fb663 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1475,13 +1475,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. See - * {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String)} for additional details. + * {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} for additional details. * - * @param allocationId the allocation ID of the shard to mark as in-sync + * @param allocationId the allocation ID of the shard to mark as in-sync + * @param localCheckpoint the current local checkpoint on the shard */ - public void markAllocationIdAsInSync(final String allocationId) { + public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { verifyPrimary(); - getEngine().seqNoService().markAllocationIdAsInSync(allocationId); + getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); } /** @@ -1516,11 +1517,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Updates the global checkpoint on a replica shard after it has been updated by the primary. * - * @param checkpoint the global checkpoint + * @param globalCheckpoint the global checkpoint */ - public void updateGlobalCheckpointOnReplica(final long checkpoint) { + public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) { verifyReplicationTarget(); - getEngine().seqNoService().updateGlobalCheckpointOnReplica(checkpoint); + final SequenceNumbersService seqNoService = getEngine().seqNoService(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); + if (globalCheckpoint <= localCheckpoint) { + seqNoService.updateGlobalCheckpointOnReplica(globalCheckpoint); + } else { + /* + * This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global + * checkpoint updates from in-flight operations. However, since this shard is not yet contributing to calculating the global + * checkpoint, it can be the case that the global checkpoint update from the primary is ahead of the local checkpoint on this + * shard. In this case, we ignore the global checkpoint update. This should only happen if we are in the translog stage of + * recovery. Prior to this, the engine is not opened and this shard will not receive global checkpoint updates, and after this + * the shard will be contributing to calculations of the the global checkpoint. + */ + assert recoveryState().getStage() == RecoveryState.Stage.TRANSLOG + : "expected recovery stage [" + RecoveryState.Stage.TRANSLOG + "] but was [" + recoveryState().getStage() + "]"; + } } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/ChannelFactory.java b/core/src/main/java/org/elasticsearch/index/translog/ChannelFactory.java index 7e1dcec14df..ccb362a3507 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/ChannelFactory.java +++ b/core/src/main/java/org/elasticsearch/index/translog/ChannelFactory.java @@ -28,7 +28,7 @@ import java.nio.file.StandardOpenOption; * only for testing until we have a disk-full FileSystem */ @FunctionalInterface -interface ChannelFactory { +public interface ChannelFactory { default FileChannel open(Path path) throws IOException { return open(path, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 400395d1b20..8435fe4ee1e 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -348,20 +348,24 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde */ public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) { try { - final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.indexShard().shardPath().resolveTranslog()); + final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation()); final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint); if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) { - // commit point is good for seq no based recovery as the maximum seq# including in it - // is below the global checkpoint (i.e., it excludes any ops thay may not be on the primary) - // Recovery will start at the first op after the local check point stored in the commit. + /* + * Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global + * checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation + * after the local checkpoint stored in the commit. + */ return seqNoStats.getLocalCheckpoint() + 1; } else { return SequenceNumbersService.UNASSIGNED_SEQ_NO; } } catch (final IOException e) { - // this can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the - // translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and - // proceeds to attempt a sequence-number-based recovery + /* + * This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the + * translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and + * proceeds to attempt a sequence-number-based recovery. + */ return SequenceNumbersService.UNASSIGNED_SEQ_NO; } } @@ -418,7 +422,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde final RecoveryTarget recoveryTarget = recoveryRef.target(); try { recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps()); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint())); } catch (TranslogRecoveryPerformer.BatchOperationException exception) { MapperException mapperException = (MapperException) ExceptionsHelper.unwrap(exception, MapperException.class); if (mapperException == null) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index c53a46ef222..4c8c31779ac 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -179,14 +180,16 @@ public class RecoverySourceHandler { } logger.trace("snapshot translog for recovery; current size is [{}]", translogView.totalOperations()); + final long targetLocalCheckpoint; try { - phase2(isSequenceNumberBasedRecoveryPossible ? request.startingSeqNo() : SequenceNumbersService.UNASSIGNED_SEQ_NO, - translogView.snapshot()); + final long startingSeqNo = + isSequenceNumberBasedRecoveryPossible ? request.startingSeqNo() : SequenceNumbersService.UNASSIGNED_SEQ_NO; + targetLocalCheckpoint = phase2(startingSeqNo, translogView.snapshot()); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } - finalizeRecovery(); + finalizeRecovery(targetLocalCheckpoint); } return response; } @@ -410,8 +413,10 @@ public class RecoverySourceHandler { * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if all * ops should be sent * @param snapshot a snapshot of the translog + * + * @return the local checkpoint on the target */ - void phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { + long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -422,18 +427,19 @@ public class RecoverySourceHandler { logger.trace("recovery [phase2]: sending transaction log operations"); // send all the snapshot's translog operations to the target - final int totalOperations = sendSnapshot(startingSeqNo, snapshot); + final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot); stopWatch.stop(); logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); response.phase2Time = stopWatch.totalTime().millis(); - response.phase2Operations = totalOperations; + response.phase2Operations = result.totalOperations; + return result.targetLocalCheckpoint; } /* * finalizes the recovery process */ - public void finalizeRecovery() { + public void finalizeRecovery(final long targetLocalCheckpoint) { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -441,7 +447,7 @@ public class RecoverySourceHandler { StopWatch stopWatch = new StopWatch().start(); logger.trace("finalizing recovery"); cancellableThreads.execute(() -> { - shard.markAllocationIdAsInSync(request.targetAllocationId()); + shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint); recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint()); }); @@ -467,6 +473,18 @@ public class RecoverySourceHandler { logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); } + static class SendSnapshotResult { + + final long targetLocalCheckpoint; + final int totalOperations; + + SendSnapshotResult(final long targetLocalCheckpoint, final int totalOperations) { + this.targetLocalCheckpoint = targetLocalCheckpoint; + this.totalOperations = totalOperations; + } + + } + /** * Send the given snapshot's operations with a sequence number greater than the specified staring sequence number to this handler's * target node. @@ -475,19 +493,25 @@ public class RecoverySourceHandler { * * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent * @param snapshot the translog snapshot to replay operations from - * @return the total number of translog operations that were sent + * @return the local checkpoint on the target and the total number of operations sent * @throws IOException if an I/O exception occurred reading the translog snapshot */ - protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { + protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { int ops = 0; long size = 0; - int totalOperations = 0; + int skippedOps = 0; + int totalSentOps = 0; + final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); final List operations = new ArrayList<>(); - if (snapshot.totalOperations() == 0) { + final int expectedTotalOps = snapshot.totalOperations(); + if (expectedTotalOps == 0) { logger.trace("no translog operations to send"); } + final CancellableThreads.Interruptable sendBatch = + () -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps)); + // send operations in batches Translog.Operation operation; while ((operation = snapshot.next()) != null) { @@ -495,39 +519,41 @@ public class RecoverySourceHandler { throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); - // if we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and - // any ops before the starting sequence number + /* + * If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and + * any ops before the starting sequence number. + */ final long seqNo = operation.seqNo(); - if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) continue; + if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) { + skippedOps++; + continue; + } operations.add(operation); ops++; size += operation.estimateSize(); - totalOperations++; + totalSentOps++; // check if this request is past bytes threshold, and if so, send it off if (size >= chunkSizeInBytes) { - cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations())); - if (logger.isTraceEnabled()) { - logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), - snapshot.totalOperations()); - } + cancellableThreads.execute(sendBatch); + logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); ops = 0; size = 0; operations.clear(); } } - // send the leftover operations - if (!operations.isEmpty()) { - cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations())); + if (!operations.isEmpty() || totalSentOps == 0) { + // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint + cancellableThreads.execute(sendBatch); } - if (logger.isTraceEnabled()) { - logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), - snapshot.totalOperations()); - } + assert expectedTotalOps == skippedOps + totalSentOps + : "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]"; - return totalOperations; + logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); + + return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps); } /** diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 7b48edfa042..d9540019e80 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import java.io.IOException; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -374,12 +375,13 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps) throws TranslogRecoveryPerformer - .BatchOperationException { + public long indexTranslogOperations( + List operations, int totalTranslogOps) throws TranslogRecoveryPerformer.BatchOperationException { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); assert indexShard().recoveryState() == state(); indexShard().performBatchRecovery(operations); + return indexShard().getLocalCheckpoint(); } @Override @@ -470,4 +472,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget assert remove == null || remove == indexOutput; // remove maybe null if we got finished } } + + Path translogLocation() { + return indexShard().shardPath().resolveTranslog(); + } + } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index dec63877448..38f412fed73 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -53,8 +53,10 @@ public interface RecoveryTargetHandler { * Index a set of translog operations on the target * @param operations operations to index * @param totalTranslogOps current number of total operations expected to be indexed + * + * @return the local checkpoint on the target shard */ - void indexTranslogOperations(List operations, int totalTranslogOps); + long indexTranslogOperations(List operations, int totalTranslogOps); /** * Notifies the target of the files it is going to receive diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java new file mode 100644 index 00000000000..7427e631fb0 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.transport.FutureTransportResponseHandler; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; + +import java.io.IOException; + +public class RecoveryTranslogOperationsResponse extends TransportResponse { + + long localCheckpoint; + + RecoveryTranslogOperationsResponse() { + + } + + RecoveryTranslogOperationsResponse(final long localCheckpoint) { + this.localCheckpoint = localCheckpoint; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + // before 6.0.0 we responded with an empty response so we have to maintain that + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(localCheckpoint); + } + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + // before 6.0.0 we received an empty response so we have to maintain that + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + localCheckpoint = in.readZLong(); + } + else { + localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } + } + + static TransportResponseHandler HANDLER = + new FutureTransportResponseHandler() { + @Override + public RecoveryTranslogOperationsResponse newInstance() { + return new RecoveryTranslogOperationsResponse(); + } + }; + +} diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index f21d61f2f71..a4f24b710b2 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -28,7 +28,10 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.FutureTransportResponseHandler; +import org.elasticsearch.transport.TransportFuture; import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -98,11 +101,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps) { - final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( - recoveryId, shardId, operations, totalTranslogOps); - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, translogOperationsRequest, - translogOpsRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + public long indexTranslogOperations(List operations, int totalTranslogOps) { + final RecoveryTranslogOperationsRequest translogOperationsRequest = + new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps); + final TransportFuture future = transportService.submitRequest( + targetNode, + PeerRecoveryTargetService.Actions.TRANSLOG_OPS, + translogOperationsRequest, + translogOpsRequestOptions, + RecoveryTranslogOperationsResponse.HANDLER); + return future.txGet().localCheckpoint; } @Override diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 2243a5769b9..68373ce529a 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -296,9 +296,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase return getDiscoveryNode(primary.routingEntry().currentNodeId()); } - public Future asyncRecoverReplica(IndexShard replica, BiFunction targetSupplier) - throws IOException { - FutureTask task = new FutureTask<>(() -> { + public Future asyncRecoverReplica( + final IndexShard replica, final BiFunction targetSupplier) throws IOException { + final FutureTask task = new FutureTask<>(() -> { recoverReplica(replica, targetSupplier); return null; }); diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 349258785f0..7ee40c5c90e 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -43,16 +43,17 @@ import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; +import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestCase { @@ -205,60 +206,40 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } } - @TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE," + - "org.elasticsearch.discovery:TRACE," + - "org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," + - "org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE," + - "org.elasticsearch.index.seqno:TRACE" - ) + @TestLogging( + "_root:DEBUG," + + "org.elasticsearch.action.bulk:TRACE," + + "org.elasticsearch.action.get:TRACE," + + "org.elasticsearch.cluster.service:TRACE," + + "org.elasticsearch.discovery:TRACE," + + "org.elasticsearch.indices.cluster:TRACE," + + "org.elasticsearch.indices.recovery:TRACE," + + "org.elasticsearch.index.seqno:TRACE," + + "org.elasticsearch.index.shard:TRACE") public void testWaitForPendingSeqNo() throws Exception { IndexMetaData metaData = buildIndexMetaData(1); final int pendingDocs = randomIntBetween(1, 5); - final AtomicReference blockIndexingOnPrimary = new AtomicReference<>(); - final CountDownLatch blockedIndexers = new CountDownLatch(pendingDocs); + final BlockingEngineFactory primaryEngineFactory = new BlockingEngineFactory(); try (ReplicationGroup shards = new ReplicationGroup(metaData) { @Override protected EngineFactory getEngineFactory(ShardRouting routing) { if (routing.primary()) { - return new EngineFactory() { - @Override - public Engine newReadWriteEngine(EngineConfig config) { - return InternalEngineTests.createInternalEngine((directory, writerConfig) -> - new IndexWriter(directory, writerConfig) { - @Override - public long addDocument(Iterable doc) throws IOException { - Semaphore block = blockIndexingOnPrimary.get(); - if (block != null) { - blockedIndexers.countDown(); - try { - block.acquire(); - } catch (InterruptedException e) { - throw new AssertionError("unexpectedly interrupted", e); - } - } - return super.addDocument(doc); - } - - }, null, config); - } - }; + return primaryEngineFactory; } else { return null; } } }) { shards.startAll(); - int docs = shards.indexDocs(randomIntBetween(1,10)); + int docs = shards.indexDocs(randomIntBetween(1, 10)); IndexShard replica = shards.getReplicas().get(0); shards.removeReplica(replica); closeShards(replica); docs += pendingDocs; - final Semaphore pendingDocsSemaphore = new Semaphore(pendingDocs); - blockIndexingOnPrimary.set(pendingDocsSemaphore); - blockIndexingOnPrimary.get().acquire(pendingDocs); + primaryEngineFactory.latchIndexers(); CountDownLatch pendingDocsDone = new CountDownLatch(pendingDocs); for (int i = 0; i < pendingDocs; i++) { final String id = "pending_" + i; @@ -274,9 +255,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } // wait for the pending ops to "hang" - blockedIndexers.await(); + primaryEngineFactory.awaitIndexersLatch(); - blockIndexingOnPrimary.set(null); + primaryEngineFactory.allowIndexing(); // index some more docs += shards.indexDocs(randomInt(5)); @@ -298,11 +279,12 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC recoveryStart.await(); - for (int i = 0; i < pendingDocs; i++) { - assertFalse((pendingDocs - i) + " pending operations, recovery should wait", preparedForTranslog.get()); - pendingDocsSemaphore.release(); - } + // index some more + docs += shards.indexDocs(randomInt(5)); + assertFalse("recovery should wait on pending docs", preparedForTranslog.get()); + + primaryEngineFactory.releaseLatchedIndexers(); pendingDocsDone.await(); // now recovery can finish @@ -312,6 +294,114 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(docs)); shards.assertAllEqual(docs); + } finally { + primaryEngineFactory.close(); + } + } + + @TestLogging( + "_root:DEBUG," + + "org.elasticsearch.action.bulk:TRACE," + + "org.elasticsearch.action.get:TRACE," + + "org.elasticsearch.cluster.service:TRACE," + + "org.elasticsearch.discovery:TRACE," + + "org.elasticsearch.indices.cluster:TRACE," + + "org.elasticsearch.indices.recovery:TRACE," + + "org.elasticsearch.index.seqno:TRACE," + + "org.elasticsearch.index.shard:TRACE") + public void testCheckpointsAndMarkingInSync() throws Exception { + final IndexMetaData metaData = buildIndexMetaData(0); + final BlockingEngineFactory replicaEngineFactory = new BlockingEngineFactory(); + try ( + ReplicationGroup shards = new ReplicationGroup(metaData) { + @Override + protected EngineFactory getEngineFactory(final ShardRouting routing) { + if (routing.primary()) { + return null; + } else { + return replicaEngineFactory; + } + } + }; + AutoCloseable ignored = replicaEngineFactory // make sure we release indexers before closing + ) { + shards.startPrimary(); + final int docs = shards.indexDocs(randomIntBetween(1, 10)); + logger.info("indexed [{}] docs", docs); + final CountDownLatch pendingDocDone = new CountDownLatch(1); + final CountDownLatch pendingDocActiveWithExtraDocIndexed = new CountDownLatch(1); + final IndexShard replica = shards.addReplica(); + final Future recoveryFuture = shards.asyncRecoverReplica( + replica, + (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { + @Override + public long indexTranslogOperations(final List operations, final int totalTranslogOps) { + // index a doc which is not part of the snapshot, but also does not complete on replica + replicaEngineFactory.latchIndexers(); + threadPool.generic().submit(() -> { + try { + shards.index(new IndexRequest(index.getName(), "type", "pending").source("{}", XContentType.JSON)); + } catch (final Exception e) { + throw new RuntimeException(e); + } finally { + pendingDocDone.countDown(); + } + }); + try { + // the pending doc is latched in the engine + replicaEngineFactory.awaitIndexersLatch(); + // unblock indexing for the next doc + replicaEngineFactory.allowIndexing(); + shards.index(new IndexRequest(index.getName(), "type", "completed").source("{}", XContentType.JSON)); + /* + * We want to test that the global checkpoint is blocked from advancing on the primary when a replica shard + * is pending being marked in-sync. We also want to test the the global checkpoint does not advance on the + * replica when its local checkpoint is behind the global checkpoint on the primary. Finally, advancing the + * global checkpoint here forces recovery to block until the pending doc is indexing on the replica. + */ + shards.getPrimary().updateGlobalCheckpointOnPrimary(); + pendingDocActiveWithExtraDocIndexed.countDown(); + } catch (final Exception e) { + throw new AssertionError(e); + } + return super.indexTranslogOperations(operations, totalTranslogOps); + } + }); + pendingDocActiveWithExtraDocIndexed.await(); + assertThat(pendingDocDone.getCount(), equalTo(1L)); + { + final long expectedDocs = docs + 2L; + assertThat(shards.getPrimary().getLocalCheckpoint(), equalTo(expectedDocs - 1)); + // recovery has not completed, therefore the global checkpoint can have advance on the primary + assertThat(shards.getPrimary().getGlobalCheckpoint(), equalTo(expectedDocs - 1)); + // the pending document is not done, the checkpoints can not have advanced on the replica + assertThat(replica.getLocalCheckpoint(), lessThan(expectedDocs - 1)); + assertThat(replica.getGlobalCheckpoint(), lessThan(expectedDocs - 1)); + } + + shards.getPrimary().updateGlobalCheckpointOnPrimary(); + { + final long expectedDocs = docs + 3L; + shards.index(new IndexRequest(index.getName(), "type", "last").source("{}", XContentType.JSON)); + assertThat(shards.getPrimary().getLocalCheckpoint(), equalTo(expectedDocs - 1)); + assertThat(shards.getPrimary().getGlobalCheckpoint(), equalTo(expectedDocs - 2)); + assertThat(replica.getLocalCheckpoint(), lessThan(expectedDocs - 2)); + assertThat(replica.getGlobalCheckpoint(), lessThan(expectedDocs - 2)); + } + + replicaEngineFactory.releaseLatchedIndexers(); + pendingDocDone.await(); + recoveryFuture.get(); + shards.getPrimary().updateGlobalCheckpointOnPrimary(); + { + final long expectedDocs = docs + 3L; + assertBusy(() -> { + assertThat(shards.getPrimary().getLocalCheckpoint(), equalTo(expectedDocs - 1)); + assertThat(shards.getPrimary().getGlobalCheckpoint(), equalTo(expectedDocs - 1)); + assertThat(replica.getLocalCheckpoint(), equalTo(expectedDocs - 1)); + assertThat(replica.getGlobalCheckpoint(), equalTo(expectedDocs - 1)); + }); + } } } @@ -354,11 +444,11 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps) { + public long indexTranslogOperations(List operations, int totalTranslogOps) { if (hasBlocked() == false) { blockIfNeeded(RecoveryState.Stage.TRANSLOG); } - super.indexTranslogOperations(operations, totalTranslogOps); + return super.indexTranslogOperations(operations, totalTranslogOps); } @Override @@ -379,4 +469,66 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } + static class BlockingEngineFactory implements EngineFactory, AutoCloseable { + + private final List blocks = new ArrayList<>(); + + private final AtomicReference blockReference = new AtomicReference<>(); + private final AtomicReference blockedIndexers = new AtomicReference<>(); + + public synchronized void latchIndexers() { + final CountDownLatch block = new CountDownLatch(1); + blocks.add(block); + blockedIndexers.set(new CountDownLatch(1)); + assert blockReference.compareAndSet(null, block); + } + + public void awaitIndexersLatch() throws InterruptedException { + blockedIndexers.get().await(); + } + + public synchronized void allowIndexing() { + final CountDownLatch previous = blockReference.getAndSet(null); + assert previous == null || blocks.contains(previous); + } + + public synchronized void releaseLatchedIndexers() { + allowIndexing(); + blocks.forEach(CountDownLatch::countDown); + blocks.clear(); + } + + @Override + public Engine newReadWriteEngine(final EngineConfig config) { + return InternalEngineTests.createInternalEngine( + (directory, writerConfig) -> + new IndexWriter(directory, writerConfig) { + @Override + public long addDocument(final Iterable doc) throws IOException { + final CountDownLatch block = blockReference.get(); + if (block != null) { + final CountDownLatch latch = blockedIndexers.get(); + if (latch != null) { + latch.countDown(); + } + try { + block.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + return super.addDocument(doc); + } + }, + null, + config); + } + + @Override + public void close() throws Exception { + releaseLatchedIndexers(); + } + + } + } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java deleted file mode 100644 index 58f66dc62e7..00000000000 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.index.seqno; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.IndexSettingsModule; -import org.junit.Before; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Stream; - -import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.not; - -public class GlobalCheckpointTests extends ESTestCase { - - GlobalCheckpointTracker tracker; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - tracker = - new GlobalCheckpointTracker( - new ShardId("test", "_na_", 0), - IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), - UNASSIGNED_SEQ_NO); - } - - public void testEmptyShards() { - assertFalse("checkpoint shouldn't be updated when the are no active shards", tracker.updateCheckpointOnPrimary()); - assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); - } - - private final AtomicInteger aIdGenerator = new AtomicInteger(); - - private Map randomAllocationsWithLocalCheckpoints(int min, int max) { - Map allocations = new HashMap<>(); - for (int i = randomIntBetween(min, max); i > 0; i--) { - allocations.put("id_" + aIdGenerator.incrementAndGet(), (long) randomInt(1000)); - } - return allocations; - } - - public void testGlobalCheckpointUpdate() { - Map allocations = new HashMap<>(); - Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5); - Set active = new HashSet<>(activeWithCheckpoints.keySet()); - allocations.putAll(activeWithCheckpoints); - Map initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5); - Set initializing = new HashSet<>(initializingWithCheckpoints.keySet()); - allocations.putAll(initializingWithCheckpoints); - assertThat(allocations.size(), equalTo(active.size() + initializing.size())); - - // note: allocations can never be empty in practice as we always have at least one primary shard active/in sync - // it is however nice not to assume this on this level and check we do the right thing. - final long maxLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO); - - assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); - - logger.info("--> using allocations"); - allocations.keySet().forEach(aId -> { - final String type; - if (active.contains(aId)) { - type = "active"; - } else if (initializing.contains(aId)) { - type = "init"; - } else { - throw new IllegalStateException(aId + " not found in any map"); - } - logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type); - }); - - tracker.updateAllocationIdsFromMaster(active, initializing); - initializing.forEach(aId -> tracker.markAllocationIdAsInSync(aId)); - allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId))); - - - assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); - - assertThat(tracker.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != UNASSIGNED_SEQ_NO)); - assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint)); - - // increment checkpoints - active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); - initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); - allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId))); - - // now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested. - final String extraId = "extra_" + randomAlphaOfLength(5); - - // first check that adding it without the master blessing doesn't change anything. - tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4)); - assertThat(tracker.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO)); - - Set newActive = new HashSet<>(active); - newActive.add(extraId); - tracker.updateAllocationIdsFromMaster(newActive, initializing); - - // we should ask for a refresh , but not update the checkpoint - assertTrue(tracker.updateCheckpointOnPrimary()); - assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint)); - - // now notify for the new id - tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4)); - - // now it should be incremented - assertTrue(tracker.updateCheckpointOnPrimary()); - assertThat(tracker.getCheckpoint(), greaterThan(maxLocalCheckpoint)); - } - - public void testMissingActiveIdsPreventAdvance() { - final Map active = randomAllocationsWithLocalCheckpoints(1, 5); - final Map initializing = randomAllocationsWithLocalCheckpoints(0, 5); - final Map assigned = new HashMap<>(); - assigned.putAll(active); - assigned.putAll(initializing); - tracker.updateAllocationIdsFromMaster( - new HashSet<>(randomSubsetOf(randomInt(active.size() - 1), active.keySet())), - initializing.keySet()); - randomSubsetOf(initializing.keySet()).forEach(tracker::markAllocationIdAsInSync); - assigned.forEach(tracker::updateLocalCheckpoint); - - // now mark all active shards - tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); - - // global checkpoint can't be advanced, but we need a sync - assertTrue(tracker.updateCheckpointOnPrimary()); - assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); - - // update again - assigned.forEach(tracker::updateLocalCheckpoint); - assertTrue(tracker.updateCheckpointOnPrimary()); - assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); - } - - public void testMissingInSyncIdsPreventAdvance() { - final Map active = randomAllocationsWithLocalCheckpoints(0, 5); - final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); - tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); - initializing.keySet().forEach(tracker::markAllocationIdAsInSync); - randomSubsetOf(randomInt(initializing.size() - 1), - initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId))); - - active.forEach(tracker::updateLocalCheckpoint); - - // global checkpoint can't be advanced, but we need a sync - assertTrue(tracker.updateCheckpointOnPrimary()); - assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); - - // update again - initializing.forEach(tracker::updateLocalCheckpoint); - assertTrue(tracker.updateCheckpointOnPrimary()); - assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); - } - - public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { - final Map active = randomAllocationsWithLocalCheckpoints(1, 5); - final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); - final Map nonApproved = randomAllocationsWithLocalCheckpoints(1, 5); - tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); - initializing.keySet().forEach(tracker::markAllocationIdAsInSync); - nonApproved.keySet().forEach(tracker::markAllocationIdAsInSync); - - List> allocations = Arrays.asList(active, initializing, nonApproved); - Collections.shuffle(allocations, random()); - allocations.forEach(a -> a.forEach(tracker::updateLocalCheckpoint)); - - // global checkpoint can be advanced, but we need a sync - assertTrue(tracker.updateCheckpointOnPrimary()); - assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); - } - - public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { - final Map activeToStay = randomAllocationsWithLocalCheckpoints(1, 5); - final Map initializingToStay = randomAllocationsWithLocalCheckpoints(1, 5); - final Map activeToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5); - final Map initializingToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5); - final Set active = Sets.union(activeToStay.keySet(), activeToBeRemoved.keySet()); - final Set initializing = Sets.union(initializingToStay.keySet(), initializingToBeRemoved.keySet()); - final Map allocations = new HashMap<>(); - allocations.putAll(activeToStay); - if (randomBoolean()) { - allocations.putAll(activeToBeRemoved); - } - allocations.putAll(initializingToStay); - if (randomBoolean()) { - allocations.putAll(initializingToBeRemoved); - } - tracker.updateAllocationIdsFromMaster(active, initializing); - if (randomBoolean()) { - initializingToStay.keySet().forEach(tracker::markAllocationIdAsInSync); - } else { - initializing.forEach(tracker::markAllocationIdAsInSync); - } - if (randomBoolean()) { - allocations.forEach(tracker::updateLocalCheckpoint); - } - - // global checkpoint may be advanced, but we need a sync in any case - assertTrue(tracker.updateCheckpointOnPrimary()); - - // now remove shards - if (randomBoolean()) { - tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet()); - allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); - } else { - allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); - tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet()); - } - - final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream()) - .min(Long::compare).get() + 10; // we added 10 to make sure it's advanced in the second time - - // global checkpoint is advanced and we need a sync - assertTrue(tracker.updateCheckpointOnPrimary()); - assertThat(tracker.getCheckpoint(), equalTo(checkpoint)); - } -} diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java new file mode 100644 index 00000000000..3f5882e6c57 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java @@ -0,0 +1,490 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; + +public class GlobalCheckpointTrackerTests extends ESTestCase { + + GlobalCheckpointTracker tracker; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + tracker = + new GlobalCheckpointTracker( + new ShardId("test", "_na_", 0), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + UNASSIGNED_SEQ_NO); + } + + public void testEmptyShards() { + assertFalse("checkpoint shouldn't be updated when the are no active shards", tracker.updateCheckpointOnPrimary()); + assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + } + + private final AtomicInteger aIdGenerator = new AtomicInteger(); + + private Map randomAllocationsWithLocalCheckpoints(int min, int max) { + Map allocations = new HashMap<>(); + for (int i = randomIntBetween(min, max); i > 0; i--) { + allocations.put("id_" + aIdGenerator.incrementAndGet(), (long) randomInt(1000)); + } + return allocations; + } + + public void testGlobalCheckpointUpdate() { + Map allocations = new HashMap<>(); + Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5); + Set active = new HashSet<>(activeWithCheckpoints.keySet()); + allocations.putAll(activeWithCheckpoints); + Map initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5); + Set initializing = new HashSet<>(initializingWithCheckpoints.keySet()); + allocations.putAll(initializingWithCheckpoints); + assertThat(allocations.size(), equalTo(active.size() + initializing.size())); + + // note: allocations can never be empty in practice as we always have at least one primary shard active/in sync + // it is however nice not to assume this on this level and check we do the right thing. + final long maxLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO); + + assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + + logger.info("--> using allocations"); + allocations.keySet().forEach(aId -> { + final String type; + if (active.contains(aId)) { + type = "active"; + } else if (initializing.contains(aId)) { + type = "init"; + } else { + throw new IllegalStateException(aId + " not found in any map"); + } + logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type); + }); + + tracker.updateAllocationIdsFromMaster(active, initializing); + initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId, tracker.getCheckpoint())); + allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId))); + + + assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + + assertThat(tracker.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != UNASSIGNED_SEQ_NO)); + assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint)); + + // increment checkpoints + active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); + initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); + allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId))); + + // now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested. + final String extraId = "extra_" + randomAlphaOfLength(5); + + // first check that adding it without the master blessing doesn't change anything. + tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4)); + assertThat(tracker.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO)); + + Set newActive = new HashSet<>(active); + newActive.add(extraId); + tracker.updateAllocationIdsFromMaster(newActive, initializing); + + // we should ask for a refresh , but not update the checkpoint + assertTrue(tracker.updateCheckpointOnPrimary()); + assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint)); + + // now notify for the new id + tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4)); + + // now it should be incremented + assertTrue(tracker.updateCheckpointOnPrimary()); + assertThat(tracker.getCheckpoint(), greaterThan(maxLocalCheckpoint)); + } + + public void testMissingActiveIdsPreventAdvance() { + final Map active = randomAllocationsWithLocalCheckpoints(1, 5); + final Map initializing = randomAllocationsWithLocalCheckpoints(0, 5); + final Map assigned = new HashMap<>(); + assigned.putAll(active); + assigned.putAll(initializing); + tracker.updateAllocationIdsFromMaster( + new HashSet<>(randomSubsetOf(randomInt(active.size() - 1), active.keySet())), + initializing.keySet()); + randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint())); + assigned.forEach(tracker::updateLocalCheckpoint); + + // now mark all active shards + tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); + + // global checkpoint can't be advanced, but we need a sync + assertTrue(tracker.updateCheckpointOnPrimary()); + assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + + // update again + assigned.forEach(tracker::updateLocalCheckpoint); + assertTrue(tracker.updateCheckpointOnPrimary()); + assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); + } + + public void testMissingInSyncIdsPreventAdvance() { + final Map active = randomAllocationsWithLocalCheckpoints(0, 5); + final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); + tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); + initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint())); + randomSubsetOf(randomInt(initializing.size() - 1), + initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId))); + + active.forEach(tracker::updateLocalCheckpoint); + + // global checkpoint can't be advanced, but we need a sync + assertTrue(tracker.updateCheckpointOnPrimary()); + assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + + // update again + initializing.forEach(tracker::updateLocalCheckpoint); + assertTrue(tracker.updateCheckpointOnPrimary()); + assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); + } + + public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { + final Map active = randomAllocationsWithLocalCheckpoints(1, 5); + final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); + final Map nonApproved = randomAllocationsWithLocalCheckpoints(1, 5); + tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); + initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint())); + nonApproved.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint())); + + List> allocations = Arrays.asList(active, initializing, nonApproved); + Collections.shuffle(allocations, random()); + allocations.forEach(a -> a.forEach(tracker::updateLocalCheckpoint)); + + // global checkpoint can be advanced, but we need a sync + assertTrue(tracker.updateCheckpointOnPrimary()); + assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); + } + + public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { + final Map activeToStay = randomAllocationsWithLocalCheckpoints(1, 5); + final Map initializingToStay = randomAllocationsWithLocalCheckpoints(1, 5); + final Map activeToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5); + final Map initializingToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5); + final Set active = Sets.union(activeToStay.keySet(), activeToBeRemoved.keySet()); + final Set initializing = Sets.union(initializingToStay.keySet(), initializingToBeRemoved.keySet()); + final Map allocations = new HashMap<>(); + allocations.putAll(activeToStay); + if (randomBoolean()) { + allocations.putAll(activeToBeRemoved); + } + allocations.putAll(initializingToStay); + if (randomBoolean()) { + allocations.putAll(initializingToBeRemoved); + } + tracker.updateAllocationIdsFromMaster(active, initializing); + if (randomBoolean()) { + initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint())); + } else { + initializing.forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint())); + } + if (randomBoolean()) { + allocations.forEach(tracker::updateLocalCheckpoint); + } + + // global checkpoint may be advanced, but we need a sync in any case + assertTrue(tracker.updateCheckpointOnPrimary()); + + // now remove shards + if (randomBoolean()) { + tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet()); + allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); + } else { + allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); + tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet()); + } + + final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream()) + .min(Long::compare).get() + 10; // we added 10 to make sure it's advanced in the second time + + // global checkpoint is advanced and we need a sync + assertTrue(tracker.updateCheckpointOnPrimary()); + assertThat(tracker.getCheckpoint(), equalTo(checkpoint)); + } + + public void testWaitForAllocationIdToBeInSync() throws BrokenBarrierException, InterruptedException { + final int localCheckpoint = randomIntBetween(1, 32); + final int globalCheckpoint = randomIntBetween(localCheckpoint + 1, 64); + final CyclicBarrier barrier = new CyclicBarrier(2); + final AtomicBoolean complete = new AtomicBoolean(); + final String inSyncAllocationId =randomAlphaOfLength(16); + final String trackingAllocationId = randomAlphaOfLength(16); + tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); + tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint); + tracker.updateCheckpointOnPrimary(); + final Thread thread = new Thread(() -> { + try { + // synchronize starting with the test thread + barrier.await(); + tracker.markAllocationIdAsInSync(trackingAllocationId, localCheckpoint); + complete.set(true); + // synchronize with the test thread checking if we are no longer waiting + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + thread.start(); + + // synchronize starting with the waiting thread + barrier.await(); + + final List elements = IntStream.rangeClosed(0, globalCheckpoint - 1).boxed().collect(Collectors.toList()); + Randomness.shuffle(elements); + for (int i = 0; i < elements.size(); i++) { + tracker.updateLocalCheckpoint(trackingAllocationId, elements.get(i)); + assertFalse(complete.get()); + assertTrue(awaitBusy(() -> tracker.trackingLocalCheckpoints.containsKey(trackingAllocationId))); + assertTrue(awaitBusy(() -> tracker.pendingInSync.contains(trackingAllocationId))); + assertFalse(tracker.inSyncLocalCheckpoints.containsKey(trackingAllocationId)); + } + + tracker.updateLocalCheckpoint(trackingAllocationId, randomIntBetween(globalCheckpoint, 64)); + // synchronize with the waiting thread to mark that it is complete + barrier.await(); + assertTrue(complete.get()); + assertTrue(tracker.trackingLocalCheckpoints.isEmpty()); + assertTrue(tracker.pendingInSync.isEmpty()); + assertTrue(tracker.inSyncLocalCheckpoints.containsKey(trackingAllocationId)); + + thread.join(); + } + + public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException { + final int localCheckpoint = randomIntBetween(1, 32); + final int globalCheckpoint = randomIntBetween(localCheckpoint + 1, 64); + final CyclicBarrier barrier = new CyclicBarrier(2); + final AtomicBoolean interrupted = new AtomicBoolean(); + final String inSyncAllocationId = randomAlphaOfLength(16); + final String trackingAllocationId = randomAlphaOfLength(32); + tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); + tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint); + tracker.updateCheckpointOnPrimary(); + final Thread thread = new Thread(() -> { + try { + // synchronize starting with the test thread + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + try { + tracker.markAllocationIdAsInSync(trackingAllocationId, localCheckpoint); + } catch (final InterruptedException e) { + interrupted.set(true); + // synchronize with the test thread checking if we are interrupted + } + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + thread.start(); + + // synchronize starting with the waiting thread + barrier.await(); + + thread.interrupt(); + + // synchronize with the waiting thread to mark that it is complete + barrier.await(); + + assertTrue(interrupted.get()); + + thread.join(); + } + + public void testUpdateAllocationIdsFromMaster() throws Exception { + final int numberOfActiveAllocationsIds = randomIntBetween(2, 16); + final Set activeAllocationIds = + IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16)).collect(Collectors.toSet()); + final int numberOfInitializingIds = randomIntBetween(2, 16); + final Set initializingIds = + IntStream.range(0, numberOfInitializingIds).mapToObj(i -> { + do { + final String initializingId = randomAlphaOfLength(16); + // ensure we do not duplicate an allocation ID in active and initializing sets + if (!activeAllocationIds.contains(initializingId)) { + return initializingId; + } + } while (true); + }).collect(Collectors.toSet()); + tracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingIds); + + // first we assert that the in-sync and tracking sets are set up correctly + assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); + assertTrue( + activeAllocationIds + .stream() + .allMatch(a -> tracker.inSyncLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertTrue(initializingIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); + assertTrue( + initializingIds + .stream() + .allMatch(a -> tracker.trackingLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO)); + + // now we will remove some allocation IDs from these and ensure that they propagate through + final List removingActiveAllocationIds = randomSubsetOf(activeAllocationIds); + final Set newActiveAllocationIds = + activeAllocationIds.stream().filter(a -> !removingActiveAllocationIds.contains(a)).collect(Collectors.toSet()); + final List removingInitializingAllocationIds = randomSubsetOf(initializingIds); + final Set newInitializingAllocationIds = + initializingIds.stream().filter(a -> !removingInitializingAllocationIds.contains(a)).collect(Collectors.toSet()); + tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); + assertTrue(removingActiveAllocationIds.stream().noneMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); + assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); + assertTrue(removingInitializingAllocationIds.stream().noneMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); + + /* + * Now we will add an allocation ID to each of active and initializing and ensure they propagate through. Using different lengths + * than we have been using above ensures that we can not collide with a previous allocation ID + */ + newActiveAllocationIds.add(randomAlphaOfLength(32)); + newInitializingAllocationIds.add(randomAlphaOfLength(64)); + tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); + assertTrue( + newActiveAllocationIds + .stream() + .allMatch(a -> tracker.inSyncLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); + assertTrue( + newInitializingAllocationIds + .stream() + .allMatch(a -> tracker.trackingLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO)); + + // the tracking allocation IDs should play no role in determining the global checkpoint + final Map activeLocalCheckpoints = + newActiveAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024))); + activeLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a, l)); + final Map initializingLocalCheckpoints = + newInitializingAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024))); + initializingLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a, l)); + assertTrue( + activeLocalCheckpoints + .entrySet() + .stream() + .allMatch(e -> tracker.getLocalCheckpointForAllocationId(e.getKey()) == e.getValue())); + assertTrue( + initializingLocalCheckpoints + .entrySet() + .stream() + .allMatch(e -> tracker.trackingLocalCheckpoints.get(e.getKey()) == e.getValue())); + assertTrue(tracker.updateCheckpointOnPrimary()); + final long minimumActiveLocalCheckpoint = (long) activeLocalCheckpoints.values().stream().min(Integer::compareTo).get(); + assertThat(tracker.getCheckpoint(), equalTo(minimumActiveLocalCheckpoint)); + final long minimumInitailizingLocalCheckpoint = (long) initializingLocalCheckpoints.values().stream().min(Integer::compareTo).get(); + + // now we are going to add a new allocation ID and bring it in sync which should move it to the in-sync allocation IDs + final long localCheckpoint = + randomIntBetween(0, Math.toIntExact(Math.min(minimumActiveLocalCheckpoint, minimumInitailizingLocalCheckpoint) - 1)); + + // using a different length than we have been using above ensures that we can not collide with a previous allocation ID + final String newSyncingAllocationId = randomAlphaOfLength(128); + newInitializingAllocationIds.add(newSyncingAllocationId); + tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + final CyclicBarrier barrier = new CyclicBarrier(2); + final Thread thread = new Thread(() -> { + try { + barrier.await(); + tracker.markAllocationIdAsInSync(newSyncingAllocationId, localCheckpoint); + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + thread.start(); + + barrier.await(); + + assertBusy(() -> { + assertTrue(tracker.pendingInSync.contains(newSyncingAllocationId)); + assertTrue(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId)); + }); + + tracker.updateLocalCheckpoint(newSyncingAllocationId, randomIntBetween(Math.toIntExact(minimumActiveLocalCheckpoint), 1024)); + + barrier.await(); + + assertFalse(tracker.pendingInSync.contains(newSyncingAllocationId)); + assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId)); + assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId)); + + /* + * The new in-sync allocation ID is in the in-sync set now yet the master does not know this; the allocation ID should still be in + * the in-sync set even if we receive a cluster state update that does not reflect this. + * + */ + tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId)); + assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId)); + } + + + private void markAllocationIdAsInSyncQuietly( + final GlobalCheckpointTracker tracker, final String allocationId, final long localCheckpoint) { + try { + tracker.markAllocationIdAsInSync(allocationId, localCheckpoint); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3f01a0c0a9a..a0d356c03e7 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1275,9 +1275,10 @@ public class IndexShardTests extends IndexShardTestCase { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void indexTranslogOperations(List operations, int totalTranslogOps) { - super.indexTranslogOperations(operations, totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps) { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); assertFalse(replica.getTranslog().syncNeeded()); + return localCheckpoint; } }, true); @@ -1331,10 +1332,11 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps) { - super.indexTranslogOperations(operations, totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps) { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); // Shard should now be active since we did recover: assertTrue(replica.isActive()); + return localCheckpoint; } }, false); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 261e53064fe..1c588caadcd 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -29,6 +29,16 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.index.translog.TranslogWriter; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; @@ -36,7 +46,13 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { public void testGetStartingSeqNo() throws Exception { IndexShard replica = newShard(false); - RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + final AtomicReference translogLocation = new AtomicReference<>(); + RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null) { + @Override + Path translogLocation() { + return translogLocation.get(); + } + }; try { recoveryEmptyReplica(replica); int docs = randomIntBetween(1, 10); @@ -56,22 +72,28 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { final long maxSeqNo = replica.seqNoStats().getMaxSeqNo(); final long localCheckpoint = replica.getLocalCheckpoint(); + translogLocation.set(replica.getTranslog().location()); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); - replica.updateGlobalCheckpointOnReplica(maxSeqNo - 1); - replica.getTranslog().sync(); + final Translog translog = replica.getTranslog(); + translogLocation.set( + writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo - 1)); - // commit is enough, global checkpoint is below max *committed* which is NO_OPS_PERFORMED + // commit is good, global checkpoint is at least max *committed* which is NO_OPS_PERFORMED assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L)); replica.flush(new FlushRequest()); - // commit is still not good enough, global checkpoint is below max + translogLocation.set(replica.getTranslog().location()); + + // commit is not good, global checkpoint is below max assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); - replica.updateGlobalCheckpointOnReplica(maxSeqNo); - replica.getTranslog().sync(); - // commit is enough, global checkpoint is below max + translogLocation.set( + writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo)); + + // commit is good, global checkpoint is above max assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(localCheckpoint + 1)); } finally { closeShards(replica); @@ -79,4 +101,23 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { } } + private Path writeTranslog( + final ShardId shardId, + final String translogUUID, + final long generation, + final long globalCheckpoint + ) throws IOException { + final Path tempDir = createTempDir(); + final Path resolve = tempDir.resolve(Translog.getFilename(generation)); + Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME)); + try (TranslogWriter ignored = TranslogWriter.create( + shardId, + translogUUID, + generation, + resolve, + FileChannel::open, + TranslogConfig.DEFAULT_BUFFER_SIZE, () -> globalCheckpoint)) {} + return tempDir; + } + } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 468a5a5500e..17497af8838 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -180,7 +180,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true))); } operations.add(null); - int totalOperations = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() { + RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() { private int counter = 0; @Override @@ -194,9 +194,9 @@ public class RecoverySourceHandlerTests extends ESTestCase { } }); if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { - assertThat(totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers)); + assertThat(result.totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers)); } else { - assertThat(totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo))); + assertThat(result.totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo))); } } @@ -403,8 +403,9 @@ public class RecoverySourceHandlerTests extends ESTestCase { } @Override - void phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException { + long phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException { phase2Called.set(true); + return SequenceNumbersService.UNASSIGNED_SEQ_NO; } }; @@ -494,8 +495,9 @@ public class RecoverySourceHandlerTests extends ESTestCase { } @Override - void phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException { + long phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException { phase2Called.set(true); + return SequenceNumbersService.UNASSIGNED_SEQ_NO; } };