mirror of https://github.com/apache/lucene.git
LUCENE-6507: don't let NativeFSLock.close release other locks
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1682327 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8847842a1f
commit
225e4a5803
|
@ -221,6 +221,9 @@ Bug Fixes
|
||||||
* LUCENE-6505: NRT readers now reflect segments_N filename and commit
|
* LUCENE-6505: NRT readers now reflect segments_N filename and commit
|
||||||
user data from previous commits (Mike McCandless)
|
user data from previous commits (Mike McCandless)
|
||||||
|
|
||||||
|
* LUCENE-6507: Don't let NativeFSLock.close() release other locks
|
||||||
|
(Simon Willnauer, Robert Muir, Uwe Schindler, Mike McCandless)
|
||||||
|
|
||||||
API Changes
|
API Changes
|
||||||
|
|
||||||
* LUCENE-6377: SearcherFactory#newSearcher now accepts the previous reader
|
* LUCENE-6377: SearcherFactory#newSearcher now accepts the previous reader
|
||||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.lucene.store;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.nio.channels.FileLock;
|
|
||||||
import java.nio.channels.OverlappingFileLockException;
|
import java.nio.channels.OverlappingFileLockException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
@ -87,25 +86,26 @@ public final class NativeFSLockFactory extends FSLockFactory {
|
||||||
|
|
||||||
static final class NativeFSLock extends Lock {
|
static final class NativeFSLock extends Lock {
|
||||||
|
|
||||||
private FileChannel channel;
|
private final Path path;
|
||||||
private FileLock lock;
|
private final Path lockDir;
|
||||||
private Path path;
|
|
||||||
private Path lockDir;
|
|
||||||
private static final Set<String> LOCK_HELD = Collections.synchronizedSet(new HashSet<String>());
|
private static final Set<String> LOCK_HELD = Collections.synchronizedSet(new HashSet<String>());
|
||||||
|
|
||||||
|
private FileChannel channel; // set when we have the lock
|
||||||
|
private Path realPath; // unconditionally set in obtain(), for use in close()
|
||||||
|
|
||||||
public NativeFSLock(Path lockDir, String lockFileName) {
|
public NativeFSLock(Path lockDir, String lockFileName) {
|
||||||
this.lockDir = lockDir;
|
this.lockDir = lockDir;
|
||||||
path = lockDir.resolve(lockFileName);
|
path = lockDir.resolve(lockFileName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean obtain() throws IOException {
|
public synchronized boolean obtain() throws IOException {
|
||||||
|
|
||||||
if (lock != null) {
|
if (channel != null) {
|
||||||
// Our instance is already locked:
|
// Our instance is already locked:
|
||||||
return false;
|
assert channel.isOpen();
|
||||||
|
assert realPath != null;
|
||||||
|
throw new LockObtainFailedException("this lock instance was already obtained");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that lockDir exists and is a directory.
|
// Ensure that lockDir exists and is a directory.
|
||||||
|
@ -116,7 +116,7 @@ public final class NativeFSLockFactory extends FSLockFactory {
|
||||||
// we must create the file to have a truly canonical path.
|
// we must create the file to have a truly canonical path.
|
||||||
// if it's already created, we don't care. if it cant be created, it will fail below.
|
// if it's already created, we don't care. if it cant be created, it will fail below.
|
||||||
}
|
}
|
||||||
final Path canonicalPath = path.toRealPath();
|
realPath = path.toRealPath();
|
||||||
// Make sure nobody else in-process has this lock held
|
// Make sure nobody else in-process has this lock held
|
||||||
// already, and, mark it held if not:
|
// already, and, mark it held if not:
|
||||||
// This is a pretty crazy workaround for some documented
|
// This is a pretty crazy workaround for some documented
|
||||||
|
@ -131,12 +131,15 @@ public final class NativeFSLockFactory extends FSLockFactory {
|
||||||
// is that we can't re-obtain the lock in the same JVM but from a different process if that happens. Nevertheless
|
// is that we can't re-obtain the lock in the same JVM but from a different process if that happens. Nevertheless
|
||||||
// this is super trappy. See LUCENE-5738
|
// this is super trappy. See LUCENE-5738
|
||||||
boolean obtained = false;
|
boolean obtained = false;
|
||||||
if (LOCK_HELD.add(canonicalPath.toString())) {
|
if (LOCK_HELD.add(realPath.toString())) {
|
||||||
|
FileChannel ch = null;
|
||||||
try {
|
try {
|
||||||
channel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
|
ch = FileChannel.open(realPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
|
||||||
try {
|
try {
|
||||||
lock = channel.tryLock();
|
if (ch.tryLock() != null) {
|
||||||
obtained = lock != null;
|
channel = ch;
|
||||||
|
obtained = true;
|
||||||
|
}
|
||||||
} catch (IOException | OverlappingFileLockException e) {
|
} catch (IOException | OverlappingFileLockException e) {
|
||||||
// At least on OS X, we will sometimes get an
|
// At least on OS X, we will sometimes get an
|
||||||
// intermittent "Permission Denied" IOException,
|
// intermittent "Permission Denied" IOException,
|
||||||
|
@ -151,10 +154,8 @@ public final class NativeFSLockFactory extends FSLockFactory {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (obtained == false) { // not successful - clear up and move out
|
if (obtained == false) { // not successful - clear up and move out
|
||||||
clearLockHeld(path);
|
IOUtils.closeWhileHandlingException(ch);
|
||||||
final FileChannel toClose = channel;
|
clearLockHeld(realPath); // clear LOCK_HELD last
|
||||||
channel = null;
|
|
||||||
IOUtils.closeWhileHandlingException(toClose);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -163,23 +164,17 @@ public final class NativeFSLockFactory extends FSLockFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void close() throws IOException {
|
public synchronized void close() throws IOException {
|
||||||
try {
|
if (channel != null) {
|
||||||
if (lock != null) {
|
try {
|
||||||
try {
|
IOUtils.close(channel);
|
||||||
lock.release();
|
} finally {
|
||||||
lock = null;
|
channel = null;
|
||||||
} finally {
|
clearLockHeld(realPath); // clear LOCK_HELD last
|
||||||
clearLockHeld(path);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
IOUtils.close(channel);
|
|
||||||
channel = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final void clearLockHeld(Path path) throws IOException {
|
private static final void clearLockHeld(Path path) {
|
||||||
path = path.toRealPath();
|
|
||||||
boolean remove = LOCK_HELD.remove(path.toString());
|
boolean remove = LOCK_HELD.remove(path.toString());
|
||||||
assert remove : "Lock was cleared but never marked as held";
|
assert remove : "Lock was cleared but never marked as held";
|
||||||
}
|
}
|
||||||
|
@ -189,10 +184,14 @@ public final class NativeFSLockFactory extends FSLockFactory {
|
||||||
// The test for is isLocked is not directly possible with native file locks:
|
// The test for is isLocked is not directly possible with native file locks:
|
||||||
|
|
||||||
// First a shortcut, if a lock reference in this instance is available
|
// First a shortcut, if a lock reference in this instance is available
|
||||||
if (lock != null) return true;
|
if (channel != null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// Look if lock file is definitely not present; if not, there can definitely be no lock!
|
// Look if lock file is definitely not present; if not, there can definitely be no lock!
|
||||||
if (Files.notExists(path)) return false;
|
if (Files.notExists(path)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// Try to obtain and release (if was locked) the lock
|
// Try to obtain and release (if was locked) the lock
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -78,6 +78,7 @@ public final class SimpleFSLockFactory extends FSLockFactory {
|
||||||
|
|
||||||
Path lockFile;
|
Path lockFile;
|
||||||
Path lockDir;
|
Path lockDir;
|
||||||
|
boolean obtained = false;
|
||||||
|
|
||||||
public SimpleFSLock(Path lockDir, String lockFileName) {
|
public SimpleFSLock(Path lockDir, String lockFileName) {
|
||||||
this.lockDir = lockDir;
|
this.lockDir = lockDir;
|
||||||
|
@ -85,28 +86,38 @@ public final class SimpleFSLockFactory extends FSLockFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean obtain() throws IOException {
|
public synchronized boolean obtain() throws IOException {
|
||||||
|
if (obtained) {
|
||||||
|
// Our instance is already locked:
|
||||||
|
throw new LockObtainFailedException("this lock instance was already obtained");
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Files.createDirectories(lockDir);
|
Files.createDirectories(lockDir);
|
||||||
Files.createFile(lockFile);
|
Files.createFile(lockFile);
|
||||||
return true;
|
obtained = true;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
// On Windows, on concurrent createNewFile, the 2nd process gets "access denied".
|
// On Windows, on concurrent createNewFile, the 2nd process gets "access denied".
|
||||||
// In that case, the lock was not aquired successfully, so return false.
|
// In that case, the lock was not aquired successfully, so return false.
|
||||||
// We record the failure reason here; the obtain with timeout (usually the
|
// We record the failure reason here; the obtain with timeout (usually the
|
||||||
// one calling us) will use this as "root cause" if it fails to get the lock.
|
// one calling us) will use this as "root cause" if it fails to get the lock.
|
||||||
failureReason = ioe;
|
failureReason = ioe;
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return obtained;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws LockReleaseFailedException {
|
public synchronized void close() throws LockReleaseFailedException {
|
||||||
// TODO: wierd that clearLock() throws the raw IOException...
|
// TODO: wierd that clearLock() throws the raw IOException...
|
||||||
try {
|
if (obtained) {
|
||||||
Files.deleteIfExists(lockFile);
|
try {
|
||||||
} catch (Throwable cause) {
|
Files.deleteIfExists(lockFile);
|
||||||
throw new LockReleaseFailedException("failed to delete " + lockFile, cause);
|
} catch (Throwable cause) {
|
||||||
|
throw new LockReleaseFailedException("failed to delete " + lockFile, cause);
|
||||||
|
} finally {
|
||||||
|
obtained = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ public final class SingleInstanceLockFactory extends LockFactory {
|
||||||
|
|
||||||
private final String lockName;
|
private final String lockName;
|
||||||
private final HashSet<String> locks;
|
private final HashSet<String> locks;
|
||||||
|
private boolean obtained = false;
|
||||||
|
|
||||||
public SingleInstanceLock(HashSet<String> locks, String lockName) {
|
public SingleInstanceLock(HashSet<String> locks, String lockName) {
|
||||||
this.locks = locks;
|
this.locks = locks;
|
||||||
|
@ -53,14 +54,23 @@ public final class SingleInstanceLockFactory extends LockFactory {
|
||||||
@Override
|
@Override
|
||||||
public boolean obtain() throws IOException {
|
public boolean obtain() throws IOException {
|
||||||
synchronized(locks) {
|
synchronized(locks) {
|
||||||
return locks.add(lockName);
|
if (obtained) {
|
||||||
|
// Our instance is already locked:
|
||||||
|
throw new LockObtainFailedException("this lock instance was already obtained");
|
||||||
|
}
|
||||||
|
obtained = locks.add(lockName);
|
||||||
|
|
||||||
|
return obtained;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
synchronized(locks) {
|
synchronized(locks) {
|
||||||
locks.remove(lockName);
|
if (obtained) {
|
||||||
|
locks.remove(lockName);
|
||||||
|
obtained = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,7 @@ public final class VerifyingLockFactory extends LockFactory {
|
||||||
|
|
||||||
private class CheckedLock extends Lock {
|
private class CheckedLock extends Lock {
|
||||||
private final Lock lock;
|
private final Lock lock;
|
||||||
|
private boolean obtained = false;
|
||||||
|
|
||||||
public CheckedLock(Lock lock) {
|
public CheckedLock(Lock lock) {
|
||||||
this.lock = lock;
|
this.lock = lock;
|
||||||
|
@ -62,9 +63,10 @@ public final class VerifyingLockFactory extends LockFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean obtain() throws IOException {
|
public synchronized boolean obtain() throws IOException {
|
||||||
boolean obtained = lock.obtain();
|
obtained = lock.obtain();
|
||||||
if (obtained)
|
if (obtained) {
|
||||||
verify((byte) 1);
|
verify((byte) 1);
|
||||||
|
}
|
||||||
return obtained;
|
return obtained;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,10 +77,11 @@ public final class VerifyingLockFactory extends LockFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void close() throws IOException {
|
public synchronized void close() throws IOException {
|
||||||
if (isLocked()) {
|
if (obtained) {
|
||||||
|
assert isLocked();
|
||||||
verify((byte) 0);
|
verify((byte) 0);
|
||||||
lock.close();
|
|
||||||
}
|
}
|
||||||
|
lock.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,11 @@ package org.apache.lucene.store;
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
|
|
||||||
public class TestLock extends LuceneTestCase {
|
public class TestLock extends LuceneTestCase {
|
||||||
|
@ -52,4 +57,112 @@ public class TestLock extends LuceneTestCase {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testObtainConcurrently() throws InterruptedException, IOException {
|
||||||
|
final Directory directory;
|
||||||
|
if (random().nextBoolean()) {
|
||||||
|
directory = newDirectory();
|
||||||
|
} else {
|
||||||
|
LockFactory lf = random().nextBoolean() ? SimpleFSLockFactory.INSTANCE : NativeFSLockFactory.INSTANCE;
|
||||||
|
directory = newFSDirectory(createTempDir(), lf);
|
||||||
|
}
|
||||||
|
final AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
final AtomicInteger atomicCounter = new AtomicInteger(0);
|
||||||
|
final ReentrantLock assertingLock = new ReentrantLock();
|
||||||
|
int numThreads = 2 + random().nextInt(10);
|
||||||
|
final int runs = 500 + random().nextInt(1000);
|
||||||
|
CyclicBarrier barrier = new CyclicBarrier(numThreads);
|
||||||
|
Thread[] threads = new Thread[numThreads];
|
||||||
|
for (int i = 0; i < threads.length; i++) {
|
||||||
|
threads[i] = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
barrier.await();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
while (running.get()) {
|
||||||
|
try (Lock lock = directory.makeLock("foo.lock")) {
|
||||||
|
if (lock.isLocked() == false && lock.obtain()) {
|
||||||
|
assertTrue(lock.isLocked());
|
||||||
|
assertFalse(assertingLock.isLocked());
|
||||||
|
if (assertingLock.tryLock()) {
|
||||||
|
assertingLock.unlock();
|
||||||
|
} else {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
if (atomicCounter.incrementAndGet() > runs) {
|
||||||
|
running.set(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < threads.length; i++) {
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
|
directory.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSingleInstanceLockFactoryDoubleObtain() throws Exception {
|
||||||
|
LockFactory lf = new SingleInstanceLockFactory();
|
||||||
|
Directory dir = newFSDirectory(createTempDir(), lf);
|
||||||
|
Lock lock = dir.makeLock("foo");
|
||||||
|
assertTrue(lock.obtain());
|
||||||
|
try {
|
||||||
|
lock.obtain();
|
||||||
|
fail("did not hit double-obtain failure");
|
||||||
|
} catch (LockObtainFailedException lofe) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
lock.close();
|
||||||
|
|
||||||
|
lock = dir.makeLock("foo");
|
||||||
|
assertTrue(lock.obtain());
|
||||||
|
lock.close();
|
||||||
|
dir.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSimpleFSLockFactoryDoubleObtain() throws Exception {
|
||||||
|
Directory dir = newFSDirectory(createTempDir(), SimpleFSLockFactory.INSTANCE);
|
||||||
|
Lock lock = dir.makeLock("foo");
|
||||||
|
assertTrue(lock.obtain());
|
||||||
|
try {
|
||||||
|
lock.obtain();
|
||||||
|
fail("did not hit double-obtain failure");
|
||||||
|
} catch (LockObtainFailedException lofe) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
lock.close();
|
||||||
|
|
||||||
|
lock = dir.makeLock("foo");
|
||||||
|
assertTrue(lock.obtain());
|
||||||
|
lock.close();
|
||||||
|
dir.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNativeFSLockFactoryDoubleObtain() throws Exception {
|
||||||
|
Directory dir = newFSDirectory(createTempDir(), NativeFSLockFactory.INSTANCE);
|
||||||
|
Lock lock = dir.makeLock("foo");
|
||||||
|
assertTrue(lock.obtain());
|
||||||
|
try {
|
||||||
|
lock.obtain();
|
||||||
|
fail("did not hit double-obtain failure");
|
||||||
|
} catch (LockObtainFailedException lofe) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
lock.close();
|
||||||
|
|
||||||
|
lock = dir.makeLock("foo");
|
||||||
|
assertTrue(lock.obtain());
|
||||||
|
lock.close();
|
||||||
|
dir.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,8 @@ import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
|
@ -82,7 +84,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
|
||||||
private Set<String> unSyncedFiles;
|
private Set<String> unSyncedFiles;
|
||||||
private Set<String> createdFiles;
|
private Set<String> createdFiles;
|
||||||
private Set<String> openFilesForWrite = new HashSet<>();
|
private Set<String> openFilesForWrite = new HashSet<>();
|
||||||
Map<String,Exception> openLocks = Collections.synchronizedMap(new HashMap<String,Exception>());
|
ConcurrentMap<String,RuntimeException> openLocks = new ConcurrentHashMap<>();
|
||||||
volatile boolean crashed;
|
volatile boolean crashed;
|
||||||
private ThrottledIndexOutput throttledOutput;
|
private ThrottledIndexOutput throttledOutput;
|
||||||
private Throttling throttling = LuceneTestCase.TEST_NIGHTLY ? Throttling.SOMETIMES : Throttling.NEVER;
|
private Throttling throttling = LuceneTestCase.TEST_NIGHTLY ? Throttling.SOMETIMES : Throttling.NEVER;
|
||||||
|
@ -748,7 +750,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
|
||||||
}
|
}
|
||||||
if (openLocks.size() > 0) {
|
if (openLocks.size() > 0) {
|
||||||
Exception cause = null;
|
Exception cause = null;
|
||||||
Iterator<Exception> stacktraces = openLocks.values().iterator();
|
Iterator<RuntimeException> stacktraces = openLocks.values().iterator();
|
||||||
if (stacktraces.hasNext()) {
|
if (stacktraces.hasNext()) {
|
||||||
cause = stacktraces.next();
|
cause = stacktraces.next();
|
||||||
}
|
}
|
||||||
|
@ -1004,6 +1006,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
|
||||||
private final class AssertingLock extends Lock {
|
private final class AssertingLock extends Lock {
|
||||||
private final Lock delegateLock;
|
private final Lock delegateLock;
|
||||||
private final String name;
|
private final String name;
|
||||||
|
private boolean obtained = false;
|
||||||
|
|
||||||
AssertingLock(Lock delegate, String name) {
|
AssertingLock(Lock delegate, String name) {
|
||||||
this.delegateLock = delegate;
|
this.delegateLock = delegate;
|
||||||
|
@ -1013,24 +1016,38 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
|
||||||
@Override
|
@Override
|
||||||
public boolean obtain() throws IOException {
|
public boolean obtain() throws IOException {
|
||||||
if (delegateLock.obtain()) {
|
if (delegateLock.obtain()) {
|
||||||
assert delegateLock == NoLockFactory.SINGLETON_LOCK || !openLocks.containsKey(name);
|
final RuntimeException exception = openLocks.putIfAbsent(name, new RuntimeException("lock \"" + name + "\" was not released: " + delegateLock));
|
||||||
openLocks.put(name, new RuntimeException("lock \"" + name + "\" was not released"));
|
if (exception != null && delegateLock != NoLockFactory.SINGLETON_LOCK) {
|
||||||
return true;
|
throw exception;
|
||||||
|
}
|
||||||
|
obtained = true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
obtained = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return obtained;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
if (obtained) {
|
||||||
|
RuntimeException remove = openLocks.remove(name);
|
||||||
|
// TODO: fix stupid tests like TestIndexWriter.testNoSegmentFile to not do this!
|
||||||
|
assert remove != null || delegateLock == NoLockFactory.SINGLETON_LOCK;
|
||||||
|
obtained = false;
|
||||||
|
}
|
||||||
delegateLock.close();
|
delegateLock.close();
|
||||||
openLocks.remove(name);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isLocked() throws IOException {
|
public boolean isLocked() throws IOException {
|
||||||
return delegateLock.isLocked();
|
return delegateLock.isLocked();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "AssertingLock(" + delegateLock + ")";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Use this when throwing fake {@code IOException},
|
/** Use this when throwing fake {@code IOException},
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.Lock;
|
import org.apache.lucene.store.Lock;
|
||||||
import org.apache.lucene.store.LockFactory;
|
import org.apache.lucene.store.LockFactory;
|
||||||
|
import org.apache.lucene.store.LockObtainFailedException;
|
||||||
import org.apache.lucene.store.LockReleaseFailedException;
|
import org.apache.lucene.store.LockReleaseFailedException;
|
||||||
import org.apache.solr.common.util.IOUtils;
|
import org.apache.solr.common.util.IOUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -54,6 +55,7 @@ public class HdfsLockFactory extends LockFactory {
|
||||||
private final Path lockPath;
|
private final Path lockPath;
|
||||||
private final String lockName;
|
private final String lockName;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
private boolean obtained;
|
||||||
|
|
||||||
public HdfsLock(Path lockPath, String lockName, Configuration conf) {
|
public HdfsLock(Path lockPath, String lockName, Configuration conf) {
|
||||||
this.lockPath = lockPath;
|
this.lockPath = lockPath;
|
||||||
|
@ -63,6 +65,12 @@ public class HdfsLockFactory extends LockFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean obtain() throws IOException {
|
public boolean obtain() throws IOException {
|
||||||
|
|
||||||
|
if (obtained) {
|
||||||
|
// Our instance is already locked:
|
||||||
|
throw new LockObtainFailedException("this lock instance was already obtained");
|
||||||
|
}
|
||||||
|
|
||||||
FSDataOutputStream file = null;
|
FSDataOutputStream file = null;
|
||||||
FileSystem fs = FileSystem.get(lockPath.toUri(), conf);
|
FileSystem fs = FileSystem.get(lockPath.toUri(), conf);
|
||||||
try {
|
try {
|
||||||
|
@ -77,12 +85,11 @@ public class HdfsLockFactory extends LockFactory {
|
||||||
// just to check for safe mode
|
// just to check for safe mode
|
||||||
fs.mkdirs(lockPath);
|
fs.mkdirs(lockPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
file = fs.create(new Path(lockPath, lockName), false);
|
file = fs.create(new Path(lockPath, lockName), false);
|
||||||
break;
|
break;
|
||||||
} catch (FileAlreadyExistsException e) {
|
} catch (FileAlreadyExistsException e) {
|
||||||
return false;
|
return obtained = false;
|
||||||
} catch (RemoteException e) {
|
} catch (RemoteException e) {
|
||||||
if (e.getClassName().equals(
|
if (e.getClassName().equals(
|
||||||
"org.apache.hadoop.hdfs.server.namenode.SafeModeException")) {
|
"org.apache.hadoop.hdfs.server.namenode.SafeModeException")) {
|
||||||
|
@ -95,10 +102,10 @@ public class HdfsLockFactory extends LockFactory {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
log.error("Error creating lock file", e);
|
log.error("Error creating lock file", e);
|
||||||
return false;
|
return obtained = false;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Error creating lock file", e);
|
log.error("Error creating lock file", e);
|
||||||
return false;
|
return obtained = false;
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeQuietly(file);
|
IOUtils.closeQuietly(file);
|
||||||
}
|
}
|
||||||
|
@ -106,18 +113,21 @@ public class HdfsLockFactory extends LockFactory {
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeQuietly(fs);
|
IOUtils.closeQuietly(fs);
|
||||||
}
|
}
|
||||||
return true;
|
return obtained = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
FileSystem fs = FileSystem.get(lockPath.toUri(), conf);
|
if (obtained) {
|
||||||
try {
|
FileSystem fs = FileSystem.get(lockPath.toUri(), conf);
|
||||||
if (fs.exists(new Path(lockPath, lockName))
|
try {
|
||||||
&& !fs.delete(new Path(lockPath, lockName), false)) throw new LockReleaseFailedException(
|
if (fs.exists(new Path(lockPath, lockName))
|
||||||
"failed to delete " + new Path(lockPath, lockName));
|
&& !fs.delete(new Path(lockPath, lockName), false)) throw new LockReleaseFailedException(
|
||||||
} finally {
|
"failed to delete " + new Path(lockPath, lockName));
|
||||||
IOUtils.closeQuietly(fs);
|
} finally {
|
||||||
|
obtained = false;
|
||||||
|
IOUtils.closeQuietly(fs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,7 +142,5 @@ public class HdfsLockFactory extends LockFactory {
|
||||||
}
|
}
|
||||||
return isLocked;
|
return isLocked;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.lucene.store.Lock;
|
import org.apache.lucene.store.Lock;
|
||||||
|
import org.apache.lucene.store.LockObtainFailedException;
|
||||||
import org.apache.solr.SolrTestCaseJ4;
|
import org.apache.solr.SolrTestCaseJ4;
|
||||||
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
|
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
|
||||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||||
|
@ -32,7 +33,6 @@ import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||||
|
|
||||||
@ThreadLeakFilters(defaultFilters = true, filters = {
|
@ThreadLeakFilters(defaultFilters = true, filters = {
|
||||||
|
@ -82,5 +82,24 @@ public class HdfsLockFactoryTest extends SolrTestCaseJ4 {
|
||||||
dir.close();
|
dir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDoubleObtain() throws Exception {
|
||||||
}
|
String uri = HdfsTestUtil.getURI(dfsCluster);
|
||||||
|
Path lockPath = new Path(uri, "/basedir/lock");
|
||||||
|
Configuration conf = HdfsTestUtil.getClientConfiguration(dfsCluster);
|
||||||
|
HdfsDirectory dir = new HdfsDirectory(lockPath, conf);
|
||||||
|
Lock lock = dir.makeLock("foo");
|
||||||
|
assertTrue(lock.obtain());
|
||||||
|
try {
|
||||||
|
lock.obtain();
|
||||||
|
fail("did not hit double-obtain failure");
|
||||||
|
} catch (LockObtainFailedException lofe) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
lock.close();
|
||||||
|
|
||||||
|
lock = dir.makeLock("foo");
|
||||||
|
assertTrue(lock.obtain());
|
||||||
|
lock.close();
|
||||||
|
dir.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue