Fs Gateway: Add (native) file lock to ensure two nodes in a split brain are not updating same gateway, closes #481.

This commit is contained in:
kimchy 2010-11-06 23:27:46 +02:00
parent 998bde0820
commit 6b952f6719
7 changed files with 83 additions and 6 deletions

View File

@ -46,6 +46,10 @@ public abstract class AbstractFsBlobContainer extends AbstractBlobContainer {
this.path = path;
}
public File filePath() {
return this.path;
}
public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
File[] files = path.listFiles();
if (files == null || files.length == 0) {

View File

@ -64,6 +64,17 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
*/
boolean requiresSnapshotScheduling();
SnapshotLock obtainSnapshotLock() throws Exception;
public static interface SnapshotLock {
void release();
}
public static final SnapshotLock NO_SNAPSHOT_LOCK = new SnapshotLock() {
@Override public void release() {
}
};
public static class Snapshot {
private final SnapshotIndexCommit indexCommit;
private final Translog.Snapshot translogSnapshot;

View File

@ -32,7 +32,6 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
@ -53,8 +52,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private final IndexShardGateway shardGateway;
private final Store store;
private volatile long lastIndexVersion;
@ -70,14 +67,14 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private RecoveryStatus recoveryStatus;
private IndexShardGateway.SnapshotLock snapshotLock;
@Inject public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings,
ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway,
Store store) {
ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.indexShard = (InternalIndexShard) indexShard;
this.shardGateway = shardGateway;
this.store = store;
this.snapshotOnClose = componentSettings.getAsBoolean("snapshot_on_close", true);
this.snapshotInterval = componentSettings.getAsTime("snapshot_interval", TimeValue.timeValueSeconds(10));
@ -220,6 +217,16 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
// shard is recovering, don't snapshot
return;
}
if (snapshotLock == null) {
try {
snapshotLock = shardGateway.obtainSnapshotLock();
} catch (Exception e) {
logger.warn("failed to obtain snapshot lock, ignoring snapshot", e);
return;
}
}
try {
SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler<SnapshotStatus>() {
@Override public SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
@ -283,6 +290,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
delete = false;
}
shardGateway.close(delete);
if (snapshotLock != null) {
snapshotLock.release();
}
}
private synchronized void scheduleSnapshotIfNeeded() {

View File

@ -113,6 +113,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
return true;
}
@Override public SnapshotLock obtainSnapshotLock() throws Exception {
return NO_SNAPSHOT_LOCK;
}
@Override public void close(boolean delete) throws ElasticSearchException {
if (delete) {
blobStore.delete(shardPath);

View File

@ -19,6 +19,10 @@
package org.elasticsearch.index.gateway.fs;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NativeFSLockFactory;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.blobstore.fs.AbstractFsBlobContainer;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.gateway.IndexGateway;
@ -29,17 +33,53 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class FsIndexShardGateway extends BlobStoreIndexShardGateway {
private final boolean snapshotLock;
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway fsIndexGateway,
IndexShard indexShard, Store store) {
super(shardId, indexSettings, threadPool, fsIndexGateway, indexShard, store);
this.snapshotLock = indexSettings.getAsBoolean("gateway.fs.snapshot_lock", true);
}
@Override public String type() {
return "fs";
}
@Override public SnapshotLock obtainSnapshotLock() throws Exception {
if (!snapshotLock) {
return NO_SNAPSHOT_LOCK;
}
AbstractFsBlobContainer fsBlobContainer = (AbstractFsBlobContainer) blobContainer;
NativeFSLockFactory lockFactory = new NativeFSLockFactory(fsBlobContainer.filePath());
Lock lock = lockFactory.makeLock("snapshot.lock");
boolean obtained = lock.obtain();
if (!obtained) {
throw new ElasticSearchIllegalStateException("failed to obtain snapshot lock [" + lock + "]");
}
return new FsSnapshotLock(lock);
}
public class FsSnapshotLock implements SnapshotLock {
private final Lock lock;
public FsSnapshotLock(Lock lock) {
this.lock = lock;
}
@Override public void release() {
try {
lock.release();
} catch (IOException e) {
logger.warn("failed to release snapshot lock [{}]", e, lock);
}
}
}
}

View File

@ -173,6 +173,10 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
}
}
@Override public SnapshotLock obtainSnapshotLock() throws Exception {
return NO_SNAPSHOT_LOCK;
}
private class Sync implements Runnable {
@Override public void run() {
if (indexShard.state() == IndexShardState.STARTED) {

View File

@ -93,4 +93,8 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
@Override public void close(boolean delete) {
}
@Override public SnapshotLock obtainSnapshotLock() throws Exception {
return NO_SNAPSHOT_LOCK;
}
}