LUCENE-2050: enhance contrib/benchmark for running NRT tests

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@836154 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2009-11-14 11:14:18 +00:00
parent c35c0ec4fe
commit 443d0093e9
20 changed files with 494 additions and 208 deletions

View File

@ -4,6 +4,16 @@ The Benchmark contrib package contains code for benchmarking Lucene in a variety
$Id:$
11/13/2009
LUCENE-2050: Added ability to run tasks within a serial sequence in
the background, by appending "&". The tasks are stopped & joined at
the end of the sequence. Also added Wait and RollbackIndex tasks.
Genericized NearRealTimeReaderTask to only reopen the reader
(previously it spawned its own thread, and also did searching).
Also changed the API of PerfRunData.getIndexReader: it now returns a
reference, and it's your job to decRef the reader when you're done
using it. (Mike McCandless)
11/12/2009
LUCENE-2059: allow TrecContentSource not to change the docname.
Previously, it would always append the iteration # to the docname.

View File

@ -173,25 +173,41 @@ public class PerfRunData {
}
/**
* @return Returns the indexReader.
* @return Returns the indexReader. NOTE: this returns a
* reference. You must call IndexReader.decRef() when
* you're done.
*/
public IndexReader getIndexReader() {
public synchronized IndexReader getIndexReader() {
if (indexReader != null) {
indexReader.incRef();
}
return indexReader;
}
/**
* @return Returns the indexSearcher.
* @return Returns the indexSearcher. NOTE: this returns
* a reference to the underlying IndexReader. You must
* call IndexReader.decRef() when you're done.
*/
public IndexSearcher getIndexSearcher() {
public synchronized IndexSearcher getIndexSearcher() {
if (indexReader != null) {
indexReader.incRef();
}
return indexSearcher;
}
/**
* @param indexReader The indexReader to set.
*/
public void setIndexReader(IndexReader indexReader) {
public synchronized void setIndexReader(IndexReader indexReader) throws IOException {
if (this.indexReader != null) {
// Release current IR
this.indexReader.decRef();
}
this.indexReader = indexReader;
if (indexReader != null) {
// Hold reference to new IR
indexReader.incRef();
indexSearcher = new IndexSearcher(indexReader);
} else {
indexSearcher = null;

View File

@ -35,11 +35,12 @@ public class CloseReaderTask extends PerfTask {
@Override
public int doLogic() throws IOException {
IndexReader reader= getRunData().getIndexReader();
if (reader!=null) {
reader.close();
}
IndexReader reader = getRunData().getIndexReader();
getRunData().setIndexReader(null);
if (reader.getRefCount() != 1) {
System.out.println("WARNING: CloseReader: reference count is currently " + reader.getRefCount());
}
reader.decRef();
return 1;
}

View File

@ -54,6 +54,7 @@ public class CommitIndexTask extends PerfTask {
IndexReader r = getRunData().getIndexReader();
if (r != null) {
r.commit(commitUserData);
r.decRef();
} else {
throw new IllegalStateException("neither IndexWriter nor IndexReader is currently open");
}

View File

@ -127,11 +127,10 @@ public class CreateIndexTask extends PerfTask {
PerfRunData runData = getRunData();
Config config = runData.getConfig();
IndexDeletionPolicy indexDeletionPolicy = getIndexDeletionPolicy(config);
IndexWriter writer = new IndexWriter(runData.getDirectory(),
runData.getAnalyzer(),
true, indexDeletionPolicy,
true,
getIndexDeletionPolicy(config),
IndexWriter.MaxFieldLength.LIMITED);
setIndexWriterConfig(writer, config);
runData.setIndexWriter(writer);

View File

@ -88,6 +88,7 @@ public class DeleteByPercentTask extends PerfTask {
termDocs.close();
}
System.out.println("--> processed (delete) " + numDeleted + " docs");
r.decRef();
return numDeleted;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.lucene.benchmark.byTask.tasks;
*/
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.index.IndexReader;
/**
* Delete a document by docid. If no docid param is supplied, deletes doc with
@ -42,8 +43,10 @@ public class DeleteDocTask extends PerfTask {
@Override
public int doLogic() throws Exception {
getRunData().getIndexReader().deleteDocument(docid);
IndexReader r = getRunData().getIndexReader();
r.deleteDocument(docid);
lastDeleted = docid;
r.decRef();
return 1; // one work item done here
}

View File

@ -52,6 +52,7 @@ public class FlushReaderTask extends PerfTask {
} else {
reader.flush();
}
reader.decRef();
return 1;
}
}

View File

@ -17,18 +17,9 @@ package org.apache.lucene.benchmark.byTask.tasks;
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.index.Term;
/**
* Spawns a BG thread that periodically (defaults to 3.0
@ -43,97 +34,67 @@ import org.apache.lucene.index.Term;
*/
public class NearRealtimeReaderTask extends PerfTask {
ReopenThread t;
float pauseSec = 3.0f;
private static class ReopenThread extends Thread {
final IndexWriter writer;
final int pauseMsec;
public volatile boolean done;
ReopenThread(IndexWriter writer, float pauseSec) {
this.writer = writer;
this.pauseMsec = (int) (1000*pauseSec);
setDaemon(true);
}
@Override
public void run() {
IndexReader reader = null;
final Query query = new TermQuery(new Term("body", "1"));
final SortField sf = new SortField("docdate", SortField.LONG);
final Sort sort = new Sort(sf);
try {
while(!done) {
final long t0 = System.currentTimeMillis();
if (reader == null) {
reader = writer.getReader();
} else {
final IndexReader newReader = reader.reopen();
if (reader != newReader) {
reader.close();
reader = newReader;
}
}
final long t1 = System.currentTimeMillis();
final TopFieldDocs hits = new IndexSearcher(reader).search(query, null, 10, sort);
final long t2 = System.currentTimeMillis();
System.out.println("nrt: open " + (t1-t0) + " msec; search " + (t2-t1) + " msec, " + hits.totalHits +
" results; " + reader.numDocs() + " docs");
final long t4 = System.currentTimeMillis();
final int delay = (int) (pauseMsec - (t4-t0));
if (delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
long pauseMSec = 3000L;
public NearRealtimeReaderTask(PerfRunData runData) {
super(runData);
}
@Override
public int doLogic() throws IOException {
if (t == null) {
IndexWriter w = getRunData().getIndexWriter();
t = new ReopenThread(w, pauseSec);
t.start();
public int doLogic() throws Exception {
final PerfRunData runData = getRunData();
// Get initial reader
IndexWriter w = runData.getIndexWriter();
if (w == null) {
throw new RuntimeException("please open the writer before invoking NearRealtimeReader");
}
return 1;
if (runData.getIndexReader() != null) {
throw new RuntimeException("please close the existing reader before invoking NearRealtimeReader");
}
long t = System.currentTimeMillis();
IndexReader r = w.getReader();
runData.setIndexReader(r);
// Transfer our reference to runData
r.decRef();
// TODO: gather basic metrics for reporting -- eg mean,
// stddev, min/max reopen latencies
// Parent sequence sets stopNow
int reopenCount = 0;
while(!stopNow) {
long waitForMsec = (long) (pauseMSec - (System.currentTimeMillis() - t));
if (waitForMsec > 0) {
Thread.sleep(waitForMsec);
}
t = System.currentTimeMillis();
final IndexReader newReader = r.reopen();
if (r != newReader) {
// TODO: somehow we need to enable warming, here
runData.setIndexReader(newReader);
// Transfer our reference to runData
newReader.decRef();
r = newReader;
reopenCount++;
}
}
return reopenCount;
}
@Override
public void setParams(String params) {
super.setParams(params);
pauseSec = Float.parseFloat(params);
pauseMSec = (long) (1000.0*Float.parseFloat(params));
}
@Override
public boolean supportsParams() {
return true;
}
// Close the thread
@Override
public void close() throws InterruptedException {
if (t != null) {
t.done = true;
t.join();
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.lucene.benchmark.byTask.tasks;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.LogMergePolicy;
import java.io.IOException;
@ -30,6 +31,11 @@ import java.io.IOException;
* <br>Other side effects: index writer object in perfRunData is set.
* <br>Relevant properties: <code>merge.factor, max.buffered,
* max.field.length, ram.flush.mb [default 0]</code>.
*
* <p> Accepts a param specifying the commit point as
* previously saved with CommitIndexTask. If you specify
* this, it rolls the index back to that commit on opening
* the IndexWriter.
*/
public class OpenIndexTask extends PerfTask {
@ -37,6 +43,7 @@ public class OpenIndexTask extends PerfTask {
public static final int DEFAULT_MAX_FIELD_LENGTH = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
public static final int DEFAULT_MERGE_PFACTOR = LogMergePolicy.DEFAULT_MERGE_FACTOR;
public static final double DEFAULT_RAM_FLUSH_MB = (int) IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB;
private String commitUserData;
public OpenIndexTask(PerfRunData runData) {
super(runData);
@ -46,12 +53,34 @@ public class OpenIndexTask extends PerfTask {
public int doLogic() throws IOException {
PerfRunData runData = getRunData();
Config config = runData.getConfig();
final IndexCommit ic;
if (commitUserData != null) {
ic = OpenReaderTask.findIndexCommit(runData.getDirectory(), commitUserData);
} else {
ic = null;
}
IndexWriter writer = new IndexWriter(runData.getDirectory(),
runData.getAnalyzer(),
false,
IndexWriter.MaxFieldLength.UNLIMITED);
CreateIndexTask.getIndexDeletionPolicy(config),
IndexWriter.MaxFieldLength.UNLIMITED,
ic);
CreateIndexTask.setIndexWriterConfig(writer, config);
runData.setIndexWriter(writer);
return 1;
}
@Override
public void setParams(String params) {
super.setParams(params);
if (params != null) {
// specifies which commit point to open
commitUserData = params;
}
}
@Override
public boolean supportsParams() {
return true;
}
}

View File

@ -47,35 +47,27 @@ public class OpenReaderTask extends PerfTask {
Directory dir = getRunData().getDirectory();
Config config = getRunData().getConfig();
IndexReader r = null;
if (commitUserData != null) {
r = openCommitPoint(commitUserData, dir, config, readOnly);
final IndexDeletionPolicy deletionPolicy;
if (readOnly) {
deletionPolicy = null;
} else {
IndexDeletionPolicy indexDeletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config);
r = IndexReader.open(dir, indexDeletionPolicy, readOnly);
deletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config);
}
if (commitUserData != null) {
r = IndexReader.open(OpenReaderTask.findIndexCommit(dir, commitUserData),
deletionPolicy,
readOnly);
} else {
r = IndexReader.open(dir,
deletionPolicy,
readOnly);
}
getRunData().setIndexReader(r);
// We transfer reference to the run data
r.decRef();
return 1;
}
public static IndexReader openCommitPoint(String userData, Directory dir, Config config, boolean readOnly) throws IOException {
IndexReader r = null;
Collection<IndexCommit> commits = IndexReader.listCommits(dir);
for (final IndexCommit ic : commits) {
Map<String,String> map = ic.getUserData();
String ud = null;
if (map != null) {
ud = map.get(USER_DATA);
}
if (ud != null && ud.equals(userData)) {
IndexDeletionPolicy indexDeletionPolicy = CreateIndexTask.getIndexDeletionPolicy(config);
r = IndexReader.open(ic, indexDeletionPolicy, readOnly);
break;
}
}
if (r == null) throw new IOException("cannot find commitPoint userData:"+userData);
return r;
}
@Override
public void setParams(String params) {
super.setParams(params);
@ -94,4 +86,20 @@ public class OpenReaderTask extends PerfTask {
public boolean supportsParams() {
return true;
}
public static IndexCommit findIndexCommit(Directory dir, String userData) throws IOException {
Collection<IndexCommit> commits = IndexReader.listCommits(dir);
for (final IndexCommit ic : commits) {
Map<String,String> map = ic.getUserData();
String ud = null;
if (map != null) {
ud = map.get(USER_DATA);
}
if (ud != null && ud.equals(userData)) {
return ic;
}
}
throw new IOException("index does not contain commit with userData: " + userData);
}
}

View File

@ -59,7 +59,9 @@ public abstract class PerfTask implements Cloneable {
private int maxDepthLogStart = 0;
private boolean disableCounting = false;
protected String params = null;
private boolean runInBackground;
protected static final String NEW_LINE = System.getProperty("line.separator");
/** Should not be used externally */
@ -69,7 +71,21 @@ public abstract class PerfTask implements Cloneable {
name = name.substring(0, name.length() - 4);
}
}
public void setRunInBackground() {
runInBackground = true;
}
public boolean getRunInBackground() {
return runInBackground;
}
protected volatile boolean stopNow;
public void stopNow() {
stopNow = true;
}
public PerfTask(PerfRunData runData) {
this();
this.runData = runData;
@ -112,9 +128,7 @@ public abstract class PerfTask implements Cloneable {
* @return number of work items done by this task.
*/
public final int runAndMaybeStats(boolean reportStats) throws Exception {
if (reportStats && depth <= maxDepthLogStart && !shouldNeverLogAtStart()) {
System.out.println("------------> starting task: " + getName());
}
stopNow = false;
if (!reportStats || shouldNotRecordStats()) {
setup();
int count = doLogic();
@ -122,9 +136,12 @@ public abstract class PerfTask implements Cloneable {
tearDown();
return count;
}
if (reportStats && depth <= maxDepthLogStart && !shouldNeverLogAtStart()) {
System.out.println("------------> starting task: " + getName());
}
setup();
Points pnts = runData.getPoints();
TaskStats ts = pnts.markTaskStart(this,runData.getConfig().getRoundNumber());
TaskStats ts = pnts.markTaskStart(this, runData.getConfig().getRoundNumber());
int count = doLogic();
count = disableCounting ? 0 : count;
pnts.markTaskEnd(ts, count);
@ -197,6 +214,9 @@ public abstract class PerfTask implements Cloneable {
sb.append('-');
}
sb.append(getName());
if (getRunInBackground()) {
sb.append(" &");
}
return sb.toString();
}

View File

@ -18,7 +18,6 @@ package org.apache.lucene.benchmark.byTask.tasks;
*/
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.utils.Config;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
@ -43,12 +42,13 @@ public class PrintReaderTask extends PerfTask {
@Override
public int doLogic() throws Exception {
Directory dir = getRunData().getDirectory();
Config config = getRunData().getConfig();
IndexReader r = null;
if (userData == null)
r = IndexReader.open(dir, true);
else
r = OpenReaderTask.openCommitPoint(userData, dir, config, true);
r = IndexReader.open(OpenReaderTask.findIndexCommit(dir, userData),
null,
true);
System.out.println("--> numDocs:"+r.numDocs()+" dels:"+r.numDeletedDocs());
r.close();
return 1;

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.QueryMaker;
import org.apache.lucene.document.Document;
@ -35,11 +34,10 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.MultiTermQuery;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.highlight.Highlighter;
import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
import org.apache.lucene.store.Directory;
@ -60,29 +58,44 @@ import org.apache.lucene.store.Directory;
*/
public abstract class ReadTask extends PerfTask {
private final QueryMaker queryMaker;
public ReadTask(PerfRunData runData) {
super(runData);
if (withSearch()) {
queryMaker = getQueryMaker();
} else {
queryMaker = null;
}
}
@Override
public int doLogic() throws Exception {
int res = 0;
boolean closeReader = false;
// open reader or use existing one
IndexReader ir = getRunData().getIndexReader();
if (ir == null) {
IndexSearcher searcher = getRunData().getIndexSearcher();
IndexReader reader;
final boolean closeSearcher;
if (searcher == null) {
// open our own reader
Directory dir = getRunData().getDirectory();
ir = IndexReader.open(dir, true);
closeReader = true;
//res++; //this is confusing, comment it out
reader = IndexReader.open(dir, true);
searcher = new IndexSearcher(reader);
closeSearcher = true;
} else {
// use existing one; this passes +1 ref to us
reader = searcher.getIndexReader();
closeSearcher = false;
}
// optionally warm and add num docs traversed to count
if (withWarm()) {
Document doc = null;
for (int m = 0; m < ir.maxDoc(); m++) {
if (!ir.isDeleted(m)) {
doc = ir.document(m);
for (int m = 0; m < reader.maxDoc(); m++) {
if (!reader.isDeleted(m)) {
doc = reader.document(m);
res += (doc == null ? 0 : 1);
}
}
@ -90,24 +103,18 @@ public abstract class ReadTask extends PerfTask {
if (withSearch()) {
res++;
final IndexSearcher searcher;
if (closeReader) {
searcher = new IndexSearcher(ir);
} else {
searcher = getRunData().getIndexSearcher();
}
QueryMaker queryMaker = getQueryMaker();
Query q = queryMaker.makeQuery();
Sort sort = getSort();
TopDocs hits;
final int numHits = numHits();
if (numHits > 0) {
if (sort != null) {
// TODO: change the following to create TFC with in/out-of order
// according to whether the query's Scorer.
Weight w = q.weight(searcher);
TopFieldCollector collector = TopFieldCollector.create(sort, numHits,
true, withScore(), withMaxScore(), false);
searcher.search(q, collector);
true, withScore(),
withMaxScore(),
!w.scoresDocsOutOfOrder());
searcher.search(w, null, collector);
hits = collector.topDocs();
} else {
hits = searcher.search(q, numHits);
@ -115,22 +122,19 @@ public abstract class ReadTask extends PerfTask {
final String printHitsField = getRunData().getConfig().get("print.hits.field", null);
if (printHitsField != null && printHitsField.length() > 0) {
final IndexReader r = searcher.getIndexReader();
if (q instanceof MultiTermQuery) {
System.out.println("MultiTermQuery term count = " + ((MultiTermQuery) q).getTotalNumberOfTerms());
}
System.out.println("totalHits = " + hits.totalHits);
System.out.println("maxDoc() = " + r.maxDoc());
System.out.println("numDocs() = " + r.numDocs());
System.out.println("maxDoc() = " + reader.maxDoc());
System.out.println("numDocs() = " + reader.numDocs());
for(int i=0;i<hits.scoreDocs.length;i++) {
final int docID = hits.scoreDocs[i].doc;
final Document doc = r.document(docID);
final Document doc = reader.document(docID);
System.out.println(" " + i + ": doc=" + docID + " score=" + hits.scoreDocs[i].score + " " + printHitsField + " =" + doc.get(printHitsField));
}
}
//System.out.println("q=" + q + ":" + hits.totalHits + " total hits");
if (withTraverse()) {
final ScoreDoc[] scoreDocs = hits.scoreDocs;
int traversalSize = Math.min(scoreDocs.length, traversalSize());
@ -147,13 +151,13 @@ public abstract class ReadTask extends PerfTask {
int id = scoreDocs[m].doc;
res++;
if (retrieve) {
Document document = retrieveDoc(ir, id);
Document document = retrieveDoc(reader, id);
res += document != null ? 1 : 0;
if (numHighlight > 0 && m < numHighlight) {
Collection<String> fieldsToHighlight = getFieldsToHighlight(document);
for (final String field : fieldsToHighlight) {
String text = document.get(field);
res += highlighter.doHighlight(ir, id, field, document, analyzer, text);
res += highlighter.doHighlight(reader, id, field, document, analyzer, text);
}
}
}
@ -161,12 +165,13 @@ public abstract class ReadTask extends PerfTask {
}
}
}
searcher.close();
}
if (closeReader) {
ir.close();
if (closeSearcher) {
searcher.close();
} else {
// Release our +1 ref from above
reader.decRef();
}
return res;
}

View File

@ -33,13 +33,13 @@ public class ReopenReaderTask extends PerfTask {
@Override
public int doLogic() throws IOException {
IndexReader ir = getRunData().getIndexReader();
IndexReader or = ir;
IndexReader nr = ir.reopen();
if(nr != or) {
IndexReader r = getRunData().getIndexReader();
IndexReader nr = r.reopen();
if (nr != r) {
getRunData().setIndexReader(nr);
or.close();
nr.decRef();
}
r.decRef();
return 1;
}
}

View File

@ -0,0 +1,52 @@
package org.apache.lucene.benchmark.byTask.tasks;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.PrintStream;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.index.IndexWriter;
/**
* Rollback the index writer.
*/
public class RollbackIndexTask extends PerfTask {
public RollbackIndexTask(PerfRunData runData) {
super(runData);
}
boolean doWait = true;
@Override
public int doLogic() throws IOException {
IndexWriter iw = getRunData().getIndexWriter();
if (iw != null) {
// If infoStream was set to output to a file, close it.
PrintStream infoStream = iw.getInfoStream();
if (infoStream != null && infoStream != System.out
&& infoStream != System.err) {
infoStream.close();
}
iw.rollback();
getRunData().setIndexWriter(null);
}
return 1;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.lucene.benchmark.byTask.tasks;
*/
import java.util.ArrayList;
import java.util.List;
import java.text.NumberFormat;
import org.apache.lucene.benchmark.byTask.PerfRunData;
@ -131,6 +132,33 @@ public class TaskSequence extends PerfTask {
return ( parallel ? doParallelTasks() : doSerialTasks());
}
private static class RunBackgroundTask extends Thread {
private final PerfTask task;
private final boolean letChildReport;
private volatile int count;
public RunBackgroundTask(PerfTask task, boolean letChildReport) {
this.task = task;
this.letChildReport = letChildReport;
}
public void stopNow() throws InterruptedException {
task.stopNow();
}
public int getCount() {
return count;
}
public void run() {
try {
count = task.runAndMaybeStats(letChildReport);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private int doSerialTasks() throws Exception {
if (rate > 0) {
return doSerialTasksWithRate();
@ -142,22 +170,46 @@ public class TaskSequence extends PerfTask {
final long t0 = System.currentTimeMillis();
final long runTime = (long) (runTimeSec*1000);
List<RunBackgroundTask> bgTasks = null;
for (int k=0; fixedTime || (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
for(int l=0;l<tasksArray.length;l++)
try {
final PerfTask task = tasksArray[l];
count += task.runAndMaybeStats(letChildReport);
if (anyExhaustibleTasks)
updateExhausted(task);
} catch (NoMoreDataException e) {
exhausted = true;
if (stopNow) {
break;
}
for(int l=0;l<tasksArray.length;l++) {
final PerfTask task = tasksArray[l];
if (task.getRunInBackground()) {
if (bgTasks == null) {
bgTasks = new ArrayList<RunBackgroundTask>();
}
RunBackgroundTask bgTask = new RunBackgroundTask(task, letChildReport);
bgTask.start();
bgTasks.add(bgTask);
} else {
try {
count += task.runAndMaybeStats(letChildReport);
if (anyExhaustibleTasks)
updateExhausted(task);
} catch (NoMoreDataException e) {
exhausted = true;
}
}
}
if (fixedTime && System.currentTimeMillis()-t0 > runTime) {
repetitions = k+1;
break;
}
}
if (bgTasks != null) {
for(RunBackgroundTask bgTask : bgTasks) {
bgTask.stopNow();
}
for(RunBackgroundTask bgTask : bgTasks) {
bgTask.join();
count += bgTask.getCount();
}
}
return count;
}
@ -167,12 +219,22 @@ public class TaskSequence extends PerfTask {
long nextStartTime = System.currentTimeMillis();
int count = 0;
for (int k=0; (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
if (stopNow) {
break;
}
for (int l=0;l<tasksArray.length;l++) {
final PerfTask task = tasksArray[l];
long waitMore = nextStartTime - System.currentTimeMillis();
if (waitMore > 0) {
//System.out.println("wait: "+waitMore+" for rate: "+ratePerMin+" (delayStep="+delayStep+")");
Thread.sleep(waitMore);
while(!stopNow) {
long waitMore = nextStartTime - System.currentTimeMillis();
if (waitMore > 0) {
// TODO: better to use condition to notify
Thread.sleep(1);
} else {
break;
}
}
if (stopNow) {
break;
}
nextStartTime += delayStep; // this aims at avarage rate.
try {
@ -204,46 +266,71 @@ public class TaskSequence extends PerfTask {
}
}
private class ParallelTask extends Thread {
public int count;
public final PerfTask task;
public ParallelTask(PerfTask task) {
this.task = task;
}
@Override
public void run() {
try {
int n = task.runAndMaybeStats(letChildReport);
if (anyExhaustibleTasks) {
updateExhausted(task);
}
count += n;
} catch (NoMoreDataException e) {
exhausted = true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Override
public void stopNow() {
super.stopNow();
// Forwards top request to children
if (runningParallelTasks != null) {
for(ParallelTask t : runningParallelTasks) {
t.task.stopNow();
}
}
}
ParallelTask[] runningParallelTasks;
private int doParallelTasks() throws Exception {
initTasksArray();
final int count [] = {0};
Thread t[] = new Thread [repetitions * tasks.size()];
ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
// prepare threads
int indx = 0;
int index = 0;
for (int k=0; k<repetitions; k++) {
for (int i = 0; i < tasksArray.length; i++) {
final PerfTask task = (PerfTask) tasksArray[i].clone();
t[indx++] = new Thread() {
@Override
public void run() {
try {
int n = task.runAndMaybeStats(letChildReport);
if (anyExhaustibleTasks)
updateExhausted(task);
synchronized (count) {
count[0] += n;
}
} catch (NoMoreDataException e) {
exhausted = true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
t[index++] = new ParallelTask(task);
}
}
// run threads
startThreads(t);
// wait for all threads to complete
int count = 0;
for (int i = 0; i < t.length; i++) {
t[i].join();
count += t[i].count;
}
// return total count
return count[0];
return count;
}
// run threads
private void startThreads(Thread[] t) throws InterruptedException {
private void startThreads(ParallelTask[] t) throws InterruptedException {
if (rate > 0) {
startlThreadsWithRate(t);
return;
@ -254,13 +341,12 @@ public class TaskSequence extends PerfTask {
}
// run threads with rate
private void startlThreadsWithRate(Thread[] t) throws InterruptedException {
private void startlThreadsWithRate(ParallelTask[] t) throws InterruptedException {
long delayStep = (perMin ? 60000 : 1000) /rate;
long nextStartTime = System.currentTimeMillis();
for (int i = 0; i < t.length; i++) {
long waitMore = nextStartTime - System.currentTimeMillis();
if (waitMore > 0) {
//System.out.println("thread wait: "+waitMore+" for rate: "+ratePerMin+" (delayStep="+delayStep+")");
Thread.sleep(waitMore);
}
nextStartTime += delayStep; // this aims at average rate of starting threads.
@ -298,6 +384,9 @@ public class TaskSequence extends PerfTask {
if (rate>0) {
sb.append(", rate: " + rate+"/"+(perMin?"min":"sec"));
}
if (getRunInBackground()) {
sb.append(" &");
}
return sb.toString();
}

View File

@ -21,6 +21,7 @@ import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.DocMaker;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexWriter;
/**
* Update a document, using IndexWriter.updateDocument,
@ -62,7 +63,8 @@ public class UpdateDocTask extends PerfTask {
if (docID == null) {
throw new IllegalStateException("document must define the docid field");
}
getRunData().getIndexWriter().updateDocument(new Term(DocMaker.ID_FIELD, docID), doc);
final IndexWriter iw = getRunData().getIndexWriter();
iw.updateDocument(new Term(DocMaker.ID_FIELD, docID), doc);
return 1;
}

View File

@ -0,0 +1,75 @@
package org.apache.lucene.benchmark.byTask.tasks;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.benchmark.byTask.PerfRunData;
/**
* Simply waits for the specified (via the parameter) amount
* of time. For example Wait(30s) waits for 30 seconds.
* This is useful with background tasks to control how long
* the tasks run.
*
*<p>You can specify h, m, or s (hours, minutes, seconds) as
*the trailing time unit. No unit is interpreted as
*seconds.</p>
*/
public class WaitTask extends PerfTask {
private double waitTimeSec;
public WaitTask(PerfRunData runData) {
super(runData);
}
@Override
public void setParams(String params) {
super.setParams(params);
if (params != null) {
int multiplier;
if (params.endsWith("s")) {
multiplier = 1;
params = params.substring(0, params.length()-1);
} else if (params.endsWith("m")) {
multiplier = 60;
params = params.substring(0, params.length()-1);
} else if (params.endsWith("h")) {
multiplier = 3600;
params = params.substring(0, params.length()-1);
} else {
// Assume seconds
multiplier = 1;
}
waitTimeSec = Double.parseDouble(params) * multiplier;
} else {
throw new IllegalArgumentException("you must specify the wait time, eg: 10.0s, 4.5m, 2h");
}
}
@Override
public int doLogic() throws Exception {
Thread.sleep((long) (1000*waitTimeSec));
return 0;
}
@Override
public boolean supportsParams() {
return true;
}
}

View File

@ -186,6 +186,19 @@ public class Algorithm {
currSequence = seq2;
colonOk = false;
break;
case '&' :
if (currSequence.isParallel()) {
throw new Exception("Can only create background tasks within a serial task");
}
if (prevTask == null) {
throw new Exception("& was unexpected");
} else if (prevTask.getRunInBackground()) {
throw new Exception("double & was unexpected");
} else {
prevTask.setRunInBackground();
}
break;
case '>' :
currSequence.setNoChildReport();