LUCENE-3102: add factory method to CachingCollector

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1104683 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2011-05-18 04:00:48 +00:00
parent f1a1844fb9
commit fa23344e40
5 changed files with 300 additions and 191 deletions

View File

@ -411,6 +411,10 @@ New features
it's able to handle multi-valued fields and does not hold the term it's able to handle multi-valued fields and does not hold the term
bytes in RAM. (Mike McCandless) bytes in RAM. (Mike McCandless)
* LUCENE-1421, LUCENE-3102: added CachingCollector which allow you to cache
document IDs and scores encountered during the search, and "reply" them to
another Collector. (Mike McCandless, Shai Erera)
Optimizations Optimizations
* LUCENE-2588: Don't store unnecessary suffixes when writing the terms * LUCENE-2588: Don't store unnecessary suffixes when writing the terms

View File

@ -47,7 +47,7 @@ import org.apache.lucene.util.RamUsageEstimator;
* *
* @lucene.experimental * @lucene.experimental
*/ */
public class CachingCollector extends Collector { public abstract class CachingCollector extends Collector {
// Max out at 512K arrays // Max out at 512K arrays
private static final int MAX_ARRAY_SIZE = 512 * 1024; private static final int MAX_ARRAY_SIZE = 512 * 1024;
@ -64,8 +64,8 @@ public class CachingCollector extends Collector {
} }
} }
private static class CachedScorer extends Scorer { private static final class CachedScorer extends Scorer {
// NOTE: these members are package-private b/c that way accessing them from // NOTE: these members are package-private b/c that way accessing them from
// the outer class does not incur access check by the JVM. The same // the outer class does not incur access check by the JVM. The same
// situation would be if they were defined in the outer class as private // situation would be if they were defined in the outer class as private
@ -76,52 +76,246 @@ public class CachingCollector extends Collector {
private CachedScorer() { super(null); } private CachedScorer() { super(null); }
@Override @Override
public float score() { return score; } public final float score() { return score; }
@Override @Override
public int advance(int target) { throw new UnsupportedOperationException(); } public final int advance(int target) { throw new UnsupportedOperationException(); }
@Override @Override
public int docID() { return doc; } public final int docID() { return doc; }
@Override @Override
public float freq() { throw new UnsupportedOperationException(); } public final float freq() { throw new UnsupportedOperationException(); }
@Override @Override
public int nextDoc() { throw new UnsupportedOperationException(); } public final int nextDoc() { throw new UnsupportedOperationException(); }
} }
// TODO: would be nice if a collector defined a // A CachingCollector which caches scores
// needsScores() method so we can specialize / do checks private static final class ScoreCachingCollector extends CachingCollector {
// up front:
private final Collector other;
private final int maxDocsToCache;
private final boolean cacheScores; private final CachedScorer cachedScorer;
private final CachedScorer cachedScorer; private final List<float[]> cachedScores;
private final List<int[]> cachedDocs;
private final List<float[]> cachedScores;
private final List<SegStart> cachedSegs = new ArrayList<SegStart>();
private Scorer scorer; private Scorer scorer;
private int[] curDocs; private float[] curScores;
private float[] curScores;
private int upto; ScoreCachingCollector(Collector other, double maxRAMMB) {
private AtomicReaderContext lastReaderContext; super(other, maxRAMMB, true);
private int base;
public CachingCollector(Collector other, boolean cacheScores, double maxRAMMB) {
this.other = other;
this.cacheScores = cacheScores;
if (cacheScores) {
cachedScorer = new CachedScorer(); cachedScorer = new CachedScorer();
cachedScores = new ArrayList<float[]>(); cachedScores = new ArrayList<float[]>();
curScores = new float[128]; curScores = new float[128];
cachedScores.add(curScores); cachedScores.add(curScores);
} else {
cachedScorer = null;
cachedScores = null;
} }
@Override
public void collect(int doc) throws IOException {
if (curDocs == null) {
// Cache was too large
cachedScorer.score = scorer.score();
cachedScorer.doc = doc;
other.collect(doc);
return;
}
// Allocate a bigger array or abort caching
if (upto == curDocs.length) {
base += upto;
// Compute next array length - don't allocate too big arrays
int nextLength = 8*curDocs.length;
if (nextLength > MAX_ARRAY_SIZE) {
nextLength = MAX_ARRAY_SIZE;
}
if (base + nextLength > maxDocsToCache) {
// try to allocate a smaller array
nextLength = maxDocsToCache - base;
if (nextLength <= 0) {
// Too many docs to collect -- clear cache
curDocs = null;
curScores = null;
cachedSegs.clear();
cachedDocs.clear();
cachedScores.clear();
cachedScorer.score = scorer.score();
cachedScorer.doc = doc;
other.collect(doc);
return;
}
}
curDocs = new int[nextLength];
cachedDocs.add(curDocs);
curScores = new float[nextLength];
cachedScores.add(curScores);
upto = 0;
}
curDocs[upto] = doc;
cachedScorer.score = curScores[upto] = scorer.score();
upto++;
cachedScorer.doc = doc;
other.collect(doc);
}
@Override
public void replay(Collector other) throws IOException {
replayInit(other);
int curUpto = 0;
int curBase = 0;
int chunkUpto = 0;
other.setScorer(cachedScorer);
curDocs = EMPTY_INT_ARRAY;
for (SegStart seg : cachedSegs) {
other.setNextReader(seg.readerContext);
while (curBase + curUpto < seg.end) {
if (curUpto == curDocs.length) {
curBase += curDocs.length;
curDocs = cachedDocs.get(chunkUpto);
curScores = cachedScores.get(chunkUpto);
chunkUpto++;
curUpto = 0;
}
cachedScorer.score = curScores[curUpto];
other.collect(curDocs[curUpto++]);
}
}
}
@Override
public void setScorer(Scorer scorer) throws IOException {
this.scorer = scorer;
other.setScorer(cachedScorer);
}
@Override
public String toString() {
if (isCached()) {
return "CachingCollector (" + (base+upto) + " docs & scores cached)";
} else {
return "CachingCollector (cache was cleared)";
}
}
}
// A CachingCollector which does not cache scores
private static final class NoScoreCachingCollector extends CachingCollector {
NoScoreCachingCollector(Collector other, double maxRAMMB) {
super(other, maxRAMMB, false);
}
@Override
public void collect(int doc) throws IOException {
if (curDocs == null) {
// Cache was too large
other.collect(doc);
return;
}
// Allocate a bigger array or abort caching
if (upto == curDocs.length) {
base += upto;
// Compute next array length - don't allocate too big arrays
int nextLength = 8*curDocs.length;
if (nextLength > MAX_ARRAY_SIZE) {
nextLength = MAX_ARRAY_SIZE;
}
if (base + nextLength > maxDocsToCache) {
// try to allocate a smaller array
nextLength = maxDocsToCache - base;
if (nextLength <= 0) {
// Too many docs to collect -- clear cache
curDocs = null;
cachedSegs.clear();
cachedDocs.clear();
other.collect(doc);
return;
}
}
curDocs = new int[nextLength];
cachedDocs.add(curDocs);
upto = 0;
}
curDocs[upto] = doc;
upto++;
other.collect(doc);
}
@Override
public void replay(Collector other) throws IOException {
replayInit(other);
int curUpto = 0;
int curbase = 0;
int chunkUpto = 0;
curDocs = EMPTY_INT_ARRAY;
for (SegStart seg : cachedSegs) {
other.setNextReader(seg.readerContext);
while (curbase + curUpto < seg.end) {
if (curUpto == curDocs.length) {
curbase += curDocs.length;
curDocs = cachedDocs.get(chunkUpto);
chunkUpto++;
curUpto = 0;
}
other.collect(curDocs[curUpto++]);
}
}
}
@Override
public void setScorer(Scorer scorer) throws IOException {
other.setScorer(scorer);
}
@Override
public String toString() {
if (isCached()) {
return "CachingCollector (" + (base+upto) + " docs cached)";
} else {
return "CachingCollector (cache was cleared)";
}
}
}
// TODO: would be nice if a collector defined a
// needsScores() method so we can specialize / do checks
// up front. This is only relevant for the ScoreCaching
// version -- if the wrapped Collector does not need
// scores, it can avoid cachedScorer entirely.
protected final Collector other;
protected final int maxDocsToCache;
protected final List<SegStart> cachedSegs = new ArrayList<SegStart>();
protected final List<int[]> cachedDocs;
private AtomicReaderContext lastReaderContext;
protected int[] curDocs;
protected int upto;
protected int base;
protected int lastDocBase;
public static CachingCollector create(Collector other, boolean cacheScores, double maxRAMMB) {
return cacheScores ? new ScoreCachingCollector(other, maxRAMMB) : new NoScoreCachingCollector(other, maxRAMMB);
}
// Prevent extension from non-internal classes
private CachingCollector(Collector other, double maxRAMMB, boolean cacheScores) {
this.other = other;
cachedDocs = new ArrayList<int[]>(); cachedDocs = new ArrayList<int[]>();
curDocs = new int[INITIAL_ARRAY_SIZE]; curDocs = new int[INITIAL_ARRAY_SIZE];
cachedDocs.add(curDocs); cachedDocs.add(curDocs);
@ -133,79 +327,11 @@ public class CachingCollector extends Collector {
maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc); maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc);
} }
@Override
public void setScorer(Scorer scorer) throws IOException {
this.scorer = scorer;
other.setScorer(cachedScorer);
}
@Override @Override
public boolean acceptsDocsOutOfOrder() { public boolean acceptsDocsOutOfOrder() {
return other.acceptsDocsOutOfOrder(); return other.acceptsDocsOutOfOrder();
} }
@Override
public void collect(int doc) throws IOException {
if (curDocs == null) {
// Cache was too large
if (cacheScores) {
cachedScorer.score = scorer.score();
}
cachedScorer.doc = doc;
other.collect(doc);
return;
}
// Allocate a bigger array or abort caching
if (upto == curDocs.length) {
base += upto;
// Compute next array length - don't allocate too big arrays
int nextLength = 8*curDocs.length;
if (nextLength > MAX_ARRAY_SIZE) {
nextLength = MAX_ARRAY_SIZE;
}
if (base + nextLength > maxDocsToCache) {
// try to allocate a smaller array
nextLength = maxDocsToCache - base;
if (nextLength <= 0) {
// Too many docs to collect -- clear cache
curDocs = null;
curScores = null;
cachedSegs.clear();
cachedDocs.clear();
cachedScores.clear();
if (cacheScores) {
cachedScorer.score = scorer.score();
}
cachedScorer.doc = doc;
other.collect(doc);
return;
}
}
curDocs = new int[nextLength];
cachedDocs.add(curDocs);
if (cacheScores) {
curScores = new float[nextLength];
cachedScores.add(curScores);
}
upto = 0;
}
curDocs[upto] = doc;
// TODO: maybe specialize private subclass so we don't
// null check per collect...
if (cacheScores) {
cachedScorer.score = curScores[upto] = scorer.score();
}
upto++;
cachedScorer.doc = doc;
other.collect(doc);
}
public boolean isCached() { public boolean isCached() {
return curDocs != null; return curDocs != null;
} }
@ -219,26 +345,8 @@ public class CachingCollector extends Collector {
lastReaderContext = context; lastReaderContext = context;
} }
@Override /** Reused by the specialized inner classes. */
public String toString() { void replayInit(Collector other) {
if (isCached()) {
return "CachingCollector (" + (base+upto) + " docs " + (cacheScores ? " & scores" : "") + " cached)";
} else {
return "CachingCollector (cache was cleared)";
}
}
/**
* Replays the cached doc IDs (and scores) to the given Collector.
*
* @throws IllegalStateException
* if this collector is not cached (i.e., if the RAM limits were too
* low for the number of documents + scores to cache).
* @throws IllegalArgumentException
* if the given Collect's does not support out-of-order collection,
* while the collector passed to the ctor does.
*/
public void replay(Collector other) throws IOException {
if (!isCached()) { if (!isCached()) {
throw new IllegalStateException("cannot replay: cache was cleared because too much RAM was required"); throw new IllegalStateException("cannot replay: cache was cleared because too much RAM was required");
} }
@ -249,35 +357,26 @@ public class CachingCollector extends Collector {
+ "out-of-order collection, while the wrapped collector does. " + "out-of-order collection, while the wrapped collector does. "
+ "Therefore cached documents may be out-of-order."); + "Therefore cached documents may be out-of-order.");
} }
//System.out.println("CC: replay totHits=" + (upto + base)); //System.out.println("CC: replay totHits=" + (upto + base));
if (lastReaderContext != null) { if (lastReaderContext != null) {
cachedSegs.add(new SegStart(lastReaderContext, base+upto)); cachedSegs.add(new SegStart(lastReaderContext, base+upto));
lastReaderContext = null; lastReaderContext = null;
} }
int curupto = 0;
int curbase = 0;
int chunkUpto = 0;
other.setScorer(cachedScorer);
curDocs = EMPTY_INT_ARRAY;
for(SegStart seg : cachedSegs) {
other.setNextReader(seg.readerContext);
while(curbase+curupto < seg.end) {
if (curupto == curDocs.length) {
curbase += curDocs.length;
curDocs = cachedDocs.get(chunkUpto);
if (cacheScores) {
curScores = cachedScores.get(chunkUpto);
}
chunkUpto++;
curupto = 0;
}
if (cacheScores) {
cachedScorer.score = curScores[curupto];
}
other.collect(curDocs[curupto++]);
}
}
} }
/**
* Replays the cached doc IDs (and scores) to the given Collector. If this
* instance does not cache scores, then Scorer is not set on
* {@code other.setScorer} as well as scores are not replayed.
*
* @throws IllegalStateException
* if this collector is not cached (i.e., if the RAM limits were too
* low for the number of documents + scores to cache).
* @throws IllegalArgumentException
* if the given Collect's does not support out-of-order collection,
* while the collector passed to the ctor does.
*/
public abstract void replay(Collector other) throws IOException;
} }

View File

@ -75,39 +75,41 @@ public class TestCachingCollector extends LuceneTestCase {
} }
public void testBasic() throws Exception { public void testBasic() throws Exception {
CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 1); for (boolean cacheScores : new boolean[] { false, true }) {
cc.setScorer(new MockScorer()); CachingCollector cc = CachingCollector.create(new NoOpCollector(false), cacheScores, 1);
cc.setScorer(new MockScorer());
// collect 1000 docs
for (int i = 0; i < 1000; i++) { // collect 1000 docs
cc.collect(i); for (int i = 0; i < 1000; i++) {
cc.collect(i);
}
// now replay them
cc.replay(new Collector() {
int prevDocID = -1;
@Override
public void setScorer(Scorer scorer) throws IOException {}
@Override
public void setNextReader(AtomicReaderContext context) throws IOException {}
@Override
public void collect(int doc) throws IOException {
assertEquals(prevDocID + 1, doc);
prevDocID = doc;
}
@Override
public boolean acceptsDocsOutOfOrder() {
return false;
}
});
} }
// now replay them
cc.replay(new Collector() {
int prevDocID = -1;
@Override
public void setScorer(Scorer scorer) throws IOException {}
@Override
public void setNextReader(AtomicReaderContext context) throws IOException {}
@Override
public void collect(int doc) throws IOException {
assertEquals(prevDocID + 1, doc);
prevDocID = doc;
}
@Override
public boolean acceptsDocsOutOfOrder() {
return false;
}
});
} }
public void testIllegalStateOnReplay() throws Exception { public void testIllegalStateOnReplay() throws Exception {
CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 50 * ONE_BYTE); CachingCollector cc = CachingCollector.create(new NoOpCollector(false), true, 50 * ONE_BYTE);
cc.setScorer(new MockScorer()); cc.setScorer(new MockScorer());
// collect 130 docs, this should be enough for triggering cache abort. // collect 130 docs, this should be enough for triggering cache abort.
@ -130,14 +132,14 @@ public class TestCachingCollector extends LuceneTestCase {
// is valid with the Collector passed to the ctor // is valid with the Collector passed to the ctor
// 'src' Collector does not support out-of-order // 'src' Collector does not support out-of-order
CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 50 * ONE_BYTE); CachingCollector cc = CachingCollector.create(new NoOpCollector(false), true, 50 * ONE_BYTE);
cc.setScorer(new MockScorer()); cc.setScorer(new MockScorer());
for (int i = 0; i < 10; i++) cc.collect(i); for (int i = 0; i < 10; i++) cc.collect(i);
cc.replay(new NoOpCollector(true)); // this call should not fail cc.replay(new NoOpCollector(true)); // this call should not fail
cc.replay(new NoOpCollector(false)); // this call should not fail cc.replay(new NoOpCollector(false)); // this call should not fail
// 'src' Collector supports out-of-order // 'src' Collector supports out-of-order
cc = new CachingCollector(new NoOpCollector(true), true, 50 * ONE_BYTE); cc = CachingCollector.create(new NoOpCollector(true), true, 50 * ONE_BYTE);
cc.setScorer(new MockScorer()); cc.setScorer(new MockScorer());
for (int i = 0; i < 10; i++) cc.collect(i); for (int i = 0; i < 10; i++) cc.collect(i);
cc.replay(new NoOpCollector(true)); // this call should not fail cc.replay(new NoOpCollector(true)); // this call should not fail
@ -156,14 +158,18 @@ public class TestCachingCollector extends LuceneTestCase {
// set RAM limit enough for 150 docs + random(10000) // set RAM limit enough for 150 docs + random(10000)
int numDocs = random.nextInt(10000) + 150; int numDocs = random.nextInt(10000) + 150;
CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 8 * ONE_BYTE * numDocs); for (boolean cacheScores : new boolean[] { false, true }) {
cc.setScorer(new MockScorer()); int bytesPerDoc = cacheScores ? 8 : 4;
for (int i = 0; i < numDocs; i++) cc.collect(i); CachingCollector cc = CachingCollector.create(new NoOpCollector(false),
assertTrue(cc.isCached()); cacheScores, bytesPerDoc * ONE_BYTE * numDocs);
cc.setScorer(new MockScorer());
// The 151's document should terminate caching for (int i = 0; i < numDocs; i++) cc.collect(i);
cc.collect(numDocs); assertTrue(cc.isCached());
assertFalse(cc.isCached());
// The 151's document should terminate caching
cc.collect(numDocs);
assertFalse(cc.isCached());
}
} }
} }

View File

@ -73,7 +73,7 @@ field fall into a single group.</p>
boolean cacheScores = true; boolean cacheScores = true;
double maxCacheRAMMB = 4.0; double maxCacheRAMMB = 4.0;
CachingCollector cachedCollector = new CachingCollector(c1, cacheScores, maxCacheRAMMB); CachingCollector cachedCollector = CachingCollector.create(c1, cacheScores, maxCacheRAMMB);
s.search(new TermQuery(new Term("content", searchTerm)), cachedCollector); s.search(new TermQuery(new Term("content", searchTerm)), cachedCollector);
Collection<SearchGroup> topGroups = c1.getTopGroups(groupOffset, fillFields); Collection<SearchGroup> topGroups = c1.getTopGroups(groupOffset, fillFields);

View File

@ -452,10 +452,10 @@ public class TestGrouping extends LuceneTestCase {
} }
if (doAllGroups) { if (doAllGroups) {
cCache = new CachingCollector(c1, true, maxCacheMB); cCache = CachingCollector.create(c1, true, maxCacheMB);
c = MultiCollector.wrap(cCache, allGroupsCollector); c = MultiCollector.wrap(cCache, allGroupsCollector);
} else { } else {
c = cCache = new CachingCollector(c1, true, maxCacheMB); c = cCache = CachingCollector.create(c1, true, maxCacheMB);
} }
} else if (doAllGroups) { } else if (doAllGroups) {
c = MultiCollector.wrap(c1, allGroupsCollector); c = MultiCollector.wrap(c1, allGroupsCollector);