mirror of
synced 2025-03-24 17:09:48 +00:00
Handle optional term and field statistics gracefully
Lucene provides a set of statistics that depend on the codec / postingsformat as well as on the index options used when the field is created / indexed. If a certain stats value is not available lucene return `-1` instead of the correct value. We need to ensure that those values are encoded correctly if we try to write vLongs as well as when we aggregate those values. Closes #3012
This commit is contained in:
Normal file
Normal file
@ -0,0 +1,92 @@
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.common.collect;
import gnu.trove.impl.Constants;
import java.util.Map;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.trove.ExtTHashMap;
import com.google.common.collect.ForwardingMap;
* This class provides factory methods for Maps. The returned {@link Map}
* instances are general purpose maps and non of the method guarantees a
* concrete implementation unless the return type is a concrete type. The
* implementations used might change over time, if you rely on a specific
* Implementation you should use a concrete constructor.
public final class XMaps {
public static final int DEFAULT_CAPACITY = Constants.DEFAULT_CAPACITY;
* Returns a new map with the given initial capacity
public static <K, V> Map<K, V> newMap(int capacity) {
return new ExtTHashMap<K, V>(capacity, Constants.DEFAULT_LOAD_FACTOR);
* Returns a new map with a default initial capacity of
public static <K, V> Map<K, V> newMap() {
return newMap(DEFAULT_CAPACITY);
* Returns a map like {@link #newMap()} that does not accept <code>null</code> keys
public static <K, V> Map<K, V> newNoNullKeysMap() {
Map<K, V> delegate = newMap();
return ensureNoNullKeys(delegate);
* Returns a map like {@link #newMap(in)} that does not accept <code>null</code> keys
public static <K, V> Map<K, V> newNoNullKeysMap(int capacity) {
Map<K, V> delegate = newMap(capacity);
return ensureNoNullKeys(delegate);
* Wraps the given map and prevent adding of <code>null</code> keys.
public static <K, V> Map<K, V> ensureNoNullKeys(final Map<K, V> delegate) {
return new ForwardingMap<K, V>() {
public V put(K key, V value) {
if (key == null) {
throw new ElasticSearchIllegalArgumentException("Map key must not be null");
return super.put(key, value);
protected Map<K, V> delegate() {
return delegate;
@ -21,19 +21,16 @@ package org.elasticsearch.search.controller;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import gnu.trove.impl.Constants;
import gnu.trove.map.TMap;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.search.ShardFieldDocSortedHitQueue;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.AggregatedDfs;
@ -49,9 +46,6 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.Suggest.Suggestion;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry.Option;
import java.util.ArrayList;
import java.util.Collection;
@ -90,27 +84,37 @@ public class SearchPhaseController extends AbstractComponent {
public AggregatedDfs aggregateDfs(Iterable<DfsSearchResult> results) {
TMap<Term, TermStatistics> termStatistics = new ExtTHashMap<Term, TermStatistics>(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR);
TMap<String, CollectionStatistics> fieldStatistics = new ExtTHashMap<String, CollectionStatistics>(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR);
Map<Term, TermStatistics> termStatistics = XMaps.newNoNullKeysMap();
Map<String, CollectionStatistics> fieldStatistics = XMaps.newNoNullKeysMap();
long aggMaxDoc = 0;
for (DfsSearchResult result : results) {
for (int i = 0; i < result.termStatistics().length; i++) {
TermStatistics existing = termStatistics.get(result.terms()[i]);
final Term[] terms = result.terms();
final TermStatistics[] stats = result.termStatistics();
assert terms.length == stats.length;
for (int i = 0; i < terms.length; i++) {
assert terms[i] != null;
TermStatistics existing = termStatistics.get(terms[i]);
if (existing != null) {
termStatistics.put(result.terms()[i], new TermStatistics(existing.term(), existing.docFreq() + result.termStatistics()[i].docFreq(), existing.totalTermFreq() + result.termStatistics()[i].totalTermFreq()));
assert terms[i].bytes().equals(existing.term());
// totalTermFrequency is an optional statistic we need to check if either one or both
// are set to -1 which means not present and then set it globally to -1
termStatistics.put(terms[i], new TermStatistics(existing.term(),
existing.docFreq() + stats[i].docFreq(),
optionalSum(existing.totalTermFreq(), stats[i].totalTermFreq())));
} else {
termStatistics.put(result.terms()[i], result.termStatistics()[i]);
termStatistics.put(terms[i], stats[i]);
for (Map.Entry<String, CollectionStatistics> entry : result.fieldStatistics().entrySet()) {
assert entry.getKey() != null;
CollectionStatistics existing = fieldStatistics.get(entry.getKey());
if (existing != null) {
CollectionStatistics merged = new CollectionStatistics(
entry.getKey(), existing.maxDoc() + entry.getValue().maxDoc(),
existing.docCount() + entry.getValue().docCount(),
existing.sumTotalTermFreq() + entry.getValue().sumTotalTermFreq(),
existing.sumDocFreq() + entry.getValue().sumDocFreq()
optionalSum(existing.docCount(), entry.getValue().docCount()),
optionalSum(existing.sumTotalTermFreq(), entry.getValue().sumTotalTermFreq()),
optionalSum(existing.sumDocFreq(), entry.getValue().sumDocFreq())
fieldStatistics.put(entry.getKey(), merged);
} else {
@ -121,6 +125,10 @@ public class SearchPhaseController extends AbstractComponent {
return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc);
private static long optionalSum(long left, long right) {
return Math.min(left, right) == -1 ? -1 : left + right;
public ShardDoc[] sortDocs(Collection<? extends QuerySearchResultProvider> results1) {
if (results1.isEmpty()) {
@ -267,7 +275,7 @@ public class SearchPhaseController extends AbstractComponent {
public Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad(ShardDoc[] shardDocs) {
Map<SearchShardTarget, ExtTIntArrayList> result = Maps.newHashMap();
Map<SearchShardTarget, ExtTIntArrayList> result = XMaps.newMap();
for (ShardDoc shardDoc : shardDocs) {
ExtTIntArrayList list = result.get(shardDoc.shardTarget());
if (list == null) {
@ -19,43 +19,38 @@
package org.elasticsearch.search.dfs;
import gnu.trove.impl.Constants;
import gnu.trove.map.TMap;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.trove.ExtTHashMap;
import java.io.IOException;
import java.util.Map;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
public class AggregatedDfs implements Streamable {
private TMap<Term, TermStatistics> termStatistics;
private TMap<String, CollectionStatistics> fieldStatistics;
private Map<Term, TermStatistics> termStatistics;
private Map<String, CollectionStatistics> fieldStatistics;
private long maxDoc;
private AggregatedDfs() {
public AggregatedDfs(TMap<Term, TermStatistics> termStatistics, TMap<String, CollectionStatistics> fieldStatistics, long maxDoc) {
public AggregatedDfs(Map<Term, TermStatistics> termStatistics, Map<String, CollectionStatistics> fieldStatistics, long maxDoc) {
this.termStatistics = termStatistics;
this.fieldStatistics = fieldStatistics;
this.maxDoc = maxDoc;
public TMap<Term, TermStatistics> termStatistics() {
public Map<Term, TermStatistics> termStatistics() {
return termStatistics;
public TMap<String, CollectionStatistics> fieldStatistics() {
public Map<String, CollectionStatistics> fieldStatistics() {
return fieldStatistics;
@ -72,19 +67,15 @@ public class AggregatedDfs implements Streamable {
public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
termStatistics = new ExtTHashMap<Term, TermStatistics>(size, Constants.DEFAULT_LOAD_FACTOR);
termStatistics = XMaps.newMap(size);
for (int i = 0; i < size; i++) {
Term term = new Term(in.readString(), in.readBytesRef());
TermStatistics stats = new TermStatistics(in.readBytesRef(), in.readVLong(), in.readVLong());
TermStatistics stats = new TermStatistics(in.readBytesRef(),
termStatistics.put(term, stats);
size = in.readVInt();
fieldStatistics = new ExtTHashMap<String, CollectionStatistics>(size, Constants.DEFAULT_LOAD_FACTOR);
for (int i = 0; i < size; i++) {
String field = in.readString();
CollectionStatistics stats = new CollectionStatistics(field, in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
fieldStatistics.put(field, stats);
fieldStatistics = DfsSearchResult.readFieldStats(in);
maxDoc = in.readVLong();
@ -98,18 +89,9 @@ public class AggregatedDfs implements Streamable {
TermStatistics stats = termTermStatisticsEntry.getValue();
for (Map.Entry<String, CollectionStatistics> entry : fieldStatistics.entrySet()) {
DfsSearchResult.writeFieldStats(out, fieldStatistics);
@ -20,14 +20,13 @@
package org.elasticsearch.search.dfs;
import com.google.common.collect.ImmutableMap;
import gnu.trove.map.TMap;
import gnu.trove.set.hash.THashSet;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermContext;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchPhase;
@ -57,18 +56,21 @@ public class DfsPhase implements SearchPhase {
public void execute(SearchContext context) {
THashSet<Term> termsSet = null;
try {
if (!context.queryRewritten()) {
THashSet<Term> termsSet = cachedTermsSet.get().get();
termsSet = cachedTermsSet.get().get();
if (!termsSet.isEmpty()) {
if (context.rescore() != null) {
context.rescore().rescorer().extractTerms(context, context.rescore(), termsSet);
Term[] terms = termsSet.toArray(new Term[termsSet.size()]);
TermStatistics[] termStatistics = new TermStatistics[terms.length];
IndexReaderContext indexReaderContext = context.searcher().getTopReaderContext();
@ -78,10 +80,12 @@ public class DfsPhase implements SearchPhase {
termStatistics[i] = context.searcher().termStatistics(terms[i], termContext);
TMap<String, CollectionStatistics> fieldStatistics = new ExtTHashMap<String, CollectionStatistics>();
Map<String, CollectionStatistics> fieldStatistics = XMaps.newNoNullKeysMap();
for (Term term : terms) {
assert term.field() != null : "field is null";
if (!fieldStatistics.containsKey(term.field())) {
fieldStatistics.put(term.field(), context.searcher().collectionStatistics(term.field()));
final CollectionStatistics collectionStatistics = context.searcher().collectionStatistics(term.field());
fieldStatistics.put(term.field(), collectionStatistics);
@ -90,6 +94,10 @@ public class DfsPhase implements SearchPhase {
} catch (Exception e) {
throw new DfsPhaseExecutionException(context, "Exception during dfs phase", e);
} finally {
if (termsSet != null) {
termsSet.clear(); // don't hold on to terms
@ -19,21 +19,20 @@
package org.elasticsearch.search.dfs;
import gnu.trove.map.TMap;
import java.io.IOException;
import java.util.Map;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Map;
@ -46,7 +45,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes
private long id;
private Term[] terms;
private TermStatistics[] termStatistics;
private TMap<String, CollectionStatistics> fieldStatistics = new ExtTHashMap<String, CollectionStatistics>();
private Map<String, CollectionStatistics> fieldStatistics = XMaps.newNoNullKeysMap();
private int maxDoc;
public DfsSearchResult() {
@ -86,7 +85,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes
return this;
public DfsSearchResult fieldStatistics(TMap<String, CollectionStatistics> fieldStatistics) {
public DfsSearchResult fieldStatistics(Map<String, CollectionStatistics> fieldStatistics) {
this.fieldStatistics = fieldStatistics;
return this;
@ -99,7 +98,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes
return termStatistics;
public TMap<String, CollectionStatistics> fieldStatistics() {
public Map<String, CollectionStatistics> fieldStatistics() {
return fieldStatistics;
@ -113,7 +112,6 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes
public void readFrom(StreamInput in) throws IOException {
id = in.readLong();
// shardTarget = readSearchShardTarget(in);
int termsSize = in.readVInt();
if (termsSize == 0) {
terms = EMPTY_TERMS;
@ -123,52 +121,108 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes
terms[i] = new Term(in.readString(), in.readBytesRef());
int termsStatsSize = in.readVInt();
if (termsStatsSize == 0) {
termStatistics = EMPTY_TERM_STATS;
} else {
termStatistics = new TermStatistics[termsStatsSize];
for (int i = 0; i < termStatistics.length; i++) {
BytesRef term = terms[i].bytes();
long docFreq = in.readVLong();
long totalTermFreq = in.readVLong();
termStatistics[i] = new TermStatistics(term, docFreq, totalTermFreq);
int numFieldStatistics = in.readVInt();
for (int i = 0; i < numFieldStatistics; i++) {
String field = in.readString();
CollectionStatistics stats = new CollectionStatistics(field, in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
fieldStatistics.put(field, stats);
this.termStatistics = readTermStats(in, terms);
readFieldStats(in, fieldStatistics);
maxDoc = in.readVInt();
public void writeTo(StreamOutput out) throws IOException {
// shardTarget.writeTo(out);
for (Term term : terms) {
for (TermStatistics termStatistic : termStatistics) {
writeTermStats(out, termStatistics);
writeFieldStats(out, fieldStatistics);
public static void writeFieldStats(StreamOutput out, Map<String, CollectionStatistics> fieldStatistics) throws IOException {
for (Map.Entry<String, CollectionStatistics> entry : fieldStatistics.entrySet()) {
assert entry.getValue().maxDoc() >= 0;
public static void writeTermStats(StreamOutput out, TermStatistics[] termStatistics) throws IOException {
for (TermStatistics termStatistic : termStatistics) {
writeSingleTermStats(out, termStatistic);
public static void writeSingleTermStats(StreamOutput out, TermStatistics termStatistic) throws IOException {
assert termStatistic.docFreq() >= 0;
public static Map<String, CollectionStatistics> readFieldStats(StreamInput in) throws IOException {
return readFieldStats(in, null);
public static Map<String, CollectionStatistics> readFieldStats(StreamInput in, Map<String, CollectionStatistics> fieldStatistics) throws IOException {
final int numFieldStatistics = in.readVInt();
if (fieldStatistics == null) {
fieldStatistics = XMaps.newNoNullKeysMap(numFieldStatistics);
for (int i = 0; i < numFieldStatistics; i++) {
final String field = in.readString();
assert field != null;
final long maxDoc = in.readVLong();
final long docCount = toNotAvailable(in.readVLong());
final long sumTotalTermFreq = toNotAvailable(in.readVLong());
final long sumDocFreq = toNotAvailable(in.readVLong());
CollectionStatistics stats = new CollectionStatistics(field, maxDoc, docCount, sumTotalTermFreq, sumDocFreq);
fieldStatistics.put(field, stats);
return fieldStatistics;
public static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throws IOException {
int termsStatsSize = in.readVInt();
final TermStatistics[] termStatistics;
if (termsStatsSize == 0) {
termStatistics = EMPTY_TERM_STATS;
} else {
termStatistics = new TermStatistics[termsStatsSize];
assert terms.length == termsStatsSize;
for (int i = 0; i < termStatistics.length; i++) {
BytesRef term = terms[i].bytes();
final long docFreq = in.readVLong();
assert docFreq >= 0;
final long totalTermFreq = toNotAvailable(in.readVLong());
termStatistics[i] = new TermStatistics(term, docFreq, totalTermFreq);
return termStatistics;
* optional statistics are set to -1 in lucene by default.
* Since we are using var longs to encode values we add one to each value
* to ensure we don't waste space and don't add negative values.
public static long makePositive(long value) {
assert Math.signum(value+1) >= 0;
return value+1;
public static long toNotAvailable(long value) {
assert Math.signum(value) >= 0;
return value-1;
@ -60,6 +60,7 @@ public class SimpleQueryTests extends AbstractNodesTests {
public void createNodes() throws Exception {
client = getClient();
@ -1351,5 +1352,106 @@ public class SimpleQueryTests extends AbstractNodesTests {
assertHitCount(response, 3l);
public void testSimpleDFSQuery() throws ElasticSearchException, IOException {
.put("index.number_of_shards", 5)
.put("index.number_of_replicas", 0)
).addMapping("s", jsonBuilder()
.field("required", true)
.field("path", "bs")
.field("type", "boolean")
.field("type", "date")
.field("ignore_malformed", false)
.field("format", "dateOptionalTime")
.field("type", "string")
.field("index", "not_analyzed")
.addMapping("bs", jsonBuilder()
.field("type", "boolean")
.field("type", "date")
.field("ignore_malformed", false)
.field("format", "dateOptionalTime")
client.prepareIndex("test", "s", "1").setSource(jsonBuilder().startObject()
.field("online", false)
.field("bs", "Y")
.field("ts", System.currentTimeMillis()- 100)
client.prepareIndex("test", "s", "2").setSource(jsonBuilder().startObject()
.field("online", true)
.field("bs", "X")
.field("ts", System.currentTimeMillis()- 10000000)
client.prepareIndex("test", "bs", "3").setSource(jsonBuilder().startObject()
.field("online", false)
.field("ts", System.currentTimeMillis()- 100)
client.prepareIndex("test", "bs", "4").setSource(jsonBuilder().startObject()
.field("online", true)
.field("ts", System.currentTimeMillis() - 123123)
SearchResponse response = client.prepareSearch("test")
.must(QueryBuilders.termQuery("online", true))
.must(QueryBuilders.rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000)))
.must(QueryBuilders.termQuery("_type", "bs"))
.must(QueryBuilders.rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000)))
.must(QueryBuilders.termQuery("_type", "s"))
Reference in New Issue
Block a user