LUCENE-2822: prevent TimeLimitedCollector from starting a statically referenced thread

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1201165 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-11-12 02:22:06 +00:00
parent 789cb13f69
commit 03cc612036
5 changed files with 208 additions and 123 deletions

View File

@ -654,6 +654,14 @@ Changes in backwards compatibility policy
keep a buffer in your IndexInput, do this yourself in your implementation, keep a buffer in your IndexInput, do this yourself in your implementation,
and be sure to do the right thing on clone()! (Robert Muir) and be sure to do the right thing on clone()! (Robert Muir)
* LUCENE-2822: TimeLimitingCollector now expects a counter clock instead of
relying on a private daemon thread. The global time limiting clock thread
has been exposed and is now lazily loaded and fully optional.
TimeLimitingCollector now supports setting clock baseline manually to include
prelude of a search. Previous versions set the baseline on construction time,
now baseline is set once the first IndexReader is passed to the collector
unless set before. (Simon Willnauer)
Changes in runtime behavior Changes in runtime behavior
* LUCENE-3520: IndexReader.openIfChanged, when passed a near-real-time * LUCENE-3520: IndexReader.openIfChanged, when passed a near-real-time

View File

@ -20,6 +20,7 @@ package org.apache.lucene.search;
import java.io.IOException; import java.io.IOException;
import org.apache.lucene.index.IndexReader.AtomicReaderContext; import org.apache.lucene.index.IndexReader.AtomicReaderContext;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
/** /**
@ -30,69 +31,9 @@ import org.apache.lucene.util.ThreadInterruptedException;
*/ */
public class TimeLimitingCollector extends Collector { public class TimeLimitingCollector extends Collector {
/**
* Default timer resolution.
* @see #setResolution(long)
*/
public static final int DEFAULT_RESOLUTION = 20;
/**
* Default for {@link #isGreedy()}.
* @see #isGreedy()
*/
public boolean DEFAULT_GREEDY = false;
private static long resolution = DEFAULT_RESOLUTION;
private boolean greedy = DEFAULT_GREEDY ;
private static final class TimerThread extends Thread {
// NOTE: we can avoid explicit synchronization here for several reasons:
// * updates to volatile long variables are atomic
// * only single thread modifies this value
// * use of volatile keyword ensures that it does not reside in
// a register, but in main memory (so that changes are visible to
// other threads).
// * visibility of changes does not need to be instantaneous, we can
// afford losing a tick or two.
//
// See section 17 of the Java Language Specification for details.
private volatile long time = 0;
/**
* TimerThread provides a pseudo-clock service to all searching
* threads, so that they can count elapsed time with less overhead
* than repeatedly calling System.currentTimeMillis. A single
* thread should be created to be used for all searches.
*/
private TimerThread() {
super("TimeLimitedCollector timer thread");
this.setDaemon( true );
}
@Override
public void run() {
while (true) {
// TODO: Use System.nanoTime() when Lucene moves to Java SE 5.
time += resolution;
try {
Thread.sleep( resolution );
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
/**
* Get the timer value in milliseconds.
*/
public long getMilliseconds() {
return time;
}
}
/** Thrown when elapsed search time exceeds allowed search time. */ /** Thrown when elapsed search time exceeds allowed search time. */
@SuppressWarnings("serial")
public static class TimeExceededException extends RuntimeException { public static class TimeExceededException extends RuntimeException {
private long timeAllowed; private long timeAllowed;
private long timeElapsed; private long timeElapsed;
@ -117,19 +58,12 @@ public class TimeLimitingCollector extends Collector {
} }
} }
// Declare and initialize a single static timer thread to be used by private long t0 = Long.MIN_VALUE;
// all TimeLimitedCollector instances. The JVM assures that private long timeout = Long.MIN_VALUE;
// this only happens once.
private final static TimerThread TIMER_THREAD = new TimerThread();
static {
TIMER_THREAD.start();
}
private final long t0;
private final long timeout;
private final Collector collector; private final Collector collector;
private final Counter clock;
private final long ticksAllowed;
private boolean greedy = false;
private int docBase; private int docBase;
/** /**
@ -137,36 +71,43 @@ public class TimeLimitingCollector extends Collector {
* @param collector the wrapped {@link Collector} * @param collector the wrapped {@link Collector}
* @param timeAllowed max time allowed for collecting hits after which {@link TimeExceededException} is thrown * @param timeAllowed max time allowed for collecting hits after which {@link TimeExceededException} is thrown
*/ */
public TimeLimitingCollector(final Collector collector, final long timeAllowed ) { public TimeLimitingCollector(final Collector collector, Counter clock, final long ticksAllowed ) {
this.collector = collector; this.collector = collector;
t0 = TIMER_THREAD.getMilliseconds(); this.clock = clock;
this.timeout = t0 + timeAllowed; this.ticksAllowed = ticksAllowed;
} }
/** /**
* Return the timer resolution. * Sets the baseline for this collector. By default the collectors baseline is
* @see #setResolution(long) * initialized once the first reader is passed to
* {@link #setNextReader(AtomicReaderContext)}. To include operations executed
* in prior to the actual document collection set the baseline through this method
* in your prelude.
* <p>
* Example usage:
* <pre>
* Counter clock = ...;
* long baseline = clock.get();
* // ... prepare search
* TimeLimitingCollector collector = new TimeLimitingCollector(c, clock, numTicks);
* collector.setBaseline(baseline);
* indexSearcher.search(query, collector);
* </pre>
* </p>
* @see #setBaseline()
* @param clockTime
*/ */
public static long getResolution() { public void setBaseline(long clockTime) {
return resolution; t0 = clockTime;
timeout = t0 + ticksAllowed;
} }
/** /**
* Set the timer resolution. * Syntactic sugar for {@link #setBaseline(long)} using {@link Counter#get()}
* The default timer resolution is 20 milliseconds. * on the clock passed to the construcutor.
* This means that a search required to take no longer than
* 800 milliseconds may be stopped after 780 to 820 milliseconds.
* <br>Note that:
* <ul>
* <li>Finer (smaller) resolution is more accurate but less efficient.</li>
* <li>Setting resolution to less than 5 milliseconds will be silently modified to 5 milliseconds.</li>
* <li>Setting resolution smaller than current resolution might take effect only after current
* resolution. (Assume current resolution of 20 milliseconds is modified to 5 milliseconds,
* then it can take up to 20 milliseconds for the change to have effect.</li>
* </ul>
*/ */
public static void setResolution(long newResolution) { public void setBaseline() {
resolution = Math.max(newResolution,5); // 5 milliseconds is about the minimum reasonable time for a Object.wait(long) call. setBaseline(clock.get());
} }
/** /**
@ -199,7 +140,7 @@ public class TimeLimitingCollector extends Collector {
*/ */
@Override @Override
public void collect(final int doc) throws IOException { public void collect(final int doc) throws IOException {
long time = TIMER_THREAD.getMilliseconds(); final long time = clock.get();
if (timeout < time) { if (timeout < time) {
if (greedy) { if (greedy) {
//System.out.println(this+" greedy: before failing, collecting doc: "+(docBase + doc)+" "+(time-t0)); //System.out.println(this+" greedy: before failing, collecting doc: "+(docBase + doc)+" "+(time-t0));
@ -216,6 +157,9 @@ public class TimeLimitingCollector extends Collector {
public void setNextReader(AtomicReaderContext context) throws IOException { public void setNextReader(AtomicReaderContext context) throws IOException {
collector.setNextReader(context); collector.setNextReader(context);
this.docBase = context.docBase; this.docBase = context.docBase;
if (Long.MIN_VALUE == t0) {
setBaseline();
}
} }
@Override @Override
@ -228,4 +172,131 @@ public class TimeLimitingCollector extends Collector {
return collector.acceptsDocsOutOfOrder(); return collector.acceptsDocsOutOfOrder();
} }
/**
* Returns the global TimerThreads {@link Counter}
* <p>
* Invoking this creates may create a new instance of {@link TimerThread} iff
* the global {@link TimerThread} has never been accessed before. The thread
* returned from this method is started on creation and will be alive unless
* you stop the {@link TimerThread} via {@link TimerThread#stopTimer()}.
* </p>
* @return the global TimerThreads {@link Counter}
* @lucene.experimental
*/
public static Counter getGlobalCounter() {
return TimerThreadHolder.THREAD.counter;
}
/**
* Returns the global {@link TimerThread}.
* <p>
* Invoking this creates may create a new instance of {@link TimerThread} iff
* the global {@link TimerThread} has never been accessed before. The thread
* returned from this method is started on creation and will be alive unless
* you stop the {@link TimerThread} via {@link TimerThread#stopTimer()}.
* </p>
*
* @return the global {@link TimerThread}
* @lucene.experimental
*/
public static TimerThread getGlobalTimerThread() {
return TimerThreadHolder.THREAD;
}
private static final class TimerThreadHolder {
static final TimerThread THREAD;
static {
THREAD = new TimerThread(Counter.newCounter(true));
THREAD.start();
}
}
/**
* @lucene.experimental
*/
public static final class TimerThread extends Thread {
public static final String THREAD_NAME = "TimeLimitedCollector timer thread";
public static final int DEFAULT_RESOLUTION = 20;
// NOTE: we can avoid explicit synchronization here for several reasons:
// * updates to volatile long variables are atomic
// * only single thread modifies this value
// * use of volatile keyword ensures that it does not reside in
// a register, but in main memory (so that changes are visible to
// other threads).
// * visibility of changes does not need to be instantaneous, we can
// afford losing a tick or two.
//
// See section 17 of the Java Language Specification for details.
private volatile long time = 0;
private volatile boolean stop = false;
private volatile long resolution;
final Counter counter;
public TimerThread(long resolution, Counter counter) {
super(THREAD_NAME);
this.resolution = resolution;
this.counter = counter;
this.setDaemon(true);
}
public TimerThread(Counter counter) {
this(DEFAULT_RESOLUTION, counter);
}
@Override
public void run() {
while (!stop) {
// TODO: Use System.nanoTime() when Lucene moves to Java SE 5.
counter.addAndGet(resolution);
try {
Thread.sleep( resolution );
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
/**
* Get the timer value in milliseconds.
*/
public long getMilliseconds() {
return time;
}
/**
* Stops the timer thread
*/
public void stopTimer() {
stop = true;
}
/**
* Return the timer resolution.
* @see #setResolution(long)
*/
public long getResolution() {
return resolution;
}
/**
* Set the timer resolution.
* The default timer resolution is 20 milliseconds.
* This means that a search required to take no longer than
* 800 milliseconds may be stopped after 780 to 820 milliseconds.
* <br>Note that:
* <ul>
* <li>Finer (smaller) resolution is more accurate but less efficient.</li>
* <li>Setting resolution to less than 5 milliseconds will be silently modified to 5 milliseconds.</li>
* <li>Setting resolution smaller than current resolution might take effect only after current
* resolution. (Assume current resolution of 20 milliseconds is modified to 5 milliseconds,
* then it can take up to 20 milliseconds for the change to have effect.</li>
* </ul>
*/
public void setResolution(long resolution) {
this.resolution = Math.max(resolution, 5); // 5 milliseconds is about the minimum reasonable time for a Object.wait(long) call.
}
}
} }

View File

@ -77,7 +77,7 @@ public abstract class Counter {
} }
private final static class AtomicCounter extends Counter { private final static class AtomicCounter extends Counter {
private AtomicLong count; private final AtomicLong count = new AtomicLong();
@Override @Override
public long addAndGet(long delta) { public long addAndGet(long delta) {

View File

@ -28,17 +28,17 @@ import org.apache.lucene.index.IndexReader.AtomicReaderContext;
import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.TimeLimitingCollector.TimeExceededException; import org.apache.lucene.search.TimeLimitingCollector.TimeExceededException;
import org.apache.lucene.search.TimeLimitingCollector.TimerThread;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
import org.junit.Ignore;
/** /**
* Tests the {@link TimeLimitingCollector}. This test checks (1) search * Tests the {@link TimeLimitingCollector}. This test checks (1) search
* correctness (regardless of timeout), (2) expected timeout behavior, * correctness (regardless of timeout), (2) expected timeout behavior,
* and (3) a sanity test with multiple searching threads. * and (3) a sanity test with multiple searching threads.
*/ */
@Ignore("broken: see https://issues.apache.org/jira/browse/LUCENE-2822")
public class TestTimeLimitingCollector extends LuceneTestCase { public class TestTimeLimitingCollector extends LuceneTestCase {
private static final int SLOW_DOWN = 3; private static final int SLOW_DOWN = 3;
private static final long TIME_ALLOWED = 17 * SLOW_DOWN; // so searches can find about 17 docs. private static final long TIME_ALLOWED = 17 * SLOW_DOWN; // so searches can find about 17 docs.
@ -57,6 +57,8 @@ public class TestTimeLimitingCollector extends LuceneTestCase {
private final String FIELD_NAME = "body"; private final String FIELD_NAME = "body";
private Query query; private Query query;
private Counter counter;
private TimerThread counterThread;
/** /**
* initializes searcher with a document set * initializes searcher with a document set
@ -64,6 +66,9 @@ public class TestTimeLimitingCollector extends LuceneTestCase {
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
counter = Counter.newCounter(true);
counterThread = new TimerThread(counter);
counterThread.start();
final String docText[] = { final String docText[] = {
"docThatNeverMatchesSoWeCanRequireLastDocCollectedToBeGreaterThanZero", "docThatNeverMatchesSoWeCanRequireLastDocCollectedToBeGreaterThanZero",
"one blah three", "one blah three",
@ -98,7 +103,6 @@ public class TestTimeLimitingCollector extends LuceneTestCase {
// warm the searcher // warm the searcher
searcher.search(query, null, 1000); searcher.search(query, null, 1000);
} }
@Override @Override
@ -106,6 +110,8 @@ public class TestTimeLimitingCollector extends LuceneTestCase {
searcher.close(); searcher.close();
reader.close(); reader.close();
directory.close(); directory.close();
counterThread.stopTimer();
counterThread.join();
super.tearDown(); super.tearDown();
} }
@ -147,7 +153,7 @@ public class TestTimeLimitingCollector extends LuceneTestCase {
} }
private Collector createTimedCollector(MyHitCollector hc, long timeAllowed, boolean greedy) { private Collector createTimedCollector(MyHitCollector hc, long timeAllowed, boolean greedy) {
TimeLimitingCollector res = new TimeLimitingCollector(hc, timeAllowed); TimeLimitingCollector res = new TimeLimitingCollector(hc, counter, timeAllowed);
res.setGreedy(greedy); // set to true to make sure at least one doc is collected. res.setGreedy(greedy); // set to true to make sure at least one doc is collected.
return res; return res;
} }
@ -199,8 +205,8 @@ public class TestTimeLimitingCollector extends LuceneTestCase {
// verify that elapsed time at exception is within valid limits // verify that elapsed time at exception is within valid limits
assertEquals( timoutException.getTimeAllowed(), TIME_ALLOWED); assertEquals( timoutException.getTimeAllowed(), TIME_ALLOWED);
// a) Not too early // a) Not too early
assertTrue ( "elapsed="+timoutException.getTimeElapsed()+" <= (allowed-resolution)="+(TIME_ALLOWED-TimeLimitingCollector.getResolution()), assertTrue ( "elapsed="+timoutException.getTimeElapsed()+" <= (allowed-resolution)="+(TIME_ALLOWED-counterThread.getResolution()),
timoutException.getTimeElapsed() > TIME_ALLOWED-TimeLimitingCollector.getResolution()); timoutException.getTimeElapsed() > TIME_ALLOWED-counterThread.getResolution());
// b) Not too late. // b) Not too late.
// This part is problematic in a busy test system, so we just print a warning. // This part is problematic in a busy test system, so we just print a warning.
// We already verified that a timeout occurred, we just can't be picky about how long it took. // We already verified that a timeout occurred, we just can't be picky about how long it took.
@ -215,7 +221,7 @@ public class TestTimeLimitingCollector extends LuceneTestCase {
} }
private long maxTime(boolean multiThreaded) { private long maxTime(boolean multiThreaded) {
long res = 2 * TimeLimitingCollector.getResolution() + TIME_ALLOWED + SLOW_DOWN; // some slack for less noise in this test long res = 2 * counterThread.getResolution() + TIME_ALLOWED + SLOW_DOWN; // some slack for less noise in this test
if (multiThreaded) { if (multiThreaded) {
res *= MULTI_THREAD_SLACK; // larger slack res *= MULTI_THREAD_SLACK; // larger slack
} }
@ -226,7 +232,7 @@ public class TestTimeLimitingCollector extends LuceneTestCase {
String s = String s =
"( " + "( " +
"2*resolution + TIME_ALLOWED + SLOW_DOWN = " + "2*resolution + TIME_ALLOWED + SLOW_DOWN = " +
"2*" + TimeLimitingCollector.getResolution() + " + " + TIME_ALLOWED + " + " + SLOW_DOWN + "2*" + counterThread.getResolution() + " + " + TIME_ALLOWED + " + " + SLOW_DOWN +
")"; ")";
if (multiThreaded) { if (multiThreaded) {
s = MULTI_THREAD_SLACK + " * "+s; s = MULTI_THREAD_SLACK + " * "+s;
@ -240,22 +246,22 @@ public class TestTimeLimitingCollector extends LuceneTestCase {
public void testModifyResolution() { public void testModifyResolution() {
try { try {
// increase and test // increase and test
long resolution = 20 * TimeLimitingCollector.DEFAULT_RESOLUTION; //400 long resolution = 20 * TimerThread.DEFAULT_RESOLUTION; //400
TimeLimitingCollector.setResolution(resolution); counterThread.setResolution(resolution);
assertEquals(resolution, TimeLimitingCollector.getResolution()); assertEquals(resolution, counterThread.getResolution());
doTestTimeout(false,true); doTestTimeout(false,true);
// decrease much and test // decrease much and test
resolution = 5; resolution = 5;
TimeLimitingCollector.setResolution(resolution); counterThread.setResolution(resolution);
assertEquals(resolution, TimeLimitingCollector.getResolution()); assertEquals(resolution, counterThread.getResolution());
doTestTimeout(false,true); doTestTimeout(false,true);
// return to default and test // return to default and test
resolution = TimeLimitingCollector.DEFAULT_RESOLUTION; resolution = TimerThread.DEFAULT_RESOLUTION;
TimeLimitingCollector.setResolution(resolution); counterThread.setResolution(resolution);
assertEquals(resolution, TimeLimitingCollector.getResolution()); assertEquals(resolution, counterThread.getResolution());
doTestTimeout(false,true); doTestTimeout(false,true);
} finally { } finally {
TimeLimitingCollector.setResolution(TimeLimitingCollector.DEFAULT_RESOLUTION); counterThread.setResolution(TimerThread.DEFAULT_RESOLUTION);
} }
} }

View File

@ -1294,7 +1294,7 @@ public class SolrIndexSearcher extends IndexSearcher implements SolrInfoMBean {
} }
if( timeAllowed > 0 ) { if( timeAllowed > 0 ) {
collector = new TimeLimitingCollector(collector, timeAllowed); collector = new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeAllowed);
} }
if (pf.postFilter != null) { if (pf.postFilter != null) {
pf.postFilter.setLastDelegate(collector); pf.postFilter.setLastDelegate(collector);
@ -1323,7 +1323,7 @@ public class SolrIndexSearcher extends IndexSearcher implements SolrInfoMBean {
} }
Collector collector = topCollector; Collector collector = topCollector;
if( timeAllowed > 0 ) { if( timeAllowed > 0 ) {
collector = new TimeLimitingCollector(collector, timeAllowed); collector = new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeAllowed);
} }
if (pf.postFilter != null) { if (pf.postFilter != null) {
pf.postFilter.setLastDelegate(collector); pf.postFilter.setLastDelegate(collector);
@ -1413,7 +1413,7 @@ public class SolrIndexSearcher extends IndexSearcher implements SolrInfoMBean {
} }
if( timeAllowed > 0 ) { if( timeAllowed > 0 ) {
collector = new TimeLimitingCollector(collector, timeAllowed); collector = new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeAllowed);
} }
if (pf.postFilter != null) { if (pf.postFilter != null) {
pf.postFilter.setLastDelegate(collector); pf.postFilter.setLastDelegate(collector);
@ -1449,7 +1449,7 @@ public class SolrIndexSearcher extends IndexSearcher implements SolrInfoMBean {
Collector collector = setCollector; Collector collector = setCollector;
if( timeAllowed > 0 ) { if( timeAllowed > 0 ) {
collector = new TimeLimitingCollector(collector, timeAllowed ); collector = new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeAllowed );
} }
if (pf.postFilter != null) { if (pf.postFilter != null) {
pf.postFilter.setLastDelegate(collector); pf.postFilter.setLastDelegate(collector);