diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java index fe549169fcc..dbcf594517a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java @@ -99,37 +99,39 @@ public class RecoveryTarget extends AbstractComponent { }); } - public void startRecovery(final StartRecoveryRequest request, final RecoveryListener listener) { + public void startRecovery(final StartRecoveryRequest request, boolean fromRetry, final RecoveryListener listener) { if (request.sourceNode() == null) { listener.onIgnoreRecovery(false, "No node to recovery from, retry on next cluster state update"); return; } final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); - // mark the shard as recovering - IndexShardState preRecoveryState; - try { - preRecoveryState = shard.recovering(); - } catch (IllegalIndexShardStateException e) { - // that's fine, since we might be called concurrently, just ignore this, we are already recovering - listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage()); + if (!fromRetry) { + try { + shard.recovering(); + } catch (IllegalIndexShardStateException e) { + // that's fine, since we might be called concurrently, just ignore this, we are already recovering + listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage()); + return; + } + } + if (shard.state() == IndexShardState.CLOSED) { + listener.onIgnoreRecovery(false, "shard closed, stop recovery"); return; } - final IndexShardState fPreRecoveryState = preRecoveryState; threadPool.cached().execute(new Runnable() { @Override public void run() { - doRecovery(shard, fPreRecoveryState, request, listener); + doRecovery(shard, request, listener); } }); } - private void doRecovery(final InternalIndexShard shard, final IndexShardState preRecoveryState, final StartRecoveryRequest request, final RecoveryListener listener) { - // we know we are on a thread, we can spin till we can engage in recovery + private void doRecovery(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoveryListener listener) { + if (shard.state() == IndexShardState.CLOSED) { + listener.onIgnoreRecovery(false, "shard closed, stop recovery"); + return; + } + if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) { - if (shard.state() == IndexShardState.CLOSED) { - listener.onIgnoreRecovery(false, "shard closed, stop recovery"); - return; - } - shard.restoreRecoveryState(preRecoveryState); listener.onRetryRecovery(recoveryThrottler.throttleInterval()); return; } @@ -151,7 +153,6 @@ public class RecoveryTarget extends AbstractComponent { } logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval()); removeAndCleanOnGoingRecovery(request.shardId()); - shard.restoreRecoveryState(preRecoveryState); listener.onRetryRecovery(recoveryThrottler.throttleInterval()); return; } @@ -193,11 +194,6 @@ public class RecoveryTarget extends AbstractComponent { } if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) { - try { - shard.restoreRecoveryState(preRecoveryState); - } catch (IndexShardNotRecoveringException e1) { - // ignore this, we might be closing... - } listener.onRetryRecovery(recoveryThrottler.throttleInterval()); return; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 7c72725f0dc..88347d6aab4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -148,17 +148,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } } - public InternalIndexShard restoreRecoveryState(IndexShardState stateToRestore) { - synchronized (mutex) { - if (this.state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } - logger.debug("state: [{}]->[{}], restored after recovery", state, stateToRestore); - this.state = stateToRestore; - } - return this; - } - public InternalIndexShard relocated() throws IndexShardNotStartedException { synchronized (mutex) { if (state != IndexShardState.STARTED) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index a88d4a9057c..3cb1f2c617f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -324,7 +324,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent