diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java index b998905a147..f6060707913 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java @@ -46,6 +46,10 @@ public abstract class AbstractFsBlobContainer extends AbstractBlobContainer { this.path = path; } + public File filePath() { + return this.path; + } + public ImmutableMap listBlobs() throws IOException { File[] files = path.listFiles(); if (files == null || files.length == 0) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index 215f077c564..37b787f2e9f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -64,6 +64,17 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo */ boolean requiresSnapshotScheduling(); + SnapshotLock obtainSnapshotLock() throws Exception; + + public static interface SnapshotLock { + void release(); + } + + public static final SnapshotLock NO_SNAPSHOT_LOCK = new SnapshotLock() { + @Override public void release() { + } + }; + public static class Snapshot { private final SnapshotIndexCommit indexCommit; private final Translog.Snapshot translogSnapshot; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 1d60d5566e7..f188e6144f4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -32,7 +32,6 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; -import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; @@ -53,8 +52,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem private final IndexShardGateway shardGateway; - private final Store store; - private volatile long lastIndexVersion; @@ -70,14 +67,14 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem private RecoveryStatus recoveryStatus; + private IndexShardGateway.SnapshotLock snapshotLock; + @Inject public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings, - ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway, - Store store) { + ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway) { super(shardId, indexSettings); this.threadPool = threadPool; this.indexShard = (InternalIndexShard) indexShard; this.shardGateway = shardGateway; - this.store = store; this.snapshotOnClose = componentSettings.getAsBoolean("snapshot_on_close", true); this.snapshotInterval = componentSettings.getAsTime("snapshot_interval", TimeValue.timeValueSeconds(10)); @@ -220,6 +217,16 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem // shard is recovering, don't snapshot return; } + + if (snapshotLock == null) { + try { + snapshotLock = shardGateway.obtainSnapshotLock(); + } catch (Exception e) { + logger.warn("failed to obtain snapshot lock, ignoring snapshot", e); + return; + } + } + try { SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler() { @Override public SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException { @@ -283,6 +290,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem delete = false; } shardGateway.close(delete); + if (snapshotLock != null) { + snapshotLock.release(); + } } private synchronized void scheduleSnapshotIfNeeded() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index 536f1011cbb..26c4ccf95eb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -113,6 +113,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo return true; } + @Override public SnapshotLock obtainSnapshotLock() throws Exception { + return NO_SNAPSHOT_LOCK; + } + @Override public void close(boolean delete) throws ElasticSearchException { if (delete) { blobStore.delete(shardPath); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java index abc505806d0..f4113f9808d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java @@ -19,6 +19,10 @@ package org.elasticsearch.index.gateway.fs; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.NativeFSLockFactory; +import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.common.blobstore.fs.AbstractFsBlobContainer; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.gateway.IndexGateway; @@ -29,17 +33,53 @@ import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.store.Store; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; + /** * @author kimchy (shay.banon) */ public class FsIndexShardGateway extends BlobStoreIndexShardGateway { + private final boolean snapshotLock; + @Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway fsIndexGateway, IndexShard indexShard, Store store) { super(shardId, indexSettings, threadPool, fsIndexGateway, indexShard, store); + this.snapshotLock = indexSettings.getAsBoolean("gateway.fs.snapshot_lock", true); } @Override public String type() { return "fs"; } + + @Override public SnapshotLock obtainSnapshotLock() throws Exception { + if (!snapshotLock) { + return NO_SNAPSHOT_LOCK; + } + AbstractFsBlobContainer fsBlobContainer = (AbstractFsBlobContainer) blobContainer; + NativeFSLockFactory lockFactory = new NativeFSLockFactory(fsBlobContainer.filePath()); + + Lock lock = lockFactory.makeLock("snapshot.lock"); + boolean obtained = lock.obtain(); + if (!obtained) { + throw new ElasticSearchIllegalStateException("failed to obtain snapshot lock [" + lock + "]"); + } + return new FsSnapshotLock(lock); + } + + public class FsSnapshotLock implements SnapshotLock { + private final Lock lock; + + public FsSnapshotLock(Lock lock) { + this.lock = lock; + } + + @Override public void release() { + try { + lock.release(); + } catch (IOException e) { + logger.warn("failed to release snapshot lock [{}]", e, lock); + } + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index 123e04fe4a7..147b5ad8b4d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -173,6 +173,10 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen } } + @Override public SnapshotLock obtainSnapshotLock() throws Exception { + return NO_SNAPSHOT_LOCK; + } + private class Sync implements Runnable { @Override public void run() { if (indexShard.state() == IndexShardState.STARTED) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java index afcb6601f70..14429d42686 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java @@ -93,4 +93,8 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement @Override public void close(boolean delete) { } + + @Override public SnapshotLock obtainSnapshotLock() throws Exception { + return NO_SNAPSHOT_LOCK; + } }