Consume the entire weight and pre compute the DocIdSets for all segments instead of keeping the weight around and build a DocIdSet when a segment is being processed. This fixes issues where the has_child / has_parent filter produce no results or errors on subsequent scan requests.
Also made CustomQueryWrappingFilter implement Releasable in order to cleanup the pre-computed DocIdSets. Closes #4703
This commit is contained in:
parent
db394117c4
commit
a7a2f4747d
|
@ -18,13 +18,18 @@
|
|||
*/
|
||||
package org.elasticsearch.index.search.child;
|
||||
|
||||
import org.apache.lucene.index.AtomicReader;
|
||||
import org.apache.lucene.index.AtomicReaderContext;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lucene.docset.DocIdSets;
|
||||
import org.elasticsearch.common.lucene.search.NoCacheFilter;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.IdentityHashMap;
|
||||
|
||||
/**
|
||||
* Forked from {@link QueryWrapperFilter} to make sure the weight is only created once.
|
||||
|
@ -32,12 +37,12 @@ import java.io.IOException;
|
|||
*
|
||||
* @elasticsearch.internal
|
||||
*/
|
||||
public class CustomQueryWrappingFilter extends NoCacheFilter {
|
||||
public class CustomQueryWrappingFilter extends NoCacheFilter implements Releasable {
|
||||
|
||||
private final Query query;
|
||||
|
||||
private IndexSearcher searcher;
|
||||
private Weight weight;
|
||||
private IdentityHashMap<AtomicReader, DocIdSet> docIdSets;
|
||||
|
||||
/** Constructs a filter which only matches documents matching
|
||||
* <code>query</code>.
|
||||
|
@ -55,24 +60,43 @@ public class CustomQueryWrappingFilter extends NoCacheFilter {
|
|||
|
||||
@Override
|
||||
public DocIdSet getDocIdSet(final AtomicReaderContext context, final Bits acceptDocs) throws IOException {
|
||||
SearchContext searchContext = SearchContext.current();
|
||||
if (weight == null) {
|
||||
final SearchContext searchContext = SearchContext.current();
|
||||
if (docIdSets == null) {
|
||||
assert searcher == null;
|
||||
IndexSearcher searcher = searchContext.searcher();
|
||||
weight = searcher.createNormalizedWeight(query);
|
||||
docIdSets = new IdentityHashMap<AtomicReader, DocIdSet>();
|
||||
this.searcher = searcher;
|
||||
searchContext.addReleasable(this);
|
||||
|
||||
final Weight weight = searcher.createNormalizedWeight(query);
|
||||
for (final AtomicReaderContext leaf : searcher.getTopReaderContext().leaves()) {
|
||||
final DocIdSet set = DocIdSets.toCacheable(leaf.reader(), new DocIdSet() {
|
||||
@Override
|
||||
public DocIdSetIterator iterator() throws IOException {
|
||||
return weight.scorer(leaf, true, false, null);
|
||||
}
|
||||
@Override
|
||||
public boolean isCacheable() { return false; }
|
||||
});
|
||||
docIdSets.put(leaf.reader(), set);
|
||||
}
|
||||
} else {
|
||||
assert searcher == SearchContext.current().searcher();
|
||||
}
|
||||
final DocIdSet set = docIdSets.get(context.reader());
|
||||
if (set != null && acceptDocs != null) {
|
||||
return BitsFilteredDocIdSet.wrap(set, acceptDocs);
|
||||
}
|
||||
return set;
|
||||
}
|
||||
|
||||
return new DocIdSet() {
|
||||
@Override
|
||||
public DocIdSetIterator iterator() throws IOException {
|
||||
return weight.scorer(context, true, false, acceptDocs);
|
||||
}
|
||||
@Override
|
||||
public boolean isCacheable() { return false; }
|
||||
};
|
||||
@Override
|
||||
public boolean release() throws ElasticsearchException {
|
||||
// We need to clear the docIdSets, otherwise this is leaved unused
|
||||
// DocIdSets around and can potentially become a memory leak.
|
||||
docIdSets = null;
|
||||
searcher = null;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,9 +106,15 @@ public class CustomQueryWrappingFilter extends NoCacheFilter {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof CustomQueryWrappingFilter))
|
||||
return false;
|
||||
return this.query.equals(((CustomQueryWrappingFilter)o).query);
|
||||
if (o == this) {
|
||||
return true;
|
||||
}
|
||||
if (o != null && o instanceof CustomQueryWrappingFilter &&
|
||||
this.query.equals(((CustomQueryWrappingFilter)o).query)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
|
|||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.lucene.search.function.CombineFunction;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.mapper.MergeMappingException;
|
||||
import org.elasticsearch.index.query.*;
|
||||
import org.elasticsearch.index.search.child.ScoreType;
|
||||
|
@ -2224,6 +2225,61 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest {
|
|||
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParentChildQueriesViaScrollApi() throws Exception {
|
||||
client().admin().indices().prepareCreate("test")
|
||||
.setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
|
||||
.execute().actionGet();
|
||||
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
|
||||
client().admin()
|
||||
.indices()
|
||||
.preparePutMapping("test")
|
||||
.setType("child")
|
||||
.setSource(
|
||||
jsonBuilder().startObject().startObject("child").startObject("_parent").field("type", "parent").endObject()
|
||||
.endObject().endObject()).execute().actionGet();
|
||||
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
client().prepareIndex("test", "parent", "p" + i).setSource("{}").execute().actionGet();
|
||||
client().prepareIndex("test", "child", "c" + i).setSource("{}").setParent("p" + i).execute().actionGet();
|
||||
}
|
||||
|
||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||
|
||||
QueryBuilder[] queries = new QueryBuilder[]{
|
||||
hasChildQuery("child", matchAllQuery()),
|
||||
filteredQuery(matchAllQuery(), hasChildFilter("child", matchAllQuery())),
|
||||
hasParentQuery("parent", matchAllQuery()),
|
||||
filteredQuery(matchAllQuery(), hasParentFilter("parent", matchAllQuery())),
|
||||
topChildrenQuery("child", matchAllQuery()).factor(10)
|
||||
};
|
||||
|
||||
for (QueryBuilder query : queries) {
|
||||
SearchResponse scrollResponse = client().prepareSearch("test")
|
||||
.setScroll(TimeValue.timeValueSeconds(30))
|
||||
.setSize(1)
|
||||
.addField("_id")
|
||||
.setQuery(query)
|
||||
.setSearchType("scan")
|
||||
.execute()
|
||||
.actionGet();
|
||||
|
||||
assertThat(scrollResponse.getFailedShards(), equalTo(0));
|
||||
assertThat(scrollResponse.getHits().totalHits(), equalTo(10l));
|
||||
|
||||
int scannedDocs = 0;
|
||||
do {
|
||||
scrollResponse = client()
|
||||
.prepareSearchScroll(scrollResponse.getScrollId())
|
||||
.setScroll(TimeValue.timeValueSeconds(30)).execute().actionGet();
|
||||
assertThat(scrollResponse.getHits().totalHits(), equalTo(10l));
|
||||
scannedDocs += scrollResponse.getHits().getHits().length;
|
||||
} while (scrollResponse.getHits().getHits().length > 0);
|
||||
assertThat(scannedDocs, equalTo(10));
|
||||
}
|
||||
}
|
||||
|
||||
private static HasChildFilterBuilder hasChildFilter(String type, QueryBuilder queryBuilder) {
|
||||
HasChildFilterBuilder hasChildFilterBuilder = FilterBuilders.hasChildFilter(type, queryBuilder);
|
||||
hasChildFilterBuilder.setShortCircuitCutoff(randomInt(10));
|
||||
|
|
Loading…
Reference in New Issue