Block global checkpoint advances when recovering

After a replica shard finishes recovery, it will be marked as active and
its local checkpoint will be considered in the calculation of the global
checkpoint on the primary. If there were operations in flight during
recovery, when the replica is activated its local checkpoint could be
lagging behind the global checkpoint on the primary. This means that
when the replica shard is activated, we can end up in a situtaion where
a global checkpoint update would want to move the global checkpoint
backwards, violating an invariant of the system. This only arises if a
background global checkpoint sync executes, which today is only a
scheduled operation and might be delayed until the in-flight operations
complete and the replica catches up to the primary. Yet, we are going to
move to inlining global checkpoints which will cause this issue to be
more likely to manifest. Additionally, the global checkpoint on the
replica, which is the local knowledge on the replica updated under the
mandate of the primary, could be higher than the local checkpoint on the
replica, again violating an invariant of the system. This commit
addresses these issues by blocking global checkpoint on the primary when
a replica shard is finalizing recovery. While we have blocked global
checkpoint advancement, recovery on the replica shard will not be
considered complete until its local checkpoint advances to the blocked
global checkpoint.

Relates #24404
This commit is contained in:
Jason Tedor 2017-05-03 06:48:09 -04:00 committed by GitHub
parent c99cc8a896
commit 070963658b
17 changed files with 1106 additions and 435 deletions

View File

@ -22,7 +22,7 @@ package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
@ -31,8 +31,6 @@ import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
/**
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
* equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
@ -49,14 +47,20 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by
* allocation IDs. All accesses to this set are guarded by a lock on this.
*/
private final ObjectLongMap<String> inSyncLocalCheckpoints;
final ObjectLongMap<String> inSyncLocalCheckpoints;
/*
* This set holds the last set of known valid allocation ids as received by the master. This is important to make sure shard that are
* failed or relocated are cleaned up from {@link #inSyncLocalCheckpoints} and do not hold the global checkpoint back. All accesses to
* this set are guarded by a lock on this.
* This map holds the last known local checkpoint for initializing shards that are undergoing recovery. Such shards do not participate
* in determining the global checkpoint. We must track these local checkpoints so that when a shard is activated we use the highest
* known checkpoint.
*/
private final Set<String> assignedAllocationIds;
final ObjectLongMap<String> trackingLocalCheckpoints;
/*
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
* current global checkpoint.
*/
final Set<String> pendingInSync;
/*
* The current global checkpoint for this shard. Note that this field is guarded by a lock on this and thus this field does not need to
@ -74,10 +78,11 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
*/
GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
assignedAllocationIds = new HashSet<>(1 + indexSettings.getNumberOfReplicas());
assert globalCheckpoint >= SequenceNumbersService.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
this.trackingLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
this.globalCheckpoint = globalCheckpoint;
this.pendingInSync = new HashSet<>();
}
/**
@ -86,60 +91,86 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* shards that are removed to be re-added.
*
* @param allocationId the allocation ID of the shard to update the local checkpoint for
* @param checkpoint the local checkpoint for the shard
* @param localCheckpoint the local checkpoint for the shard
*/
public synchronized void updateLocalCheckpoint(final String allocationId, final long checkpoint) {
final int indexOfKey = inSyncLocalCheckpoints.indexOf(allocationId);
if (indexOfKey >= 0) {
final long current = inSyncLocalCheckpoints.indexGet(indexOfKey);
if (current < checkpoint) {
inSyncLocalCheckpoints.indexReplace(indexOfKey, checkpoint);
if (logger.isTraceEnabled()) {
logger.trace("updated local checkpoint of [{}] to [{}] (was [{}])", allocationId, checkpoint, current);
public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) {
final boolean updated;
if (updateLocalCheckpoint(allocationId, localCheckpoint, inSyncLocalCheckpoints, "in-sync")) {
updated = true;
} else if (updateLocalCheckpoint(allocationId, localCheckpoint, trackingLocalCheckpoints, "tracking")) {
updated = true;
} else {
logger.trace("ignored local checkpoint [{}] of [{}], allocation ID is not tracked", localCheckpoint, allocationId);
updated = false;
}
if (updated) {
notifyAllWaiters();
}
}
@SuppressForbidden(reason = "Object#notifyAll waiters for local checkpoint advancement")
private synchronized void notifyAllWaiters() {
this.notifyAll();
}
private boolean updateLocalCheckpoint(
final String allocationId, final long localCheckpoint, ObjectLongMap<String> map, final String reason) {
final int index = map.indexOf(allocationId);
if (index >= 0) {
final long current = map.indexGet(index);
if (current < localCheckpoint) {
map.indexReplace(index, localCheckpoint);
logger.trace("updated local checkpoint of [{}] in [{}] from [{}] to [{}]", allocationId, reason, current, localCheckpoint);
} else {
logger.trace(
"skipping update of local checkpoint [{}], current checkpoint is higher (current [{}], incoming [{}], type [{}])",
"skipped updating local checkpoint of [{}] in [{}] from [{}] to [{}], current checkpoint is higher",
allocationId,
reason,
current,
checkpoint,
allocationId);
localCheckpoint);
}
return true;
} else {
logger.trace("[{}] isn't marked as in sync. ignoring local checkpoint of [{}].", allocationId, checkpoint);
return false;
}
}
/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return {@code true} if the checkpoint has been updated or if it can not be updated since one of the local checkpoints of one of the
* active allocations is not known.
* @return {@code true} if the checkpoint has been updated or if it can not be updated since the local checkpoints of one of the active
* allocations is not known.
*/
synchronized boolean updateCheckpointOnPrimary() {
long minCheckpoint = Long.MAX_VALUE;
if (inSyncLocalCheckpoints.isEmpty()) {
long minLocalCheckpoint = Long.MAX_VALUE;
if (inSyncLocalCheckpoints.isEmpty() || !pendingInSync.isEmpty()) {
return false;
}
for (final ObjectLongCursor<String> cp : inSyncLocalCheckpoints) {
if (cp.value == UNASSIGNED_SEQ_NO) {
logger.trace("unknown local checkpoint for active allocationId [{}], requesting a sync", cp.key);
for (final ObjectLongCursor<String> localCheckpoint : inSyncLocalCheckpoints) {
if (localCheckpoint.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
logger.trace("unknown local checkpoint for active allocation ID [{}], requesting a sync", localCheckpoint.key);
return true;
}
minCheckpoint = Math.min(cp.value, minCheckpoint);
minLocalCheckpoint = Math.min(localCheckpoint.value, minLocalCheckpoint);
}
if (minCheckpoint < globalCheckpoint) {
assert minLocalCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO : "new global checkpoint must be assigned";
if (minLocalCheckpoint < globalCheckpoint) {
final String message =
String.format(Locale.ROOT, "new global checkpoint [%d] is lower than previous one [%d]", minCheckpoint, globalCheckpoint);
String.format(
Locale.ROOT,
"new global checkpoint [%d] is lower than previous one [%d]",
minLocalCheckpoint,
globalCheckpoint);
throw new IllegalStateException(message);
}
if (globalCheckpoint != minCheckpoint) {
logger.trace("global checkpoint updated to [{}]", minCheckpoint);
globalCheckpoint = minCheckpoint;
if (globalCheckpoint != minLocalCheckpoint) {
logger.trace("global checkpoint updated to [{}]", minLocalCheckpoint);
globalCheckpoint = minLocalCheckpoint;
return true;
}
} else {
return false;
}
}
/**
* Returns the global checkpoint for the shard.
@ -153,17 +184,17 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/**
* Updates the global checkpoint on a replica shard after it has been updated by the primary.
*
* @param checkpoint the global checkpoint
* @param globalCheckpoint the global checkpoint
*/
synchronized void updateCheckpointOnReplica(final long checkpoint) {
synchronized void updateGlobalCheckpointOnReplica(final long globalCheckpoint) {
/*
* The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary
* information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other
* replica shards). In these cases, the local knowledge of the global checkpoint could be higher than sync from the lagging primary.
*/
if (this.globalCheckpoint <= checkpoint) {
this.globalCheckpoint = checkpoint;
logger.trace("global checkpoint updated from primary to [{}]", checkpoint);
if (this.globalCheckpoint <= globalCheckpoint) {
this.globalCheckpoint = globalCheckpoint;
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
}
}
@ -173,33 +204,98 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public synchronized void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds,
final Set<String> initializingAllocationIds) {
assignedAllocationIds.removeIf(
aId -> activeAllocationIds.contains(aId) == false && initializingAllocationIds.contains(aId) == false);
assignedAllocationIds.addAll(activeAllocationIds);
assignedAllocationIds.addAll(initializingAllocationIds);
for (String activeId : activeAllocationIds) {
if (inSyncLocalCheckpoints.containsKey(activeId) == false) {
inSyncLocalCheckpoints.put(activeId, UNASSIGNED_SEQ_NO);
public synchronized void updateAllocationIdsFromMaster(
final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
// remove shards whose allocation ID no longer exists
inSyncLocalCheckpoints.removeAll(a -> !activeAllocationIds.contains(a) && !initializingAllocationIds.contains(a));
// add any new active allocation IDs
for (final String a : activeAllocationIds) {
if (!inSyncLocalCheckpoints.containsKey(a)) {
final long localCheckpoint = trackingLocalCheckpoints.getOrDefault(a, SequenceNumbersService.UNASSIGNED_SEQ_NO);
inSyncLocalCheckpoints.put(a, localCheckpoint);
logger.trace("marked [{}] as in-sync with local checkpoint [{}] via cluster state update from master", a, localCheckpoint);
}
}
inSyncLocalCheckpoints.removeAll(key -> assignedAllocationIds.contains(key) == false);
trackingLocalCheckpoints.removeAll(a -> !initializingAllocationIds.contains(a));
for (final String a : initializingAllocationIds) {
if (inSyncLocalCheckpoints.containsKey(a)) {
/*
* This can happen if we mark the allocation ID as in sync at the end of recovery before seeing a cluster state update from
* marking the shard as active.
*/
continue;
}
if (trackingLocalCheckpoints.containsKey(a)) {
// we are already tracking this allocation ID
continue;
}
// this is a new allocation ID
trackingLocalCheckpoints.put(a, SequenceNumbersService.UNASSIGNED_SEQ_NO);
logger.trace("tracking [{}] via cluster state update from master", a);
}
}
/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. This should be called at the end of recovery where
* the primary knows all operations below the global checkpoint have been completed on this shard.
* Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint
* on the specified shard advances above the current global checkpoint.
*
* @param allocationId the allocation ID of the shard to mark as in-sync
* @param localCheckpoint the current local checkpoint on the shard
*
* @throws InterruptedException if the thread is interrupted waiting for the local checkpoint on the shard to advance
*/
public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
if (!trackingLocalCheckpoints.containsKey(allocationId)) {
/*
* This can happen if the recovery target has been failed and the cluster state update from the master has triggered removing
* this allocation ID from the tracking map but this recovery thread has not yet been made aware that the recovery is
* cancelled.
*/
public synchronized void markAllocationIdAsInSync(final String allocationId) {
if (assignedAllocationIds.contains(allocationId) == false) {
// master has removed this allocation, ignore
return;
}
logger.trace("marked [{}] as in sync", allocationId);
inSyncLocalCheckpoints.put(allocationId, UNASSIGNED_SEQ_NO);
updateLocalCheckpoint(allocationId, localCheckpoint, trackingLocalCheckpoints, "tracking");
waitForAllocationIdToBeInSync(allocationId);
}
private synchronized void waitForAllocationIdToBeInSync(final String allocationId) throws InterruptedException {
if (!pendingInSync.add(allocationId)) {
throw new IllegalStateException("there is already a pending sync in progress for allocation ID [" + allocationId + "]");
}
try {
while (true) {
/*
* If the allocation has been cancelled and so removed from the tracking map from a cluster state update from the master it
* means that this recovery will be cancelled; we are here on a cancellable recovery thread and so this thread will throw
* an interrupted exception as soon as it tries to wait on the monitor.
*/
final long current = trackingLocalCheckpoints.getOrDefault(allocationId, Long.MIN_VALUE);
if (current >= globalCheckpoint) {
logger.trace("marked [{}] as in-sync with local checkpoint [{}]", allocationId, current);
trackingLocalCheckpoints.remove(allocationId);
/*
* This is prematurely adding the allocation ID to the in-sync map as at this point recovery is not yet finished and
* could still abort. At this point we will end up with a shard in the in-sync map holding back the global checkpoint
* because the shard never recovered and we would have to wait until either the recovery retries and completes
* successfully, or the master fails the shard and issues a cluster state update that removes the shard from the set of
* active allocation IDs.
*/
inSyncLocalCheckpoints.put(allocationId, current);
break;
} else {
waitForLocalCheckpointToAdvance();
}
}
} finally {
pendingInSync.remove(allocationId);
}
}
@SuppressForbidden(reason = "Object#wait for local checkpoint advancement")
private synchronized void waitForLocalCheckpointToAdvance() throws InterruptedException {
this.wait();
}
/**
@ -213,7 +309,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
if (inSyncLocalCheckpoints.containsKey(allocationId)) {
return inSyncLocalCheckpoints.get(allocationId);
}
return UNASSIGNED_SEQ_NO;
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

View File

@ -127,12 +127,13 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. See
* {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String)} for additional details.
* {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} for additional details.
*
* @param allocationId the allocation ID of the shard to mark as in-sync
* @param localCheckpoint the current local checkpoint on the shard
*/
public void markAllocationIdAsInSync(final String allocationId) {
globalCheckpointTracker.markAllocationIdAsInSync(allocationId);
public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
globalCheckpointTracker.markAllocationIdAsInSync(allocationId, localCheckpoint);
}
/**
@ -166,10 +167,10 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
/**
* Updates the global checkpoint on a replica shard after it has been updated by the primary.
*
* @param checkpoint the global checkpoint
* @param globalCheckpoint the global checkpoint
*/
public void updateGlobalCheckpointOnReplica(final long checkpoint) {
globalCheckpointTracker.updateCheckpointOnReplica(checkpoint);
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) {
globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint);
}
/**

View File

@ -1475,13 +1475,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. See
* {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String)} for additional details.
* {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} for additional details.
*
* @param allocationId the allocation ID of the shard to mark as in-sync
* @param localCheckpoint the current local checkpoint on the shard
*/
public void markAllocationIdAsInSync(final String allocationId) {
public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
verifyPrimary();
getEngine().seqNoService().markAllocationIdAsInSync(allocationId);
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint);
}
/**
@ -1516,11 +1517,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Updates the global checkpoint on a replica shard after it has been updated by the primary.
*
* @param checkpoint the global checkpoint
* @param globalCheckpoint the global checkpoint
*/
public void updateGlobalCheckpointOnReplica(final long checkpoint) {
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) {
verifyReplicationTarget();
getEngine().seqNoService().updateGlobalCheckpointOnReplica(checkpoint);
final SequenceNumbersService seqNoService = getEngine().seqNoService();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
if (globalCheckpoint <= localCheckpoint) {
seqNoService.updateGlobalCheckpointOnReplica(globalCheckpoint);
} else {
/*
* This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global
* checkpoint updates from in-flight operations. However, since this shard is not yet contributing to calculating the global
* checkpoint, it can be the case that the global checkpoint update from the primary is ahead of the local checkpoint on this
* shard. In this case, we ignore the global checkpoint update. This should only happen if we are in the translog stage of
* recovery. Prior to this, the engine is not opened and this shard will not receive global checkpoint updates, and after this
* the shard will be contributing to calculations of the the global checkpoint.
*/
assert recoveryState().getStage() == RecoveryState.Stage.TRANSLOG
: "expected recovery stage [" + RecoveryState.Stage.TRANSLOG + "] but was [" + recoveryState().getStage() + "]";
}
}
/**

View File

@ -28,7 +28,7 @@ import java.nio.file.StandardOpenOption;
* only for testing until we have a disk-full FileSystem
*/
@FunctionalInterface
interface ChannelFactory {
public interface ChannelFactory {
default FileChannel open(Path path) throws IOException {
return open(path, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
}

View File

@ -348,20 +348,24 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
*/
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.indexShard().shardPath().resolveTranslog());
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint);
if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) {
// commit point is good for seq no based recovery as the maximum seq# including in it
// is below the global checkpoint (i.e., it excludes any ops thay may not be on the primary)
// Recovery will start at the first op after the local check point stored in the commit.
/*
* Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global
* checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation
* after the local checkpoint stored in the commit.
*/
return seqNoStats.getLocalCheckpoint() + 1;
} else {
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
} catch (final IOException e) {
// this can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
// translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
// proceeds to attempt a sequence-number-based recovery
/*
* This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
* translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
* proceeds to attempt a sequence-number-based recovery.
*/
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}
@ -418,7 +422,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
final RecoveryTarget recoveryTarget = recoveryRef.target();
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
} catch (TranslogRecoveryPerformer.BatchOperationException exception) {
MapperException mapperException = (MapperException) ExceptionsHelper.unwrap(exception, MapperException.class);
if (mapperException == null) {

View File

@ -58,6 +58,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
@ -179,14 +180,16 @@ public class RecoverySourceHandler {
}
logger.trace("snapshot translog for recovery; current size is [{}]", translogView.totalOperations());
final long targetLocalCheckpoint;
try {
phase2(isSequenceNumberBasedRecoveryPossible ? request.startingSeqNo() : SequenceNumbersService.UNASSIGNED_SEQ_NO,
translogView.snapshot());
final long startingSeqNo =
isSequenceNumberBasedRecoveryPossible ? request.startingSeqNo() : SequenceNumbersService.UNASSIGNED_SEQ_NO;
targetLocalCheckpoint = phase2(startingSeqNo, translogView.snapshot());
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
finalizeRecovery();
finalizeRecovery(targetLocalCheckpoint);
}
return response;
}
@ -410,8 +413,10 @@ public class RecoverySourceHandler {
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param snapshot a snapshot of the translog
*
* @return the local checkpoint on the target
*/
void phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
@ -422,18 +427,19 @@ public class RecoverySourceHandler {
logger.trace("recovery [phase2]: sending transaction log operations");
// send all the snapshot's translog operations to the target
final int totalOperations = sendSnapshot(startingSeqNo, snapshot);
final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot);
stopWatch.stop();
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = totalOperations;
response.phase2Operations = result.totalOperations;
return result.targetLocalCheckpoint;
}
/*
* finalizes the recovery process
*/
public void finalizeRecovery() {
public void finalizeRecovery(final long targetLocalCheckpoint) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
@ -441,7 +447,7 @@ public class RecoverySourceHandler {
StopWatch stopWatch = new StopWatch().start();
logger.trace("finalizing recovery");
cancellableThreads.execute(() -> {
shard.markAllocationIdAsInSync(request.targetAllocationId());
shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint);
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
});
@ -467,6 +473,18 @@ public class RecoverySourceHandler {
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
}
static class SendSnapshotResult {
final long targetLocalCheckpoint;
final int totalOperations;
SendSnapshotResult(final long targetLocalCheckpoint, final int totalOperations) {
this.targetLocalCheckpoint = targetLocalCheckpoint;
this.totalOperations = totalOperations;
}
}
/**
* Send the given snapshot's operations with a sequence number greater than the specified staring sequence number to this handler's
* target node.
@ -475,19 +493,25 @@ public class RecoverySourceHandler {
*
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param snapshot the translog snapshot to replay operations from
* @return the total number of translog operations that were sent
* @return the local checkpoint on the target and the total number of operations sent
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
int ops = 0;
long size = 0;
int totalOperations = 0;
int skippedOps = 0;
int totalSentOps = 0;
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO);
final List<Translog.Operation> operations = new ArrayList<>();
if (snapshot.totalOperations() == 0) {
final int expectedTotalOps = snapshot.totalOperations();
if (expectedTotalOps == 0) {
logger.trace("no translog operations to send");
}
final CancellableThreads.Interruptable sendBatch =
() -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps));
// send operations in batches
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
@ -495,39 +519,41 @@ public class RecoverySourceHandler {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
// if we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
// any ops before the starting sequence number
/*
* If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
* any ops before the starting sequence number.
*/
final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) continue;
if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
skippedOps++;
continue;
}
operations.add(operation);
ops++;
size += operation.estimateSize();
totalOperations++;
totalSentOps++;
// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));
if (logger.isTraceEnabled()) {
logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size),
snapshot.totalOperations());
}
cancellableThreads.execute(sendBatch);
logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
ops = 0;
size = 0;
operations.clear();
}
}
// send the leftover operations
if (!operations.isEmpty()) {
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));
if (!operations.isEmpty() || totalSentOps == 0) {
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
cancellableThreads.execute(sendBatch);
}
if (logger.isTraceEnabled()) {
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size),
snapshot.totalOperations());
}
assert expectedTotalOps == skippedOps + totalSentOps
: "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]";
return totalOperations;
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps);
}
/**

View File

@ -48,6 +48,7 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@ -374,12 +375,13 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer
.BatchOperationException {
public long indexTranslogOperations(
List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer.BatchOperationException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
indexShard().performBatchRecovery(operations);
return indexShard().getLocalCheckpoint();
}
@Override
@ -470,4 +472,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
}
Path translogLocation() {
return indexShard().shardPath().resolveTranslog();
}
}

View File

@ -53,8 +53,10 @@ public interface RecoveryTargetHandler {
* Index a set of translog operations on the target
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
*
* @return the local checkpoint on the target shard
*/
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps);
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps);
/**
* Notifies the target of the files it is going to receive

View File

@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import java.io.IOException;
public class RecoveryTranslogOperationsResponse extends TransportResponse {
long localCheckpoint;
RecoveryTranslogOperationsResponse() {
}
RecoveryTranslogOperationsResponse(final long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
// before 6.0.0 we responded with an empty response so we have to maintain that
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(localCheckpoint);
}
}
@Override
public void readFrom(final StreamInput in) throws IOException {
// before 6.0.0 we received an empty response so we have to maintain that
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
localCheckpoint = in.readZLong();
}
else {
localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}
static TransportResponseHandler<RecoveryTranslogOperationsResponse> HANDLER =
new FutureTransportResponseHandler<RecoveryTranslogOperationsResponse>() {
@Override
public RecoveryTranslogOperationsResponse newInstance() {
return new RecoveryTranslogOperationsResponse();
}
};
}

View File

@ -28,7 +28,10 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportFuture;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -98,11 +101,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
recoveryId, shardId, operations, totalTranslogOps);
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, translogOperationsRequest,
translogOpsRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
final RecoveryTranslogOperationsRequest translogOperationsRequest =
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps);
final TransportFuture<RecoveryTranslogOperationsResponse> future = transportService.submitRequest(
targetNode,
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,
translogOperationsRequest,
translogOpsRequestOptions,
RecoveryTranslogOperationsResponse.HANDLER);
return future.txGet().localCheckpoint;
}
@Override

View File

@ -296,9 +296,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return getDiscoveryNode(primary.routingEntry().currentNodeId());
}
public Future<Void> asyncRecoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier)
throws IOException {
FutureTask<Void> task = new FutureTask<>(() -> {
public Future<Void> asyncRecoverReplica(
final IndexShard replica, final BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier) throws IOException {
final FutureTask<Void> task = new FutureTask<>(() -> {
recoverReplica(replica, targetSupplier);
return null;
});

View File

@ -43,16 +43,17 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestCase {
@ -205,45 +206,27 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
@TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE," +
"org.elasticsearch.discovery:TRACE," +
"org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," +
"org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE," +
"org.elasticsearch.index.seqno:TRACE"
)
@TestLogging(
"_root:DEBUG,"
+ "org.elasticsearch.action.bulk:TRACE,"
+ "org.elasticsearch.action.get:TRACE,"
+ "org.elasticsearch.cluster.service:TRACE,"
+ "org.elasticsearch.discovery:TRACE,"
+ "org.elasticsearch.indices.cluster:TRACE,"
+ "org.elasticsearch.indices.recovery:TRACE,"
+ "org.elasticsearch.index.seqno:TRACE,"
+ "org.elasticsearch.index.shard:TRACE")
public void testWaitForPendingSeqNo() throws Exception {
IndexMetaData metaData = buildIndexMetaData(1);
final int pendingDocs = randomIntBetween(1, 5);
final AtomicReference<Semaphore> blockIndexingOnPrimary = new AtomicReference<>();
final CountDownLatch blockedIndexers = new CountDownLatch(pendingDocs);
final BlockingEngineFactory primaryEngineFactory = new BlockingEngineFactory();
try (ReplicationGroup shards = new ReplicationGroup(metaData) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
if (routing.primary()) {
return new EngineFactory() {
@Override
public Engine newReadWriteEngine(EngineConfig config) {
return InternalEngineTests.createInternalEngine((directory, writerConfig) ->
new IndexWriter(directory, writerConfig) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
Semaphore block = blockIndexingOnPrimary.get();
if (block != null) {
blockedIndexers.countDown();
try {
block.acquire();
} catch (InterruptedException e) {
throw new AssertionError("unexpectedly interrupted", e);
}
}
return super.addDocument(doc);
}
}, null, config);
}
};
return primaryEngineFactory;
} else {
return null;
}
@ -256,9 +239,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
closeShards(replica);
docs += pendingDocs;
final Semaphore pendingDocsSemaphore = new Semaphore(pendingDocs);
blockIndexingOnPrimary.set(pendingDocsSemaphore);
blockIndexingOnPrimary.get().acquire(pendingDocs);
primaryEngineFactory.latchIndexers();
CountDownLatch pendingDocsDone = new CountDownLatch(pendingDocs);
for (int i = 0; i < pendingDocs; i++) {
final String id = "pending_" + i;
@ -274,9 +255,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
// wait for the pending ops to "hang"
blockedIndexers.await();
primaryEngineFactory.awaitIndexersLatch();
blockIndexingOnPrimary.set(null);
primaryEngineFactory.allowIndexing();
// index some more
docs += shards.indexDocs(randomInt(5));
@ -298,11 +279,12 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
recoveryStart.await();
for (int i = 0; i < pendingDocs; i++) {
assertFalse((pendingDocs - i) + " pending operations, recovery should wait", preparedForTranslog.get());
pendingDocsSemaphore.release();
}
// index some more
docs += shards.indexDocs(randomInt(5));
assertFalse("recovery should wait on pending docs", preparedForTranslog.get());
primaryEngineFactory.releaseLatchedIndexers();
pendingDocsDone.await();
// now recovery can finish
@ -312,6 +294,114 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(docs));
shards.assertAllEqual(docs);
} finally {
primaryEngineFactory.close();
}
}
@TestLogging(
"_root:DEBUG,"
+ "org.elasticsearch.action.bulk:TRACE,"
+ "org.elasticsearch.action.get:TRACE,"
+ "org.elasticsearch.cluster.service:TRACE,"
+ "org.elasticsearch.discovery:TRACE,"
+ "org.elasticsearch.indices.cluster:TRACE,"
+ "org.elasticsearch.indices.recovery:TRACE,"
+ "org.elasticsearch.index.seqno:TRACE,"
+ "org.elasticsearch.index.shard:TRACE")
public void testCheckpointsAndMarkingInSync() throws Exception {
final IndexMetaData metaData = buildIndexMetaData(0);
final BlockingEngineFactory replicaEngineFactory = new BlockingEngineFactory();
try (
ReplicationGroup shards = new ReplicationGroup(metaData) {
@Override
protected EngineFactory getEngineFactory(final ShardRouting routing) {
if (routing.primary()) {
return null;
} else {
return replicaEngineFactory;
}
}
};
AutoCloseable ignored = replicaEngineFactory // make sure we release indexers before closing
) {
shards.startPrimary();
final int docs = shards.indexDocs(randomIntBetween(1, 10));
logger.info("indexed [{}] docs", docs);
final CountDownLatch pendingDocDone = new CountDownLatch(1);
final CountDownLatch pendingDocActiveWithExtraDocIndexed = new CountDownLatch(1);
final IndexShard replica = shards.addReplica();
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(
replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
@Override
public long indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps) {
// index a doc which is not part of the snapshot, but also does not complete on replica
replicaEngineFactory.latchIndexers();
threadPool.generic().submit(() -> {
try {
shards.index(new IndexRequest(index.getName(), "type", "pending").source("{}", XContentType.JSON));
} catch (final Exception e) {
throw new RuntimeException(e);
} finally {
pendingDocDone.countDown();
}
});
try {
// the pending doc is latched in the engine
replicaEngineFactory.awaitIndexersLatch();
// unblock indexing for the next doc
replicaEngineFactory.allowIndexing();
shards.index(new IndexRequest(index.getName(), "type", "completed").source("{}", XContentType.JSON));
/*
* We want to test that the global checkpoint is blocked from advancing on the primary when a replica shard
* is pending being marked in-sync. We also want to test the the global checkpoint does not advance on the
* replica when its local checkpoint is behind the global checkpoint on the primary. Finally, advancing the
* global checkpoint here forces recovery to block until the pending doc is indexing on the replica.
*/
shards.getPrimary().updateGlobalCheckpointOnPrimary();
pendingDocActiveWithExtraDocIndexed.countDown();
} catch (final Exception e) {
throw new AssertionError(e);
}
return super.indexTranslogOperations(operations, totalTranslogOps);
}
});
pendingDocActiveWithExtraDocIndexed.await();
assertThat(pendingDocDone.getCount(), equalTo(1L));
{
final long expectedDocs = docs + 2L;
assertThat(shards.getPrimary().getLocalCheckpoint(), equalTo(expectedDocs - 1));
// recovery has not completed, therefore the global checkpoint can have advance on the primary
assertThat(shards.getPrimary().getGlobalCheckpoint(), equalTo(expectedDocs - 1));
// the pending document is not done, the checkpoints can not have advanced on the replica
assertThat(replica.getLocalCheckpoint(), lessThan(expectedDocs - 1));
assertThat(replica.getGlobalCheckpoint(), lessThan(expectedDocs - 1));
}
shards.getPrimary().updateGlobalCheckpointOnPrimary();
{
final long expectedDocs = docs + 3L;
shards.index(new IndexRequest(index.getName(), "type", "last").source("{}", XContentType.JSON));
assertThat(shards.getPrimary().getLocalCheckpoint(), equalTo(expectedDocs - 1));
assertThat(shards.getPrimary().getGlobalCheckpoint(), equalTo(expectedDocs - 2));
assertThat(replica.getLocalCheckpoint(), lessThan(expectedDocs - 2));
assertThat(replica.getGlobalCheckpoint(), lessThan(expectedDocs - 2));
}
replicaEngineFactory.releaseLatchedIndexers();
pendingDocDone.await();
recoveryFuture.get();
shards.getPrimary().updateGlobalCheckpointOnPrimary();
{
final long expectedDocs = docs + 3L;
assertBusy(() -> {
assertThat(shards.getPrimary().getLocalCheckpoint(), equalTo(expectedDocs - 1));
assertThat(shards.getPrimary().getGlobalCheckpoint(), equalTo(expectedDocs - 1));
assertThat(replica.getLocalCheckpoint(), equalTo(expectedDocs - 1));
assertThat(replica.getGlobalCheckpoint(), equalTo(expectedDocs - 1));
});
}
}
}
@ -354,11 +444,11 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
if (hasBlocked() == false) {
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
super.indexTranslogOperations(operations, totalTranslogOps);
return super.indexTranslogOperations(operations, totalTranslogOps);
}
@Override
@ -379,4 +469,66 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
static class BlockingEngineFactory implements EngineFactory, AutoCloseable {
private final List<CountDownLatch> blocks = new ArrayList<>();
private final AtomicReference<CountDownLatch> blockReference = new AtomicReference<>();
private final AtomicReference<CountDownLatch> blockedIndexers = new AtomicReference<>();
public synchronized void latchIndexers() {
final CountDownLatch block = new CountDownLatch(1);
blocks.add(block);
blockedIndexers.set(new CountDownLatch(1));
assert blockReference.compareAndSet(null, block);
}
public void awaitIndexersLatch() throws InterruptedException {
blockedIndexers.get().await();
}
public synchronized void allowIndexing() {
final CountDownLatch previous = blockReference.getAndSet(null);
assert previous == null || blocks.contains(previous);
}
public synchronized void releaseLatchedIndexers() {
allowIndexing();
blocks.forEach(CountDownLatch::countDown);
blocks.clear();
}
@Override
public Engine newReadWriteEngine(final EngineConfig config) {
return InternalEngineTests.createInternalEngine(
(directory, writerConfig) ->
new IndexWriter(directory, writerConfig) {
@Override
public long addDocument(final Iterable<? extends IndexableField> doc) throws IOException {
final CountDownLatch block = blockReference.get();
if (block != null) {
final CountDownLatch latch = blockedIndexers.get();
if (latch != null) {
latch.countDown();
}
try {
block.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
return super.addDocument(doc);
}
},
null,
config);
}
@Override
public void close() throws Exception {
releaseLatchedIndexers();
}
}
}

View File

@ -1,247 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
public class GlobalCheckpointTests extends ESTestCase {
GlobalCheckpointTracker tracker;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
tracker =
new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
}
public void testEmptyShards() {
assertFalse("checkpoint shouldn't be updated when the are no active shards", tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
}
private final AtomicInteger aIdGenerator = new AtomicInteger();
private Map<String, Long> randomAllocationsWithLocalCheckpoints(int min, int max) {
Map<String, Long> allocations = new HashMap<>();
for (int i = randomIntBetween(min, max); i > 0; i--) {
allocations.put("id_" + aIdGenerator.incrementAndGet(), (long) randomInt(1000));
}
return allocations;
}
public void testGlobalCheckpointUpdate() {
Map<String, Long> allocations = new HashMap<>();
Map<String, Long> activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5);
Set<String> active = new HashSet<>(activeWithCheckpoints.keySet());
allocations.putAll(activeWithCheckpoints);
Map<String, Long> initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5);
Set<String> initializing = new HashSet<>(initializingWithCheckpoints.keySet());
allocations.putAll(initializingWithCheckpoints);
assertThat(allocations.size(), equalTo(active.size() + initializing.size()));
// note: allocations can never be empty in practice as we always have at least one primary shard active/in sync
// it is however nice not to assume this on this level and check we do the right thing.
final long maxLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO);
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
logger.info("--> using allocations");
allocations.keySet().forEach(aId -> {
final String type;
if (active.contains(aId)) {
type = "active";
} else if (initializing.contains(aId)) {
type = "init";
} else {
throw new IllegalStateException(aId + " not found in any map");
}
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});
tracker.updateAllocationIdsFromMaster(active, initializing);
initializing.forEach(aId -> tracker.markAllocationIdAsInSync(aId));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(tracker.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != UNASSIGNED_SEQ_NO));
assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint));
// increment checkpoints
active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
// now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested.
final String extraId = "extra_" + randomAlphaOfLength(5);
// first check that adding it without the master blessing doesn't change anything.
tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
assertThat(tracker.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO));
Set<String> newActive = new HashSet<>(active);
newActive.add(extraId);
tracker.updateAllocationIdsFromMaster(newActive, initializing);
// we should ask for a refresh , but not update the checkpoint
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint));
// now notify for the new id
tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
// now it should be incremented
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), greaterThan(maxLocalCheckpoint));
}
public void testMissingActiveIdsPreventAdvance() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(0, 5);
final Map<String, Long> assigned = new HashMap<>();
assigned.putAll(active);
assigned.putAll(initializing);
tracker.updateAllocationIdsFromMaster(
new HashSet<>(randomSubsetOf(randomInt(active.size() - 1), active.keySet())),
initializing.keySet());
randomSubsetOf(initializing.keySet()).forEach(tracker::markAllocationIdAsInSync);
assigned.forEach(tracker::updateLocalCheckpoint);
// now mark all active shards
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
// global checkpoint can't be advanced, but we need a sync
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
assigned.forEach(tracker::updateLocalCheckpoint);
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testMissingInSyncIdsPreventAdvance() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(0, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(tracker::markAllocationIdAsInSync);
randomSubsetOf(randomInt(initializing.size() - 1),
initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId)));
active.forEach(tracker::updateLocalCheckpoint);
// global checkpoint can't be advanced, but we need a sync
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
initializing.forEach(tracker::updateLocalCheckpoint);
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(tracker::markAllocationIdAsInSync);
nonApproved.keySet().forEach(tracker::markAllocationIdAsInSync);
List<Map<String, Long>> allocations = Arrays.asList(active, initializing, nonApproved);
Collections.shuffle(allocations, random());
allocations.forEach(a -> a.forEach(tracker::updateLocalCheckpoint));
// global checkpoint can be advanced, but we need a sync
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreRemovedIfNotValidatedByMaster() {
final Map<String, Long> activeToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializingToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> activeToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializingToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5);
final Set<String> active = Sets.union(activeToStay.keySet(), activeToBeRemoved.keySet());
final Set<String> initializing = Sets.union(initializingToStay.keySet(), initializingToBeRemoved.keySet());
final Map<String, Long> allocations = new HashMap<>();
allocations.putAll(activeToStay);
if (randomBoolean()) {
allocations.putAll(activeToBeRemoved);
}
allocations.putAll(initializingToStay);
if (randomBoolean()) {
allocations.putAll(initializingToBeRemoved);
}
tracker.updateAllocationIdsFromMaster(active, initializing);
if (randomBoolean()) {
initializingToStay.keySet().forEach(tracker::markAllocationIdAsInSync);
} else {
initializing.forEach(tracker::markAllocationIdAsInSync);
}
if (randomBoolean()) {
allocations.forEach(tracker::updateLocalCheckpoint);
}
// global checkpoint may be advanced, but we need a sync in any case
assertTrue(tracker.updateCheckpointOnPrimary());
// now remove shards
if (randomBoolean()) {
tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
} else {
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
}
final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream())
.min(Long::compare).get() + 10; // we added 10 to make sure it's advanced in the second time
// global checkpoint is advanced and we need a sync
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(checkpoint));
}
}

