LUCENE-3639: fix a few bugs in shard searching; add basic shard search test

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1220426 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2011-12-18 15:24:58 +00:00
parent 8472b04326
commit aee83bf7ee
6 changed files with 1012 additions and 16 deletions

View File

@ -760,6 +760,12 @@ Bug fixes
where they would create invalid offsets in some situations, leading to problems
in highlighting. (Max Beutel via Robert Muir)
* LUCENE-3639: TopDocs.merge was incorrectly setting TopDocs.maxScore to
Float.MIN_VALUE when it should be Float.NaN, when there were 0
hits. Improved age calculation in SearcherLifetimeManager, to have
double precision and to compute age to be how long ago the searcher
was replaced with a new searcher (Mike McCandless)
Documentation
* LUCENE-3597: Fixed incorrect grouping documentation. (Martijn van Groningen, Robert Muir)

View File

@ -46,6 +46,6 @@ public class ScoreDoc {
// A convenience method for debugging.
@Override
public String toString() {
return "doc=" + doc + " score=" + score;
return "doc=" + doc + " score=" + score + " shardIndex=" + shardIndex;
}
}

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.search.NRTManager; // javadocs
import org.apache.lucene.index.IndexReader; // javadocs
@ -101,9 +100,11 @@ import org.apache.lucene.util.IOUtils;
public class SearcherLifetimeManager implements Closeable {
static final double NANOS_PER_SEC = 1000000000.0;
private static class SearcherTracker implements Comparable<SearcherTracker>, Closeable {
public final IndexSearcher searcher;
public final long recordTimeSec;
public final double recordTimeSec;
public final long version;
public SearcherTracker(IndexSearcher searcher) {
@ -112,7 +113,7 @@ public class SearcherLifetimeManager implements Closeable {
searcher.getIndexReader().incRef();
// Use nanoTime not currentTimeMillis since it [in
// theory] reduces risk from clock shift
recordTimeSec = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime());
recordTimeSec = System.nanoTime() / NANOS_PER_SEC;
}
// Newer searchers are sort before older ones:
@ -170,6 +171,7 @@ public class SearcherLifetimeManager implements Closeable {
final long version = searcher.getIndexReader().getVersion();
SearcherTracker tracker = searchers.get(version);
if (tracker == null) {
//System.out.println("RECORD version=" + version + " ms=" + System.currentTimeMillis());
tracker = new SearcherTracker(searcher);
if (searchers.putIfAbsent(version, tracker) != null) {
// Another thread beat us -- must decRef to undo
@ -217,29 +219,28 @@ public class SearcherLifetimeManager implements Closeable {
/** See {@link #prune}. */
public interface Pruner {
/** Return true if this searcher should be removed.
* @param ageSec how long ago this searcher was
* recorded vs the most recently recorded
* searcher
* @param ageSec how much time has passed since this
* searcher was the current (live) searcher
* @param searcher Searcher
**/
public boolean doPrune(int ageSec, IndexSearcher searcher);
public boolean doPrune(double ageSec, IndexSearcher searcher);
}
/** Simple pruner that drops any searcher older by
* more than the specified seconds, than the newest
* searcher. */
public final static class PruneByAge implements Pruner {
private final int maxAgeSec;
private final double maxAgeSec;
public PruneByAge(int maxAgeSec) {
if (maxAgeSec < 1) {
public PruneByAge(double maxAgeSec) {
if (maxAgeSec < 0) {
throw new IllegalArgumentException("maxAgeSec must be > 0 (got " + maxAgeSec + ")");
}
this.maxAgeSec = maxAgeSec;
}
@Override
public boolean doPrune(int ageSec, IndexSearcher searcher) {
public boolean doPrune(double ageSec, IndexSearcher searcher) {
return ageSec > maxAgeSec;
}
}
@ -261,14 +262,25 @@ public class SearcherLifetimeManager implements Closeable {
trackers.add(tracker);
}
Collections.sort(trackers);
final long newestSec = trackers.isEmpty() ? 0L : trackers.get(0).recordTimeSec;
double lastRecordTimeSec = 0.0;
final double now = System.nanoTime()/NANOS_PER_SEC;
for (SearcherTracker tracker: trackers) {
final int ageSec = (int) (newestSec - tracker.recordTimeSec);
assert ageSec >= 0;
final double ageSec;
if (lastRecordTimeSec == 0.0) {
ageSec = 0.0;
} else {
ageSec = now - lastRecordTimeSec;
}
// First tracker is always age 0.0 sec, since it's
// still "live"; second tracker's age (= seconds since
// it was "live") is now minus first tracker's
// recordTime, etc:
if (pruner.doPrune(ageSec, tracker.searcher)) {
//System.out.println("PRUNE version=" + tracker.version + " age=" + ageSec + " ms=" + System.currentTimeMillis());
searchers.remove(tracker.version);
tracker.close();
}
lastRecordTimeSec = tracker.recordTimeSec;
}
}

View File

@ -216,8 +216,10 @@ public class TopDocs {
float maxScore = Float.MIN_VALUE;
for(int shardIDX=0;shardIDX<shardHits.length;shardIDX++) {
final TopDocs shard = shardHits[shardIDX];
// totalHits can be non-zero even if no hits were
// collected, when searchAfter was used:
totalHitCount += shard.totalHits;
if (shard.scoreDocs != null && shard.scoreDocs.length > 0) {
totalHitCount += shard.totalHits;
availHitCount += shard.scoreDocs.length;
queue.add(new ShardRef(shardIDX));
maxScore = Math.max(maxScore, shard.getMaxScore());
@ -225,6 +227,10 @@ public class TopDocs {
}
}
if (availHitCount == 0) {
maxScore = Float.NaN;
}
final ScoreDoc[] hits = new ScoreDoc[Math.min(topN, availHitCount)];
int hitUpto = 0;

View File

@ -0,0 +1,579 @@
package org.apache.lucene.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TermContext;
// TODO
// - doc blocks? so we can test joins/grouping...
// - controlled consistency (NRTMgr)
public abstract class ShardSearchingTestBase extends LuceneTestCase {
// TODO: maybe SLM should throw this instead of returning null...
public static class SearcherExpiredException extends RuntimeException {
public SearcherExpiredException(String message) {
super(message);
}
}
private static class FieldAndShardVersion {
private final long version;
private final int nodeID;
private final String field;
public FieldAndShardVersion(int nodeID, long version, String field) {
this.nodeID = nodeID;
this.version = version;
this.field = field;
}
@Override
public int hashCode() {
return (int) (version * nodeID + field.hashCode());
}
@Override
public boolean equals(Object _other) {
if (!(_other instanceof FieldAndShardVersion)) {
return false;
}
final FieldAndShardVersion other = (FieldAndShardVersion) _other;
return field.equals(other.field) && version == other.version && nodeID == other.nodeID;
}
@Override
public String toString() {
return "FieldAndShardVersion(field=" + field + " nodeID=" + nodeID + " version=" + version+ ")";
}
}
private static class TermAndShardVersion {
private final long version;
private final int nodeID;
private final Term term;
public TermAndShardVersion(int nodeID, long version, Term term) {
this.nodeID = nodeID;
this.version = version;
this.term = term;
}
@Override
public int hashCode() {
return (int) (version * nodeID + term.hashCode());
}
@Override
public boolean equals(Object _other) {
if (!(_other instanceof TermAndShardVersion)) {
return false;
}
final TermAndShardVersion other = (TermAndShardVersion) _other;
return term.equals(other.term) && version == other.version && nodeID == other.nodeID;
}
}
// We share collection stats for these fields on each node
// reopen:
private final String[] fieldsToShare = new String[] {"body", "title"};
// Called by one node once it has reopened, to notify all
// other nodes. This is just a mock (since it goes and
// directly updates all other nodes, in RAM)... in a real
// env this would hit the wire, sending version &
// collection stats to all other nodes:
void broadcastNodeReopen(int nodeID, long version, IndexSearcher newSearcher) throws IOException {
if (VERBOSE) {
System.out.println("REOPEN: nodeID=" + nodeID + " version=" + version + " maxDoc=" + newSearcher.getIndexReader().maxDoc());
}
// Broadcast new collection stats for this node to all
// other nodes:
for(String field : fieldsToShare) {
final CollectionStatistics stats = newSearcher.collectionStatistics(field);
for (NodeState node : nodes) {
// Don't put my own collection stats into the cache;
// we pull locally:
if (node.myNodeID != nodeID) {
node.collectionStatsCache.put(new FieldAndShardVersion(nodeID, version, field), stats);
}
}
}
for (NodeState node : nodes) {
node.updateNodeVersion(nodeID, version);
}
}
// TODO: broadcastNodeExpire? then we can purge the
// known-stale cache entries...
// MOCK: in a real env you have to hit the wire
// (send this query to all remote nodes
// concurrently):
TopDocs searchNode(int nodeID, long[] nodeVersions, Query q, Sort sort, int numHits, ScoreDoc searchAfter) throws IOException {
final NodeState.ShardIndexSearcher s = nodes[nodeID].acquire(nodeVersions);
try {
if (sort == null) {
if (searchAfter != null) {
return s.localSearchAfter(searchAfter, q, numHits);
} else {
return s.localSearch(q, numHits);
}
} else {
assert searchAfter == null; // not supported yet
return s.localSearch(q, numHits, sort);
}
} finally {
nodes[nodeID].release(s);
}
}
// Mock: in a real env, this would hit the wire and get
// term stats from remote node
Map<Term,TermStatistics> getNodeTermStats(Set<Term> terms, int nodeID, long version) throws IOException {
final NodeState node = nodes[nodeID];
final Map<Term,TermStatistics> stats = new HashMap<Term,TermStatistics>();
final IndexSearcher s = node.searchers.acquire(version);
if (s == null) {
throw new SearcherExpiredException("node=" + nodeID + " version=" + version);
}
try {
for(Term term : terms) {
final TermContext termContext = TermContext.build(s.getIndexReader().getTopReaderContext(), term, false);
stats.put(term, s.termStatistics(term, termContext));
}
} finally {
node.searchers.release(s);
}
return stats;
}
protected final class NodeState implements Closeable {
public final Directory dir;
public final IndexWriter writer;
public final SearcherLifetimeManager searchers;
public final SearcherManager mgr;
public final int myNodeID;
public final long[] currentNodeVersions;
// TODO: nothing evicts from here!!! Somehow, on searcher
// expiration on remote nodes we must evict from our
// local cache...? And still LRU otherwise (for the
// still-live searchers).
private final Map<FieldAndShardVersion,CollectionStatistics> collectionStatsCache = new ConcurrentHashMap<FieldAndShardVersion,CollectionStatistics>();
private final Map<TermAndShardVersion,TermStatistics> termStatsCache = new ConcurrentHashMap<TermAndShardVersion,TermStatistics>();
/** Matches docs in the local shard but scores based on
* aggregated stats ("mock distributed scoring") from all
* nodes. */
public class ShardIndexSearcher extends IndexSearcher {
// Version for the node searchers we search:
public final long[] nodeVersions;
public final int myNodeID;
public ShardIndexSearcher(long[] nodeVersions, IndexReader localReader, int nodeID) {
super(localReader);
this.nodeVersions = nodeVersions;
myNodeID = nodeID;
assert myNodeID == NodeState.this.myNodeID: "myNodeID=" + nodeID + " NodeState.this.myNodeID=" + NodeState.this.myNodeID;
}
@Override
public Query rewrite(Query original) throws IOException {
final Query rewritten = super.rewrite(original);
final Set<Term> terms = new HashSet<Term>();
rewritten.extractTerms(terms);
// Make a single request to remote nodes for term
// stats:
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
if (nodeID == myNodeID) {
continue;
}
final Set<Term> missing = new HashSet<Term>();
for(Term term : terms) {
final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], term);
if (!termStatsCache.containsKey(key)) {
missing.add(term);
}
}
if (missing.size() != 0) {
for(Map.Entry<Term,TermStatistics> ent : getNodeTermStats(missing, nodeID, nodeVersions[nodeID]).entrySet()) {
final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], ent.getKey());
termStatsCache.put(key, ent.getValue());
}
}
}
return rewritten;
}
@Override
public TermStatistics termStatistics(Term term, TermContext context) throws IOException {
assert term != null;
int docFreq = 0;
long totalTermFreq = 0;
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
final TermStatistics subStats;
if (nodeID == myNodeID) {
subStats = super.termStatistics(term, context);
} else {
final TermAndShardVersion key = new TermAndShardVersion(nodeID, nodeVersions[nodeID], term);
subStats = termStatsCache.get(key);
// We pre-cached during rewrite so all terms
// better be here...
assert subStats != null;
}
docFreq += subStats.docFreq();
totalTermFreq += subStats.totalTermFreq();
}
return new TermStatistics(term.bytes(), docFreq, totalTermFreq);
}
@Override
public CollectionStatistics collectionStatistics(String field) throws IOException {
// TODO: we could compute this on init and cache,
// since we are re-inited whenever any nodes have a
// new reader
int docCount = 0;
long sumTotalTermFreq = 0;
long sumDocFreq = 0;
int maxDoc = 0;
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
final FieldAndShardVersion key = new FieldAndShardVersion(nodeID, nodeVersions[nodeID], field);
final CollectionStatistics nodeStats;
if (nodeID == myNodeID) {
nodeStats = super.collectionStatistics(field);
} else {
nodeStats = collectionStatsCache.get(key);
}
if (nodeStats == null) {
System.out.println("coll stats myNodeID=" + myNodeID + ": " + collectionStatsCache.keySet());
}
// Collection stats are pre-shared on reopen, so,
// we better not have a cache miss:
assert nodeStats != null: "myNodeID=" + myNodeID + " nodeID=" + nodeID + " version=" + nodeVersions[nodeID] + " field=" + field;
docCount += nodeStats.docCount();
sumTotalTermFreq += nodeStats.sumTotalTermFreq();
sumDocFreq += nodeStats.sumDocFreq();
maxDoc += nodeStats.maxDoc();
}
return new CollectionStatistics(field, maxDoc, docCount, sumTotalTermFreq, sumDocFreq);
}
@Override
public TopDocs search(Query query, int numHits) throws IOException {
final TopDocs[] shardHits = new TopDocs[nodeVersions.length];
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
if (nodeID == myNodeID) {
// My node; run using local shard searcher we
// already aquired:
shardHits[nodeID] = localSearch(query, numHits);
} else {
shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, null, numHits, null);
}
}
// Merge:
return TopDocs.merge(null, numHits, shardHits);
}
public TopDocs localSearch(Query query, int numHits) throws IOException {
return super.search(query, numHits);
}
@Override
public TopDocs searchAfter(ScoreDoc after, Query query, int numHits) throws IOException {
final TopDocs[] shardHits = new TopDocs[nodeVersions.length];
ScoreDoc shardAfter = new ScoreDoc(after.doc, after.score);
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
if (nodeID < after.shardIndex) {
// If score is tied then no docs in this shard
// should be collected:
shardAfter.doc = Integer.MAX_VALUE;
} else if (nodeID == after.shardIndex) {
// If score is tied then we break according to
// docID (like normal):
shardAfter.doc = after.doc;
} else {
// If score is tied then all docs in this shard
// should be collected, because they come after
// the previous bottom:
shardAfter.doc = -1;
}
if (nodeID == myNodeID) {
// My node; run using local shard searcher we
// already aquired:
shardHits[nodeID] = localSearchAfter(shardAfter, query, numHits);
} else {
shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, null, numHits, shardAfter);
}
//System.out.println(" node=" + nodeID + " totHits=" + shardHits[nodeID].totalHits);
}
// Merge:
return TopDocs.merge(null, numHits, shardHits);
}
public TopDocs localSearchAfter(ScoreDoc after, Query query, int numHits) throws IOException {
return super.searchAfter(after, query, numHits);
}
@Override
public TopFieldDocs search(Query query, int numHits, Sort sort) throws IOException {
assert sort != null;
final TopDocs[] shardHits = new TopDocs[nodeVersions.length];
for(int nodeID=0;nodeID<nodeVersions.length;nodeID++) {
if (nodeID == myNodeID) {
// My node; run using local shard searcher we
// already aquired:
shardHits[nodeID] = localSearch(query, numHits, sort);
} else {
shardHits[nodeID] = searchNode(nodeID, nodeVersions, query, sort, numHits, null);
}
}
// Merge:
return (TopFieldDocs) TopDocs.merge(sort, numHits, shardHits);
}
public TopFieldDocs localSearch(Query query, int numHits, Sort sort) throws IOException {
return super.search(query, numHits, sort);
}
}
private volatile ShardIndexSearcher currentShardSearcher;
public NodeState(Random random, String baseDir, int nodeID, int numNodes) throws IOException {
myNodeID = nodeID;
dir = newFSDirectory(new File(baseDir + "." + myNodeID));
// TODO: set warmer
writer = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
mgr = new SearcherManager(writer, true, null);
searchers = new SearcherLifetimeManager();
// Init w/ 0s... caller above will do initial
// "broadcast" by calling initSearcher:
currentNodeVersions = new long[numNodes];
}
public void initSearcher(long[] nodeVersions) {
assert currentShardSearcher == null;
System.arraycopy(nodeVersions, 0, currentNodeVersions, 0, currentNodeVersions.length);
currentShardSearcher = new ShardIndexSearcher(currentNodeVersions.clone(),
mgr.acquire().getIndexReader(),
myNodeID);
}
public void updateNodeVersion(int nodeID, long version) throws IOException {
currentNodeVersions[nodeID] = version;
if (currentShardSearcher != null) {
currentShardSearcher.getIndexReader().decRef();
}
currentShardSearcher = new ShardIndexSearcher(currentNodeVersions.clone(),
mgr.acquire().getIndexReader(),
myNodeID);
}
// Get the current (fresh) searcher for this node
public ShardIndexSearcher acquire() {
final ShardIndexSearcher s = currentShardSearcher;
// TODO: this isn't thread safe.... in theory the
// reader could get decRef'd to 0 before we have a
// chance to incRef, ie if a reopen happens right
// after the above line, this thread gets stalled, and
// the old IR is closed. But because we use SLM in
// this test, this will be exceptionally rare:
s.getIndexReader().incRef();
return s;
}
public void release(ShardIndexSearcher s) throws IOException {
s.getIndexReader().decRef();
}
// Get and old searcher matching the specified versions:
public ShardIndexSearcher acquire(long[] nodeVersions) {
final IndexSearcher s = searchers.acquire(nodeVersions[myNodeID]);
if (s == null) {
throw new SearcherExpiredException("nodeID=" + myNodeID + " version=" + nodeVersions[myNodeID]);
}
return new ShardIndexSearcher(nodeVersions, s.getIndexReader(), myNodeID);
}
// Reopen local reader
public void reopen() throws IOException {
final IndexSearcher before = mgr.acquire();
mgr.release(before);
mgr.maybeReopen();
final IndexSearcher after = mgr.acquire();
try {
if (after != before) {
// New searcher was opened
final long version = searchers.record(after);
searchers.prune(new SearcherLifetimeManager.PruneByAge(maxSearcherAgeSeconds));
broadcastNodeReopen(myNodeID, version, after);
}
} finally {
mgr.release(after);
}
}
@Override
public void close() throws IOException {
if (currentShardSearcher != null) {
currentShardSearcher.getIndexReader().decRef();
}
searchers.close();
mgr.close();
writer.close();
dir.close();
}
}
// TODO: make this more realistic, ie, each node should
// have its own thread, so we have true node to node
// concurrency
private final class ChangeIndices extends Thread {
@Override
public void run() {
try {
final LineFileDocs docs = new LineFileDocs(random);
int numDocs = 0;
while (System.nanoTime() < endTimeNanos) {
final int what = random.nextInt(3);
final NodeState node = nodes[random.nextInt(nodes.length)];
if (numDocs == 0 || what == 0) {
node.writer.addDocument(docs.nextDoc());
numDocs++;
} else if (what == 1) {
node.writer.updateDocument(new Term("docid", ""+random.nextInt(numDocs)),
docs.nextDoc());
numDocs++;
} else {
node.writer.deleteDocuments(new Term("docid", ""+random.nextInt(numDocs)));
}
// TODO: doc blocks too
if (random.nextInt(17) == 12) {
node.writer.commit();
}
if (random.nextInt(17) == 12) {
nodes[random.nextInt(nodes.length)].reopen();
}
}
} catch (Throwable t) {
System.out.println("FAILED:");
t.printStackTrace(System.out);
throw new RuntimeException(t);
}
}
}
protected NodeState[] nodes;
int maxSearcherAgeSeconds;
long endTimeNanos;
private Thread changeIndicesThread;
protected void start(String baseDirName, int numNodes, double runTimeSec, int maxSearcherAgeSeconds) throws IOException {
endTimeNanos = System.nanoTime() + (long) (runTimeSec*1000000000);
this.maxSearcherAgeSeconds = maxSearcherAgeSeconds;
nodes = new NodeState[numNodes];
for(int nodeID=0;nodeID<numNodes;nodeID++) {
nodes[nodeID] = new NodeState(random, baseDirName, nodeID, numNodes);
}
long[] nodeVersions = new long[nodes.length];
for(int nodeID=0;nodeID<numNodes;nodeID++) {
final IndexSearcher s = nodes[nodeID].mgr.acquire();
try {
nodeVersions[nodeID] = nodes[nodeID].searchers.record(s);
} finally {
nodes[nodeID].mgr.release(s);
}
}
for(int nodeID=0;nodeID<numNodes;nodeID++) {
final IndexSearcher s = nodes[nodeID].mgr.acquire();
assert nodeVersions[nodeID] == nodes[nodeID].searchers.record(s);
assert s != null;
try {
broadcastNodeReopen(nodeID, nodeVersions[nodeID], s);
} finally {
nodes[nodeID].mgr.release(s);
}
}
changeIndicesThread = new ChangeIndices();
changeIndicesThread.start();
}
protected void finish() throws InterruptedException, IOException {
changeIndicesThread.join();
for(NodeState node : nodes) {
node.close();
}
}
protected static class SearcherAndVersion {
public final IndexSearcher searcher;
public final long version;
public SearcherAndVersion(IndexSearcher searcher, long version) {
this.searcher = searcher;
this.version = version;
}
}
}

View File

@ -0,0 +1,393 @@
package org.apache.lucene.search;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiFields;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase.UseNoMemoryExpensiveCodec;
import org.apache.lucene.util._TestUtil;
// TODO
// - other queries besides PrefixQuery & TermQuery (but:
// FuzzyQ will be problematic... the top N terms it
// takes means results will differ)
// - NRQ/F
// - BQ, negated clauses, negated prefix clauses
// - test pulling docs in 2nd round trip...
// - filter too
@UseNoMemoryExpensiveCodec
public class TestShardSearching extends ShardSearchingTestBase {
private static class PreviousSearchState {
public final long searchTimeNanos;
public final long[] versions;
public final ScoreDoc searchAfterLocal;
public final ScoreDoc searchAfterShard;
public final Sort sort;
public final Query query;
public final int numHitsPaged;
public PreviousSearchState(Query query, Sort sort, ScoreDoc searchAfterLocal, ScoreDoc searchAfterShard, long[] versions, int numHitsPaged) {
this.versions = versions.clone();
this.searchAfterLocal = searchAfterLocal;
this.searchAfterShard = searchAfterShard;
this.sort = sort;
this.query = query;
this.numHitsPaged = numHitsPaged;
searchTimeNanos = System.nanoTime();
}
}
public void testSimple() throws Exception {
final int numNodes = _TestUtil.nextInt(random, 1, 10);
final double runTimeSec = atLeast(5) * RANDOM_MULTIPLIER;
final int minDocsToMakeTerms = _TestUtil.nextInt(random, 5, 20);
final int maxSearcherAgeSeconds = _TestUtil.nextInt(random, 1, 4);
if (VERBOSE) {
System.out.println("TEST: numNodes=" + numNodes + " runTimeSec=" + runTimeSec + " maxSearcherAgeSeconds=" + maxSearcherAgeSeconds);
}
start(_TestUtil.getTempDir("TestShardSearching").toString(),
numNodes,
runTimeSec,
maxSearcherAgeSeconds
);
final List<PreviousSearchState> priorSearches = new ArrayList<PreviousSearchState>();
List<BytesRef> terms = null;
while (System.nanoTime() < endTimeNanos) {
final boolean doFollowon = priorSearches.size() > 0 && random.nextInt(7) == 1;
// Pick a random node; we will run the query on this node:
final int myNodeID = random.nextInt(numNodes);
final NodeState.ShardIndexSearcher localShardSearcher;
final PreviousSearchState prevSearchState;
if (doFollowon) {
// Pretend user issued a followon query:
prevSearchState = priorSearches.get(random.nextInt(priorSearches.size()));
if (VERBOSE) {
System.out.println("\nTEST: follow-on query age=" + ((System.nanoTime() - prevSearchState.searchTimeNanos)/1000000000.0));
}
try {
localShardSearcher = nodes[myNodeID].acquire(prevSearchState.versions);
} catch (SearcherExpiredException see) {
// Expected, sometimes; in a "real" app we would
// either forward this error to the user ("too
// much time has passed; please re-run your
// search") or sneakily just switch to newest
// searcher w/o telling them...
if (VERBOSE) {
System.out.println(" searcher expired during local shard searcher init: " + see);
}
priorSearches.remove(prevSearchState);
continue;
}
} else {
if (VERBOSE) {
System.out.println("\nTEST: fresh query");
}
// Do fresh query:
localShardSearcher = nodes[myNodeID].acquire();
prevSearchState = null;
}
final IndexReader[] subs = new IndexReader[numNodes];
PreviousSearchState searchState = null;
try {
// Mock: now make a single reader (MultiReader) from all node
// searchers. In a real shard env you can't do this... we
// do it to confirm results from the shard searcher
// are correct:
int docCount = 0;
try {
for(int nodeID=0;nodeID<numNodes;nodeID++) {
final long subVersion = localShardSearcher.nodeVersions[nodeID];
final IndexSearcher sub = nodes[nodeID].searchers.acquire(subVersion);
if (sub == null) {
nodeID--;
while(nodeID >= 0) {
subs[nodeID].decRef();
subs[nodeID] = null;
nodeID--;
}
throw new SearcherExpiredException("nodeID=" + nodeID + " version=" + subVersion);
}
subs[nodeID] = sub.getIndexReader();
docCount += subs[nodeID].maxDoc();
}
} catch (SearcherExpiredException see) {
// Expected
if (VERBOSE) {
System.out.println(" searcher expired during mock reader init: " + see);
}
continue;
}
final IndexReader mockReader = new MultiReader(subs);
final IndexSearcher mockSearcher = new IndexSearcher(mockReader);
Query query;
Sort sort;
if (prevSearchState != null) {
query = prevSearchState.query;
sort = prevSearchState.sort;
} else {
if (terms == null && docCount > minDocsToMakeTerms) {
// TODO: try to "focus" on high freq terms sometimes too
// TODO: maybe also periodically reset the terms...?
final TermsEnum termsEnum = MultiFields.getTerms(mockReader, "body").iterator(null);
terms = new ArrayList<BytesRef>();
while(termsEnum.next() != null) {
terms.add(BytesRef.deepCopyOf(termsEnum.term()));
}
if (VERBOSE) {
System.out.println("TEST: init terms: " + terms.size() + " terms");
}
if (terms.size() == 0) {
terms = null;
}
}
if (VERBOSE) {
System.out.println(" maxDoc=" + mockReader.maxDoc());
}
if (terms != null) {
if (random.nextBoolean()) {
query = new TermQuery(new Term("body", terms.get(random.nextInt(terms.size()))));
} else {
final String t = terms.get(random.nextInt(terms.size())).utf8ToString();
final String prefix;
if (t.length() <= 1) {
prefix = t;
} else {
prefix = t.substring(0, _TestUtil.nextInt(random, 1, 2));
}
query = new PrefixQuery(new Term("body", prefix));
}
if (random.nextBoolean()) {
sort = null;
} else {
// TODO: sort by more than 1 field
final int what = random.nextInt(3);
if (what == 0) {
sort = new Sort(SortField.FIELD_SCORE);
} else if (what == 1) {
// TODO: this sort doesn't merge
// correctly... it's tricky because you
// could have > 2.1B docs across all shards:
//sort = new Sort(SortField.FIELD_DOC);
sort = null;
} else if (what == 2) {
sort = new Sort(new SortField[] {new SortField("docid", SortField.Type.INT, random.nextBoolean())});
} else {
sort = new Sort(new SortField[] {new SortField("title", SortField.Type.STRING, random.nextBoolean())});
}
}
} else {
query = null;
sort = null;
}
}
if (query != null) {
try {
searchState = assertSame(mockSearcher, localShardSearcher, query, sort, prevSearchState);
} catch (SearcherExpiredException see) {
// Expected; in a "real" app we would
// either forward this error to the user ("too
// much time has passed; please re-run your
// search") or sneakily just switch to newest
// searcher w/o telling them...
if (VERBOSE) {
System.out.println(" searcher expired during search: " + see);
see.printStackTrace(System.out);
}
assert prevSearchState != null;
priorSearches.remove(prevSearchState);
}
}
} finally {
nodes[myNodeID].release(localShardSearcher);
for(IndexReader sub : subs) {
if (sub != null) {
sub.decRef();
}
}
}
if (searchState != null && searchState.searchAfterLocal != null && random.nextInt(5) == 3) {
priorSearches.add(searchState);
if (priorSearches.size() > 200) {
Collections.shuffle(priorSearches, random);
priorSearches.subList(100, priorSearches.size()).clear();
}
}
}
finish();
}
private PreviousSearchState assertSame(IndexSearcher mockSearcher, NodeState.ShardIndexSearcher shardSearcher, Query q, Sort sort, PreviousSearchState state) throws IOException {
int numHits = _TestUtil.nextInt(random, 1, 100);
if (state != null && state.searchAfterLocal == null) {
// In addition to what we last searched:
numHits += state.numHitsPaged;
}
if (VERBOSE) {
System.out.println("TEST: query=" + q + " sort=" + sort + " numHits=" + numHits);
if (state != null) {
System.out.println(" prev: searchAfterLocal=" + state.searchAfterLocal + " searchAfterShard=" + state.searchAfterShard + " numHitsPaged=" + state.numHitsPaged);
}
}
// Single (mock local) searcher:
final TopDocs hits;
if (sort == null) {
if (state != null && state.searchAfterLocal != null) {
hits = mockSearcher.searchAfter(state.searchAfterLocal, q, numHits);
} else {
hits = mockSearcher.search(q, numHits);
}
} else {
hits = mockSearcher.search(q, numHits, sort);
}
// Shard searcher
final TopDocs shardHits;
if (sort == null) {
if (state != null && state.searchAfterShard != null) {
shardHits = shardSearcher.searchAfter(state.searchAfterShard, q, numHits);
} else {
shardHits = shardSearcher.search(q, numHits);
}
} else {
shardHits = shardSearcher.search(q, numHits, sort);
}
final int numNodes = shardSearcher.nodeVersions.length;
int[] base = new int[numNodes];
final IndexReader[] subs = mockSearcher.getIndexReader().getSequentialSubReaders();
assertEquals(numNodes, subs.length);
int docCount = 0;
for(int nodeID=0;nodeID<numNodes;nodeID++) {
base[nodeID] = docCount;
docCount += subs[nodeID].maxDoc();
}
if (VERBOSE) {
/*
for(int shardID=0;shardID<shardSearchers.length;shardID++) {
System.out.println(" shard=" + shardID + " maxDoc=" + shardSearchers[shardID].searcher.getIndexReader().maxDoc());
}
*/
System.out.println(" single searcher: " + hits.totalHits + " totalHits maxScore=" + hits.getMaxScore());
for(int i=0;i<hits.scoreDocs.length;i++) {
final ScoreDoc sd = hits.scoreDocs[i];
System.out.println(" doc=" + sd.doc + " score=" + sd.score);
}
System.out.println(" shard searcher: " + shardHits.totalHits + " totalHits maxScore=" + shardHits.getMaxScore());
for(int i=0;i<shardHits.scoreDocs.length;i++) {
final ScoreDoc sd = shardHits.scoreDocs[i];
System.out.println(" doc=" + sd.doc + " (rebased: " + (sd.doc + base[sd.shardIndex]) + ") score=" + sd.score + " shard=" + sd.shardIndex);
}
}
int numHitsPaged;
if (state != null && state.searchAfterLocal != null) {
numHitsPaged = hits.scoreDocs.length;
if (state != null) {
numHitsPaged += state.numHitsPaged;
}
} else {
numHitsPaged = hits.scoreDocs.length;
}
final boolean moreHits;
final ScoreDoc bottomHit;
final ScoreDoc bottomHitShards;
if (numHitsPaged < hits.totalHits) {
// More hits to page through
moreHits = true;
if (sort == null) {
bottomHit = hits.scoreDocs[hits.scoreDocs.length-1];
final ScoreDoc sd = shardHits.scoreDocs[shardHits.scoreDocs.length-1];
// Must copy because below we rebase:
bottomHitShards = new ScoreDoc(sd.doc, sd.score, sd.shardIndex);
if (VERBOSE) {
System.out.println(" save bottomHit=" + bottomHit);
}
} else {
bottomHit = null;
bottomHitShards = null;
}
} else {
assertEquals(hits.totalHits, numHitsPaged);
bottomHit = null;
bottomHitShards = null;
moreHits = false;
}
// Must rebase so assertEquals passes:
for(int hitID=0;hitID<shardHits.scoreDocs.length;hitID++) {
final ScoreDoc sd = shardHits.scoreDocs[hitID];
sd.doc += base[sd.shardIndex];
}
_TestUtil.assertEquals(hits, shardHits);
if (moreHits) {
// Return a continuation:
return new PreviousSearchState(q, sort, bottomHit, bottomHitShards, shardSearcher.nodeVersions, numHitsPaged);
} else {
return null;
}
}
}