Fix primary relocation for shadow replicas (#22474)
The recovery process started during primary relocation of shadow replicas accesses the engine on the source shard after it's been closed, which results in the source shard failing itself.
This commit is contained in:
parent
b8934945b4
commit
8741691511
|
@ -329,7 +329,7 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
}
|
||||
|
||||
prepareTargetForTranslog(translogView.totalOperations());
|
||||
prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
|
||||
|
||||
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
|
||||
response.phase1Time = stopWatch.totalTime().millis();
|
||||
|
@ -341,15 +341,14 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
|
||||
|
||||
protected void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
|
||||
protected void prepareTargetForTranslog(final int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode());
|
||||
final long startEngineStart = stopWatch.totalTime().millis();
|
||||
// Send a request preparing the new shard's translog to receive
|
||||
// operations. This ensures the shard engine is started and disables
|
||||
// garbage collection (not the JVM's GC!) of tombstone deletes
|
||||
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps,
|
||||
shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp()));
|
||||
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp));
|
||||
stopWatch.stop();
|
||||
|
||||
response.startTime = stopWatch.totalTime().millis() - startEngineStart;
|
||||
|
|
|
@ -50,6 +50,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
|
|||
boolean engineClosed = false;
|
||||
try {
|
||||
logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode());
|
||||
long maxUnsafeAutoIdTimestamp = shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp();
|
||||
if (request.isPrimaryRelocation()) {
|
||||
logger.debug("[phase1] closing engine on primary for shared filesystem recovery");
|
||||
try {
|
||||
|
@ -62,7 +63,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
|
|||
shard.failShard("failed to close engine (phase1)", e);
|
||||
}
|
||||
}
|
||||
prepareTargetForTranslog(0);
|
||||
prepareTargetForTranslog(0, maxUnsafeAutoIdTimestamp);
|
||||
finalizeRecovery();
|
||||
return response;
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -362,6 +362,9 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
|||
client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet();
|
||||
|
||||
ensureGreen(IDX);
|
||||
// check if primary has relocated to node3
|
||||
assertEquals(internalCluster().clusterService(node3).localNode().getId(),
|
||||
client().admin().cluster().prepareState().get().getState().routingTable().index(IDX).shard(0).primaryShard().currentNodeId());
|
||||
logger.info("--> performing query");
|
||||
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
|
||||
assertHitCount(resp, 2);
|
||||
|
|
Loading…
Reference in New Issue