don't ignore recovery on throttling unless the shard is closed

This commit is contained in:
kimchy 2010-08-20 15:58:24 +03:00
parent 4e74001bde
commit b609162be3
3 changed files with 22 additions and 14 deletions

View File

@ -43,7 +43,7 @@ public class Directories {
/**
* Deletes all the files from a directory.
*
* @param directory The directoy to delete all the files from
* @param directory The directory to delete all the files from
* @throws IOException if an exception occurs during the delete process
*/
public static void deleteFiles(Directory directory) throws IOException {
@ -52,6 +52,8 @@ public class Directories {
for (String file : files) {
try {
directory.deleteFile(file);
} catch (FileNotFoundException e) {
// ignore
} catch (IOException e) {
lastException = e;
}

View File

@ -38,7 +38,6 @@ import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.unit.TimeValue.*;
@ -68,8 +67,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private volatile long lastTranslogLength;
private final AtomicBoolean recovered = new AtomicBoolean();
private final TimeValue snapshotInterval;
private volatile ScheduledFuture snapshotScheduleFuture;
@ -127,10 +124,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
* Recovers the state of the shard from the gateway.
*/
public void recover(final RecoveryListener listener) throws IndexShardGatewayRecoveryException, IgnoreGatewayRecoveryException {
if (!recovered.compareAndSet(false, true)) {
listener.onIgnoreRecovery("already recovered");
return;
}
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
@ -140,18 +133,23 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Trying to recover when the shard is in backup state", null));
return;
}
try {
indexShard.recovering();
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery("already in recovering process, " + e.getMessage());
return;
}
threadPool.cached().execute(new Runnable() {
@Override public void run() {
indexShard.recovering();
recoveryStatus = new RecoveryStatus();
recoveryStatus.updateStage(RecoveryStatus.Stage.INIT);
// we know we are on a thread, we can spin till we can engage in recovery
while (!recoveryThrottler.tryRecovery(shardId, "gateway")) {
if (indexShard.ignoreRecoveryAttempt()) {
listener.onIgnoreRecovery("ignoring recovery while waiting on retry");
if (indexShard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("ignoring recovery while waiting on retry, closed");
return;
}
recoveryStatus.updateStage(RecoveryStatus.Stage.RETRY);
@ -159,8 +157,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
Thread.sleep(recoveryThrottler.throttleInterval().millis());
recoveryStatus.retryTime(System.currentTimeMillis() - recoveryStatus.startTime());
} catch (InterruptedException e) {
if (indexShard.ignoreRecoveryAttempt()) {
listener.onIgnoreRecovery("Interrupted while waiting for recovery, but we should ignore ...");
recoveryStatus = null;
if (indexShard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("Interrupted while waiting for recovery, but we should ignore since closed");
} else {
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e));
}
@ -233,6 +232,10 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
// shard has just been created, ignore it and return
return;
}
if (indexShard.state() == IndexShardState.RECOVERING) {
// shard is recovering, don't snapshot
return;
}
try {
SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler<SnapshotStatus>() {
@Override public SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {

View File

@ -129,6 +129,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return this;
}
/**
* Marks the shard as recovering, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState recovering() throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {