mirror of https://github.com/apache/lucene.git
LUCENE-3528: Prevent possible DeadLock & large delays if waiting for a already reopened generation
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1199405 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2102f03c12
commit
b6bdf5bc34
|
@ -147,7 +147,8 @@ public final class SearcherManager {
|
|||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* This method returns true if a new reader was in fact opened.
|
||||
* This method returns true if a new reader was in fact opened or
|
||||
* if the current searcher has no pending changes.
|
||||
* </p>
|
||||
*/
|
||||
public boolean maybeReopen() throws IOException {
|
||||
|
@ -173,10 +174,8 @@ public final class SearcherManager {
|
|||
release(newSearcher);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} finally {
|
||||
reopenLock.release();
|
||||
}
|
||||
|
@ -184,7 +183,7 @@ public final class SearcherManager {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if no changes have occured since this searcher
|
||||
* ie. reader was opened, otherwise <code>false</code>.
|
||||
|
@ -251,4 +250,5 @@ public final class SearcherManager {
|
|||
currentSearcher = newSearcher;
|
||||
release(oldSearcher);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,15 +19,24 @@ package org.apache.lucene.index;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.apache.lucene.search.SearcherWarmer;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.store.NRTCachingDirectory;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase.UseNoMemoryExpensiveCodec;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
|
||||
@UseNoMemoryExpensiveCodec
|
||||
public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
|
||||
|
@ -244,4 +253,100 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
|
|||
nrtThread.close();
|
||||
nrt.close();
|
||||
}
|
||||
|
||||
/*
|
||||
* LUCENE-3528 - NRTManager hangs in certain situations
|
||||
*/
|
||||
public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException {
|
||||
IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
|
||||
Directory d = newDirectory();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final CountDownLatch signal = new CountDownLatch(1);
|
||||
|
||||
LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal);
|
||||
final NRTManager manager = new NRTManager(writer, null, null, false);
|
||||
Document doc = new Document();
|
||||
doc.add(newField("test","test", TextField.TYPE_STORED));
|
||||
long gen = manager.addDocument(doc);
|
||||
assertTrue(manager.maybeReopen(false));
|
||||
assertFalse(gen < manager.getCurrentSearchingGen(false));
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
signal.await();
|
||||
assertTrue(manager.maybeReopen(false));
|
||||
manager.deleteDocuments(new TermQuery(new Term("foo", "barista")));
|
||||
manager.maybeReopen(false); // kick off another reopen so we inc. the internal gen
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
latch.countDown(); // let the add below finish
|
||||
}
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
|
||||
final long lastGen = manager.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
|
||||
assertFalse(manager.getSearcherManager(false).isSearcherCurrent()); // false since there is a delete in the queue
|
||||
|
||||
IndexSearcher acquire = manager.getSearcherManager(false).acquire();
|
||||
try {
|
||||
assertEquals(2, acquire.getIndexReader().numDocs());
|
||||
} finally {
|
||||
acquire.getIndexReader().decRef();
|
||||
}
|
||||
NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01);
|
||||
thread.start(); // start reopening
|
||||
if (VERBOSE) {
|
||||
System.out.println("waiting now for generation " + lastGen);
|
||||
}
|
||||
|
||||
final AtomicBoolean finished = new AtomicBoolean(false);
|
||||
Thread waiter = new Thread() {
|
||||
public void run() {
|
||||
manager.waitForGeneration(lastGen, false);
|
||||
finished.set(true);
|
||||
}
|
||||
};
|
||||
waiter.start();
|
||||
manager.maybeReopen(false);
|
||||
waiter.join(1000);
|
||||
if (!finished.get()) {
|
||||
waiter.interrupt();
|
||||
fail("thread deadlocked on waitForGeneration");
|
||||
}
|
||||
thread.close();
|
||||
thread.join();
|
||||
IOUtils.close(manager, writer, d);
|
||||
}
|
||||
|
||||
public static class LatchedIndexWriter extends IndexWriter {
|
||||
|
||||
private CountDownLatch latch;
|
||||
boolean waitAfterUpdate = false;
|
||||
private CountDownLatch signal;
|
||||
|
||||
public LatchedIndexWriter(Directory d, IndexWriterConfig conf,
|
||||
CountDownLatch latch, CountDownLatch signal)
|
||||
throws CorruptIndexException, LockObtainFailedException, IOException {
|
||||
super(d, conf);
|
||||
this.latch = latch;
|
||||
this.signal = signal;
|
||||
|
||||
}
|
||||
|
||||
public void updateDocument(Term term,
|
||||
Iterable<? extends IndexableField> doc, Analyzer analyzer)
|
||||
throws CorruptIndexException, IOException {
|
||||
super.updateDocument(term, doc, analyzer);
|
||||
try {
|
||||
if (waitAfterUpdate) {
|
||||
signal.countDown();
|
||||
latch.await();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new ThreadInterruptedException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue