mirror of https://github.com/apache/lucene.git
LUCENE-3102: first cut - some refactoring, bug fixes, add test, move to core (trunk)
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1103872 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3b45ee8cae
commit
f5fdea8dda
|
@ -1,4 +1,4 @@
|
|||
package org.apache.lucene.search.grouping;
|
||||
package org.apache.lucene.search;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
|
@ -22,8 +22,6 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.lucene.index.IndexReader.AtomicReaderContext;
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
||||
/**
|
||||
|
@ -41,6 +39,9 @@ import org.apache.lucene.util.RamUsageEstimator;
|
|||
* set is large this can easily be a very substantial amount
|
||||
* of RAM!
|
||||
*
|
||||
* <p><b>NOTE</b>: this class caches at least 128 documents
|
||||
* before checking RAM limits.
|
||||
*
|
||||
* <p>See {@link org.apache.lucene.search.grouping} for more
|
||||
* details including a full code example.</p>
|
||||
*
|
||||
|
@ -48,6 +49,11 @@ import org.apache.lucene.util.RamUsageEstimator;
|
|||
*/
|
||||
public class CachingCollector extends Collector {
|
||||
|
||||
// Max out at 512K arrays
|
||||
private static final int MAX_ARRAY_SIZE = 512 * 1024;
|
||||
private static final int INITIAL_ARRAY_SIZE = 128;
|
||||
private final static int[] EMPTY_INT_ARRAY = new int[0];
|
||||
|
||||
private static class SegStart {
|
||||
public final AtomicReaderContext readerContext;
|
||||
public final int end;
|
||||
|
@ -57,6 +63,33 @@ public class CachingCollector extends Collector {
|
|||
this.end = end;
|
||||
}
|
||||
}
|
||||
|
||||
private static class CachedScorer extends Scorer {
|
||||
|
||||
// NOTE: these members are package-private b/c that way accessing them from
|
||||
// the outer class does not incur access check by the JVM. The same
|
||||
// situation would be if they were defined in the outer class as private
|
||||
// members.
|
||||
int doc;
|
||||
float score;
|
||||
|
||||
private CachedScorer() { super(null); }
|
||||
|
||||
@Override
|
||||
public float score() { return score; }
|
||||
|
||||
@Override
|
||||
public int advance(int target) { throw new UnsupportedOperationException(); }
|
||||
|
||||
@Override
|
||||
public int docID() { return doc; }
|
||||
|
||||
@Override
|
||||
public float freq() { throw new UnsupportedOperationException(); }
|
||||
|
||||
@Override
|
||||
public int nextDoc() { throw new UnsupportedOperationException(); }
|
||||
}
|
||||
|
||||
// TODO: would be nice if a collector defined a
|
||||
// needsScores() method so we can specialize / do checks
|
||||
|
@ -64,7 +97,8 @@ public class CachingCollector extends Collector {
|
|||
private final Collector other;
|
||||
private final int maxDocsToCache;
|
||||
|
||||
private final Scorer cachedScorer;
|
||||
private final boolean cacheScores;
|
||||
private final CachedScorer cachedScorer;
|
||||
private final List<int[]> cachedDocs;
|
||||
private final List<float[]> cachedScores;
|
||||
private final List<SegStart> cachedSegs = new ArrayList<SegStart>();
|
||||
|
@ -74,39 +108,13 @@ public class CachingCollector extends Collector {
|
|||
private float[] curScores;
|
||||
private int upto;
|
||||
private AtomicReaderContext lastReaderContext;
|
||||
private float score;
|
||||
private int base;
|
||||
private int doc;
|
||||
|
||||
public CachingCollector(Collector other, boolean cacheScores, double maxRAMMB) {
|
||||
this.other = other;
|
||||
this.cacheScores = cacheScores;
|
||||
if (cacheScores) {
|
||||
cachedScorer = new Scorer(null) {
|
||||
@Override
|
||||
public float score() {
|
||||
return score;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int advance(int target) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int docID() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float freq() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int nextDoc() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
cachedScorer = new CachedScorer();
|
||||
cachedScores = new ArrayList<float[]>();
|
||||
curScores = new float[128];
|
||||
cachedScores.add(curScores);
|
||||
|
@ -115,16 +123,14 @@ public class CachingCollector extends Collector {
|
|||
cachedScores = null;
|
||||
}
|
||||
cachedDocs = new ArrayList<int[]>();
|
||||
curDocs = new int[128];
|
||||
curDocs = new int[INITIAL_ARRAY_SIZE];
|
||||
cachedDocs.add(curDocs);
|
||||
|
||||
final int bytesPerDoc;
|
||||
if (curScores != null) {
|
||||
bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT + RamUsageEstimator.NUM_BYTES_FLOAT;
|
||||
} else {
|
||||
bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT;
|
||||
int bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT;
|
||||
if (cacheScores) {
|
||||
bytesPerDoc += RamUsageEstimator.NUM_BYTES_FLOAT;
|
||||
}
|
||||
maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024)/bytesPerDoc);
|
||||
maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -143,52 +149,60 @@ public class CachingCollector extends Collector {
|
|||
|
||||
if (curDocs == null) {
|
||||
// Cache was too large
|
||||
if (curScores != null) {
|
||||
score = scorer.score();
|
||||
if (cacheScores) {
|
||||
cachedScorer.score = scorer.score();
|
||||
}
|
||||
this.doc = doc;
|
||||
cachedScorer.doc = doc;
|
||||
other.collect(doc);
|
||||
return;
|
||||
}
|
||||
|
||||
// Allocate a bigger array or abort caching
|
||||
if (upto == curDocs.length) {
|
||||
base += upto;
|
||||
final int nextLength;
|
||||
// Max out at 512K arrays:
|
||||
if (curDocs.length < 524288) {
|
||||
nextLength = 8*curDocs.length;
|
||||
} else {
|
||||
nextLength = curDocs.length;
|
||||
|
||||
// Compute next array length - don't allocate too big arrays
|
||||
int nextLength = 8*curDocs.length;
|
||||
if (nextLength > MAX_ARRAY_SIZE) {
|
||||
nextLength = MAX_ARRAY_SIZE;
|
||||
}
|
||||
|
||||
if (base + nextLength > maxDocsToCache) {
|
||||
// Too many docs to collect -- clear cache
|
||||
curDocs = null;
|
||||
if (curScores != null) {
|
||||
score = scorer.score();
|
||||
// try to allocate a smaller array
|
||||
nextLength = maxDocsToCache - base;
|
||||
if (nextLength <= 0) {
|
||||
// Too many docs to collect -- clear cache
|
||||
curDocs = null;
|
||||
curScores = null;
|
||||
cachedSegs.clear();
|
||||
cachedDocs.clear();
|
||||
cachedScores.clear();
|
||||
if (cacheScores) {
|
||||
cachedScorer.score = scorer.score();
|
||||
}
|
||||
cachedScorer.doc = doc;
|
||||
other.collect(doc);
|
||||
return;
|
||||
}
|
||||
this.doc = doc;
|
||||
other.collect(doc);
|
||||
cachedDocs.clear();
|
||||
cachedScores.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
curDocs = new int[nextLength];
|
||||
cachedDocs.add(curDocs);
|
||||
if (curScores != null) {
|
||||
if (cacheScores) {
|
||||
curScores = new float[nextLength];
|
||||
cachedScores.add(curScores);
|
||||
}
|
||||
upto = 0;
|
||||
}
|
||||
|
||||
curDocs[upto] = doc;
|
||||
// TODO: maybe specialize private subclass so we don't
|
||||
// null check per collect...
|
||||
if (curScores != null) {
|
||||
score = curScores[upto] = scorer.score();
|
||||
if (cacheScores) {
|
||||
cachedScorer.score = curScores[upto] = scorer.score();
|
||||
}
|
||||
upto++;
|
||||
this.doc = doc;
|
||||
cachedScorer.doc = doc;
|
||||
other.collect(doc);
|
||||
}
|
||||
|
||||
|
@ -205,55 +219,65 @@ public class CachingCollector extends Collector {
|
|||
lastReaderContext = context;
|
||||
}
|
||||
|
||||
private final static int[] EMPTY_INT_ARRAY = new int[0];
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (isCached()) {
|
||||
return "CachingCollector (" + (base+upto) + " docs " + (curScores != null ? " & scores" : "") + " cached)";
|
||||
return "CachingCollector (" + (base+upto) + " docs " + (cacheScores ? " & scores" : "") + " cached)";
|
||||
} else {
|
||||
return "CachingCollector (cache was cleared)";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replays the cached doc IDs (and scores) to the given Collector.
|
||||
*
|
||||
* @throws IllegalStateException
|
||||
* if this collector is not cached (i.e., if the RAM limits were too
|
||||
* low for the number of documents + scores to cache).
|
||||
* @throws IllegalArgumentException
|
||||
* if the given Collect's does not support out-of-order collection,
|
||||
* while the collector passed to the ctor does.
|
||||
*/
|
||||
public void replay(Collector other) throws IOException {
|
||||
if (!isCached()) {
|
||||
throw new IllegalStateException("cannot replay: cache was cleared because too much RAM was required");
|
||||
}
|
||||
|
||||
if (!other.acceptsDocsOutOfOrder() && this.other.acceptsDocsOutOfOrder()) {
|
||||
throw new IllegalArgumentException(
|
||||
"cannot replay: given collector does not support "
|
||||
+ "out-of-order collection, while the wrapped collector does. "
|
||||
+ "Therefore cached documents may be out-of-order.");
|
||||
}
|
||||
|
||||
//System.out.println("CC: replay totHits=" + (upto + base));
|
||||
if (lastReaderContext != null) {
|
||||
cachedSegs.add(new SegStart(lastReaderContext, base+upto));
|
||||
lastReaderContext = null;
|
||||
}
|
||||
final int uptoSav = upto;
|
||||
final int baseSav = base;
|
||||
try {
|
||||
upto = 0;
|
||||
base = 0;
|
||||
int chunkUpto = 0;
|
||||
other.setScorer(cachedScorer);
|
||||
curDocs = EMPTY_INT_ARRAY;
|
||||
for(SegStart seg : cachedSegs) {
|
||||
other.setNextReader(seg.readerContext);
|
||||
while(base+upto < seg.end) {
|
||||
if (upto == curDocs.length) {
|
||||
base += curDocs.length;
|
||||
curDocs = cachedDocs.get(chunkUpto);
|
||||
if (curScores != null) {
|
||||
curScores = cachedScores.get(chunkUpto);
|
||||
}
|
||||
chunkUpto++;
|
||||
upto = 0;
|
||||
|
||||
int curupto = 0;
|
||||
int curbase = 0;
|
||||
int chunkUpto = 0;
|
||||
other.setScorer(cachedScorer);
|
||||
curDocs = EMPTY_INT_ARRAY;
|
||||
for(SegStart seg : cachedSegs) {
|
||||
other.setNextReader(seg.readerContext);
|
||||
while(curbase+curupto < seg.end) {
|
||||
if (curupto == curDocs.length) {
|
||||
curbase += curDocs.length;
|
||||
curDocs = cachedDocs.get(chunkUpto);
|
||||
if (cacheScores) {
|
||||
curScores = cachedScores.get(chunkUpto);
|
||||
}
|
||||
if (curScores != null) {
|
||||
score = curScores[upto];
|
||||
}
|
||||
other.collect(curDocs[upto++]);
|
||||
chunkUpto++;
|
||||
curupto = 0;
|
||||
}
|
||||
if (cacheScores) {
|
||||
cachedScorer.score = curScores[curupto];
|
||||
}
|
||||
other.collect(curDocs[curupto++]);
|
||||
}
|
||||
} finally {
|
||||
upto = uptoSav;
|
||||
base = baseSav;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,169 @@
|
|||
package org.apache.lucene.search;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.lucene.index.IndexReader.AtomicReaderContext;
|
||||
import org.apache.lucene.search.CachingCollector;
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
||||
public class TestCachingCollector extends LuceneTestCase {
|
||||
|
||||
private static final double ONE_BYTE = 1.0 / (1024 * 1024); // 1 byte out of MB
|
||||
|
||||
private static class MockScorer extends Scorer {
|
||||
|
||||
private MockScorer() {
|
||||
super((Weight) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public float score() throws IOException { return 0; }
|
||||
|
||||
@Override
|
||||
public int docID() { return 0; }
|
||||
|
||||
@Override
|
||||
public int nextDoc() throws IOException { return 0; }
|
||||
|
||||
@Override
|
||||
public int advance(int target) throws IOException { return 0; }
|
||||
|
||||
}
|
||||
|
||||
private static class NoOpCollector extends Collector {
|
||||
|
||||
private final boolean acceptDocsOutOfOrder;
|
||||
|
||||
public NoOpCollector(boolean acceptDocsOutOfOrder) {
|
||||
this.acceptDocsOutOfOrder = acceptDocsOutOfOrder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScorer(Scorer scorer) throws IOException {}
|
||||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {}
|
||||
|
||||
@Override
|
||||
public void setNextReader(AtomicReaderContext context) throws IOException {}
|
||||
|
||||
@Override
|
||||
public boolean acceptsDocsOutOfOrder() {
|
||||
return acceptDocsOutOfOrder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testBasic() throws Exception {
|
||||
CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 1);
|
||||
cc.setScorer(new MockScorer());
|
||||
|
||||
// collect 1000 docs
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
cc.collect(i);
|
||||
}
|
||||
|
||||
// now replay them
|
||||
cc.replay(new Collector() {
|
||||
int prevDocID = -1;
|
||||
|
||||
@Override
|
||||
public void setScorer(Scorer scorer) throws IOException {}
|
||||
|
||||
@Override
|
||||
public void setNextReader(AtomicReaderContext context) throws IOException {}
|
||||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
assertEquals(prevDocID + 1, doc);
|
||||
prevDocID = doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean acceptsDocsOutOfOrder() {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testIllegalStateOnReplay() throws Exception {
|
||||
CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 50 * ONE_BYTE);
|
||||
cc.setScorer(new MockScorer());
|
||||
|
||||
// collect 130 docs, this should be enough for triggering cache abort.
|
||||
for (int i = 0; i < 130; i++) {
|
||||
cc.collect(i);
|
||||
}
|
||||
|
||||
assertFalse("CachingCollector should not be cached due to low memory limit", cc.isCached());
|
||||
|
||||
try {
|
||||
cc.replay(new NoOpCollector(false));
|
||||
fail("replay should fail if CachingCollector is not cached");
|
||||
} catch (IllegalStateException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
public void testIllegalCollectorOnReplay() throws Exception {
|
||||
// tests that the Collector passed to replay() has an out-of-order mode that
|
||||
// is valid with the Collector passed to the ctor
|
||||
|
||||
// 'src' Collector does not support out-of-order
|
||||
CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 50 * ONE_BYTE);
|
||||
cc.setScorer(new MockScorer());
|
||||
for (int i = 0; i < 10; i++) cc.collect(i);
|
||||
cc.replay(new NoOpCollector(true)); // this call should not fail
|
||||
cc.replay(new NoOpCollector(false)); // this call should not fail
|
||||
|
||||
// 'src' Collector supports out-of-order
|
||||
cc = new CachingCollector(new NoOpCollector(true), true, 50 * ONE_BYTE);
|
||||
cc.setScorer(new MockScorer());
|
||||
for (int i = 0; i < 10; i++) cc.collect(i);
|
||||
cc.replay(new NoOpCollector(true)); // this call should not fail
|
||||
try {
|
||||
cc.replay(new NoOpCollector(false)); // this call should fail
|
||||
fail("should have failed if an in-order Collector was given to replay(), " +
|
||||
"while CachingCollector was initialized with out-of-order collection");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// ok
|
||||
}
|
||||
}
|
||||
|
||||
public void testCachedArraysAllocation() throws Exception {
|
||||
// tests the cached arrays allocation -- if the 'nextLength' was too high,
|
||||
// caching would terminate even if a smaller length would suffice.
|
||||
|
||||
// set RAM limit enough for 150 docs + random(10000)
|
||||
int numDocs = random.nextInt(10000) + 150;
|
||||
CachingCollector cc = new CachingCollector(new NoOpCollector(false), true, 8 * ONE_BYTE * numDocs);
|
||||
cc.setScorer(new MockScorer());
|
||||
for (int i = 0; i < numDocs; i++) cc.collect(i);
|
||||
assertTrue(cc.isCached());
|
||||
|
||||
// The 151's document should terminate caching
|
||||
cc.collect(numDocs);
|
||||
assertFalse(cc.isCached());
|
||||
}
|
||||
|
||||
}
|
|
@ -49,7 +49,7 @@ field fall into a single group.</p>
|
|||
org.apache.lucene.search.grouping.SecondPassGroupingCollector})
|
||||
gathers documents within those groups. If the search is costly to
|
||||
run you may want to use the {@link
|
||||
org.apache.lucene.search.grouping.CachingCollector} class, which
|
||||
org.apache.lucene.search.CachingCollector} class, which
|
||||
caches hits and can (quickly) replay them for the second pass. This
|
||||
way you only run the query once, but you pay a RAM cost to (briefly)
|
||||
hold all hits. Results are returned as a {@link
|
||||
|
@ -66,7 +66,7 @@ field fall into a single group.</p>
|
|||
group yourself.
|
||||
</ul>
|
||||
|
||||
<p>Typical usage looks like this (using the {@link org.apache.lucene.search.grouping.CachingCollector}):</p>
|
||||
<p>Typical usage looks like this (using the {@link org.apache.lucene.search.CachingCollector}):</p>
|
||||
|
||||
<pre>
|
||||
FirstPassGroupingCollector c1 = new FirstPassGroupingCollector("author", groupSort, groupOffset+topNGroups);
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.lucene.document.NumericField;
|
|||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.CachingCollector;
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.FieldCache;
|
||||
import org.apache.lucene.search.FieldDoc;
|
||||
|
|
Loading…
Reference in New Issue