diff --git a/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 385607d89ba..e3d787779c7 100644 --- a/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -173,6 +173,28 @@ public class Lucene { return SegmentInfos.readCommit(directory, segmentsFileName); } + /** + * Tries to acquire the {@link IndexWriter#WRITE_LOCK_NAME} on the given directory. The returned lock must be closed once + * the lock is released. If the lock can't be obtained a {@link LockObtainFailedException} is thrown. + * This method uses the {@link IndexWriterConfig#getDefaultWriteLockTimeout()} as the lock timeout. + */ + public static Lock acquireWriteLock(Directory directory) throws IOException { + return acquireLock(directory, IndexWriter.WRITE_LOCK_NAME, IndexWriterConfig.getDefaultWriteLockTimeout()); + } + + /** + * Tries to acquire a lock on the given directory. The returned lock must be closed once + * the lock is released. If the lock can't be obtained a {@link LockObtainFailedException} is thrown. + */ + @SuppressForbidden(reason = "this method uses trappy Directory#makeLock API") + public static Lock acquireLock(Directory directory, String lockName, long timeout) throws IOException { + final Lock writeLock = directory.makeLock(lockName); + if (writeLock.obtain(timeout) == false) { + throw new LockObtainFailedException("failed to obtain lock: " + writeLock); + } + return writeLock; + } + /** * This method removes all files from the given directory that are not referenced by the given segments file. * This method will open an IndexWriter and relies on index file deleter to remove all unreferenced files. Segment files @@ -184,10 +206,7 @@ public class Lucene { */ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Directory directory) throws IOException { final SegmentInfos si = readSegmentInfos(segmentsFileName, directory); - try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) { - if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock - throw new LockObtainFailedException("Index locked for write: " + writeLock); - } + try (Lock writeLock = acquireWriteLock(directory)) { int foundSegmentFiles = 0; for (final String file : directory.listAll()) { /** @@ -226,10 +245,7 @@ public class Lucene { * this operation fails. */ public static void cleanLuceneIndex(Directory directory) throws IOException { - try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) { - if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock - throw new LockObtainFailedException("Index locked for write: " + writeLock); - } + try (Lock writeLock = acquireWriteLock(directory)) { for (final String file : directory.listAll()) { if (file.startsWith(IndexFileNames.SEGMENTS) || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) { directory.deleteFile(file); // remove all segment_N files diff --git a/src/main/java/org/elasticsearch/common/util/MultiDataPathUpgrader.java b/src/main/java/org/elasticsearch/common/util/MultiDataPathUpgrader.java index 1cb700cff60..3425d151c34 100644 --- a/src/main/java/org/elasticsearch/common/util/MultiDataPathUpgrader.java +++ b/src/main/java/org/elasticsearch/common/util/MultiDataPathUpgrader.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.util.IOUtils; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -32,6 +33,7 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; @@ -84,13 +86,12 @@ public class MultiDataPathUpgrader { ShardStateMetaData.FORMAT.write(loaded, loaded.version, targetPath.getShardStatePath()); Files.createDirectories(targetPath.resolveIndex()); try (SimpleFSDirectory directory = new SimpleFSDirectory(targetPath.resolveIndex())) { - try (final Lock lock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) { - if (lock.obtain(5000)) { - upgradeFiles(shard, targetPath, targetPath.resolveIndex(), ShardPath.INDEX_FOLDER_NAME, paths); - } else { - throw new IllegalStateException("Can't obtain lock on " + targetPath.resolveIndex()); - } + try (final Lock lock = Lucene.acquireWriteLock(directory)) { + upgradeFiles(shard, targetPath, targetPath.resolveIndex(), ShardPath.INDEX_FOLDER_NAME, paths); + } catch (LockObtainFailedException ex) { + throw new IllegalStateException("Can't obtain lock on " + targetPath.resolveIndex(), ex); } + } diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 26a725f9072..75ef6914eae 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -146,18 +147,17 @@ public class NodeEnvironment extends AbstractComponent implements Closeable { try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) { logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath()); - Lock tmpLock = luceneDir.makeLock(NODE_LOCK_FILENAME); - boolean obtained = tmpLock.obtain(); - if (obtained) { + try { + locks[dirIndex] = Lucene.acquireLock(luceneDir, NODE_LOCK_FILENAME, 0); nodePaths[dirIndex] = new NodePath(dir, environment); - locks[dirIndex] = tmpLock; localNodeId = possibleLockId; - } else { + } catch (LockObtainFailedException ex) { logger.trace("failed to obtain node lock on {}", dir.toAbsolutePath()); // release all the ones that were obtained up until now releaseAndNullLocks(locks); break; } + } catch (IOException e) { logger.trace("failed to obtain node lock on {}", e, dir.toAbsolutePath()); lastException = new IOException("failed to obtain lock on " + dir.toAbsolutePath(), e); @@ -314,8 +314,9 @@ public class NodeEnvironment extends AbstractComponent implements Closeable { // open a directory (will be immediately closed) on the shard's location dirs[i] = new SimpleFSDirectory(p, FsDirectoryService.buildLockFactory(indexSettings)); // create a lock for the "write.lock" file - locks[i] = dirs[i].makeLock(IndexWriter.WRITE_LOCK_NAME); - if (locks[i].obtain() == false) { + try { + locks[i] = Lucene.acquireWriteLock(dirs[i]); + } catch (IOException ex) { throw new ElasticsearchException("unable to acquire " + IndexWriter.WRITE_LOCK_NAME + " for " + p); } diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index c889dd16c20..d92128a319c 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -259,10 +259,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref metadataLock.writeLock().lock(); // we make sure that nobody fetches the metadata while we do this rename operation here to ensure we don't // get exceptions if files are still open. - try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) { - if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock - throw new LockObtainFailedException("Index locked for write: " + writeLock); - } + try (Lock writeLock = Lucene.acquireWriteLock(directory())) { for (Map.Entry entry : entries) { String tempFile = entry.getKey(); String origFile = entry.getValue(); @@ -586,10 +583,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref */ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException { metadataLock.writeLock().lock(); - try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) { - if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock - throw new LockObtainFailedException("Index locked for write: " + writeLock); - } + try (Lock writeLock = Lucene.acquireWriteLock(directory)) { final StoreDirectory dir = directory; for (String existingFile : dir.listAll()) { if (existingFile.equals(IndexWriter.WRITE_LOCK_NAME) || Store.isChecksum(existingFile) || sourceMetaData.contains(existingFile)) {