Seq Number based recovery should validate last lucene commit max seq# (#22851)
The seq# base recovery logic relies on rolling back lucene to remove any operations above the global checkpoint. This part of the plan is not implemented yet but have to have these guarantees. Instead we should make the seq# logic validate that the last commit point (and the only one we have) maintains the invariant and if not, fall back to file based recovery. This commit adds a test that creates situation where rollback is needed (primary failover with ops in flight) and fixes another issue that was surfaced by it - if a primary can't serve a seq# based recovery request and does a file copy, it still used the incoming `startSeqNo` as a filter. Relates to #22484 & #10708
This commit is contained in:
parent
29f63c78cc
commit
eb36b82de4
|
@ -139,6 +139,16 @@ public class LocalCheckpointTracker {
|
|||
return nextSeqNo - 1;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* constructs a {@link SeqNoStats} object, using local state and the supplied global checkpoint
|
||||
*
|
||||
* @implNote this is needed to make sure the local checkpoint and max seq no are consistent
|
||||
*/
|
||||
synchronized SeqNoStats getStats(final long globalCheckpoint) {
|
||||
return new SeqNoStats(getMaxSeqNo(), getCheckpoint(), globalCheckpoint);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for all operations up to the provided sequence number to complete.
|
||||
*
|
||||
|
|
|
@ -39,6 +39,10 @@ public class SeqNoStats implements ToXContent, Writeable {
|
|||
private final long globalCheckpoint;
|
||||
|
||||
public SeqNoStats(long maxSeqNo, long localCheckpoint, long globalCheckpoint) {
|
||||
assert localCheckpoint <= maxSeqNo:
|
||||
"local checkpoint [" + localCheckpoint + "] is above maximum seq no [" + maxSeqNo + "]";
|
||||
// note that the the global checkpoint can be higher from both maxSeqNo and localCheckpoint
|
||||
// as we use this stats object to describe lucene commits as well as live statistic.
|
||||
this.maxSeqNo = maxSeqNo;
|
||||
this.localCheckpoint = localCheckpoint;
|
||||
this.globalCheckpoint = globalCheckpoint;
|
||||
|
|
|
@ -111,7 +111,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
|
|||
* @return stats encapuslating the maximum sequence number, the local checkpoint and the global checkpoint
|
||||
*/
|
||||
public SeqNoStats stats() {
|
||||
return new SeqNoStats(getMaxSeqNo(), getLocalCheckpoint(), getGlobalCheckpoint());
|
||||
return localCheckpointTracker.getStats(getGlobalCheckpoint());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -199,10 +199,10 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
|
|||
Supplier<Long> currentClusterStateVersionSupplier = () -> clusterService.state().getVersion();
|
||||
if (shard.indexSettings().isOnSharedFilesystem()) {
|
||||
handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
|
||||
this::delayNewRecoveries, logger);
|
||||
this::delayNewRecoveries, settings);
|
||||
} else {
|
||||
handler = new RecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
|
||||
this::delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), logger);
|
||||
this::delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), settings);
|
||||
}
|
||||
return handler;
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
import org.elasticsearch.index.mapper.MapperException;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
|
@ -61,7 +62,6 @@ import org.elasticsearch.transport.TransportResponse;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -365,7 +365,15 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
|
|||
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
|
||||
try {
|
||||
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.indexShard().shardPath().resolveTranslog());
|
||||
return recoveryTarget.store().loadSeqNoStats(globalCheckpoint).getLocalCheckpoint() + 1;
|
||||
final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint);
|
||||
if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) {
|
||||
// commit point is good for seq no based recovery as the maximum seq# including in it
|
||||
// is below the global checkpoint (i.e., it excludes any ops thay may not be on the primary)
|
||||
// Recovery will start at the first op after the local check point stored in the commit.
|
||||
return seqNoStats.getLocalCheckpoint() + 1;
|
||||
} else {
|
||||
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
// this can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
|
||||
// translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
|
||||
|
|
|
@ -30,14 +30,15 @@ import org.apache.lucene.store.IndexInput;
|
|||
import org.apache.lucene.store.RateLimiter;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
|
@ -109,15 +110,15 @@ public class RecoverySourceHandler {
|
|||
final Supplier<Long> currentClusterStateVersionSupplier,
|
||||
Function<String, Releasable> delayNewRecoveries,
|
||||
final int fileChunkSizeInBytes,
|
||||
final Logger logger) {
|
||||
final Settings nodeSettings) {
|
||||
this.shard = shard;
|
||||
this.recoveryTarget = recoveryTarget;
|
||||
this.request = request;
|
||||
this.currentClusterStateVersionSupplier = currentClusterStateVersionSupplier;
|
||||
this.delayNewRecoveries = delayNewRecoveries;
|
||||
this.logger = logger;
|
||||
this.indexName = this.request.shardId().getIndex().getName();
|
||||
this.shardId = this.request.shardId().id();
|
||||
this.logger = Loggers.getLogger(getClass(), nodeSettings, request.shardId(), "recover to " + request.targetNode().getName());
|
||||
this.chunkSizeInBytes = fileChunkSizeInBytes;
|
||||
this.response = new RecoveryResponse();
|
||||
}
|
||||
|
@ -127,12 +128,14 @@ public class RecoverySourceHandler {
|
|||
*/
|
||||
public RecoveryResponse recoverToTarget() throws IOException {
|
||||
try (final Translog.View translogView = shard.acquireTranslogView()) {
|
||||
logger.trace("{} captured translog id [{}] for recovery", shard.shardId(), translogView.minTranslogGeneration());
|
||||
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
|
||||
|
||||
boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO &&
|
||||
isTranslogReadyForSequenceNumberBasedRecovery(translogView);
|
||||
|
||||
if (!isSequenceNumberBasedRecoveryPossible) {
|
||||
if (isSequenceNumberBasedRecoveryPossible) {
|
||||
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
|
||||
} else {
|
||||
final IndexCommit phase1Snapshot;
|
||||
try {
|
||||
phase1Snapshot = shard.acquireIndexCommit(false);
|
||||
|
@ -177,9 +180,10 @@ public class RecoverySourceHandler {
|
|||
throw new IndexShardRelocatedException(request.shardId());
|
||||
}
|
||||
|
||||
logger.trace("{} snapshot translog for recovery; current size is [{}]", shard.shardId(), translogView.totalOperations());
|
||||
logger.trace("snapshot translog for recovery; current size is [{}]", translogView.totalOperations());
|
||||
try {
|
||||
phase2(translogView.snapshot());
|
||||
phase2(isSequenceNumberBasedRecoveryPossible ? request.startingSeqNo() : SequenceNumbersService.UNASSIGNED_SEQ_NO,
|
||||
translogView.snapshot());
|
||||
} catch (Exception e) {
|
||||
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
|
||||
}
|
||||
|
@ -201,20 +205,17 @@ public class RecoverySourceHandler {
|
|||
final long startingSeqNo = request.startingSeqNo();
|
||||
assert startingSeqNo >= 0;
|
||||
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
|
||||
logger.trace("{} starting: [{}], ending: [{}]", shard.shardId(), startingSeqNo, endingSeqNo);
|
||||
logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo);
|
||||
// the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one
|
||||
if (startingSeqNo - 1 <= endingSeqNo) {
|
||||
logger.trace(
|
||||
"{} waiting for all operations in the range [{}, {}] to complete",
|
||||
shard.shardId(),
|
||||
startingSeqNo,
|
||||
endingSeqNo);
|
||||
/*
|
||||
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
|
||||
* operations in the required range will be available for replaying from the translog of the source.
|
||||
*/
|
||||
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
|
||||
|
||||
logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);
|
||||
|
||||
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1);
|
||||
final Translog.Snapshot snapshot = translogView.snapshot();
|
||||
Translog.Operation operation;
|
||||
|
@ -283,9 +284,7 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
// we shortcut recovery here because we have nothing to copy. but we must still start the engine on the target.
|
||||
// so we don't return here
|
||||
logger.trace("[{}][{}] skipping [phase1] to {} - identical sync id [{}] found on both source and target", indexName,
|
||||
shardId,
|
||||
request.targetNode(), recoverySourceSyncId);
|
||||
logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", recoverySourceSyncId);
|
||||
} else {
|
||||
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
|
||||
for (StoreFileMetaData md : diff.identical) {
|
||||
|
@ -293,9 +292,8 @@ public class RecoverySourceHandler {
|
|||
response.phase1ExistingFileSizes.add(md.length());
|
||||
existingTotalSize += md.length();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exist in local store and has checksum [{}]," +
|
||||
" size [{}]",
|
||||
indexName, shardId, request.targetNode(), md.name(), md.checksum(), md.length());
|
||||
logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," +
|
||||
" size [{}]", md.name(), md.checksum(), md.length());
|
||||
}
|
||||
totalSize += md.length();
|
||||
}
|
||||
|
@ -304,12 +302,10 @@ public class RecoverySourceHandler {
|
|||
phase1Files.addAll(diff.missing);
|
||||
for (StoreFileMetaData md : phase1Files) {
|
||||
if (request.metadataSnapshot().asMap().containsKey(md.name())) {
|
||||
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote " +
|
||||
"[{}], local [{}]",
|
||||
indexName, shardId, request.targetNode(), md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
|
||||
logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
|
||||
md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
|
||||
} else {
|
||||
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exist in remote",
|
||||
indexName, shardId, request.targetNode(), md.name());
|
||||
logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name());
|
||||
}
|
||||
response.phase1FileNames.add(md.name());
|
||||
response.phase1FileSizes.add(md.length());
|
||||
|
@ -319,9 +315,8 @@ public class RecoverySourceHandler {
|
|||
response.phase1TotalSize = totalSize;
|
||||
response.phase1ExistingTotalSize = existingTotalSize;
|
||||
|
||||
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with " +
|
||||
"total_size [{}]",
|
||||
indexName, shardId, request.targetNode(), response.phase1FileNames.size(),
|
||||
logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
|
||||
response.phase1FileNames.size(),
|
||||
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
|
||||
cancellableThreads.execute(() ->
|
||||
recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames,
|
||||
|
@ -357,10 +352,10 @@ public class RecoverySourceHandler {
|
|||
});
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
cancellableThreads.checkForCancel();
|
||||
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
|
||||
logger.debug("checking integrity for file {} after remove corruption exception", md);
|
||||
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
||||
shard.failShard("recovery", corruptIndexException);
|
||||
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
|
||||
logger.warn("Corrupted file detected {} checksum mismatch", md);
|
||||
throw corruptIndexException;
|
||||
}
|
||||
}
|
||||
|
@ -385,7 +380,7 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
}
|
||||
|
||||
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
|
||||
logger.trace("recovery [phase1]: took [{}]", stopWatch.totalTime());
|
||||
response.phase1Time = stopWatch.totalTime().millis();
|
||||
} catch (Exception e) {
|
||||
throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
|
||||
|
@ -396,7 +391,7 @@ public class RecoverySourceHandler {
|
|||
|
||||
void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode());
|
||||
logger.trace("recovery [phase1]: prepare remote engine for translog");
|
||||
final long startEngineStart = stopWatch.totalTime().millis();
|
||||
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
|
||||
// garbage collection (not the JVM's GC!) of tombstone deletes.
|
||||
|
@ -404,8 +399,7 @@ public class RecoverySourceHandler {
|
|||
stopWatch.stop();
|
||||
|
||||
response.startTime = stopWatch.totalTime().millis() - startEngineStart;
|
||||
logger.trace("{} recovery [phase1] to {}: remote engine start took [{}]",
|
||||
request.shardId(), request.targetNode(), stopWatch.totalTime());
|
||||
logger.trace("recovery [phase1]: remote engine start took [{}]", stopWatch.totalTime());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -415,9 +409,11 @@ public class RecoverySourceHandler {
|
|||
* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
|
||||
* shard.
|
||||
*
|
||||
* @param snapshot a snapshot of the translog
|
||||
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if all
|
||||
* ops should be sent
|
||||
* @param snapshot a snapshot of the translog
|
||||
*/
|
||||
void phase2(final Translog.Snapshot snapshot) throws IOException {
|
||||
void phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
|
||||
if (shard.state() == IndexShardState.CLOSED) {
|
||||
throw new IndexShardClosedException(request.shardId());
|
||||
}
|
||||
|
@ -425,13 +421,13 @@ public class RecoverySourceHandler {
|
|||
|
||||
final StopWatch stopWatch = new StopWatch().start();
|
||||
|
||||
logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode());
|
||||
logger.trace("recovery [phase2]: sending transaction log operations");
|
||||
|
||||
// send all the snapshot's translog operations to the target
|
||||
final int totalOperations = sendSnapshot(request.startingSeqNo(), snapshot);
|
||||
final int totalOperations = sendSnapshot(startingSeqNo, snapshot);
|
||||
|
||||
stopWatch.stop();
|
||||
logger.trace("{} recovery [phase2] to {}: took [{}]", request.shardId(), request.targetNode(), stopWatch.totalTime());
|
||||
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
|
||||
response.phase2Time = stopWatch.totalTime().millis();
|
||||
response.phase2Operations = totalOperations;
|
||||
}
|
||||
|
@ -445,7 +441,7 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
cancellableThreads.checkForCancel();
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
|
||||
logger.trace("finalizing recovery");
|
||||
cancellableThreads.execute(() -> {
|
||||
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId());
|
||||
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
|
||||
|
@ -458,11 +454,10 @@ public class RecoverySourceHandler {
|
|||
// state. This means that no new recovery can be completed based on information of a newer cluster state than the current one.
|
||||
try (Releasable ignored = delayNewRecoveries.apply("primary relocation hand-off in progress or completed for " + shardId)) {
|
||||
final long currentClusterStateVersion = currentClusterStateVersionSupplier.get();
|
||||
logger.trace("[{}][{}] waiting on {} to have cluster state with version [{}]", indexName, shardId, request.targetNode(),
|
||||
currentClusterStateVersion);
|
||||
logger.trace("waiting on remote node to have cluster state with version [{}]", currentClusterStateVersion);
|
||||
cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion));
|
||||
|
||||
logger.trace("[{}][{}] performing relocation hand-off to {}", indexName, shardId, request.targetNode());
|
||||
logger.trace("performing relocation hand-off");
|
||||
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode()));
|
||||
}
|
||||
/*
|
||||
|
@ -471,8 +466,7 @@ public class RecoverySourceHandler {
|
|||
*/
|
||||
}
|
||||
stopWatch.stop();
|
||||
logger.trace("[{}][{}] finalizing recovery to {}: took [{}]",
|
||||
indexName, shardId, request.targetNode(), stopWatch.totalTime());
|
||||
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -493,7 +487,7 @@ public class RecoverySourceHandler {
|
|||
final List<Translog.Operation> operations = new ArrayList<>();
|
||||
|
||||
if (snapshot.totalOperations() == 0) {
|
||||
logger.trace("[{}][{}] no translog operations to send to {}", indexName, shardId, request.targetNode());
|
||||
logger.trace("no translog operations to send");
|
||||
}
|
||||
|
||||
// send operations in batches
|
||||
|
@ -514,13 +508,8 @@ public class RecoverySourceHandler {
|
|||
if (size >= chunkSizeInBytes) {
|
||||
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}",
|
||||
indexName,
|
||||
shardId,
|
||||
ops,
|
||||
new ByteSizeValue(size),
|
||||
snapshot.totalOperations(),
|
||||
request.targetNode());
|
||||
logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size),
|
||||
snapshot.totalOperations());
|
||||
}
|
||||
ops = 0;
|
||||
size = 0;
|
||||
|
@ -534,13 +523,8 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}][{}] sent final batch of [{}][{}] (total: [{}]) translog operations to {}",
|
||||
indexName,
|
||||
shardId,
|
||||
ops,
|
||||
new ByteSizeValue(size),
|
||||
snapshot.totalOperations(),
|
||||
request.targetNode());
|
||||
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size),
|
||||
snapshot.totalOperations());
|
||||
}
|
||||
|
||||
return totalOperations;
|
||||
|
|
|
@ -19,9 +19,8 @@
|
|||
|
||||
package org.elasticsearch.indices.recovery;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
||||
|
@ -39,8 +38,8 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
|
|||
|
||||
SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request,
|
||||
Supplier<Long> currentClusterStateVersionSupplier,
|
||||
Function<String, Releasable> delayNewRecoveries, Logger logger) {
|
||||
super(shard, recoveryTarget, request, currentClusterStateVersionSupplier, delayNewRecoveries, -1, logger);
|
||||
Function<String, Releasable> delayNewRecoveries, Settings nodeSettings) {
|
||||
super(shard, recoveryTarget, request, currentClusterStateVersionSupplier, delayNewRecoveries, -1, nodeSettings);
|
||||
this.shard = shard;
|
||||
this.request = request;
|
||||
}
|
||||
|
@ -49,7 +48,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
|
|||
public RecoveryResponse recoverToTarget() throws IOException {
|
||||
boolean engineClosed = false;
|
||||
try {
|
||||
logger.trace("{} recovery [phase1] to {}: skipping phase1 for shared filesystem", request.shardId(), request.targetNode());
|
||||
logger.trace("recovery [phase1]: skipping phase1 for shared filesystem");
|
||||
final long maxUnsafeAutoIdTimestamp = shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp();
|
||||
if (request.isPrimaryRelocation()) {
|
||||
logger.debug("[phase1] closing engine on primary for shared filesystem recovery");
|
||||
|
@ -84,7 +83,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
|
|||
|
||||
@Override
|
||||
protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) {
|
||||
logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}", shard.shardId(), request.targetNode());
|
||||
logger.trace("skipping recovery of translog snapshot on shared filesystem");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.elasticsearch.discovery.zen.UnicastZenPing;
|
|||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
|
||||
import org.elasticsearch.monitor.jvm.HotThreads;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -492,6 +493,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
||||
.put(IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL.getKey(), randomBoolean() ? "5s" : "200ms")
|
||||
));
|
||||
ensureGreen();
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.index.replication;
|
||||
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
|
@ -101,9 +100,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
}
|
||||
|
||||
protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard> {
|
||||
private final IndexShard primary;
|
||||
private IndexShard primary;
|
||||
private IndexMetaData indexMetaData;
|
||||
private final List<IndexShard> replicas;
|
||||
private final IndexMetaData indexMetaData;
|
||||
private final AtomicInteger replicaId = new AtomicInteger();
|
||||
private final AtomicInteger docId = new AtomicInteger();
|
||||
boolean closed = false;
|
||||
|
@ -174,13 +173,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
updateAllocationIDsOnPrimary();
|
||||
}
|
||||
|
||||
private final Runnable replicaGlobalCheckpointSyncer = () -> {
|
||||
throw new AssertionError("replicas can not sync global checkpoint");
|
||||
};
|
||||
|
||||
public synchronized IndexShard addReplica() throws IOException {
|
||||
final IndexShard replica =
|
||||
newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, replicaGlobalCheckpointSyncer, null);
|
||||
newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, this::syncGlobalCheckpoint, null);
|
||||
replicas.add(replica);
|
||||
updateAllocationIDsOnPrimary();
|
||||
return replica;
|
||||
|
@ -193,7 +188,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
false, ShardRoutingState.INITIALIZING,
|
||||
RecoverySource.PeerRecoverySource.INSTANCE);
|
||||
|
||||
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, replicaGlobalCheckpointSyncer);
|
||||
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, this::syncGlobalCheckpoint);
|
||||
replicas.add(newReplica);
|
||||
updateAllocationIDsOnPrimary();
|
||||
return newReplica;
|
||||
|
@ -203,6 +198,24 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
return Collections.unmodifiableList(replicas);
|
||||
}
|
||||
|
||||
/**
|
||||
* promotes the specific replica as the new primary
|
||||
*/
|
||||
public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOException {
|
||||
final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1;
|
||||
IndexMetaData.Builder newMetaData =
|
||||
IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm);
|
||||
indexMetaData = newMetaData.build();
|
||||
for (IndexShard shard: replicas) {
|
||||
shard.updatePrimaryTerm(newTerm);
|
||||
}
|
||||
boolean found = replicas.remove(replica);
|
||||
assert found;
|
||||
primary = replica;
|
||||
replica.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
|
||||
updateAllocationIDsOnPrimary();
|
||||
}
|
||||
|
||||
synchronized boolean removeReplica(IndexShard replica) {
|
||||
final boolean removed = replicas.remove(replica);
|
||||
if (removed) {
|
||||
|
@ -462,35 +475,50 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
|
||||
@Override
|
||||
protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception {
|
||||
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary,
|
||||
null);
|
||||
if (indexResult.hasFailure() == false) {
|
||||
// update the version on request so it will happen on the replicas
|
||||
final long version = indexResult.getVersion();
|
||||
request.version(version);
|
||||
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
|
||||
request.setSeqNo(indexResult.getSeqNo());
|
||||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
}
|
||||
request.primaryTerm(primary.getPrimaryTerm());
|
||||
TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger);
|
||||
IndexResponse response = new IndexResponse(
|
||||
primary.shardId(),
|
||||
request.type(),
|
||||
request.id(),
|
||||
indexResult.getSeqNo(),
|
||||
indexResult.getVersion(),
|
||||
indexResult.isCreated());
|
||||
IndexResponse response = indexOnPrimary(request, primary);
|
||||
return new PrimaryResult(request, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void performOnReplica(IndexRequest request, IndexShard replica) throws IOException {
|
||||
final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica);
|
||||
TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger);
|
||||
indexOnReplica(request, replica);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* indexes the given requests on the supplied primary, modifying it for replicas
|
||||
*/
|
||||
protected IndexResponse indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
|
||||
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary,
|
||||
null);
|
||||
if (indexResult.hasFailure() == false) {
|
||||
// update the version on request so it will happen on the replicas
|
||||
final long version = indexResult.getVersion();
|
||||
request.version(version);
|
||||
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
|
||||
request.setSeqNo(indexResult.getSeqNo());
|
||||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
}
|
||||
request.primaryTerm(primary.getPrimaryTerm());
|
||||
TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger);
|
||||
return new IndexResponse(
|
||||
primary.shardId(),
|
||||
request.type(),
|
||||
request.id(),
|
||||
indexResult.getSeqNo(),
|
||||
indexResult.getVersion(),
|
||||
indexResult.isCreated());
|
||||
}
|
||||
|
||||
/**
|
||||
* indexes the given requests on the supplied replica shard
|
||||
*/
|
||||
protected void indexOnReplica(IndexRequest request, IndexShard replica) throws IOException {
|
||||
final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica);
|
||||
TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger);
|
||||
}
|
||||
|
||||
|
||||
class GlobalCheckpointSync extends
|
||||
ReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest, GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.replication;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
|
@ -29,6 +30,7 @@ import org.elasticsearch.index.translog.Translog;
|
|||
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
@ -128,6 +130,67 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
}
|
||||
}
|
||||
|
||||
@TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE")
|
||||
public void testRecoveryAfterPrimaryPromotion() throws Exception {
|
||||
try (final ReplicationGroup shards = createGroup(2)) {
|
||||
shards.startAll();
|
||||
int totalDocs = shards.indexDocs(randomInt(10));
|
||||
int committedDocs = 0;
|
||||
if (randomBoolean()) {
|
||||
shards.flush();
|
||||
committedDocs = totalDocs;
|
||||
}
|
||||
// we need some indexing to happen to transfer local checkpoint information to the primary
|
||||
// so it can update the global checkpoint and communicate to replicas
|
||||
boolean expectSeqNoRecovery = totalDocs > 0;
|
||||
|
||||
|
||||
final IndexShard oldPrimary = shards.getPrimary();
|
||||
final IndexShard newPrimary = shards.getReplicas().get(0);
|
||||
final IndexShard replica = shards.getReplicas().get(1);
|
||||
if (randomBoolean()) {
|
||||
// simulate docs that were inflight when primary failed, these will be rolled back
|
||||
final int rollbackDocs = randomIntBetween(1, 5);
|
||||
logger.info("--> indexing {} rollback docs", rollbackDocs);
|
||||
for (int i = 0; i < rollbackDocs; i++) {
|
||||
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "rollback_" + i).source("{}");
|
||||
indexOnPrimary(indexRequest, oldPrimary);
|
||||
indexOnReplica(indexRequest, replica);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
oldPrimary.flush(new FlushRequest(index.getName()));
|
||||
expectSeqNoRecovery = false;
|
||||
}
|
||||
}
|
||||
|
||||
shards.promoteReplicaToPrimary(newPrimary);
|
||||
// index some more
|
||||
totalDocs += shards.indexDocs(randomIntBetween(0, 5));
|
||||
|
||||
oldPrimary.close("demoted", false);
|
||||
oldPrimary.store().close();
|
||||
|
||||
IndexShard newReplica = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId());
|
||||
shards.recoverReplica(newReplica);
|
||||
|
||||
if (expectSeqNoRecovery) {
|
||||
assertThat(newReplica.recoveryState().getIndex().fileDetails(), empty());
|
||||
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
|
||||
} else {
|
||||
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
|
||||
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
|
||||
}
|
||||
|
||||
shards.removeReplica(replica);
|
||||
replica.close("resync", false);
|
||||
replica.store().close();
|
||||
newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
|
||||
shards.recoverReplica(newReplica);
|
||||
|
||||
shards.assertAllEqual(totalDocs);
|
||||
}
|
||||
}
|
||||
|
||||
private static class BlockingTarget extends RecoveryTarget {
|
||||
|
||||
private final CountDownLatch recoveryBlocked;
|
||||
|
|
|
@ -125,7 +125,8 @@ public class EvilPeerRecoveryIT extends ESIntegTestCase {
|
|||
* Sequence-number-based recovery on this replica has to wait until these in-flight operations complete to proceed. We verify at the end
|
||||
* of recovery that a file-based recovery was not completed, and that the expected number of operations was replayed via the translog.
|
||||
*/
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE,discovery:TRACE," +
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE," +
|
||||
"org.elasticsearch.discovery:TRACE," +
|
||||
"org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," +
|
||||
"org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE")
|
||||
@AwaitsFix(bugUrl =
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.recovery;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
|
||||
|
||||
public void testGetStartingSeqNo() throws Exception {
|
||||
IndexShard replica = newShard(false);
|
||||
RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
|
||||
try {
|
||||
recoveryEmptyReplica(replica);
|
||||
int docs = randomIntBetween(1, 10);
|
||||
final String index = replica.shardId().getIndexName();
|
||||
long seqNo = 0;
|
||||
for (int i = 0; i < docs; i++) {
|
||||
Engine.Index indexOp = replica.prepareIndexOnReplica(
|
||||
SourceToParse.source(SourceToParse.Origin.REPLICA, index, "type", "doc_" + i, new BytesArray("{}")),
|
||||
seqNo++, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
|
||||
replica.index(indexOp);
|
||||
if (rarely()) {
|
||||
// insert a gap
|
||||
seqNo++;
|
||||
}
|
||||
}
|
||||
|
||||
final long maxSeqNo = replica.seqNoStats().getMaxSeqNo();
|
||||
final long localCheckpoint = replica.getLocalCheckpoint();
|
||||
|
||||
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
|
||||
|
||||
replica.updateGlobalCheckpointOnReplica(maxSeqNo - 1);
|
||||
replica.getTranslog().sync();
|
||||
|
||||
// commit is enough, global checkpoint is below max *committed* which is NO_OPS_PERFORMED
|
||||
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
|
||||
|
||||
replica.flush(new FlushRequest());
|
||||
|
||||
// commit is still not good enough, global checkpoint is below max
|
||||
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
|
||||
|
||||
replica.updateGlobalCheckpointOnReplica(maxSeqNo);
|
||||
replica.getTranslog().sync();
|
||||
// commit is enough, global checkpoint is below max
|
||||
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(localCheckpoint + 1));
|
||||
} finally {
|
||||
closeShards(replica);
|
||||
recoveryTarget.decRef();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -94,7 +94,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
|
||||
Store store = newStore(createTempDir());
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
|
||||
recoverySettings.getChunkSize().bytesAsInt(), logger);
|
||||
recoverySettings.getChunkSize().bytesAsInt(), Settings.EMPTY);
|
||||
Directory dir = store.directory();
|
||||
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
|
||||
int numDocs = randomIntBetween(10, 100);
|
||||
|
@ -153,7 +153,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
Store store = newStore(tempDir, false);
|
||||
AtomicBoolean failedEngine = new AtomicBoolean(false);
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
|
||||
recoverySettings.getChunkSize().bytesAsInt(), logger) {
|
||||
recoverySettings.getChunkSize().bytesAsInt(), Settings.EMPTY) {
|
||||
@Override
|
||||
protected void failEngine(IOException cause) {
|
||||
assertFalse(failedEngine.get());
|
||||
|
@ -222,7 +222,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
Store store = newStore(tempDir, false);
|
||||
AtomicBoolean failedEngine = new AtomicBoolean(false);
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
|
||||
recoverySettings.getChunkSize().bytesAsInt(), logger) {
|
||||
recoverySettings.getChunkSize().bytesAsInt(), Settings.EMPTY) {
|
||||
@Override
|
||||
protected void failEngine(IOException cause) {
|
||||
assertFalse(failedEngine.get());
|
||||
|
@ -298,10 +298,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
mock(RecoveryTargetHandler.class),
|
||||
request,
|
||||
() -> 0L,
|
||||
e -> () -> {
|
||||
},
|
||||
e -> () -> {},
|
||||
recoverySettings.getChunkSize().bytesAsInt(),
|
||||
logger) {
|
||||
Settings.EMPTY) {
|
||||
|
||||
@Override
|
||||
boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) {
|
||||
|
@ -319,7 +318,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void phase2(Translog.Snapshot snapshot) {
|
||||
void phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException {
|
||||
phase2Called.set(true);
|
||||
}
|
||||
|
||||
|
@ -391,7 +390,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
currentClusterStateVersionSupplier,
|
||||
delayNewRecoveries,
|
||||
recoverySettings.getChunkSize().bytesAsInt(),
|
||||
logger) {
|
||||
Settings.EMPTY) {
|
||||
|
||||
@Override
|
||||
boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) {
|
||||
|
@ -409,7 +408,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void phase2(final Translog.Snapshot snapshot) {
|
||||
void phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException {
|
||||
phase2Called.set(true);
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.lucene.index.LeafReaderContext;
|
|||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
|
@ -41,7 +40,6 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
|
@ -50,14 +48,12 @@ import org.elasticsearch.index.VersionType;
|
|||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.cache.query.DisabledQueryCache;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
|
@ -70,6 +66,7 @@ import org.elasticsearch.indices.recovery.RecoverySourceHandler;
|
|||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -78,16 +75,14 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
/**
|
||||
* A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily,
|
||||
|
@ -373,7 +368,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
private DiscoveryNode getFakeDiscoNode(String id) {
|
||||
return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
|
||||
Version.CURRENT);
|
||||
}
|
||||
|
||||
/** recovers a replica from the given primary **/
|
||||
|
@ -423,7 +419,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
() -> 0L,
|
||||
e -> () -> {},
|
||||
(int) ByteSizeUnit.MB.toBytes(1),
|
||||
logger);
|
||||
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), pNode.getName()).build());
|
||||
recovery.recoverToTarget();
|
||||
recoveryTarget.markAsDone();
|
||||
replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry()));
|
||||
|
|
Loading…
Reference in New Issue