From f51e2cf90540b08e38d77f07ccec773041598736 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 28 Feb 2010 11:15:35 +0200 Subject: [PATCH] stop the recovery process by interrupting network threads when closing a shard --- .../index/shard/recovery/RecoveryAction.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java index 1c0d6517d48..7196c44ca8d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java @@ -23,10 +23,12 @@ import com.google.inject.Inject; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchInterruptedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.node.Node; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.service.IndexShard; @@ -59,7 +61,7 @@ import static java.util.concurrent.TimeUnit.*; import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class RecoveryAction extends AbstractIndexShardComponent { @@ -117,11 +119,6 @@ public class RecoveryAction extends AbstractIndexShardComponent { cleanOpenIndex(); - if (true) { - // disable the interruptions for now - return; - } - // interrupt the startRecovery thread if its performing recovery if (sendStartRecoveryThread != null) { sendStartRecoveryThread.interrupt(); @@ -191,6 +188,13 @@ public class RecoveryAction extends AbstractIndexShardComponent { // the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering indexShard.restoreRecoveryState(preRecoveringState); throw new IgnoreRecoveryException("Ignoring recovery attempt, remote shard not started", e); + } else if (cause instanceof RecoveryEngineException) { + // it might be wrapped + if (cause.getCause() instanceof IgnoreRecoveryException) { + throw (IgnoreRecoveryException) cause.getCause(); + } + } else if (cause instanceof IgnoreRecoveryException) { + throw (IgnoreRecoveryException) cause; } throw new RecoveryFailedException(shardId, node, targetNode, e); } catch (Exception e) { @@ -315,6 +319,9 @@ public class RecoveryAction extends AbstractIndexShardComponent { stopWatch.stop(); logger.trace("Recovery [phase1] to {}: took [{}]", node, stopWatch.totalTime()); recoveryStatus.phase1Time = stopWatch.totalTime().millis(); + } catch (ElasticSearchInterruptedException e) { + // we got interrupted since we are closing, ignore the recovery + throw new IgnoreRecoveryException("Interrupted while recovering files"); } catch (Throwable e) { throw new RecoverFilesRecoveryException(shardId, snapshot.getFiles().length, new SizeValue(totalSize), e); } finally { @@ -335,6 +342,9 @@ public class RecoveryAction extends AbstractIndexShardComponent { logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime()); recoveryStatus.phase2Time = stopWatch.totalTime().millis(); recoveryStatus.phase2Operations = snapshot.size(); + } catch (ElasticSearchInterruptedException e) { + // we got interrupted since we are closing, ignore the recovery + throw new IgnoreRecoveryException("Interrupted in phase 2 files"); } finally { sendSnapshotRecoveryThread = null; } @@ -363,6 +373,9 @@ public class RecoveryAction extends AbstractIndexShardComponent { logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime()); recoveryStatus.phase3Time = stopWatch.totalTime().millis(); recoveryStatus.phase3Operations = snapshot.size(); + } catch (ElasticSearchInterruptedException e) { + // we got interrupted since we are closing, ignore the recovery + throw new IgnoreRecoveryException("Interrupted in phase 2 files"); } finally { sendSnapshotRecoveryThread = null; }