View File

@ -0,0 +1,490 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.not;
public class GlobalCheckpointTrackerTests extends ESTestCase {
GlobalCheckpointTracker tracker;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
tracker =
new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
}
public void testEmptyShards() {
assertFalse("checkpoint shouldn't be updated when the are no active shards", tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
}
private final AtomicInteger aIdGenerator = new AtomicInteger();
private Map<String, Long> randomAllocationsWithLocalCheckpoints(int min, int max) {
Map<String, Long> allocations = new HashMap<>();
for (int i = randomIntBetween(min, max); i > 0; i--) {
allocations.put("id_" + aIdGenerator.incrementAndGet(), (long) randomInt(1000));
}
return allocations;
}
public void testGlobalCheckpointUpdate() {
Map<String, Long> allocations = new HashMap<>();
Map<String, Long> activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5);
Set<String> active = new HashSet<>(activeWithCheckpoints.keySet());
allocations.putAll(activeWithCheckpoints);
Map<String, Long> initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5);
Set<String> initializing = new HashSet<>(initializingWithCheckpoints.keySet());
allocations.putAll(initializingWithCheckpoints);
assertThat(allocations.size(), equalTo(active.size() + initializing.size()));
// note: allocations can never be empty in practice as we always have at least one primary shard active/in sync
// it is however nice not to assume this on this level and check we do the right thing.
final long maxLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO);
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
logger.info("--> using allocations");
allocations.keySet().forEach(aId -> {
final String type;
if (active.contains(aId)) {
type = "active";
} else if (initializing.contains(aId)) {
type = "init";
} else {
throw new IllegalStateException(aId + " not found in any map");
}
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});
tracker.updateAllocationIdsFromMaster(active, initializing);
initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId, tracker.getCheckpoint()));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(tracker.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != UNASSIGNED_SEQ_NO));
assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint));
// increment checkpoints
active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId)));
// now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested.
final String extraId = "extra_" + randomAlphaOfLength(5);
// first check that adding it without the master blessing doesn't change anything.
tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
assertThat(tracker.getLocalCheckpointForAllocationId(extraId), equalTo(UNASSIGNED_SEQ_NO));
Set<String> newActive = new HashSet<>(active);
newActive.add(extraId);
tracker.updateAllocationIdsFromMaster(newActive, initializing);
// we should ask for a refresh , but not update the checkpoint
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(maxLocalCheckpoint));
// now notify for the new id
tracker.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
// now it should be incremented
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), greaterThan(maxLocalCheckpoint));
}
public void testMissingActiveIdsPreventAdvance() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(0, 5);
final Map<String, Long> assigned = new HashMap<>();
assigned.putAll(active);
assigned.putAll(initializing);
tracker.updateAllocationIdsFromMaster(
new HashSet<>(randomSubsetOf(randomInt(active.size() - 1), active.keySet())),
initializing.keySet());
randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint()));
assigned.forEach(tracker::updateLocalCheckpoint);
// now mark all active shards
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
// global checkpoint can't be advanced, but we need a sync
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
assigned.forEach(tracker::updateLocalCheckpoint);
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testMissingInSyncIdsPreventAdvance() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(0, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint()));
randomSubsetOf(randomInt(initializing.size() - 1),
initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId)));
active.forEach(tracker::updateLocalCheckpoint);
// global checkpoint can't be advanced, but we need a sync
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
initializing.forEach(tracker::updateLocalCheckpoint);
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint()));
nonApproved.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint()));
List<Map<String, Long>> allocations = Arrays.asList(active, initializing, nonApproved);
Collections.shuffle(allocations, random());
allocations.forEach(a -> a.forEach(tracker::updateLocalCheckpoint));
// global checkpoint can be advanced, but we need a sync
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreRemovedIfNotValidatedByMaster() {
final Map<String, Long> activeToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializingToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> activeToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializingToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5);
final Set<String> active = Sets.union(activeToStay.keySet(), activeToBeRemoved.keySet());
final Set<String> initializing = Sets.union(initializingToStay.keySet(), initializingToBeRemoved.keySet());
final Map<String, Long> allocations = new HashMap<>();
allocations.putAll(activeToStay);
if (randomBoolean()) {
allocations.putAll(activeToBeRemoved);
}
allocations.putAll(initializingToStay);
if (randomBoolean()) {
allocations.putAll(initializingToBeRemoved);
}
tracker.updateAllocationIdsFromMaster(active, initializing);
if (randomBoolean()) {
initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint()));
} else {
initializing.forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getCheckpoint()));
}
if (randomBoolean()) {
allocations.forEach(tracker::updateLocalCheckpoint);
}
// global checkpoint may be advanced, but we need a sync in any case
assertTrue(tracker.updateCheckpointOnPrimary());
// now remove shards
if (randomBoolean()) {
tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
} else {
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L));
tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
}
final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream())
.min(Long::compare).get() + 10; // we added 10 to make sure it's advanced in the second time
// global checkpoint is advanced and we need a sync
assertTrue(tracker.updateCheckpointOnPrimary());
assertThat(tracker.getCheckpoint(), equalTo(checkpoint));
}
public void testWaitForAllocationIdToBeInSync() throws BrokenBarrierException, InterruptedException {
final int localCheckpoint = randomIntBetween(1, 32);
final int globalCheckpoint = randomIntBetween(localCheckpoint + 1, 64);
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean complete = new AtomicBoolean();
final String inSyncAllocationId =randomAlphaOfLength(16);
final String trackingAllocationId = randomAlphaOfLength(16);
tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint);
tracker.updateCheckpointOnPrimary();
final Thread thread = new Thread(() -> {
try {
// synchronize starting with the test thread
barrier.await();
tracker.markAllocationIdAsInSync(trackingAllocationId, localCheckpoint);
complete.set(true);
// synchronize with the test thread checking if we are no longer waiting
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
});
thread.start();
// synchronize starting with the waiting thread
barrier.await();
final List<Integer> elements = IntStream.rangeClosed(0, globalCheckpoint - 1).boxed().collect(Collectors.toList());
Randomness.shuffle(elements);
for (int i = 0; i < elements.size(); i++) {
tracker.updateLocalCheckpoint(trackingAllocationId, elements.get(i));
assertFalse(complete.get());
assertTrue(awaitBusy(() -> tracker.trackingLocalCheckpoints.containsKey(trackingAllocationId)));
assertTrue(awaitBusy(() -> tracker.pendingInSync.contains(trackingAllocationId)));
assertFalse(tracker.inSyncLocalCheckpoints.containsKey(trackingAllocationId));
}
tracker.updateLocalCheckpoint(trackingAllocationId, randomIntBetween(globalCheckpoint, 64));
// synchronize with the waiting thread to mark that it is complete
barrier.await();
assertTrue(complete.get());
assertTrue(tracker.trackingLocalCheckpoints.isEmpty());
assertTrue(tracker.pendingInSync.isEmpty());
assertTrue(tracker.inSyncLocalCheckpoints.containsKey(trackingAllocationId));
thread.join();
}
public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException {
final int localCheckpoint = randomIntBetween(1, 32);
final int globalCheckpoint = randomIntBetween(localCheckpoint + 1, 64);
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean interrupted = new AtomicBoolean();
final String inSyncAllocationId = randomAlphaOfLength(16);
final String trackingAllocationId = randomAlphaOfLength(32);
tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId));
tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint);
tracker.updateCheckpointOnPrimary();
final Thread thread = new Thread(() -> {
try {
// synchronize starting with the test thread
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
try {
tracker.markAllocationIdAsInSync(trackingAllocationId, localCheckpoint);
} catch (final InterruptedException e) {
interrupted.set(true);
// synchronize with the test thread checking if we are interrupted
}
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
});
thread.start();
// synchronize starting with the waiting thread
barrier.await();
thread.interrupt();
// synchronize with the waiting thread to mark that it is complete
barrier.await();
assertTrue(interrupted.get());
thread.join();
}
public void testUpdateAllocationIdsFromMaster() throws Exception {
final int numberOfActiveAllocationsIds = randomIntBetween(2, 16);
final Set<String> activeAllocationIds =
IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16)).collect(Collectors.toSet());
final int numberOfInitializingIds = randomIntBetween(2, 16);
final Set<String> initializingIds =
IntStream.range(0, numberOfInitializingIds).mapToObj(i -> {
do {
final String initializingId = randomAlphaOfLength(16);
// ensure we do not duplicate an allocation ID in active and initializing sets
if (!activeAllocationIds.contains(initializingId)) {
return initializingId;
}
} while (true);
}).collect(Collectors.toSet());
tracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingIds);
// first we assert that the in-sync and tracking sets are set up correctly
assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(
activeAllocationIds
.stream()
.allMatch(a -> tracker.inSyncLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertTrue(initializingIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a)));
assertTrue(
initializingIds
.stream()
.allMatch(a -> tracker.trackingLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO));
// now we will remove some allocation IDs from these and ensure that they propagate through
final List<String> removingActiveAllocationIds = randomSubsetOf(activeAllocationIds);
final Set<String> newActiveAllocationIds =
activeAllocationIds.stream().filter(a -> !removingActiveAllocationIds.contains(a)).collect(Collectors.toSet());
final List<String> removingInitializingAllocationIds = randomSubsetOf(initializingIds);
final Set<String> newInitializingAllocationIds =
initializingIds.stream().filter(a -> !removingInitializingAllocationIds.contains(a)).collect(Collectors.toSet());
tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(removingActiveAllocationIds.stream().noneMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a)));
assertTrue(removingInitializingAllocationIds.stream().noneMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a)));
/*
* Now we will add an allocation ID to each of active and initializing and ensure they propagate through. Using different lengths
* than we have been using above ensures that we can not collide with a previous allocation ID
*/
newActiveAllocationIds.add(randomAlphaOfLength(32));
newInitializingAllocationIds.add(randomAlphaOfLength(64));
tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a)));
assertTrue(
newActiveAllocationIds
.stream()
.allMatch(a -> tracker.inSyncLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a)));
assertTrue(
newInitializingAllocationIds
.stream()
.allMatch(a -> tracker.trackingLocalCheckpoints.get(a) == SequenceNumbersService.UNASSIGNED_SEQ_NO));
// the tracking allocation IDs should play no role in determining the global checkpoint
final Map<String, Integer> activeLocalCheckpoints =
newActiveAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024)));
activeLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a, l));
final Map<String, Integer> initializingLocalCheckpoints =
newInitializingAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024)));
initializingLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a, l));
assertTrue(
activeLocalCheckpoints
.entrySet()
.stream()
.allMatch(e -> tracker.getLocalCheckpointForAllocationId(e.getKey()) == e.getValue()));
assertTrue(
initializingLocalCheckpoints
.entrySet()
.stream()
.allMatch(e -> tracker.trackingLocalCheckpoints.get(e.getKey()) == e.getValue()));
assertTrue(tracker.updateCheckpointOnPrimary());
final long minimumActiveLocalCheckpoint = (long) activeLocalCheckpoints.values().stream().min(Integer::compareTo).get();
assertThat(tracker.getCheckpoint(), equalTo(minimumActiveLocalCheckpoint));
final long minimumInitailizingLocalCheckpoint = (long) initializingLocalCheckpoints.values().stream().min(Integer::compareTo).get();
// now we are going to add a new allocation ID and bring it in sync which should move it to the in-sync allocation IDs
final long localCheckpoint =
randomIntBetween(0, Math.toIntExact(Math.min(minimumActiveLocalCheckpoint, minimumInitailizingLocalCheckpoint) - 1));
// using a different length than we have been using above ensures that we can not collide with a previous allocation ID
final String newSyncingAllocationId = randomAlphaOfLength(128);
newInitializingAllocationIds.add(newSyncingAllocationId);
tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
final CyclicBarrier barrier = new CyclicBarrier(2);
final Thread thread = new Thread(() -> {
try {
barrier.await();
tracker.markAllocationIdAsInSync(newSyncingAllocationId, localCheckpoint);
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
});
thread.start();
barrier.await();
assertBusy(() -> {
assertTrue(tracker.pendingInSync.contains(newSyncingAllocationId));
assertTrue(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId));
});
tracker.updateLocalCheckpoint(newSyncingAllocationId, randomIntBetween(Math.toIntExact(minimumActiveLocalCheckpoint), 1024));
barrier.await();
assertFalse(tracker.pendingInSync.contains(newSyncingAllocationId));
assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId));
assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId));
/*
* The new in-sync allocation ID is in the in-sync set now yet the master does not know this; the allocation ID should still be in
* the in-sync set even if we receive a cluster state update that does not reflect this.
*
*/
tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds);
assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId));
assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId));
}
private void markAllocationIdAsInSyncQuietly(
final GlobalCheckpointTracker tracker, final String allocationId, final long localCheckpoint) {
try {
tracker.markAllocationIdAsInSync(allocationId, localCheckpoint);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1275,9 +1275,10 @@ public class IndexShardTests extends IndexShardTestCase {
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
super.indexTranslogOperations(operations, totalTranslogOps);
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps);
assertFalse(replica.getTranslog().syncNeeded());
return localCheckpoint;
}
}, true);
@ -1331,10 +1332,11 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
super.indexTranslogOperations(operations, totalTranslogOps);
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps);
// Shard should now be active since we did recover:
assertTrue(replica.isActive());
return localCheckpoint;
}
}, false);

