LUCENE-2079: more improvements to contrib/benchmark for testing NRT

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@882648 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2009-11-20 17:23:34 +00:00
parent 0a1cce0f63
commit 85fbeddf44
11 changed files with 197 additions and 15 deletions

View File

@ -3,6 +3,11 @@ Lucene Benchmark Contrib Change Log
The Benchmark contrib package contains code for benchmarking Lucene in a variety of ways.
$Id:$
11/17/2009
LUCENE-2079: Allow specifying delta thread priority after the "&";
added log.time.step.msec to print per-time-period counts; fixed
NearRealTimeTask to print reopen times (in msec) of each reopen, at
the end. (Mike McCandless)
11/13/2009
LUCENE-2050: Added ability to run tasks within a serial sequence in

View File

@ -38,6 +38,8 @@ public class Points {
private int nextTaskRunNum = 0;
private TaskStats currentStats;
/**
* Create a Points statistics object.
*/
@ -62,10 +64,15 @@ public class Points {
*/
public synchronized TaskStats markTaskStart (PerfTask task, int round) {
TaskStats stats = new TaskStats(task, nextTaskRunNum(), round);
this.currentStats = stats;
points.add(stats);
return stats;
}
public TaskStats getCurrentStats() {
return currentStats;
}
// return next task num
private synchronized int nextTaskRunNum() {
return nextTaskRunNum++;

View File

@ -92,6 +92,22 @@ public class TaskStats implements Cloneable {
this.count = count;
}
private int[] countsByTime;
private long countsByTimeStepMSec;
public void setCountsByTime(int[] counts, long msecStep) {
countsByTime = counts;
countsByTimeStepMSec = msecStep;
}
public int[] getCountsByTime() {
return countsByTime;
}
public long getCountsByTimeStepMSec() {
return countsByTimeStepMSec;
}
/**
* @return the taskRunNum.
*/
@ -174,6 +190,18 @@ public class TaskStats implements Cloneable {
if (round != stat2.round) {
round = -1; // no meaning if aggregating tasks of different round.
}
if (countsByTime != null && stat2.countsByTime != null) {
if (countsByTimeStepMSec != stat2.countsByTimeStepMSec) {
throw new IllegalStateException("different by-time msec step");
}
if (countsByTime.length != stat2.countsByTime.length) {
throw new IllegalStateException("different by-time msec count");
}
for(int i=0;i<stat2.countsByTime.length;i++) {
countsByTime[i] += stat2.countsByTime[i];
}
}
}
/* (non-Javadoc)
@ -181,7 +209,11 @@ public class TaskStats implements Cloneable {
*/
@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
TaskStats c = (TaskStats) super.clone();
if (c.countsByTime != null) {
c.countsByTime = (int[]) c.countsByTime.clone();
}
return c;
}
/**

View File

@ -20,6 +20,7 @@ package org.apache.lucene.benchmark.byTask.tasks;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.ArrayUtil;
/**
* Spawns a BG thread that periodically (defaults to 3.0
@ -36,6 +37,9 @@ public class NearRealtimeReaderTask extends PerfTask {
long pauseMSec = 3000L;
int reopenCount;
int[] reopenTimes = new int[1];
public NearRealtimeReaderTask(PerfRunData runData) {
super(runData);
}
@ -65,22 +69,27 @@ public class NearRealtimeReaderTask extends PerfTask {
// stddev, min/max reopen latencies
// Parent sequence sets stopNow
int reopenCount = 0;
reopenCount = 0;
while(!stopNow) {
long waitForMsec = (long) (pauseMSec - (System.currentTimeMillis() - t));
if (waitForMsec > 0) {
Thread.sleep(waitForMsec);
//System.out.println("NRT wait: " + waitForMsec + " msec");
}
t = System.currentTimeMillis();
final IndexReader newReader = r.reopen();
if (r != newReader) {
final int delay = (int) (System.currentTimeMillis()-t);
if (reopenTimes.length == reopenCount) {
reopenTimes = ArrayUtil.grow(reopenTimes, 1+reopenCount);
}
reopenTimes[reopenCount++] = delay;
// TODO: somehow we need to enable warming, here
runData.setIndexReader(newReader);
// Transfer our reference to runData
newReader.decRef();
r = newReader;
reopenCount++;
}
}
@ -93,6 +102,15 @@ public class NearRealtimeReaderTask extends PerfTask {
pauseMSec = (long) (1000.0*Float.parseFloat(params));
}
@Override
public void close() {
System.out.println("NRT reopen times:");
for(int i=0;i<reopenCount;i++) {
System.out.print(" " + reopenTimes[i]);
}
System.out.println();
}
@Override
public boolean supportsParams() {
return true;

View File

@ -61,6 +61,7 @@ public abstract class PerfTask implements Cloneable {
protected String params = null;
private boolean runInBackground;
private int deltaPri;
protected static final String NEW_LINE = System.getProperty("line.separator");
@ -72,14 +73,19 @@ public abstract class PerfTask implements Cloneable {
}
}
public void setRunInBackground() {
public void setRunInBackground(int deltaPri) {
runInBackground = true;
this.deltaPri = deltaPri;
}
public boolean getRunInBackground() {
return runInBackground;
}
public int getBackgroundDeltaPriority() {
return deltaPri;
}
protected volatile boolean stopNow;
public void stopNow() {
@ -216,6 +222,10 @@ public abstract class PerfTask implements Cloneable {
sb.append(getName());
if (getRunInBackground()) {
sb.append(" &");
int x = getBackgroundDeltaPriority();
if (x != 0) {
sb.append(x);
}
}
return sb.toString();
}

View File

@ -151,11 +151,26 @@ public abstract class ReportTask extends PerfTask {
line = line.replaceAll(" "," - ");
}
sb.append(line);
int[] byTime = stat.getCountsByTime();
if (byTime != null) {
sb.append(newline);
int end = -1;
for(int i=byTime.length-1;i>=0;i--) {
if (byTime[i] != 0) {
end = i;
break;
}
}
if (end != -1) {
sb.append(" by time:");
for(int i=0;i<end;i++) {
sb.append(' ').append(byTime[i]);
}
}
}
}
String reptxt = (reported==0 ? "No Matching Entries Were Found!" : sb.toString());
return new Report(reptxt,partOfTasks.size(),reported,totalSize);
}
}

View File

@ -23,6 +23,8 @@ import java.text.NumberFormat;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
import org.apache.lucene.benchmark.byTask.stats.TaskStats;
import org.apache.lucene.util.ArrayUtil;
/**
* Sequence of parallel or sequential tasks.
@ -45,6 +47,7 @@ public class TaskSequence extends PerfTask {
private boolean fixedTime; // true if we run for fixed time
private double runTimeSec; // how long to run for
private final long logByTimeMsec;
public TaskSequence (PerfRunData runData, String name, TaskSequence parent, boolean parallel) {
super(runData);
@ -55,6 +58,7 @@ public class TaskSequence extends PerfTask {
this.parent = parent;
this.parallel = parallel;
tasks = new ArrayList<PerfTask>();
logByTimeMsec = runData.getConfig().get("report.time.step.msec", 0);
}
@Override
@ -76,6 +80,9 @@ public class TaskSequence extends PerfTask {
anyExhaustibleTasks |= tasksArray[k] instanceof TaskSequence;
}
}
if (!parallel && logByTimeMsec != 0 && !letChildReport) {
countsByTime = new int[1];
}
}
/**
@ -92,6 +99,8 @@ public class TaskSequence extends PerfTask {
return repetitions;
}
private int[] countsByTime;
public void setRunTime(double sec) throws Exception {
runTimeSec = sec;
fixedTime = true;
@ -108,9 +117,6 @@ public class TaskSequence extends PerfTask {
if (isParallel()) {
throw new Exception("REPEAT_EXHAUST is not allowed for parallel tasks");
}
if (getRunData().getConfig().get("content.source.forever",true)) {
throw new Exception("REPEAT_EXHAUST requires setting content.source.forever=false");
}
}
setSequenceName();
}
@ -167,11 +173,10 @@ public class TaskSequence extends PerfTask {
initTasksArray();
int count = 0;
final long t0 = System.currentTimeMillis();
final long runTime = (long) (runTimeSec*1000);
List<RunBackgroundTask> bgTasks = null;
final long t0 = System.currentTimeMillis();
for (int k=0; fixedTime || (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
if (stopNow) {
break;
@ -183,11 +188,20 @@ public class TaskSequence extends PerfTask {
bgTasks = new ArrayList<RunBackgroundTask>();
}
RunBackgroundTask bgTask = new RunBackgroundTask(task, letChildReport);
bgTask.setPriority(getBackgroundDeltaPriority() + Thread.currentThread().getPriority());
bgTask.start();
bgTasks.add(bgTask);
} else {
try {
count += task.runAndMaybeStats(letChildReport);
final int inc = task.runAndMaybeStats(letChildReport);
count += inc;
if (countsByTime != null) {
final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
if (slot >= countsByTime.length) {
countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
}
countsByTime[slot] += inc;
}
if (anyExhaustibleTasks)
updateExhausted(task);
} catch (NoMoreDataException e) {
@ -210,6 +224,11 @@ public class TaskSequence extends PerfTask {
count += bgTask.getCount();
}
}
if (countsByTime != null) {
getRunData().getPoints().getCurrentStats().setCountsByTime(countsByTime, logByTimeMsec);
}
return count;
}
@ -218,6 +237,7 @@ public class TaskSequence extends PerfTask {
long delayStep = (perMin ? 60000 : 1000) /rate;
long nextStartTime = System.currentTimeMillis();
int count = 0;
final long t0 = System.currentTimeMillis();
for (int k=0; (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
if (stopNow) {
break;
@ -238,7 +258,16 @@ public class TaskSequence extends PerfTask {
}
nextStartTime += delayStep; // this aims at avarage rate.
try {
count += task.runAndMaybeStats(letChildReport);
final int inc = task.runAndMaybeStats(letChildReport);
count += inc;
if (countsByTime != null) {
final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
if (slot >= countsByTime.length) {
countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
}
countsByTime[slot] += inc;
}
if (anyExhaustibleTasks)
updateExhausted(task);
} catch (NoMoreDataException e) {
@ -305,6 +334,9 @@ public class TaskSequence extends PerfTask {
ParallelTask[] runningParallelTasks;
private int doParallelTasks() throws Exception {
final TaskStats stats = getRunData().getPoints().getCurrentStats();
initTasksArray();
ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
// prepare threads
@ -323,6 +355,23 @@ public class TaskSequence extends PerfTask {
for (int i = 0; i < t.length; i++) {
t[i].join();
count += t[i].count;
if (t[i].task instanceof TaskSequence) {
TaskSequence sub = (TaskSequence) t[i].task;
if (sub.countsByTime != null) {
if (countsByTime == null) {
countsByTime = new int[sub.countsByTime.length];
} else if (countsByTime.length < sub.countsByTime.length) {
countsByTime = ArrayUtil.grow(countsByTime, sub.countsByTime.length);
}
for(int j=0;j<sub.countsByTime.length;j++) {
countsByTime[j] += sub.countsByTime[j];
}
}
}
}
if (countsByTime != null) {
stats.setCountsByTime(countsByTime, logByTimeMsec);
}
// return total count
@ -386,6 +435,10 @@ public class TaskSequence extends PerfTask {
}
if (getRunInBackground()) {
sb.append(" &");
int x = getBackgroundDeltaPriority();
if (x != 0) {
sb.append(x);
}
}
return sb.toString();
}

View File

@ -191,12 +191,22 @@ public class Algorithm {
if (currSequence.isParallel()) {
throw new Exception("Can only create background tasks within a serial task");
}
stok.nextToken();
final int deltaPri;
if (stok.ttype != StreamTokenizer.TT_NUMBER) {
stok.pushBack();
deltaPri = 0;
} else {
// priority
deltaPri = (int) stok.nval;
}
if (prevTask == null) {
throw new Exception("& was unexpected");
} else if (prevTask.getRunInBackground()) {
throw new Exception("double & was unexpected");
} else {
prevTask.setRunInBackground();
prevTask.setRunInBackground(deltaPri);
}
break;

View File

@ -123,6 +123,28 @@ public class TestPerfTasksLogic extends TestCase {
assertTrue("elapsed time was " + elapsed + " msec", elapsed <= 1500);
}
public void testBGSearchTaskThreads() throws Exception {
String algLines[] = {
"log.time.step.msec = 100",
"ResetSystemErase",
"CreateIndex",
"{ AddDoc } : 1000",
"Optimize",
"CloseIndex",
"OpenReader",
"{",
" [ \"XSearch\" { CountingSearchTest > : * ] : 2 &-1",
" Wait(1.0)",
"}",
"CloseReader",
"RepSumByPref X"
};
CountingSearchTestTask.numSearches = 0;
execBenchmark(algLines);
assertTrue(CountingSearchTestTask.numSearches > 0);
}
public void testHighlighting() throws Exception {
// 1. alg definition (required in every "logic" test)
String algLines[] = {

View File

@ -926,6 +926,8 @@ final class DocumentsWriter {
if (!hasDeletes())
return false;
final long t0 = System.currentTimeMillis();
if (infoStream != null)
message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " +
deletesFlushed.docIDs.size() + " deleted docIDs and " +
@ -952,6 +954,9 @@ final class DocumentsWriter {
}
deletesFlushed.clear();
if (infoStream != null) {
message("apply deletes took " + (System.currentTimeMillis()-t0) + " msec");
}
return any;
}

View File

@ -3889,6 +3889,8 @@ public class IndexWriter implements Closeable {
boolean success = false;
final long t0 = System.currentTimeMillis();
try {
try {
try {
@ -3924,6 +3926,9 @@ public class IndexWriter implements Closeable {
} catch (OutOfMemoryError oom) {
handleOOM(oom, "merge");
}
if (infoStream != null) {
message("merge time " + (System.currentTimeMillis()-t0) + " msec");
}
}
/** Hook that's called when the specified merge is complete. */