From 8741691511a93429170ce830d14df3f67395c09f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Sun, 8 Jan 2017 12:18:52 +0100 Subject: [PATCH] Fix primary relocation for shadow replicas (#22474) The recovery process started during primary relocation of shadow replicas accesses the engine on the source shard after it's been closed, which results in the source shard failing itself. --- .../indices/recovery/RecoverySourceHandler.java | 7 +++---- .../indices/recovery/SharedFSRecoverySourceHandler.java | 3 ++- .../org/elasticsearch/index/IndexWithShadowReplicasIT.java | 3 +++ 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index be055531813..3ed9282be59 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -329,7 +329,7 @@ public class RecoverySourceHandler { } } - prepareTargetForTranslog(translogView.totalOperations()); + prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp()); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime()); response.phase1Time = stopWatch.totalTime().millis(); @@ -341,15 +341,14 @@ public class RecoverySourceHandler { } - protected void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { + protected void prepareTargetForTranslog(final int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { StopWatch stopWatch = new StopWatch().start(); logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode()); final long startEngineStart = stopWatch.totalTime().millis(); // Send a request preparing the new shard's translog to receive // operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes - cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, - shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp())); + cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp)); stopWatch.stop(); response.startTime = stopWatch.totalTime().millis() - startEngineStart; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index 591176f047a..509dd996d19 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -50,6 +50,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { boolean engineClosed = false; try { logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode()); + long maxUnsafeAutoIdTimestamp = shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp(); if (request.isPrimaryRelocation()) { logger.debug("[phase1] closing engine on primary for shared filesystem recovery"); try { @@ -62,7 +63,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { shard.failShard("failed to close engine (phase1)", e); } } - prepareTargetForTranslog(0); + prepareTargetForTranslog(0, maxUnsafeAutoIdTimestamp); finalizeRecovery(); return response; } catch (Exception e) { diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java index e0f5d2a4377..28e6dd82fee 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java +++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java @@ -362,6 +362,9 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet(); ensureGreen(IDX); + // check if primary has relocated to node3 + assertEquals(internalCluster().clusterService(node3).localNode().getId(), + client().admin().cluster().prepareState().get().getState().routingTable().index(IDX).shard(0).primaryShard().currentNodeId()); logger.info("--> performing query"); SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get(); assertHitCount(resp, 2);