View File

@ -29,6 +29,16 @@ import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogWriter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
@ -36,7 +46,13 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
public void testGetStartingSeqNo() throws Exception {
IndexShard replica = newShard(false);
RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
final AtomicReference<Path> translogLocation = new AtomicReference<>();
RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null) {
@Override
Path translogLocation() {
return translogLocation.get();
}
};
try {
recoveryEmptyReplica(replica);
int docs = randomIntBetween(1, 10);
@ -56,22 +72,28 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
final long maxSeqNo = replica.seqNoStats().getMaxSeqNo();
final long localCheckpoint = replica.getLocalCheckpoint();
translogLocation.set(replica.getTranslog().location());
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
replica.updateGlobalCheckpointOnReplica(maxSeqNo - 1);
replica.getTranslog().sync();
final Translog translog = replica.getTranslog();
translogLocation.set(
writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo - 1));
// commit is enough, global checkpoint is below max *committed* which is NO_OPS_PERFORMED
// commit is good, global checkpoint is at least max *committed* which is NO_OPS_PERFORMED
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
replica.flush(new FlushRequest());
// commit is still not good enough, global checkpoint is below max
translogLocation.set(replica.getTranslog().location());
// commit is not good, global checkpoint is below max
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
replica.updateGlobalCheckpointOnReplica(maxSeqNo);
replica.getTranslog().sync();
// commit is enough, global checkpoint is below max
translogLocation.set(
writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo));
// commit is good, global checkpoint is above max
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(localCheckpoint + 1));
} finally {
closeShards(replica);
@ -79,4 +101,23 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
}
}
private Path writeTranslog(
final ShardId shardId,
final String translogUUID,
final long generation,
final long globalCheckpoint
) throws IOException {
final Path tempDir = createTempDir();
final Path resolve = tempDir.resolve(Translog.getFilename(generation));
Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME));
try (TranslogWriter ignored = TranslogWriter.create(
shardId,
translogUUID,
generation,
resolve,
FileChannel::open,
TranslogConfig.DEFAULT_BUFFER_SIZE, () -> globalCheckpoint)) {}
return tempDir;
}
}

View File

@ -180,7 +180,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true)));
}
operations.add(null);
int totalOperations = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() {
RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() {
private int counter = 0;
@Override
@ -194,9 +194,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
});
if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
assertThat(totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers));
assertThat(result.totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers));
} else {
assertThat(totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo)));
assertThat(result.totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo)));
}
}
@ -403,8 +403,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
void phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException {
long phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException {
phase2Called.set(true);
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
};
@ -494,8 +495,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
void phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException {
long phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException {
phase2Called.set(true);
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
};