stop the recovery process by interrupting network threads when closing a shard

This commit is contained in:
kimchy 2010-02-28 11:15:35 +02:00
parent 372bdec45f
commit f51e2cf905
1 changed files with 19 additions and 6 deletions

View File

@ -23,10 +23,12 @@ import com.google.inject.Inject;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.Node; import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
@ -59,7 +61,7 @@ import static java.util.concurrent.TimeUnit.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class RecoveryAction extends AbstractIndexShardComponent { public class RecoveryAction extends AbstractIndexShardComponent {
@ -117,11 +119,6 @@ public class RecoveryAction extends AbstractIndexShardComponent {
cleanOpenIndex(); cleanOpenIndex();
if (true) {
// disable the interruptions for now
return;
}
// interrupt the startRecovery thread if its performing recovery // interrupt the startRecovery thread if its performing recovery
if (sendStartRecoveryThread != null) { if (sendStartRecoveryThread != null) {
sendStartRecoveryThread.interrupt(); sendStartRecoveryThread.interrupt();
@ -191,6 +188,13 @@ public class RecoveryAction extends AbstractIndexShardComponent {
// the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering // the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering
indexShard.restoreRecoveryState(preRecoveringState); indexShard.restoreRecoveryState(preRecoveringState);
throw new IgnoreRecoveryException("Ignoring recovery attempt, remote shard not started", e); throw new IgnoreRecoveryException("Ignoring recovery attempt, remote shard not started", e);
} else if (cause instanceof RecoveryEngineException) {
// it might be wrapped
if (cause.getCause() instanceof IgnoreRecoveryException) {
throw (IgnoreRecoveryException) cause.getCause();
}
} else if (cause instanceof IgnoreRecoveryException) {
throw (IgnoreRecoveryException) cause;
} }
throw new RecoveryFailedException(shardId, node, targetNode, e); throw new RecoveryFailedException(shardId, node, targetNode, e);
} catch (Exception e) { } catch (Exception e) {
@ -315,6 +319,9 @@ public class RecoveryAction extends AbstractIndexShardComponent {
stopWatch.stop(); stopWatch.stop();
logger.trace("Recovery [phase1] to {}: took [{}]", node, stopWatch.totalTime()); logger.trace("Recovery [phase1] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase1Time = stopWatch.totalTime().millis(); recoveryStatus.phase1Time = stopWatch.totalTime().millis();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted while recovering files");
} catch (Throwable e) { } catch (Throwable e) {
throw new RecoverFilesRecoveryException(shardId, snapshot.getFiles().length, new SizeValue(totalSize), e); throw new RecoverFilesRecoveryException(shardId, snapshot.getFiles().length, new SizeValue(totalSize), e);
} finally { } finally {
@ -335,6 +342,9 @@ public class RecoveryAction extends AbstractIndexShardComponent {
logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime()); logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase2Time = stopWatch.totalTime().millis(); recoveryStatus.phase2Time = stopWatch.totalTime().millis();
recoveryStatus.phase2Operations = snapshot.size(); recoveryStatus.phase2Operations = snapshot.size();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
} finally { } finally {
sendSnapshotRecoveryThread = null; sendSnapshotRecoveryThread = null;
} }
@ -363,6 +373,9 @@ public class RecoveryAction extends AbstractIndexShardComponent {
logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime()); logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase3Time = stopWatch.totalTime().millis(); recoveryStatus.phase3Time = stopWatch.totalTime().millis();
recoveryStatus.phase3Operations = snapshot.size(); recoveryStatus.phase3Operations = snapshot.size();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
} finally { } finally {
sendSnapshotRecoveryThread = null; sendSnapshotRecoveryThread = null;
} }