mirror of https://github.com/apache/lucene.git
LUCENE-3445: Make SearcherManager#acquire lock free
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1176772 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
67c13bd2fe
commit
1f731984ba
|
@ -20,7 +20,7 @@ package org.apache.lucene.search;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
|
@ -67,7 +67,7 @@ public class SearcherManager implements Closeable {
|
|||
// Current searcher
|
||||
private volatile IndexSearcher currentSearcher;
|
||||
private final SearcherWarmer warmer;
|
||||
private final AtomicBoolean reopening = new AtomicBoolean();
|
||||
private final Semaphore reopening = new Semaphore(1);
|
||||
private final ExecutorService es;
|
||||
|
||||
/** Opens an initial searcher from the Directory.
|
||||
|
@ -136,7 +136,7 @@ public class SearcherManager implements Closeable {
|
|||
|
||||
// Ensure only 1 thread does reopen at once; other
|
||||
// threads just return immediately:
|
||||
if (!reopening.getAndSet(true)) {
|
||||
if (reopening.tryAcquire()) {
|
||||
try {
|
||||
IndexReader newReader = currentSearcher.getIndexReader().reopen();
|
||||
if (newReader != currentSearcher.getIndexReader()) {
|
||||
|
@ -158,7 +158,7 @@ public class SearcherManager implements Closeable {
|
|||
return false;
|
||||
}
|
||||
} finally {
|
||||
reopening.set(false);
|
||||
reopening.release();
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
|
@ -168,12 +168,14 @@ public class SearcherManager implements Closeable {
|
|||
/** Obtain the current IndexSearcher. You must match
|
||||
* every call to get with one call to {@link #release};
|
||||
* it's best to do so in a finally clause. */
|
||||
public synchronized IndexSearcher get() {
|
||||
if (currentSearcher == null) {
|
||||
throw new AlreadyClosedException("this SearcherManager is closed");
|
||||
}
|
||||
currentSearcher.getIndexReader().incRef();
|
||||
return currentSearcher;
|
||||
public IndexSearcher acquire() {
|
||||
IndexSearcher searcher;
|
||||
do {
|
||||
if ((searcher = currentSearcher) == null) {
|
||||
throw new AlreadyClosedException("this SearcherManager is closed");
|
||||
}
|
||||
} while (!searcher.getIndexReader().tryIncRef());
|
||||
return searcher;
|
||||
}
|
||||
|
||||
/** Release the searcher previously obtained with {@link
|
||||
|
@ -186,7 +188,7 @@ public class SearcherManager implements Closeable {
|
|||
searcher.getIndexReader().decRef();
|
||||
}
|
||||
|
||||
// Replaces old searcher with new one
|
||||
// Replaces old searcher with new one - needs to be synced to make close() work
|
||||
private synchronized void swapSearcher(IndexSearcher newSearcher)
|
||||
throws IOException {
|
||||
IndexSearcher oldSearcher = currentSearcher;
|
||||
|
|
|
@ -36,7 +36,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
|
|||
protected IndexSearcher getFinalSearcher() throws Exception {
|
||||
writer.commit();
|
||||
mgr.maybeReopen();
|
||||
return mgr.get();
|
||||
return mgr.acquire();
|
||||
}
|
||||
|
||||
private SearcherManager mgr;
|
||||
|
@ -94,7 +94,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
|
|||
mgr.maybeReopen();
|
||||
}
|
||||
|
||||
return mgr.get();
|
||||
return mgr.acquire();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -200,11 +200,45 @@ public abstract class IndexReader implements Cloneable,Closeable {
|
|||
* references.
|
||||
*
|
||||
* @see #decRef
|
||||
* @see #tryIncRef
|
||||
*/
|
||||
public void incRef() {
|
||||
ensureOpen();
|
||||
refCount.incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Expert: increments the refCount of this IndexReader
|
||||
* instance only if the IndexReader has not been closed yet
|
||||
* and returns <code>true</code> iff the refCount was
|
||||
* successfully incremented, otherwise <code>false</code>.
|
||||
* If this method returns <code>false</code> the reader is either
|
||||
* already closed or is currently been closed. Either way this
|
||||
* reader instance shouldn't be used by an application unless
|
||||
* <code>true</code> is returned.
|
||||
* <p>
|
||||
* RefCounts are used to determine when a
|
||||
* reader can be closed safely, i.e. as soon as there are
|
||||
* no more references. Be sure to always call a
|
||||
* corresponding {@link #decRef}, in a finally clause;
|
||||
* otherwise the reader may never be closed. Note that
|
||||
* {@link #close} simply calls decRef(), which means that
|
||||
* the IndexReader will not really be closed until {@link
|
||||
* #decRef} has been called for all outstanding
|
||||
* references.
|
||||
*
|
||||
* @see #decRef
|
||||
* @see #incRef
|
||||
*/
|
||||
public boolean tryIncRef() {
|
||||
int count;
|
||||
while ((count = refCount.get()) > 0) {
|
||||
if(refCount.compareAndSet(count, count+1)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import org.junit.Assume;
|
||||
|
@ -1403,4 +1404,70 @@ public class TestIndexReader extends LuceneTestCase
|
|||
r.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testTryIncRef() throws CorruptIndexException, LockObtainFailedException, IOException {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
|
||||
writer.addDocument(new Document());
|
||||
writer.commit();
|
||||
IndexReader r = IndexReader.open(dir);
|
||||
assertTrue(r.tryIncRef());
|
||||
r.decRef();
|
||||
r.close();
|
||||
assertFalse(r.tryIncRef());
|
||||
writer.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testStressTryIncRef() throws CorruptIndexException, LockObtainFailedException, IOException, InterruptedException {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
|
||||
writer.addDocument(new Document());
|
||||
writer.commit();
|
||||
IndexReader r = IndexReader.open(dir);
|
||||
int numThreads = atLeast(2);
|
||||
|
||||
IncThread[] threads = new IncThread[numThreads];
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
threads[i] = new IncThread(r, random);
|
||||
threads[i].start();
|
||||
}
|
||||
Thread.sleep(100);
|
||||
|
||||
assertTrue(r.tryIncRef());
|
||||
r.decRef();
|
||||
r.close();
|
||||
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
threads[i].join();
|
||||
assertNull(threads[i].failed);
|
||||
}
|
||||
assertFalse(r.tryIncRef());
|
||||
writer.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
static class IncThread extends Thread {
|
||||
final IndexReader toInc;
|
||||
final Random random;
|
||||
Throwable failed;
|
||||
|
||||
IncThread(IndexReader toInc, Random random) {
|
||||
this.toInc = toInc;
|
||||
this.random = random;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (toInc.tryIncRef()) {
|
||||
assertFalse(toInc.hasDeletions());
|
||||
toInc.decRef();
|
||||
}
|
||||
assertFalse(toInc.tryIncRef());
|
||||
} catch (Throwable e) {
|
||||
failed = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue