From d515ff3d9e2bd27595a9610f935ec5f85484214b Mon Sep 17 00:00:00 2001 From: Yonik Seeley Date: Wed, 2 Nov 2011 21:02:17 +0000 Subject: [PATCH] SOLR-2861: NRT test, debugging improvements git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1196797 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/solr/core/SolrCore.java | 28 +- .../component/RealTimeGetComponent.java | 3 + .../apache/solr/search/SolrIndexSearcher.java | 2 +- .../solr/update/DirectUpdateHandler2.java | 22 +- .../org/apache/solr/update/FSUpdateLog.java | 33 +- .../apache/solr/search/TestRealTimeGet.java | 400 +++++++++++++++++- 6 files changed, 454 insertions(+), 34 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index e5524221162..fbea86f02e8 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -51,6 +51,7 @@ import org.apache.solr.util.plugin.NamedListInitializedPlugin; import org.apache.solr.util.plugin.SolrCoreAware; import org.apache.solr.util.plugin.PluginInfoInitialized; import org.apache.commons.io.IOUtils; +import org.eclipse.jdt.core.dom.ThisExpression; import org.xml.sax.SAXException; import javax.xml.parsers.ParserConfigurationException; @@ -818,6 +819,21 @@ public final class SolrCore implements SolrInfoMBean { closeHooks.add( hook ); } + /** @lucene.internal + * Debugging aid only. No non-test code should be released with uncommented verbose() calls. */ + public static boolean VERBOSE = Boolean.parseBoolean(System.getProperty("tests.verbose","false")); + public static void verbose(Object... args) { + if (!VERBOSE) return; + StringBuilder sb = new StringBuilder("VERBOSE:"); + sb.append(Thread.currentThread().getName()); + sb.append(':'); + for (Object o : args) { + sb.append(' '); + sb.append(o==null ? "(null)" : o.toString()); + } + System.out.println(sb.toString()); + } + //////////////////////////////////////////////////////////////////////////////// // Request Handler @@ -1109,18 +1125,20 @@ public final class SolrCore implements SolrInfoMBean { if (newestSearcher != null && solrConfig.reopenReaders && indexDirFile.equals(newIndexDirFile)) { - + if (updateHandlerReopens) { tmp = getUpdateHandler().reopenSearcher(newestSearcher.get()); - } else { IndexReader currentReader = newestSearcher.get().getIndexReader(); IndexReader newReader; + // verbose("start reopen without writer, reader=", currentReader); newReader = IndexReader.openIfChanged(currentReader); - + // verbose("reopen result", newReader); + + if (newReader == null) { currentReader.incRef(); newReader = currentReader; @@ -1129,8 +1147,11 @@ public final class SolrCore implements SolrInfoMBean { tmp = new SolrIndexSearcher(this, schema, "main", newReader, true, true, true, directoryFactory); } + } else { + // verbose("non-reopen START:"); tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().mainIndexConfig, "main", true, true, directoryFactory); + // verbose("non-reopen DONE: searcher=",tmp); } } catch (Throwable th) { synchronized(searcherLock) { @@ -1163,6 +1184,7 @@ public final class SolrCore implements SolrInfoMBean { boolean alreadyRegistered = false; synchronized (searcherLock) { _searchers.add(newSearchHolder); + // verbose("added searcher ",newSearchHolder.get()," to _searchers"); if (_searcher == null) { // if there isn't a current searcher then we may diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index cabe2f929d8..75f3f32695e 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -27,6 +27,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.SolrCore; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.response.transform.DocTransformer; @@ -138,6 +139,8 @@ public class RealTimeGetComponent extends SearchComponent searcher = searcherHolder.get(); } + // SolrCore.verbose("RealTimeGet using searcher ", searcher); + int docid = searcher.getFirstMatch(new Term(idField.getName(), idBytes)); if (docid < 0) continue; Document luceneDocument = searcher.doc(docid); diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java index 3b91d650e7e..9e929728c3e 100644 --- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java +++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java @@ -184,7 +184,7 @@ public class SolrIndexSearcher extends IndexSearcher implements SolrInfoMBean { @Override public String toString() { - return name; + return name + "{" + reader + "}"; } public SolrCore getCore() { diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java index 1bb23259dde..1bba1eaf715 100644 --- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java +++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; @@ -157,7 +158,11 @@ public class DirectUpdateHandler2 extends UpdateHandler { updateTerm = cmd.updateTerm; } - writer.updateDocument(updateTerm, cmd.getLuceneDocument()); + Document luceneDocument = cmd.getLuceneDocument(); + // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer); + writer.updateDocument(updateTerm, luceneDocument); + // SolrCore.verbose("updateDocument",updateTerm,"DONE"); + if(del) { // ensure id remains unique BooleanQuery bq = new BooleanQuery(); bq.add(new BooleanClause(new TermQuery(updateTerm), Occur.MUST_NOT)); @@ -195,7 +200,12 @@ public class DirectUpdateHandler2 extends UpdateHandler { deleteByIdCommands.incrementAndGet(); deleteByIdCommandsCumulative.incrementAndGet(); - solrCoreState.getIndexWriter(core).deleteDocuments(new Term(idField.getName(), cmd.getIndexedId())); + IndexWriter writer = solrCoreState.getIndexWriter(core); + Term deleteTerm = new Term(idField.getName(), cmd.getIndexedId()); + + // SolrCore.verbose("deleteDocuments",deleteTerm,writer); + writer.deleteDocuments(deleteTerm); + // SolrCore.verbose("deleteDocuments",deleteTerm,"DONE"); ulog.delete(cmd); @@ -312,7 +322,9 @@ public class DirectUpdateHandler2 extends UpdateHandler { ulog.preCommit(cmd); } + // SolrCore.verbose("writer.commit() start writer=",writer); writer.commit(); + // SolrCore.verbose("writer.commit() end"); numDocsPending.set(0); callPostCommitCallbacks(); } else { @@ -385,8 +397,10 @@ public class DirectUpdateHandler2 extends UpdateHandler { IndexReader currentReader = previousSearcher.getIndexReader(); IndexReader newReader; - newReader = IndexReader.openIfChanged(currentReader, solrCoreState.getIndexWriter(core), true); - + IndexWriter writer = solrCoreState.getIndexWriter(core); + // SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer); + newReader = IndexReader.openIfChanged(currentReader, writer, true); + // SolrCore.verbose("reopen result", newReader); if (newReader == null) { currentReader.incRef(); diff --git a/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java b/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java index 136b72d3cf9..b82ebf6a855 100644 --- a/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java @@ -164,7 +164,7 @@ public class FSUpdateLog extends UpdateLog { long pos = tlog.write(cmd); LogPtr ptr = new LogPtr(pos); map.put(cmd.getIndexedId(), ptr); - // System.out.println("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); + // SolrCore.verbose("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); } } @@ -177,7 +177,7 @@ public class FSUpdateLog extends UpdateLog { long pos = tlog.writeDelete(cmd); LogPtr ptr = new LogPtr(pos); map.put(br, ptr); - // System.out.println("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); + // SolrCore.verbose("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); } } @@ -191,7 +191,7 @@ public class FSUpdateLog extends UpdateLog { // optimistic concurrency? Maybe we shouldn't support deleteByQuery w/ optimistic concurrency long pos = tlog.writeDeleteByQuery(cmd); LogPtr ptr = new LogPtr(pos); - // System.out.println("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); + // SolrCore.verbose("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); } } @@ -251,7 +251,7 @@ public class FSUpdateLog extends UpdateLog { // But we do know that any updates already added will definitely // show up in the latest reader after the commit succeeds. map = new HashMap(); - // System.out.println("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map)); + // SolrCore.verbose("TLOG: preSoftCommit: prevMap="+ System.identityHashCode(prevMap) + " new map=" + System.identityHashCode(map)); } } @@ -264,7 +264,7 @@ public class FSUpdateLog extends UpdateLog { // If this DUH2 synchronization were to be removed, preSoftCommit should // record what old maps were created and only remove those. clearOldMaps(); - // System.out.println("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap)); + // SolrCore.verbose("TLOG: postSoftCommit: disposing of prevMap="+ System.identityHashCode(prevMap)); } } @@ -276,18 +276,18 @@ public class FSUpdateLog extends UpdateLog { synchronized (this) { entry = map.get(indexedId); lookupLog = tlog; // something found in "map" will always be in "tlog" - // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in map " + System.identityHashCode(map) + " got " + entry + " lookupLog=" + lookupLog); + // SolrCore.verbose("TLOG: lookup: for id " + indexedId.utf8ToString() + " in map " + System.identityHashCode(map) + " got " + entry + " lookupLog=" + lookupLog); if (entry == null && prevMap != null) { entry = prevMap.get(indexedId); // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog) lookupLog = prevMapLog; - // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog); + // SolrCore.verbose("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog); } if (entry == null && prevMap2 != null) { entry = prevMap2.get(indexedId); // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog) lookupLog = prevMapLog2; - // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap2 " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog); + // SolrCore.verbose("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap2 " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog); } if (entry == null) { @@ -431,6 +431,7 @@ class TransactionLog { this.tlogFile = tlogFile; raf = new RandomAccessFile(this.tlogFile, "rw"); start = raf.length(); + // System.out.println("###start= "+start); channel = raf.getChannel(); os = Channels.newOutputStream(channel); fos = FastOutputStream.wrap(os); @@ -481,12 +482,22 @@ class TransactionLog { pos = start + fos.size(); } + /*** + System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length()); + if (pos != fos.size()) { + throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length()); + } + ***/ + codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); codec.writeInt(UpdateLog.ADD); // should just take one byte codec.writeLong(0); // the version... should also just be one byte if 0 codec.writeSolrInputDocument(cmd.getSolrInputDocument()); // fos.flushBuffer(); // flush later + + + return pos; } catch (IOException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); @@ -546,6 +557,12 @@ class TransactionLog { synchronized (fos) { // TODO: optimize this by keeping track of what we have flushed up to fos.flushBuffer(); + /*** + System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos); + if (fos.size() != raf.length() || pos >= fos.size() ) { + throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos); + } + ***/ } ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos); diff --git a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java index f9a4bdadc76..f0bd4906be3 100644 --- a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java +++ b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java @@ -17,6 +17,14 @@ package org.apache.solr.search; +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.FieldType; +import org.apache.lucene.index.*; +import org.apache.lucene.search.*; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; import org.apache.noggit.ObjectBuilder; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.SolrException; @@ -25,6 +33,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.Ignore; +import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -32,6 +41,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.solr.core.SolrCore.verbose; + public class TestRealTimeGet extends SolrTestCaseJ4 { @BeforeClass @@ -124,19 +135,6 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { ***/ - public static void verbose(Object... args) { - if (!VERBOSE) return; - StringBuilder sb = new StringBuilder("TEST:"); - sb.append(Thread.currentThread().getName()); - sb.append(':'); - for (Object o : args) { - sb.append(' '); - sb.append(o.toString()); - } - System.out.println(sb.toString()); - } - - final ConcurrentHashMap model = new ConcurrentHashMap(); Map committedModel = new HashMap(); long snapshotCount; @@ -159,14 +157,15 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { committedModel.putAll(model); } - @Test public void testStressGetRealtime() throws Exception { clearIndex(); assertU(commit()); + // req().getCore().getUpdateHandler().getIndexWriterProvider().getIndexWriter(req().getCore()).setInfoStream(System.out); + final int commitPercent = 5 + random.nextInt(20); - final int softCommitPercent = 30+random.nextInt(60); // what percent of the commits are soft + final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft final int deletePercent = 4+random.nextInt(25); final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200)); @@ -176,10 +175,22 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { // query variables final int percentRealtimeQuery = 60; + // final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total int nReadThreads = 5 + random.nextInt(25); + verbose("commitPercent=", commitPercent); + verbose("softCommitPercent=",softCommitPercent); + verbose("deletePercent=",deletePercent); + verbose("deleteByQueryPercent=", deleteByQueryPercent); + verbose("ndocs=", ndocs); + verbose("nWriteThreads=", nWriteThreads); + verbose("nReadThreads=", nReadThreads); + verbose("percentRealtimeQuery=", percentRealtimeQuery); + verbose("maxConcurrentCommits=", maxConcurrentCommits); + verbose("operations=", operations); + initModel(ndocs); @@ -205,6 +216,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { synchronized(TestRealTimeGet.this) { newCommittedModel = new HashMap(model); // take a snapshot version = snapshotCount++; + verbose("took snapshot version=",version); } if (rand.nextInt(100) < softCommitPercent) { @@ -212,9 +224,9 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { assertU(h.commit("softCommit","true")); verbose("softCommit end"); } else { - verbose("commit start"); + verbose("hardCommit start"); assertU(commit()); - verbose("commit end"); + verbose("hardCommit end"); } synchronized(TestRealTimeGet.this) { @@ -344,7 +356,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { assertEquals(1, doclist.size()); long foundVal = (Long)(((Map)doclist.get(0)).get(field)); if (foundVal < Math.abs(val)) { - verbose("ERROR, id=", id, "foundVal=",foundVal,"model val=",val); + verbose("ERROR, id", id, "foundVal=",foundVal,"model val=",val,"realTime=",realTime); assertTrue(foundVal >= Math.abs(val)); } } @@ -372,4 +384,356 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { } + + + + // The purpose of this test is to roughly model how solr uses lucene + IndexReader reader; + @Test + public void testStressLuceneNRT() throws Exception { + final int commitPercent = 5 + random.nextInt(20); + final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft + final int deletePercent = 4+random.nextInt(25); + final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query + final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200)); + int nWriteThreads = 5 + random.nextInt(25); + + final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers + + final AtomicLong operations = new AtomicLong(10000); // number of query operations to perform in total - crank up if + int nReadThreads = 5 + random.nextInt(25); + final boolean tombstones = random.nextBoolean(); + final boolean syncCommits = random.nextBoolean(); + + verbose("commitPercent=", commitPercent); + verbose("softCommitPercent=",softCommitPercent); + verbose("deletePercent=",deletePercent); + verbose("deleteByQueryPercent=", deleteByQueryPercent); + verbose("ndocs=", ndocs); + verbose("nWriteThreads=", nWriteThreads); + verbose("nReadThreads=", nReadThreads); + verbose("maxConcurrentCommits=", maxConcurrentCommits); + verbose("operations=", operations); + verbose("tombstones=", tombstones); + verbose("syncCommits=", syncCommits); + + initModel(ndocs); + + final AtomicInteger numCommitting = new AtomicInteger(); + + List threads = new ArrayList(); + + + final FieldType idFt = new FieldType(); + idFt.setIndexed(true); + idFt.setStored(true); + idFt.setOmitNorms(true); + idFt.setTokenized(false); + idFt.setIndexOptions(FieldInfo.IndexOptions.DOCS_ONLY); + + final FieldType ft2 = new FieldType(); + ft2.setIndexed(false); + ft2.setStored(true); + + + // model how solr does locking - only allow one thread to do a hard commit at once, and only one thread to do a soft commit, but + // a hard commit in progress does not stop a soft commit. + final Lock hardCommitLock = syncCommits ? new ReentrantLock() : null; + final Lock reopenLock = syncCommits ? new ReentrantLock() : null; + + + // RAMDirectory dir = new RAMDirectory(); + // final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new WhitespaceAnalyzer(Version.LUCENE_40))); + + Directory dir = newDirectory(); + + final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))); + writer.setDoRandomOptimizeAssert(false); + writer.w.setInfoStream(VERBOSE ? System.out : null); + writer.w.setInfoStream(null); + + // writer.commit(); + // reader = IndexReader.open(dir); + // make this reader an NRT reader from the start to avoid the first non-writer openIfChanged + // to only opening at the last commit point. + reader = IndexReader.open(writer.w, true); + + for (int i=0; i 0) { + int oper = rand.nextInt(100); + + if (oper < commitPercent) { + if (numCommitting.incrementAndGet() <= maxConcurrentCommits) { + Map newCommittedModel; + long version; + IndexReader oldReader; + + boolean softCommit = rand.nextInt(100) < softCommitPercent; + + if (!softCommit) { + // only allow one hard commit to proceed at once + if (hardCommitLock != null) hardCommitLock.lock(); + verbose("hardCommit start"); + + writer.commit(); + } + + if (reopenLock != null) reopenLock.lock(); + + synchronized(TestRealTimeGet.this) { + newCommittedModel = new HashMap(model); // take a snapshot + version = snapshotCount++; + oldReader = reader; + oldReader.incRef(); // increment the reference since we will use this for reopening + } + + if (!softCommit) { + // must commit after taking a snapshot of the model + // writer.commit(); + } + + verbose("reopen start using", oldReader); + + IndexReader newReader; + if (softCommit) { + newReader = IndexReader.openIfChanged(oldReader, writer.w, true); + } else { + // will only open to last commit + newReader = IndexReader.openIfChanged(oldReader); + } + + + if (newReader == null) { + oldReader.incRef(); + newReader = oldReader; + } + oldReader.decRef(); + + verbose("reopen result", newReader); + + synchronized(TestRealTimeGet.this) { + assert newReader.getRefCount() > 0; + assert reader.getRefCount() > 0; + + // install the new reader if it's newest (and check the current version since another reader may have already been installed) + if (newReader.getVersion() > reader.getVersion()) { + reader.decRef(); + reader = newReader; + + // install this snapshot only if it's newer than the current one + if (version >= committedModelClock) { + committedModel = newCommittedModel; + committedModelClock = version; + } + + } else { + // close if unused + newReader.decRef(); + } + + } + + if (reopenLock != null) reopenLock.unlock(); + + if (!softCommit) { + if (hardCommitLock != null) hardCommitLock.unlock(); + } + + } + numCommitting.decrementAndGet(); + continue; + } + + + int id = rand.nextInt(ndocs); + Object sync = syncArr[id]; + + // set the lastId before we actually change it sometimes to try and + // uncover more race conditions between writing and reading + boolean before = rand.nextBoolean(); + if (before) { + lastId = id; + } + + // We can't concurrently update the same document and retain our invariants of increasing values + // since we can't guarantee what order the updates will be executed. + synchronized (sync) { + Long val = model.get(id); + long nextVal = Math.abs(val)+1; + + if (oper < commitPercent + deletePercent) { + // add tombstone first + if (tombstones) { + Document d = new Document(); + d.add(new Field("id","-"+Integer.toString(id), idFt)); + d.add(new Field(field, Long.toString(nextVal), ft2)); + verbose("adding tombstone for id",id,"val=",nextVal); + writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); + } + + verbose("deleting id",id,"val=",nextVal); + writer.deleteDocuments(new Term("id",Integer.toString(id))); + model.put(id, -nextVal); + verbose("deleting id",id,"val=",nextVal,"DONE"); + + } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) { + //assertU("id:" + id + ""); + + // add tombstone first + if (tombstones) { + Document d = new Document(); + d.add(new Field("id","-"+Integer.toString(id), idFt)); + d.add(new Field(field, Long.toString(nextVal), ft2)); + verbose("adding tombstone for id",id,"val=",nextVal); + writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); + } + + verbose("deleteByQuery",id,"val=",nextVal); + writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id)))); + model.put(id, -nextVal); + verbose("deleteByQuery",id,"val=",nextVal,"DONE"); + } else { + // model.put(id, nextVal); // uncomment this and this test should fail. + + // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal))); + Document d = new Document(); + d.add(new Field("id",Integer.toString(id), idFt)); + d.add(new Field(field, Long.toString(nextVal), ft2)); + verbose("adding id",id,"val=",nextVal); + writer.updateDocument(new Term("id", Integer.toString(id)), d); + if (tombstones) { + // remove tombstone after new addition (this should be optional?) + verbose("deleting tombstone for id",id); + writer.deleteDocuments(new Term("id","-"+Integer.toString(id))); + verbose("deleting tombstone for id",id,"DONE"); + } + + model.put(id, nextVal); + verbose("adding id",id,"val=",nextVal,"DONE"); + } + } + + if (!before) { + lastId = id; + } + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + }; + + threads.add(thread); + } + + + for (int i=0; i= 0) { + // bias toward a recently changed doc + int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs); + + // when indexing, we update the index, then the model + // so when querying, we should first check the model, and then the index + + long val; + + synchronized(TestRealTimeGet.this) { + val = committedModel.get(id); + } + + + IndexReader r; + synchronized(TestRealTimeGet.this) { + r = reader; + r.incRef(); + } + + int docid = getFirstMatch(r, new Term("id",Integer.toString(id))); + + if (docid < 0 && tombstones) { + // if we couldn't find the doc, look for it's tombstone + docid = getFirstMatch(r, new Term("id","-"+Integer.toString(id))); + if (docid < 0) { + if (val == -1L) { + // expected... no doc was added yet + r.decRef(); + continue; + } + verbose("ERROR: Couldn't find a doc or tombstone for id", id, "using reader",r,"expected value",val); + fail("No documents or tombstones found for id " + id + ", expected at least " + val); + } + } + + if (docid < 0 && !tombstones) { + // nothing to do - we can't tell anything from a deleted doc without tombstones + } else { + if (docid < 0) { + verbose("ERROR: Couldn't find a doc for id", id, "using reader",r); + } + assertTrue(docid >= 0); // we should have found the document, or it's tombstone + Document doc = r.document(docid); + long foundVal = Long.parseLong(doc.get(field)); + if (foundVal < Math.abs(val)) { + verbose("ERROR: id",id,"model_val=",val," foundVal=",foundVal,"reader=",reader); + } + assertTrue(foundVal >= Math.abs(val)); + } + + r.decRef(); + } + } + catch (Throwable e) { + operations.set(-1L); + SolrException.log(log,e); + fail(e.toString()); + } + } + }; + + threads.add(thread); + } + + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + writer.close(); + reader.close(); + dir.close(); + } + + + public int getFirstMatch(IndexReader r, Term t) throws IOException { + Fields fields = MultiFields.getFields(r); + if (fields == null) return -1; + Terms terms = fields.terms(t.field()); + if (terms == null) return -1; + BytesRef termBytes = t.bytes(); + DocsEnum docs = terms.docs(MultiFields.getLiveDocs(r), termBytes, null); + if (docs == null) return -1; + int id = docs.nextDoc(); + if (id != DocIdSetIterator.NO_MORE_DOCS) { + int next = docs.nextDoc(); + assertEquals(DocIdSetIterator.NO_MORE_DOCS, next); + } + return id == DocIdSetIterator.NO_MORE_DOCS ? -1 : id; + } + }