Consolidate directory lock obtain code
The Directory#makeLock API is trappy and can easily lead to unexpected lock release if native locks are used. see LUCENE-6507 for details. This commit consolidates the lock lock into one place and only returns the lock instance if we actually acquired it.
This commit is contained in:
parent
283b0931ff
commit
a4c88b7233
|
@ -173,6 +173,28 @@ public class Lucene {
|
|||
return SegmentInfos.readCommit(directory, segmentsFileName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to acquire the {@link IndexWriter#WRITE_LOCK_NAME} on the given directory. The returned lock must be closed once
|
||||
* the lock is released. If the lock can't be obtained a {@link LockObtainFailedException} is thrown.
|
||||
* This method uses the {@link IndexWriterConfig#getDefaultWriteLockTimeout()} as the lock timeout.
|
||||
*/
|
||||
public static Lock acquireWriteLock(Directory directory) throws IOException {
|
||||
return acquireLock(directory, IndexWriter.WRITE_LOCK_NAME, IndexWriterConfig.getDefaultWriteLockTimeout());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to acquire a lock on the given directory. The returned lock must be closed once
|
||||
* the lock is released. If the lock can't be obtained a {@link LockObtainFailedException} is thrown.
|
||||
*/
|
||||
@SuppressForbidden(reason = "this method uses trappy Directory#makeLock API")
|
||||
public static Lock acquireLock(Directory directory, String lockName, long timeout) throws IOException {
|
||||
final Lock writeLock = directory.makeLock(lockName);
|
||||
if (writeLock.obtain(timeout) == false) {
|
||||
throw new LockObtainFailedException("failed to obtain lock: " + writeLock);
|
||||
}
|
||||
return writeLock;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method removes all files from the given directory that are not referenced by the given segments file.
|
||||
* This method will open an IndexWriter and relies on index file deleter to remove all unreferenced files. Segment files
|
||||
|
@ -184,10 +206,7 @@ public class Lucene {
|
|||
*/
|
||||
public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Directory directory) throws IOException {
|
||||
final SegmentInfos si = readSegmentInfos(segmentsFileName, directory);
|
||||
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
|
||||
throw new LockObtainFailedException("Index locked for write: " + writeLock);
|
||||
}
|
||||
try (Lock writeLock = acquireWriteLock(directory)) {
|
||||
int foundSegmentFiles = 0;
|
||||
for (final String file : directory.listAll()) {
|
||||
/**
|
||||
|
@ -226,10 +245,7 @@ public class Lucene {
|
|||
* this operation fails.
|
||||
*/
|
||||
public static void cleanLuceneIndex(Directory directory) throws IOException {
|
||||
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
|
||||
throw new LockObtainFailedException("Index locked for write: " + writeLock);
|
||||
}
|
||||
try (Lock writeLock = acquireWriteLock(directory)) {
|
||||
for (final String file : directory.listAll()) {
|
||||
if (file.startsWith(IndexFileNames.SEGMENTS) || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
|
||||
directory.deleteFile(file); // remove all segment_N files
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.lucene.index.CheckIndex;
|
|||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.store.SimpleFSDirectory;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -32,6 +33,7 @@ import org.elasticsearch.common.io.FileSystemUtils;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
|
@ -84,13 +86,12 @@ public class MultiDataPathUpgrader {
|
|||
ShardStateMetaData.FORMAT.write(loaded, loaded.version, targetPath.getShardStatePath());
|
||||
Files.createDirectories(targetPath.resolveIndex());
|
||||
try (SimpleFSDirectory directory = new SimpleFSDirectory(targetPath.resolveIndex())) {
|
||||
try (final Lock lock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
if (lock.obtain(5000)) {
|
||||
upgradeFiles(shard, targetPath, targetPath.resolveIndex(), ShardPath.INDEX_FOLDER_NAME, paths);
|
||||
} else {
|
||||
throw new IllegalStateException("Can't obtain lock on " + targetPath.resolveIndex());
|
||||
}
|
||||
try (final Lock lock = Lucene.acquireWriteLock(directory)) {
|
||||
upgradeFiles(shard, targetPath, targetPath.resolveIndex(), ShardPath.INDEX_FOLDER_NAME, paths);
|
||||
} catch (LockObtainFailedException ex) {
|
||||
throw new IllegalStateException("Can't obtain lock on " + targetPath.resolveIndex(), ex);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -146,18 +147,17 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
|||
|
||||
try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) {
|
||||
logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
|
||||
Lock tmpLock = luceneDir.makeLock(NODE_LOCK_FILENAME);
|
||||
boolean obtained = tmpLock.obtain();
|
||||
if (obtained) {
|
||||
try {
|
||||
locks[dirIndex] = Lucene.acquireLock(luceneDir, NODE_LOCK_FILENAME, 0);
|
||||
nodePaths[dirIndex] = new NodePath(dir, environment);
|
||||
locks[dirIndex] = tmpLock;
|
||||
localNodeId = possibleLockId;
|
||||
} else {
|
||||
} catch (LockObtainFailedException ex) {
|
||||
logger.trace("failed to obtain node lock on {}", dir.toAbsolutePath());
|
||||
// release all the ones that were obtained up until now
|
||||
releaseAndNullLocks(locks);
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.trace("failed to obtain node lock on {}", e, dir.toAbsolutePath());
|
||||
lastException = new IOException("failed to obtain lock on " + dir.toAbsolutePath(), e);
|
||||
|
@ -314,8 +314,9 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
|||
// open a directory (will be immediately closed) on the shard's location
|
||||
dirs[i] = new SimpleFSDirectory(p, FsDirectoryService.buildLockFactory(indexSettings));
|
||||
// create a lock for the "write.lock" file
|
||||
locks[i] = dirs[i].makeLock(IndexWriter.WRITE_LOCK_NAME);
|
||||
if (locks[i].obtain() == false) {
|
||||
try {
|
||||
locks[i] = Lucene.acquireWriteLock(dirs[i]);
|
||||
} catch (IOException ex) {
|
||||
throw new ElasticsearchException("unable to acquire " +
|
||||
IndexWriter.WRITE_LOCK_NAME + " for " + p);
|
||||
}
|
||||
|
|
|
@ -259,10 +259,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
metadataLock.writeLock().lock();
|
||||
// we make sure that nobody fetches the metadata while we do this rename operation here to ensure we don't
|
||||
// get exceptions if files are still open.
|
||||
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
|
||||
throw new LockObtainFailedException("Index locked for write: " + writeLock);
|
||||
}
|
||||
try (Lock writeLock = Lucene.acquireWriteLock(directory())) {
|
||||
for (Map.Entry<String, String> entry : entries) {
|
||||
String tempFile = entry.getKey();
|
||||
String origFile = entry.getValue();
|
||||
|
@ -586,10 +583,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
*/
|
||||
public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException {
|
||||
metadataLock.writeLock().lock();
|
||||
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
|
||||
throw new LockObtainFailedException("Index locked for write: " + writeLock);
|
||||
}
|
||||
try (Lock writeLock = Lucene.acquireWriteLock(directory)) {
|
||||
final StoreDirectory dir = directory;
|
||||
for (String existingFile : dir.listAll()) {
|
||||
if (existingFile.equals(IndexWriter.WRITE_LOCK_NAME) || Store.isChecksum(existingFile) || sourceMetaData.contains(existingFile)) {
|
||||
|
|
Loading…
Reference in New Issue