LUCENE-5461: fix thread hazard in ControlledRealTimeReopenThread causing a possible too-long wait time when a thread was waiting for a specific generation

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1570559 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2014-02-21 12:39:15 +00:00
parent fef5221747
commit 79f0a57874
3 changed files with 115 additions and 19 deletions

View File

@ -277,6 +277,11 @@ Bug fixes
estimate memory usage of segments. This used to make estimate memory usage of segments. This used to make
SegmentReader.ramBytesUsed very CPU-intensive. (Adrien Grand) SegmentReader.ramBytesUsed very CPU-intensive. (Adrien Grand)
* LUCENE-5461: ControlledRealTimeReopenThread would sometimes wait too
long (up to targetMaxStaleSec) when a searcher is waiting for a
specific generation, when it should have waited for at most
targetMinStaleSec. (Hans Lund via Mike McCandless)
API Changes API Changes
* LUCENE-5339: The facet module was simplified/reworked to make the * LUCENE-5339: The facet module was simplified/reworked to make the

View File

@ -87,11 +87,11 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
@Override @Override
public void afterRefresh(boolean didRefresh) { public void afterRefresh(boolean didRefresh) {
refreshDone(didRefresh); refreshDone();
} }
} }
private synchronized void refreshDone(boolean didRefresh) { private synchronized void refreshDone() {
searchingGen = refreshStartGen; searchingGen = refreshStartGen;
notifyAll(); notifyAll();
} }
@ -160,12 +160,15 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")"); throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")");
} }
if (targetGen > searchingGen) { if (targetGen > searchingGen) {
waitingGen = Math.max(waitingGen, targetGen);
// Notify the reopen thread that the waitingGen has // Notify the reopen thread that the waitingGen has
// changed, so it may wake up and realize it should // changed, so it may wake up and realize it should
// not sleep for much or any longer before reopening: // not sleep for much or any longer before reopening:
reopenLock.lock(); reopenLock.lock();
// Need to find waitingGen inside lock as its used to determine
// stale time
waitingGen = Math.max(waitingGen, targetGen);
try { try {
reopenCond.signal(); reopenCond.signal();
} finally { } finally {
@ -178,7 +181,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
if (maxMS < 0) { if (maxMS < 0) {
wait(); wait();
} else { } else {
long msLeft = ((startMS + maxMS) - (System.nanoTime())/1000000); long msLeft = (startMS + maxMS) - (System.nanoTime())/1000000;
if (msLeft <= 0) { if (msLeft <= 0) {
return false; return false;
} else { } else {
@ -207,6 +210,9 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
// next reopen: // next reopen:
while (!finish) { while (!finish) {
// Need lock before finding out if has waiting
reopenLock.lock();
try {
// True if we have someone waiting for reopened searcher: // True if we have someone waiting for reopened searcher:
boolean hasWaiting = waitingGen > searchingGen; boolean hasWaiting = waitingGen > searchingGen;
final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS); final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS);
@ -214,18 +220,16 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
final long sleepNS = nextReopenStartNS - System.nanoTime(); final long sleepNS = nextReopenStartNS - System.nanoTime();
if (sleepNS > 0) { if (sleepNS > 0) {
reopenLock.lock();
try {
reopenCond.awaitNanos(sleepNS); reopenCond.awaitNanos(sleepNS);
} else {
break;
}
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return; return;
} finally { } finally {
reopenLock.unlock(); reopenLock.unlock();
} }
} else {
break;
}
} }
if (finish) { if (finish) {

View File

@ -18,6 +18,7 @@ package org.apache.lucene.search;
*/ */
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -27,13 +28,17 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDocument; import org.apache.lucene.index.IndexDocument;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.index.TrackingIndexWriter; import org.apache.lucene.index.TrackingIndexWriter;
@ -42,7 +47,9 @@ import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.Version;
@SuppressCodecs({ "SimpleText", "Memory", "Direct" }) @SuppressCodecs({ "SimpleText", "Memory", "Direct" })
public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearchingTestCase { public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearchingTestCase {
@ -414,6 +421,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
try { try {
new SearcherManager(w.w, false, theEvilOne); new SearcherManager(w.w, false, theEvilOne);
fail("didn't hit expected exception");
} catch (IllegalStateException ise) { } catch (IllegalStateException ise) {
// expected // expected
} }
@ -447,4 +455,83 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
iw.close(); iw.close();
dir.close(); dir.close();
} }
// LUCENE-5461
public void testCRTReopen() throws Exception {
//test behaving badly
//should be high enough
int maxStaleSecs = 20;
//build crap data just to store it.
String s = " abcdefghijklmnopqrstuvwxyz ";
char[] chars = s.toCharArray();
StringBuilder builder = new StringBuilder(2048);
for (int i = 0; i < 2048; i++) {
builder.append(chars[random().nextInt(chars.length)]);
}
String content = builder.toString();
final SnapshotDeletionPolicy sdp = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
final Directory dir = new NRTCachingDirectory(newFSDirectory(TestUtil.getTempDir("nrt")), 5, 128);
IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_46,
new MockAnalyzer(random()));
config.setIndexDeletionPolicy(sdp);
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
final IndexWriter iw = new IndexWriter(dir, config);
SearcherManager sm = new SearcherManager(iw, true, new SearcherFactory());
final TrackingIndexWriter tiw = new TrackingIndexWriter(iw);
ControlledRealTimeReopenThread<IndexSearcher> controlledRealTimeReopenThread =
new ControlledRealTimeReopenThread<IndexSearcher>(tiw, sm, maxStaleSecs, 0);
controlledRealTimeReopenThread.setDaemon(true);
controlledRealTimeReopenThread.start();
List<Thread> commitThreads = new ArrayList<Thread>();
for (int i = 0; i < 500; i++) {
if (i > 0 && i % 50 == 0) {
Thread commitThread = new Thread(new Runnable() {
@Override
public void run() {
try {
iw.commit();
IndexCommit ic = sdp.snapshot();
for (String name : ic.getFileNames()) {
//distribute, and backup
//System.out.println(names);
assertTrue(dir.fileExists(name));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
commitThread.start();
commitThreads.add(commitThread);
}
Document d = new Document();
d.add(new TextField("count", i + "", Field.Store.NO));
d.add(new TextField("content", content, Field.Store.YES));
long start = System.currentTimeMillis();
long l = tiw.addDocument(d);
controlledRealTimeReopenThread.waitForGeneration(l);
long wait = System.currentTimeMillis() - start;
assertTrue("waited too long for generation " + wait,
wait < (maxStaleSecs *1000));
IndexSearcher searcher = sm.acquire();
TopDocs td = searcher.search(new TermQuery(new Term("count", i + "")), 10);
sm.release(searcher);
assertEquals(1, td.totalHits);
}
for(Thread commitThread : commitThreads) {
commitThread.join();
}
controlledRealTimeReopenThread.close();
sm.close();
iw.close();
dir.close();
}
} }