diff --git a/contrib/benchmark/CHANGES.txt b/contrib/benchmark/CHANGES.txt
index 5be4dfb8b81..961138085c5 100644
--- a/contrib/benchmark/CHANGES.txt
+++ b/contrib/benchmark/CHANGES.txt
@@ -4,6 +4,16 @@ The Benchmark contrib package contains code for benchmarking Lucene in a variety
$Id:$
+11/13/2009
+ LUCENE-2050: Added ability to run tasks within a serial sequence in
+ the background, by appending "&". The tasks are stopped & joined at
+ the end of the sequence. Also added Wait and RollbackIndex tasks.
+ Genericized NearRealTimeReaderTask to only reopen the reader
+ (previously it spawned its own thread, and also did searching).
+ Also changed the API of PerfRunData.getIndexReader: it now returns a
+ reference, and it's your job to decRef the reader when you're done
+ using it. (Mike McCandless)
+
11/12/2009
LUCENE-2059: allow TrecContentSource not to change the docname.
Previously, it would always append the iteration # to the docname.
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/PerfRunData.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/PerfRunData.java
index be2eb2651db..06edf9f6bc0 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/PerfRunData.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/PerfRunData.java
@@ -173,25 +173,41 @@ public class PerfRunData {
}
/**
- * @return Returns the indexReader.
+ * @return Returns the indexReader. NOTE: this returns a
+ * reference. You must call IndexReader.decRef() when
+ * you're done.
*/
- public IndexReader getIndexReader() {
+ public synchronized IndexReader getIndexReader() {
+ if (indexReader != null) {
+ indexReader.incRef();
+ }
return indexReader;
}
/**
- * @return Returns the indexSearcher.
+ * @return Returns the indexSearcher. NOTE: this returns
+ * a reference to the underlying IndexReader. You must
+ * call IndexReader.decRef() when you're done.
*/
- public IndexSearcher getIndexSearcher() {
+ public synchronized IndexSearcher getIndexSearcher() {
+ if (indexReader != null) {
+ indexReader.incRef();
+ }
return indexSearcher;
}
/**
* @param indexReader The indexReader to set.
*/
- public void setIndexReader(IndexReader indexReader) {
+ public synchronized void setIndexReader(IndexReader indexReader) throws IOException {
+ if (this.indexReader != null) {
+ // Release current IR
+ this.indexReader.decRef();
+ }
this.indexReader = indexReader;
if (indexReader != null) {
+ // Hold reference to new IR
+ indexReader.incRef();
indexSearcher = new IndexSearcher(indexReader);
} else {
indexSearcher = null;
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CloseReaderTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CloseReaderTask.java
index 635fff6152e..85c4d65cc0d 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CloseReaderTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CloseReaderTask.java
@@ -35,11 +35,12 @@ public class CloseReaderTask extends PerfTask {
@Override
public int doLogic() throws IOException {
- IndexReader reader= getRunData().getIndexReader();
- if (reader!=null) {
- reader.close();
- }
+ IndexReader reader = getRunData().getIndexReader();
getRunData().setIndexReader(null);
+ if (reader.getRefCount() != 1) {
+ System.out.println("WARNING: CloseReader: reference count is currently " + reader.getRefCount());
+ }
+ reader.decRef();
return 1;
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java
index ba6b99ffd10..63a52ff5283 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CommitIndexTask.java
@@ -54,6 +54,7 @@ public class CommitIndexTask extends PerfTask {
IndexReader r = getRunData().getIndexReader();
if (r != null) {
r.commit(commitUserData);
+ r.decRef();
} else {
throw new IllegalStateException("neither IndexWriter nor IndexReader is currently open");
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
index 666ccb5cac9..5c8cfc92719 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
@@ -127,11 +127,10 @@ public class CreateIndexTask extends PerfTask {
PerfRunData runData = getRunData();
Config config = runData.getConfig();
- IndexDeletionPolicy indexDeletionPolicy = getIndexDeletionPolicy(config);
-
IndexWriter writer = new IndexWriter(runData.getDirectory(),
runData.getAnalyzer(),
- true, indexDeletionPolicy,
+ true,
+ getIndexDeletionPolicy(config),
IndexWriter.MaxFieldLength.LIMITED);
setIndexWriterConfig(writer, config);
runData.setIndexWriter(writer);
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java
index 8a052da32ec..a801ceed696 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteByPercentTask.java
@@ -88,6 +88,7 @@ public class DeleteByPercentTask extends PerfTask {
termDocs.close();
}
System.out.println("--> processed (delete) " + numDeleted + " docs");
+ r.decRef();
return numDeleted;
}
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteDocTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteDocTask.java
index 67a00c5a780..52bf50149be 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteDocTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/DeleteDocTask.java
@@ -18,6 +18,7 @@ package org.apache.lucene.benchmark.byTask.tasks;
*/
import org.apache.lucene.benchmark.byTask.PerfRunData;
+import org.apache.lucene.index.IndexReader;
/**
* Delete a document by docid. If no docid param is supplied, deletes doc with
@@ -42,8 +43,10 @@ public class DeleteDocTask extends PerfTask {
@Override
public int doLogic() throws Exception {
- getRunData().getIndexReader().deleteDocument(docid);
+ IndexReader r = getRunData().getIndexReader();
+ r.deleteDocument(docid);
lastDeleted = docid;
+ r.decRef();
return 1; // one work item done here
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushReaderTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushReaderTask.java
index ce1adc9a847..1a9d8ec5f21 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushReaderTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushReaderTask.java
@@ -52,6 +52,7 @@ public class FlushReaderTask extends PerfTask {
} else {
reader.flush();
}
+ reader.decRef();
return 1;
}
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java
index a89e9195406..0d78a1306a3 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/NearRealtimeReaderTask.java
@@ -17,18 +17,9 @@ package org.apache.lucene.benchmark.byTask.tasks;
* limitations under the License.
*/
-import java.io.IOException;
-
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TopFieldDocs;
-import org.apache.lucene.index.Term;
/**
* Spawns a BG thread that periodically (defaults to 3.0
@@ -43,97 +34,67 @@ import org.apache.lucene.index.Term;
*/
public class NearRealtimeReaderTask extends PerfTask {
- ReopenThread t;
- float pauseSec = 3.0f;
-
- private static class ReopenThread extends Thread {
-
- final IndexWriter writer;
- final int pauseMsec;
-
- public volatile boolean done;
-
- ReopenThread(IndexWriter writer, float pauseSec) {
- this.writer = writer;
- this.pauseMsec = (int) (1000*pauseSec);
- setDaemon(true);
- }
-
- @Override
- public void run() {
-
- IndexReader reader = null;
-
- final Query query = new TermQuery(new Term("body", "1"));
- final SortField sf = new SortField("docdate", SortField.LONG);
- final Sort sort = new Sort(sf);
-
- try {
- while(!done) {
- final long t0 = System.currentTimeMillis();
- if (reader == null) {
- reader = writer.getReader();
- } else {
- final IndexReader newReader = reader.reopen();
- if (reader != newReader) {
- reader.close();
- reader = newReader;
- }
- }
-
- final long t1 = System.currentTimeMillis();
- final TopFieldDocs hits = new IndexSearcher(reader).search(query, null, 10, sort);
- final long t2 = System.currentTimeMillis();
- System.out.println("nrt: open " + (t1-t0) + " msec; search " + (t2-t1) + " msec, " + hits.totalHits +
- " results; " + reader.numDocs() + " docs");
-
- final long t4 = System.currentTimeMillis();
- final int delay = (int) (pauseMsec - (t4-t0));
- if (delay > 0) {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
+ long pauseMSec = 3000L;
public NearRealtimeReaderTask(PerfRunData runData) {
super(runData);
}
@Override
- public int doLogic() throws IOException {
- if (t == null) {
- IndexWriter w = getRunData().getIndexWriter();
- t = new ReopenThread(w, pauseSec);
- t.start();
+ public int doLogic() throws Exception {
+
+ final PerfRunData runData = getRunData();
+
+ // Get initial reader
+ IndexWriter w = runData.getIndexWriter();
+ if (w == null) {
+ throw new RuntimeException("please open the writer before invoking NearRealtimeReader");
}
- return 1;
+
+ if (runData.getIndexReader() != null) {
+ throw new RuntimeException("please close the existing reader before invoking NearRealtimeReader");
+ }
+
+ long t = System.currentTimeMillis();
+ IndexReader r = w.getReader();
+ runData.setIndexReader(r);
+ // Transfer our reference to runData
+ r.decRef();
+
+ // TODO: gather basic metrics for reporting -- eg mean,
+ // stddev, min/max reopen latencies
+
+ // Parent sequence sets stopNow
+ int reopenCount = 0;
+ while(!stopNow) {
+ long waitForMsec = (long) (pauseMSec - (System.currentTimeMillis() - t));
+ if (waitForMsec > 0) {
+ Thread.sleep(waitForMsec);
+ }
+
+ t = System.currentTimeMillis();
+ final IndexReader newReader = r.reopen();
+ if (r != newReader) {
+ // TODO: somehow we need to enable warming, here
+ runData.setIndexReader(newReader);
+ // Transfer our reference to runData
+ newReader.decRef();
+ r = newReader;
+ reopenCount++;
+ }
+ }
+
+ return reopenCount;
}
@Override
public void setParams(String params) {
super.setParams(params);
- pauseSec = Float.parseFloat(params);
+ pauseMSec = (long) (1000.0*Float.parseFloat(params));
}
@Override
public boolean supportsParams() {
return true;
}
-
- // Close the thread
- @Override
- public void close() throws InterruptedException {
- if (t != null) {
- t.done = true;
- t.join();
- }
- }
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java
index 0a7cb30bc72..c82cbba24dc 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenIndexTask.java
@@ -20,6 +20,7 @@ package org.apache.lucene.benchmark.byTask.tasks;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.LogMergePolicy;
import java.io.IOException;
@@ -30,6 +31,11 @@ import java.io.IOException;
*
Other side effects: index writer object in perfRunData is set.
*
Relevant properties: merge.factor, max.buffered,
* max.field.length, ram.flush.mb [default 0]
.
+ *
+ *
Accepts a param specifying the commit point as
+ * previously saved with CommitIndexTask. If you specify
+ * this, it rolls the index back to that commit on opening
+ * the IndexWriter.
*/
public class OpenIndexTask extends PerfTask {
@@ -37,6 +43,7 @@ public class OpenIndexTask extends PerfTask {
public static final int DEFAULT_MAX_FIELD_LENGTH = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
public static final int DEFAULT_MERGE_PFACTOR = LogMergePolicy.DEFAULT_MERGE_FACTOR;
public static final double DEFAULT_RAM_FLUSH_MB = (int) IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB;
+ private String commitUserData;
public OpenIndexTask(PerfRunData runData) {
super(runData);
@@ -46,12 +53,34 @@ public class OpenIndexTask extends PerfTask {
public int doLogic() throws IOException {
PerfRunData runData = getRunData();
Config config = runData.getConfig();
+ final IndexCommit ic;
+ if (commitUserData != null) {
+ ic = OpenReaderTask.findIndexCommit(runData.getDirectory(), commitUserData);
+ } else {
+ ic = null;
+ }
+
IndexWriter writer = new IndexWriter(runData.getDirectory(),
runData.getAnalyzer(),
- false,
- IndexWriter.MaxFieldLength.UNLIMITED);
+ CreateIndexTask.getIndexDeletionPolicy(config),
+ IndexWriter.MaxFieldLength.UNLIMITED,
+ ic);
CreateIndexTask.setIndexWriterConfig(writer, config);
runData.setIndexWriter(writer);
return 1;
}
+
+ @Override
+ public void setParams(String params) {
+ super.setParams(params);
+ if (params != null) {
+ // specifies which commit point to open
+ commitUserData = params;
+ }
+ }
+
+ @Override
+ public boolean supportsParams() {
+ return true;
+ }
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java
index a21ed7f8645..8bc1c94cc17 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/OpenReaderTask.java
@@ -47,35 +47,27 @@ public class OpenReaderTask extends PerfTask {
Directory dir = getRunData().getDirectory();
Config config = getRunData().getConfig();
IndexReader r = null;
- if (commitUserData != null) {
- r = openCommitPoint(commitUserData, dir, config, readOnly);
+ final IndexDeletionPolicy deletionPolicy;
+ if (readOnly) {
+ deletionPolicy = null;
} else {
- IndexDeletionPolicy indexDeletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config);
- r = IndexReader.open(dir, indexDeletionPolicy, readOnly);
+ deletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config);
+ }
+ if (commitUserData != null) {
+ r = IndexReader.open(OpenReaderTask.findIndexCommit(dir, commitUserData),
+ deletionPolicy,
+ readOnly);
+ } else {
+ r = IndexReader.open(dir,
+ deletionPolicy,
+ readOnly);
}
getRunData().setIndexReader(r);
+ // We transfer reference to the run data
+ r.decRef();
return 1;
}
- public static IndexReader openCommitPoint(String userData, Directory dir, Config config, boolean readOnly) throws IOException {
- IndexReader r = null;
- Collection commits = IndexReader.listCommits(dir);
- for (final IndexCommit ic : commits) {
- Map map = ic.getUserData();
- String ud = null;
- if (map != null) {
- ud = map.get(USER_DATA);
- }
- if (ud != null && ud.equals(userData)) {
- IndexDeletionPolicy indexDeletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config);
- r = IndexReader.open(ic, indexDeletionPolicy, readOnly);
- break;
- }
- }
- if (r == null) throw new IOException("cannot find commitPoint userData:"+userData);
- return r;
- }
-
@Override
public void setParams(String params) {
super.setParams(params);
@@ -94,4 +86,20 @@ public class OpenReaderTask extends PerfTask {
public boolean supportsParams() {
return true;
}
+
+ public static IndexCommit findIndexCommit(Directory dir, String userData) throws IOException {
+ Collection commits = IndexReader.listCommits(dir);
+ for (final IndexCommit ic : commits) {
+ Map map = ic.getUserData();
+ String ud = null;
+ if (map != null) {
+ ud = map.get(USER_DATA);
+ }
+ if (ud != null && ud.equals(userData)) {
+ return ic;
+ }
+ }
+
+ throw new IOException("index does not contain commit with userData: " + userData);
+ }
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java
index a8b85ecf0e8..ccc5f78baa8 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PerfTask.java
@@ -59,7 +59,9 @@ public abstract class PerfTask implements Cloneable {
private int maxDepthLogStart = 0;
private boolean disableCounting = false;
protected String params = null;
-
+
+ private boolean runInBackground;
+
protected static final String NEW_LINE = System.getProperty("line.separator");
/** Should not be used externally */
@@ -69,7 +71,21 @@ public abstract class PerfTask implements Cloneable {
name = name.substring(0, name.length() - 4);
}
}
-
+
+ public void setRunInBackground() {
+ runInBackground = true;
+ }
+
+ public boolean getRunInBackground() {
+ return runInBackground;
+ }
+
+ protected volatile boolean stopNow;
+
+ public void stopNow() {
+ stopNow = true;
+ }
+
public PerfTask(PerfRunData runData) {
this();
this.runData = runData;
@@ -112,9 +128,7 @@ public abstract class PerfTask implements Cloneable {
* @return number of work items done by this task.
*/
public final int runAndMaybeStats(boolean reportStats) throws Exception {
- if (reportStats && depth <= maxDepthLogStart && !shouldNeverLogAtStart()) {
- System.out.println("------------> starting task: " + getName());
- }
+ stopNow = false;
if (!reportStats || shouldNotRecordStats()) {
setup();
int count = doLogic();
@@ -122,9 +136,12 @@ public abstract class PerfTask implements Cloneable {
tearDown();
return count;
}
+ if (reportStats && depth <= maxDepthLogStart && !shouldNeverLogAtStart()) {
+ System.out.println("------------> starting task: " + getName());
+ }
setup();
Points pnts = runData.getPoints();
- TaskStats ts = pnts.markTaskStart(this,runData.getConfig().getRoundNumber());
+ TaskStats ts = pnts.markTaskStart(this, runData.getConfig().getRoundNumber());
int count = doLogic();
count = disableCounting ? 0 : count;
pnts.markTaskEnd(ts, count);
@@ -197,6 +214,9 @@ public abstract class PerfTask implements Cloneable {
sb.append('-');
}
sb.append(getName());
+ if (getRunInBackground()) {
+ sb.append(" &");
+ }
return sb.toString();
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java
index f7d3bf6dcd6..f8d9f96eae1 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/PrintReaderTask.java
@@ -18,7 +18,6 @@ package org.apache.lucene.benchmark.byTask.tasks;
*/
import org.apache.lucene.benchmark.byTask.PerfRunData;
-import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
@@ -43,12 +42,13 @@ public class PrintReaderTask extends PerfTask {
@Override
public int doLogic() throws Exception {
Directory dir = getRunData().getDirectory();
- Config config = getRunData().getConfig();
IndexReader r = null;
if (userData == null)
r = IndexReader.open(dir, true);
else
- r = OpenReaderTask.openCommitPoint(userData, dir, config, true);
+ r = IndexReader.open(OpenReaderTask.findIndexCommit(dir, userData),
+ null,
+ true);
System.out.println("--> numDocs:"+r.numDocs()+" dels:"+r.numDeletedDocs());
r.close();
return 1;
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReadTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReadTask.java
index 7b77439328d..9a27473177f 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReadTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReadTask.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.QueryMaker;
import org.apache.lucene.document.Document;
@@ -35,11 +34,10 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.MultiTermQuery;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Weight;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.highlight.Highlighter;
-import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
import org.apache.lucene.store.Directory;
@@ -60,29 +58,44 @@ import org.apache.lucene.store.Directory;
*/
public abstract class ReadTask extends PerfTask {
+ private final QueryMaker queryMaker;
+
public ReadTask(PerfRunData runData) {
super(runData);
+ if (withSearch()) {
+ queryMaker = getQueryMaker();
+ } else {
+ queryMaker = null;
+ }
}
@Override
public int doLogic() throws Exception {
int res = 0;
- boolean closeReader = false;
// open reader or use existing one
- IndexReader ir = getRunData().getIndexReader();
- if (ir == null) {
+ IndexSearcher searcher = getRunData().getIndexSearcher();
+
+ IndexReader reader;
+
+ final boolean closeSearcher;
+ if (searcher == null) {
+ // open our own reader
Directory dir = getRunData().getDirectory();
- ir = IndexReader.open(dir, true);
- closeReader = true;
- //res++; //this is confusing, comment it out
+ reader = IndexReader.open(dir, true);
+ searcher = new IndexSearcher(reader);
+ closeSearcher = true;
+ } else {
+ // use existing one; this passes +1 ref to us
+ reader = searcher.getIndexReader();
+ closeSearcher = false;
}
// optionally warm and add num docs traversed to count
if (withWarm()) {
Document doc = null;
- for (int m = 0; m < ir.maxDoc(); m++) {
- if (!ir.isDeleted(m)) {
- doc = ir.document(m);
+ for (int m = 0; m < reader.maxDoc(); m++) {
+ if (!reader.isDeleted(m)) {
+ doc = reader.document(m);
res += (doc == null ? 0 : 1);
}
}
@@ -90,24 +103,18 @@ public abstract class ReadTask extends PerfTask {
if (withSearch()) {
res++;
- final IndexSearcher searcher;
- if (closeReader) {
- searcher = new IndexSearcher(ir);
- } else {
- searcher = getRunData().getIndexSearcher();
- }
- QueryMaker queryMaker = getQueryMaker();
Query q = queryMaker.makeQuery();
Sort sort = getSort();
TopDocs hits;
final int numHits = numHits();
if (numHits > 0) {
if (sort != null) {
- // TODO: change the following to create TFC with in/out-of order
- // according to whether the query's Scorer.
+ Weight w = q.weight(searcher);
TopFieldCollector collector = TopFieldCollector.create(sort, numHits,
- true, withScore(), withMaxScore(), false);
- searcher.search(q, collector);
+ true, withScore(),
+ withMaxScore(),
+ !w.scoresDocsOutOfOrder());
+ searcher.search(w, null, collector);
hits = collector.topDocs();
} else {
hits = searcher.search(q, numHits);
@@ -115,22 +122,19 @@ public abstract class ReadTask extends PerfTask {
final String printHitsField = getRunData().getConfig().get("print.hits.field", null);
if (printHitsField != null && printHitsField.length() > 0) {
- final IndexReader r = searcher.getIndexReader();
if (q instanceof MultiTermQuery) {
System.out.println("MultiTermQuery term count = " + ((MultiTermQuery) q).getTotalNumberOfTerms());
}
System.out.println("totalHits = " + hits.totalHits);
- System.out.println("maxDoc() = " + r.maxDoc());
- System.out.println("numDocs() = " + r.numDocs());
+ System.out.println("maxDoc() = " + reader.maxDoc());
+ System.out.println("numDocs() = " + reader.numDocs());
for(int i=0;i 0 && m < numHighlight) {
Collection fieldsToHighlight = getFieldsToHighlight(document);
for (final String field : fieldsToHighlight) {
String text = document.get(field);
- res += highlighter.doHighlight(ir, id, field, document, analyzer, text);
+ res += highlighter.doHighlight(reader, id, field, document, analyzer, text);
}
}
}
@@ -161,12 +165,13 @@ public abstract class ReadTask extends PerfTask {
}
}
}
-
- searcher.close();
}
- if (closeReader) {
- ir.close();
+ if (closeSearcher) {
+ searcher.close();
+ } else {
+ // Release our +1 ref from above
+ reader.decRef();
}
return res;
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java
index a8fa2e12e65..10198c51fd6 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ReopenReaderTask.java
@@ -33,13 +33,13 @@ public class ReopenReaderTask extends PerfTask {
@Override
public int doLogic() throws IOException {
- IndexReader ir = getRunData().getIndexReader();
- IndexReader or = ir;
- IndexReader nr = ir.reopen();
- if(nr != or) {
+ IndexReader r = getRunData().getIndexReader();
+ IndexReader nr = r.reopen();
+ if (nr != r) {
getRunData().setIndexReader(nr);
- or.close();
+ nr.decRef();
}
+ r.decRef();
return 1;
}
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/RollbackIndexTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/RollbackIndexTask.java
new file mode 100644
index 00000000000..64ee89d7a8e
--- /dev/null
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/RollbackIndexTask.java
@@ -0,0 +1,52 @@
+package org.apache.lucene.benchmark.byTask.tasks;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.lucene.benchmark.byTask.PerfRunData;
+import org.apache.lucene.index.IndexWriter;
+
+/**
+ * Rollback the index writer.
+ */
+public class RollbackIndexTask extends PerfTask {
+
+ public RollbackIndexTask(PerfRunData runData) {
+ super(runData);
+ }
+
+ boolean doWait = true;
+
+ @Override
+ public int doLogic() throws IOException {
+ IndexWriter iw = getRunData().getIndexWriter();
+ if (iw != null) {
+ // If infoStream was set to output to a file, close it.
+ PrintStream infoStream = iw.getInfoStream();
+ if (infoStream != null && infoStream != System.out
+ && infoStream != System.err) {
+ infoStream.close();
+ }
+ iw.rollback();
+ getRunData().setIndexWriter(null);
+ }
+ return 1;
+ }
+}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java
index 23afd4ef328..4f7f15c834e 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java
@@ -18,6 +18,7 @@ package org.apache.lucene.benchmark.byTask.tasks;
*/
import java.util.ArrayList;
+import java.util.List;
import java.text.NumberFormat;
import org.apache.lucene.benchmark.byTask.PerfRunData;
@@ -131,6 +132,33 @@ public class TaskSequence extends PerfTask {
return ( parallel ? doParallelTasks() : doSerialTasks());
}
+ private static class RunBackgroundTask extends Thread {
+ private final PerfTask task;
+ private final boolean letChildReport;
+ private volatile int count;
+
+ public RunBackgroundTask(PerfTask task, boolean letChildReport) {
+ this.task = task;
+ this.letChildReport = letChildReport;
+ }
+
+ public void stopNow() throws InterruptedException {
+ task.stopNow();
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void run() {
+ try {
+ count = task.runAndMaybeStats(letChildReport);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
private int doSerialTasks() throws Exception {
if (rate > 0) {
return doSerialTasksWithRate();
@@ -142,22 +170,46 @@ public class TaskSequence extends PerfTask {
final long t0 = System.currentTimeMillis();
final long runTime = (long) (runTimeSec*1000);
+ List bgTasks = null;
for (int k=0; fixedTime || (repetitions==REPEAT_EXHAUST && !exhausted) || k();
+ }
+ RunBackgroundTask bgTask = new RunBackgroundTask(task, letChildReport);
+ bgTask.start();
+ bgTasks.add(bgTask);
+ } else {
+ try {
+ count += task.runAndMaybeStats(letChildReport);
+ if (anyExhaustibleTasks)
+ updateExhausted(task);
+ } catch (NoMoreDataException e) {
+ exhausted = true;
+ }
}
+ }
if (fixedTime && System.currentTimeMillis()-t0 > runTime) {
repetitions = k+1;
break;
}
}
+
+ if (bgTasks != null) {
+ for(RunBackgroundTask bgTask : bgTasks) {
+ bgTask.stopNow();
+ }
+ for(RunBackgroundTask bgTask : bgTasks) {
+ bgTask.join();
+ count += bgTask.getCount();
+ }
+ }
return count;
}
@@ -167,12 +219,22 @@ public class TaskSequence extends PerfTask {
long nextStartTime = System.currentTimeMillis();
int count = 0;
for (int k=0; (repetitions==REPEAT_EXHAUST && !exhausted) || k 0) {
- //System.out.println("wait: "+waitMore+" for rate: "+ratePerMin+" (delayStep="+delayStep+")");
- Thread.sleep(waitMore);
+ while(!stopNow) {
+ long waitMore = nextStartTime - System.currentTimeMillis();
+ if (waitMore > 0) {
+ // TODO: better to use condition to notify
+ Thread.sleep(1);
+ } else {
+ break;
+ }
+ }
+ if (stopNow) {
+ break;
}
nextStartTime += delayStep; // this aims at avarage rate.
try {
@@ -204,46 +266,71 @@ public class TaskSequence extends PerfTask {
}
}
+ private class ParallelTask extends Thread {
+
+ public int count;
+ public final PerfTask task;
+
+ public ParallelTask(PerfTask task) {
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ try {
+ int n = task.runAndMaybeStats(letChildReport);
+ if (anyExhaustibleTasks) {
+ updateExhausted(task);
+ }
+ count += n;
+ } catch (NoMoreDataException e) {
+ exhausted = true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void stopNow() {
+ super.stopNow();
+ // Forwards top request to children
+ if (runningParallelTasks != null) {
+ for(ParallelTask t : runningParallelTasks) {
+ t.task.stopNow();
+ }
+ }
+ }
+
+ ParallelTask[] runningParallelTasks;
+
private int doParallelTasks() throws Exception {
initTasksArray();
- final int count [] = {0};
- Thread t[] = new Thread [repetitions * tasks.size()];
+ ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
// prepare threads
- int indx = 0;
+ int index = 0;
for (int k=0; k 0) {
startlThreadsWithRate(t);
return;
@@ -254,13 +341,12 @@ public class TaskSequence extends PerfTask {
}
// run threads with rate
- private void startlThreadsWithRate(Thread[] t) throws InterruptedException {
+ private void startlThreadsWithRate(ParallelTask[] t) throws InterruptedException {
long delayStep = (perMin ? 60000 : 1000) /rate;
long nextStartTime = System.currentTimeMillis();
for (int i = 0; i < t.length; i++) {
long waitMore = nextStartTime - System.currentTimeMillis();
if (waitMore > 0) {
- //System.out.println("thread wait: "+waitMore+" for rate: "+ratePerMin+" (delayStep="+delayStep+")");
Thread.sleep(waitMore);
}
nextStartTime += delayStep; // this aims at average rate of starting threads.
@@ -298,6 +384,9 @@ public class TaskSequence extends PerfTask {
if (rate>0) {
sb.append(", rate: " + rate+"/"+(perMin?"min":"sec"));
}
+ if (getRunInBackground()) {
+ sb.append(" &");
+ }
return sb.toString();
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java
index d7ee6b0ca46..759029b142b 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/UpdateDocTask.java
@@ -21,6 +21,7 @@ import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.DocMaker;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
+import org.apache.lucene.index.IndexWriter;
/**
* Update a document, using IndexWriter.updateDocument,
@@ -62,7 +63,8 @@ public class UpdateDocTask extends PerfTask {
if (docID == null) {
throw new IllegalStateException("document must define the docid field");
}
- getRunData().getIndexWriter().updateDocument(new Term(DocMaker.ID_FIELD, docID), doc);
+ final IndexWriter iw = getRunData().getIndexWriter();
+ iw.updateDocument(new Term(DocMaker.ID_FIELD, docID), doc);
return 1;
}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/WaitTask.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/WaitTask.java
new file mode 100644
index 00000000000..39d526cb3ac
--- /dev/null
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/WaitTask.java
@@ -0,0 +1,75 @@
+package org.apache.lucene.benchmark.byTask.tasks;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.benchmark.byTask.PerfRunData;
+
+/**
+ * Simply waits for the specified (via the parameter) amount
+ * of time. For example Wait(30s) waits for 30 seconds.
+ * This is useful with background tasks to control how long
+ * the tasks run.
+ *
+ *You can specify h, m, or s (hours, minutes, seconds) as
+ *the trailing time unit. No unit is interpreted as
+ *seconds.
+ */
+public class WaitTask extends PerfTask {
+
+ private double waitTimeSec;
+
+ public WaitTask(PerfRunData runData) {
+ super(runData);
+ }
+
+ @Override
+ public void setParams(String params) {
+ super.setParams(params);
+ if (params != null) {
+ int multiplier;
+ if (params.endsWith("s")) {
+ multiplier = 1;
+ params = params.substring(0, params.length()-1);
+ } else if (params.endsWith("m")) {
+ multiplier = 60;
+ params = params.substring(0, params.length()-1);
+ } else if (params.endsWith("h")) {
+ multiplier = 3600;
+ params = params.substring(0, params.length()-1);
+ } else {
+ // Assume seconds
+ multiplier = 1;
+ }
+
+ waitTimeSec = Double.parseDouble(params) * multiplier;
+ } else {
+ throw new IllegalArgumentException("you must specify the wait time, eg: 10.0s, 4.5m, 2h");
+ }
+ }
+
+ @Override
+ public int doLogic() throws Exception {
+ Thread.sleep((long) (1000*waitTimeSec));
+ return 0;
+ }
+
+ @Override
+ public boolean supportsParams() {
+ return true;
+ }
+}
diff --git a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
index 58f9ca8b0fa..2f5e3e74af0 100644
--- a/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
+++ b/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
@@ -186,6 +186,19 @@ public class Algorithm {
currSequence = seq2;
colonOk = false;
break;
+
+ case '&' :
+ if (currSequence.isParallel()) {
+ throw new Exception("Can only create background tasks within a serial task");
+ }
+ if (prevTask == null) {
+ throw new Exception("& was unexpected");
+ } else if (prevTask.getRunInBackground()) {
+ throw new Exception("double & was unexpected");
+ } else {
+ prevTask.setRunInBackground();
+ }
+ break;
case '>' :
currSequence.setNoChildReport();