From b609162be3679419074d815f92e22eca5a670c77 Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 20 Aug 2010 15:58:24 +0300 Subject: [PATCH] don't ignore recovery on throttling unless the shard is closed --- .../common/lucene/Directories.java | 4 ++- .../gateway/IndexShardGatewayService.java | 29 ++++++++++--------- .../shard/service/InternalIndexShard.java | 3 ++ 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/Directories.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/Directories.java index 4d085c86596..a1e3459a273 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/Directories.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/Directories.java @@ -43,7 +43,7 @@ public class Directories { /** * Deletes all the files from a directory. * - * @param directory The directoy to delete all the files from + * @param directory The directory to delete all the files from * @throws IOException if an exception occurs during the delete process */ public static void deleteFiles(Directory directory) throws IOException { @@ -52,6 +52,8 @@ public class Directories { for (String file : files) { try { directory.deleteFile(file); + } catch (FileNotFoundException e) { + // ignore } catch (IOException e) { lastException = e; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 9971032ff73..8efbfc843bf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -38,7 +38,6 @@ import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.unit.TimeValue.*; @@ -68,8 +67,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem private volatile long lastTranslogLength; - private final AtomicBoolean recovered = new AtomicBoolean(); - private final TimeValue snapshotInterval; private volatile ScheduledFuture snapshotScheduleFuture; @@ -127,10 +124,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem * Recovers the state of the shard from the gateway. */ public void recover(final RecoveryListener listener) throws IndexShardGatewayRecoveryException, IgnoreGatewayRecoveryException { - if (!recovered.compareAndSet(false, true)) { - listener.onIgnoreRecovery("already recovered"); - return; - } if (indexShard.state() == IndexShardState.CLOSED) { // got closed on us, just ignore this recovery listener.onIgnoreRecovery("shard closed"); @@ -140,18 +133,23 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Trying to recover when the shard is in backup state", null)); return; } + try { + indexShard.recovering(); + } catch (IllegalIndexShardStateException e) { + // that's fine, since we might be called concurrently, just ignore this, we are already recovering + listener.onIgnoreRecovery("already in recovering process, " + e.getMessage()); + return; + } threadPool.cached().execute(new Runnable() { @Override public void run() { - indexShard.recovering(); - recoveryStatus = new RecoveryStatus(); recoveryStatus.updateStage(RecoveryStatus.Stage.INIT); // we know we are on a thread, we can spin till we can engage in recovery while (!recoveryThrottler.tryRecovery(shardId, "gateway")) { - if (indexShard.ignoreRecoveryAttempt()) { - listener.onIgnoreRecovery("ignoring recovery while waiting on retry"); + if (indexShard.state() == IndexShardState.CLOSED) { + listener.onIgnoreRecovery("ignoring recovery while waiting on retry, closed"); return; } recoveryStatus.updateStage(RecoveryStatus.Stage.RETRY); @@ -159,8 +157,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem Thread.sleep(recoveryThrottler.throttleInterval().millis()); recoveryStatus.retryTime(System.currentTimeMillis() - recoveryStatus.startTime()); } catch (InterruptedException e) { - if (indexShard.ignoreRecoveryAttempt()) { - listener.onIgnoreRecovery("Interrupted while waiting for recovery, but we should ignore ..."); + recoveryStatus = null; + if (indexShard.state() == IndexShardState.CLOSED) { + listener.onIgnoreRecovery("Interrupted while waiting for recovery, but we should ignore since closed"); } else { listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e)); } @@ -233,6 +232,10 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem // shard has just been created, ignore it and return return; } + if (indexShard.state() == IndexShardState.RECOVERING) { + // shard is recovering, don't snapshot + return; + } try { SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler() { @Override public SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 93d8a7e30fb..75b4d3b6684 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -129,6 +129,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I return this; } + /** + * Marks the shard as recovering, fails with exception is recovering is not allowed to be set. + */ public IndexShardState recovering() throws IndexShardStartedException, IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException { synchronized (mutex) {