mirror of https://github.com/apache/lucene.git
tests: write a lucene version of the NRT stress test
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1151526 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0dc4c9582f
commit
cca29c44c2
|
@ -16,18 +16,31 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.search;
|
package org.apache.solr.search;
|
||||||
|
|
||||||
|
import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
|
||||||
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.document.Field;
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.index.IndexReader.AtomicReaderContext;
|
import org.apache.lucene.index.IndexReader.AtomicReaderContext;
|
||||||
import org.apache.lucene.index.IndexReader.ReaderContext;
|
import org.apache.lucene.index.IndexReader.ReaderContext;
|
||||||
|
import org.apache.lucene.index.IndexWriter;
|
||||||
|
import org.apache.lucene.index.IndexWriterConfig;
|
||||||
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.queries.function.DocValues;
|
import org.apache.lucene.queries.function.DocValues;
|
||||||
import org.apache.lucene.queries.function.ValueSource;
|
import org.apache.lucene.queries.function.ValueSource;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.apache.lucene.search.Query;
|
||||||
|
import org.apache.lucene.search.TermQuery;
|
||||||
|
import org.apache.lucene.search.TopDocs;
|
||||||
|
import org.apache.lucene.store.RAMDirectory;
|
||||||
import org.apache.lucene.util.ReaderUtil;
|
import org.apache.lucene.util.ReaderUtil;
|
||||||
|
import org.apache.lucene.util.Version;
|
||||||
import org.apache.noggit.ObjectBuilder;
|
import org.apache.noggit.ObjectBuilder;
|
||||||
import org.apache.solr.SolrTestCaseJ4;
|
import org.apache.solr.SolrTestCaseJ4;
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.request.SolrQueryRequest;
|
import org.apache.solr.request.SolrQueryRequest;
|
||||||
import org.apache.solr.schema.SchemaField;
|
import org.apache.solr.schema.SchemaField;
|
||||||
|
import org.apache.solr.schema.StrField;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -84,6 +97,22 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
|
||||||
long committedModelClock;
|
long committedModelClock;
|
||||||
volatile int lastId;
|
volatile int lastId;
|
||||||
final String field = "val_l";
|
final String field = "val_l";
|
||||||
|
Object[] syncArr;
|
||||||
|
|
||||||
|
private void initModel(int ndocs) {
|
||||||
|
snapshotCount = 0;
|
||||||
|
committedModelClock = 0;
|
||||||
|
lastId = 0;
|
||||||
|
|
||||||
|
syncArr = new Object[ndocs];
|
||||||
|
|
||||||
|
for (int i=0; i<ndocs; i++) {
|
||||||
|
model.put(i, -1L);
|
||||||
|
syncArr[i] = new Object();
|
||||||
|
}
|
||||||
|
committedModel.putAll(model);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStressGetRealtime() throws Exception {
|
public void testStressGetRealtime() throws Exception {
|
||||||
|
@ -101,13 +130,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
|
||||||
final AtomicLong operations = new AtomicLong(10000); // number of query operations to perform in total
|
final AtomicLong operations = new AtomicLong(10000); // number of query operations to perform in total
|
||||||
int nReadThreads = 10;
|
int nReadThreads = 10;
|
||||||
|
|
||||||
final Object[] syncArr = new Object[ndocs];
|
initModel(ndocs);
|
||||||
|
|
||||||
for (int i=0; i<ndocs; i++) {
|
|
||||||
model.put(i, -1L);
|
|
||||||
syncArr[i] = new Object();
|
|
||||||
}
|
|
||||||
committedModel.putAll(model);
|
|
||||||
|
|
||||||
final AtomicInteger numCommitting = new AtomicInteger();
|
final AtomicInteger numCommitting = new AtomicInteger();
|
||||||
|
|
||||||
|
@ -174,6 +197,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
|
||||||
model.put(id, -nextVal);
|
model.put(id, -nextVal);
|
||||||
} else {
|
} else {
|
||||||
assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
|
assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
|
||||||
|
model.put(id, nextVal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,4 +279,210 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
volatile IndexReader reader;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStressLuceneNRT() throws Exception {
|
||||||
|
// update variables
|
||||||
|
final int commitPercent = 10;
|
||||||
|
final int softCommitPercent = 50; // what percent of the commits are soft
|
||||||
|
final int deletePercent = 8;
|
||||||
|
final int deleteByQueryPercent = 4;
|
||||||
|
final int ndocs = 100;
|
||||||
|
int nWriteThreads = 10;
|
||||||
|
final int maxConcurrentCommits = 2; // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
|
||||||
|
|
||||||
|
// query variables
|
||||||
|
final AtomicLong operations = new AtomicLong(100000); // number of query operations to perform in total
|
||||||
|
int nReadThreads = 10;
|
||||||
|
|
||||||
|
initModel(ndocs);
|
||||||
|
|
||||||
|
final AtomicInteger numCommitting = new AtomicInteger();
|
||||||
|
|
||||||
|
List<Thread> threads = new ArrayList<Thread>();
|
||||||
|
|
||||||
|
RAMDirectory dir = new RAMDirectory();
|
||||||
|
final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new WhitespaceAnalyzer(Version.LUCENE_40)));
|
||||||
|
writer.commit();
|
||||||
|
reader = IndexReader.open(dir);
|
||||||
|
|
||||||
|
for (int i=0; i<nWriteThreads; i++) {
|
||||||
|
Thread thread = new Thread("WRITER"+i) {
|
||||||
|
Random rand = new Random(random.nextInt());
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (operations.get() > 0) {
|
||||||
|
int oper = rand.nextInt(100);
|
||||||
|
|
||||||
|
if (oper < commitPercent) {
|
||||||
|
if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
|
||||||
|
Map<Integer,Long> newCommittedModel;
|
||||||
|
long version;
|
||||||
|
|
||||||
|
synchronized(TestRealTimeGet.this) {
|
||||||
|
newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
|
||||||
|
version = snapshotCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
IndexReader newReader;
|
||||||
|
if (rand.nextInt(100) < softCommitPercent) {
|
||||||
|
// assertU(h.commit("softCommit","true"));
|
||||||
|
newReader = reader.reopen(writer, true);
|
||||||
|
} else {
|
||||||
|
// assertU(commit());
|
||||||
|
writer.commit();
|
||||||
|
newReader = reader.reopen();
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized(TestRealTimeGet.this) {
|
||||||
|
// install the new reader if it's newest
|
||||||
|
if (newReader.getVersion() > reader.getVersion()) {
|
||||||
|
reader.decRef();
|
||||||
|
reader = newReader;
|
||||||
|
} else if (newReader != reader) {
|
||||||
|
newReader.decRef();
|
||||||
|
}
|
||||||
|
|
||||||
|
// install this snapshot only if it's newer than the current one
|
||||||
|
if (version >= committedModelClock) {
|
||||||
|
committedModel = newCommittedModel;
|
||||||
|
committedModelClock = version;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numCommitting.decrementAndGet();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int id = rand.nextInt(ndocs);
|
||||||
|
Object sync = syncArr[id];
|
||||||
|
|
||||||
|
// set the lastId before we actually change it sometimes to try and
|
||||||
|
// uncover more race conditions between writing and reading
|
||||||
|
boolean before = random.nextBoolean();
|
||||||
|
if (before) {
|
||||||
|
lastId = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can't concurrently update the same document and retain our invariants of increasing values
|
||||||
|
// since we can't guarantee what order the updates will be executed.
|
||||||
|
synchronized (sync) {
|
||||||
|
Long val = model.get(id);
|
||||||
|
long nextVal = Math.abs(val)+1;
|
||||||
|
|
||||||
|
if (oper < commitPercent + deletePercent) {
|
||||||
|
// assertU("<delete><id>" + id + "</id></delete>");
|
||||||
|
writer.deleteDocuments(new Term("id",Integer.toString(id)));
|
||||||
|
model.put(id, -nextVal);
|
||||||
|
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
|
||||||
|
//assertU("<delete><query>id:" + id + "</query></delete>");
|
||||||
|
writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
|
||||||
|
model.put(id, -nextVal);
|
||||||
|
} else {
|
||||||
|
// assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
|
||||||
|
Document d = new Document();
|
||||||
|
d.add(new Field("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
|
||||||
|
d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
|
||||||
|
writer.updateDocument(new Term("id", Integer.toString(id)), d);
|
||||||
|
model.put(id, nextVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!before) {
|
||||||
|
lastId = id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
threads.add(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
for (int i=0; i<nReadThreads; i++) {
|
||||||
|
Thread thread = new Thread("READER"+i) {
|
||||||
|
Random rand = new Random(random.nextInt());
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (operations.decrementAndGet() >= 0) {
|
||||||
|
int oper = rand.nextInt(100);
|
||||||
|
// bias toward a recently changed doc
|
||||||
|
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
|
||||||
|
|
||||||
|
// when indexing, we update the index, then the model
|
||||||
|
// so when querying, we should first check the model, and then the index
|
||||||
|
|
||||||
|
long val;
|
||||||
|
|
||||||
|
synchronized(TestRealTimeGet.this) {
|
||||||
|
val = committedModel.get(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
IndexReader r;
|
||||||
|
synchronized(TestRealTimeGet.this) {
|
||||||
|
r = reader;
|
||||||
|
r.incRef();
|
||||||
|
}
|
||||||
|
|
||||||
|
// sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
|
||||||
|
IndexSearcher searcher = new IndexSearcher(r);
|
||||||
|
Query q = new TermQuery(new Term("id",Integer.toString(id)));
|
||||||
|
TopDocs results = searcher.search(q, 10);
|
||||||
|
|
||||||
|
if (results.totalHits == 0) {
|
||||||
|
// there's no info we can get back with a delete, so not much we can check without further synchronization
|
||||||
|
} else {
|
||||||
|
assertEquals(1, results.totalHits);
|
||||||
|
Document doc = searcher.doc(results.scoreDocs[0].doc);
|
||||||
|
long foundVal = Long.parseLong(doc.get(field));
|
||||||
|
if (foundVal < Math.abs(val)) {
|
||||||
|
System.out.println("model_val="+val+" foundVal="+foundVal);
|
||||||
|
}
|
||||||
|
assertTrue(foundVal >= Math.abs(val));
|
||||||
|
}
|
||||||
|
|
||||||
|
r.decRef();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable e) {
|
||||||
|
operations.set(-1L);
|
||||||
|
SolrException.log(log,e);
|
||||||
|
fail(e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
threads.add(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.close();
|
||||||
|
reader.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue