From 1ef8761b70cd32ce1caaf6142a5ae709120b33dd Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 8 May 2013 16:23:16 +0200 Subject: [PATCH] Handle optional term and field statistics gracefully Lucene provides a set of statistics that depend on the codec / postingsformat as well as on the index options used when the field is created / indexed. If a certain stats value is not available lucene return `-1` instead of the correct value. We need to ensure that those values are encoded correctly if we try to write vLongs as well as when we aggregate those values. Closes #3012 --- .../elasticsearch/common/collect/XMaps.java | 92 +++++++++++++ .../controller/SearchPhaseController.java | 42 +++--- .../search/dfs/AggregatedDfs.java | 58 +++----- .../elasticsearch/search/dfs/DfsPhase.java | 22 ++- .../search/dfs/DfsSearchResult.java | 130 +++++++++++++----- .../search/query/SimpleQueryTests.java | 102 ++++++++++++++ 6 files changed, 346 insertions(+), 100 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/collect/XMaps.java diff --git a/src/main/java/org/elasticsearch/common/collect/XMaps.java b/src/main/java/org/elasticsearch/common/collect/XMaps.java new file mode 100644 index 00000000000..63b1d260b15 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/collect/XMaps.java @@ -0,0 +1,92 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.collect; + +import gnu.trove.impl.Constants; + +import java.util.Map; + +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.trove.ExtTHashMap; + +import com.google.common.collect.ForwardingMap; + +/** + * This class provides factory methods for Maps. The returned {@link Map} + * instances are general purpose maps and non of the method guarantees a + * concrete implementation unless the return type is a concrete type. The + * implementations used might change over time, if you rely on a specific + * Implementation you should use a concrete constructor. + */ +public final class XMaps { + + public static final int DEFAULT_CAPACITY = Constants.DEFAULT_CAPACITY; + + /** + * Returns a new map with the given initial capacity + */ + public static Map newMap(int capacity) { + return new ExtTHashMap(capacity, Constants.DEFAULT_LOAD_FACTOR); + } + + /** + * Returns a new map with a default initial capacity of + * {@value #DEFAULT_CAPACITY} + */ + public static Map newMap() { + return newMap(DEFAULT_CAPACITY); + } + + /** + * Returns a map like {@link #newMap()} that does not accept null keys + */ + public static Map newNoNullKeysMap() { + Map delegate = newMap(); + return ensureNoNullKeys(delegate); + } + + /** + * Returns a map like {@link #newMap(in)} that does not accept null keys + */ + public static Map newNoNullKeysMap(int capacity) { + Map delegate = newMap(capacity); + return ensureNoNullKeys(delegate); + } + + /** + * Wraps the given map and prevent adding of null keys. + */ + public static Map ensureNoNullKeys(final Map delegate) { + return new ForwardingMap() { + @Override + public V put(K key, V value) { + if (key == null) { + throw new ElasticSearchIllegalArgumentException("Map key must not be null"); + } + return super.put(key, value); + } + + @Override + protected Map delegate() { + return delegate; + } + }; + } +} diff --git a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java index 5c739c0de53..60bca7cb22b 100644 --- a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java +++ b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java @@ -21,19 +21,16 @@ package org.elasticsearch.search.controller; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; -import gnu.trove.impl.Constants; -import gnu.trove.map.TMap; import org.apache.lucene.index.Term; import org.apache.lucene.search.*; import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.XMaps; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.search.ShardFieldDocSortedHitQueue; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.trove.ExtTHashMap; import org.elasticsearch.common.trove.ExtTIntArrayList; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.AggregatedDfs; @@ -49,9 +46,6 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.suggest.Suggest; -import org.elasticsearch.search.suggest.Suggest.Suggestion; -import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; -import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry.Option; import java.util.ArrayList; import java.util.Collection; @@ -90,27 +84,37 @@ public class SearchPhaseController extends AbstractComponent { } public AggregatedDfs aggregateDfs(Iterable results) { - TMap termStatistics = new ExtTHashMap(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR); - TMap fieldStatistics = new ExtTHashMap(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR); + Map termStatistics = XMaps.newNoNullKeysMap(); + Map fieldStatistics = XMaps.newNoNullKeysMap(); long aggMaxDoc = 0; for (DfsSearchResult result : results) { - for (int i = 0; i < result.termStatistics().length; i++) { - TermStatistics existing = termStatistics.get(result.terms()[i]); + final Term[] terms = result.terms(); + final TermStatistics[] stats = result.termStatistics(); + assert terms.length == stats.length; + for (int i = 0; i < terms.length; i++) { + assert terms[i] != null; + TermStatistics existing = termStatistics.get(terms[i]); if (existing != null) { - termStatistics.put(result.terms()[i], new TermStatistics(existing.term(), existing.docFreq() + result.termStatistics()[i].docFreq(), existing.totalTermFreq() + result.termStatistics()[i].totalTermFreq())); + assert terms[i].bytes().equals(existing.term()); + // totalTermFrequency is an optional statistic we need to check if either one or both + // are set to -1 which means not present and then set it globally to -1 + termStatistics.put(terms[i], new TermStatistics(existing.term(), + existing.docFreq() + stats[i].docFreq(), + optionalSum(existing.totalTermFreq(), stats[i].totalTermFreq()))); } else { - termStatistics.put(result.terms()[i], result.termStatistics()[i]); + termStatistics.put(terms[i], stats[i]); } } for (Map.Entry entry : result.fieldStatistics().entrySet()) { + assert entry.getKey() != null; CollectionStatistics existing = fieldStatistics.get(entry.getKey()); if (existing != null) { CollectionStatistics merged = new CollectionStatistics( entry.getKey(), existing.maxDoc() + entry.getValue().maxDoc(), - existing.docCount() + entry.getValue().docCount(), - existing.sumTotalTermFreq() + entry.getValue().sumTotalTermFreq(), - existing.sumDocFreq() + entry.getValue().sumDocFreq() + optionalSum(existing.docCount(), entry.getValue().docCount()), + optionalSum(existing.sumTotalTermFreq(), entry.getValue().sumTotalTermFreq()), + optionalSum(existing.sumDocFreq(), entry.getValue().sumDocFreq()) ); fieldStatistics.put(entry.getKey(), merged); } else { @@ -121,6 +125,10 @@ public class SearchPhaseController extends AbstractComponent { } return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc); } + + private static long optionalSum(long left, long right) { + return Math.min(left, right) == -1 ? -1 : left + right; + } public ShardDoc[] sortDocs(Collection results1) { if (results1.isEmpty()) { @@ -267,7 +275,7 @@ public class SearchPhaseController extends AbstractComponent { } public Map docIdsToLoad(ShardDoc[] shardDocs) { - Map result = Maps.newHashMap(); + Map result = XMaps.newMap(); for (ShardDoc shardDoc : shardDocs) { ExtTIntArrayList list = result.get(shardDoc.shardTarget()); if (list == null) { diff --git a/src/main/java/org/elasticsearch/search/dfs/AggregatedDfs.java b/src/main/java/org/elasticsearch/search/dfs/AggregatedDfs.java index 6b750e153e6..ee52b0398e9 100644 --- a/src/main/java/org/elasticsearch/search/dfs/AggregatedDfs.java +++ b/src/main/java/org/elasticsearch/search/dfs/AggregatedDfs.java @@ -19,43 +19,38 @@ package org.elasticsearch.search.dfs; -import gnu.trove.impl.Constants; -import gnu.trove.map.TMap; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.CollectionStatistics; -import org.apache.lucene.search.TermStatistics; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.trove.ExtTHashMap; import java.io.IOException; import java.util.Map; -/** - * - */ +import org.apache.lucene.index.Term; +import org.apache.lucene.search.CollectionStatistics; +import org.apache.lucene.search.TermStatistics; +import org.elasticsearch.common.collect.XMaps; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; + public class AggregatedDfs implements Streamable { - private TMap termStatistics; - private TMap fieldStatistics; + private Map termStatistics; + private Map fieldStatistics; private long maxDoc; private AggregatedDfs() { - } - public AggregatedDfs(TMap termStatistics, TMap fieldStatistics, long maxDoc) { + public AggregatedDfs(Map termStatistics, Map fieldStatistics, long maxDoc) { this.termStatistics = termStatistics; this.fieldStatistics = fieldStatistics; this.maxDoc = maxDoc; } - public TMap termStatistics() { + public Map termStatistics() { return termStatistics; } - public TMap fieldStatistics() { + public Map fieldStatistics() { return fieldStatistics; } @@ -72,19 +67,15 @@ public class AggregatedDfs implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { int size = in.readVInt(); - termStatistics = new ExtTHashMap(size, Constants.DEFAULT_LOAD_FACTOR); + termStatistics = XMaps.newMap(size); for (int i = 0; i < size; i++) { Term term = new Term(in.readString(), in.readBytesRef()); - TermStatistics stats = new TermStatistics(in.readBytesRef(), in.readVLong(), in.readVLong()); + TermStatistics stats = new TermStatistics(in.readBytesRef(), + in.readVLong(), + DfsSearchResult.toNotAvailable(in.readVLong())); termStatistics.put(term, stats); } - size = in.readVInt(); - fieldStatistics = new ExtTHashMap(size, Constants.DEFAULT_LOAD_FACTOR); - for (int i = 0; i < size; i++) { - String field = in.readString(); - CollectionStatistics stats = new CollectionStatistics(field, in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); - fieldStatistics.put(field, stats); - } + fieldStatistics = DfsSearchResult.readFieldStats(in); maxDoc = in.readVLong(); } @@ -98,18 +89,9 @@ public class AggregatedDfs implements Streamable { TermStatistics stats = termTermStatisticsEntry.getValue(); out.writeBytesRef(stats.term()); out.writeVLong(stats.docFreq()); - out.writeVLong(stats.totalTermFreq()); + out.writeVLong(DfsSearchResult.makePositive(stats.totalTermFreq())); } - - out.writeVInt(fieldStatistics.size()); - for (Map.Entry entry : fieldStatistics.entrySet()) { - out.writeString(entry.getKey()); - out.writeVLong(entry.getValue().maxDoc()); - out.writeVLong(entry.getValue().docCount()); - out.writeVLong(entry.getValue().sumTotalTermFreq()); - out.writeVLong(entry.getValue().sumDocFreq()); - } - + DfsSearchResult.writeFieldStats(out, fieldStatistics); out.writeVLong(maxDoc); } } diff --git a/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java b/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java index c8861d97824..8cfec9b1044 100644 --- a/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java +++ b/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java @@ -20,14 +20,13 @@ package org.elasticsearch.search.dfs; import com.google.common.collect.ImmutableMap; -import gnu.trove.map.TMap; import gnu.trove.set.hash.THashSet; import org.apache.lucene.index.IndexReaderContext; import org.apache.lucene.index.Term; import org.apache.lucene.index.TermContext; import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.TermStatistics; -import org.elasticsearch.common.trove.ExtTHashMap; +import org.elasticsearch.common.collect.XMaps; import org.elasticsearch.common.util.concurrent.ThreadLocals; import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchPhase; @@ -57,18 +56,21 @@ public class DfsPhase implements SearchPhase { } public void execute(SearchContext context) { + THashSet termsSet = null; try { if (!context.queryRewritten()) { context.updateRewriteQuery(context.searcher().rewrite(context.query())); } - THashSet termsSet = cachedTermsSet.get().get(); - termsSet.clear(); + termsSet = cachedTermsSet.get().get(); + if (!termsSet.isEmpty()) { + termsSet.clear(); + } context.query().extractTerms(termsSet); if (context.rescore() != null) { context.rescore().rescorer().extractTerms(context, context.rescore(), termsSet); } - + Term[] terms = termsSet.toArray(new Term[termsSet.size()]); TermStatistics[] termStatistics = new TermStatistics[terms.length]; IndexReaderContext indexReaderContext = context.searcher().getTopReaderContext(); @@ -78,10 +80,12 @@ public class DfsPhase implements SearchPhase { termStatistics[i] = context.searcher().termStatistics(terms[i], termContext); } - TMap fieldStatistics = new ExtTHashMap(); + Map fieldStatistics = XMaps.newNoNullKeysMap(); for (Term term : terms) { + assert term.field() != null : "field is null"; if (!fieldStatistics.containsKey(term.field())) { - fieldStatistics.put(term.field(), context.searcher().collectionStatistics(term.field())); + final CollectionStatistics collectionStatistics = context.searcher().collectionStatistics(term.field()); + fieldStatistics.put(term.field(), collectionStatistics); } } @@ -90,6 +94,10 @@ public class DfsPhase implements SearchPhase { .maxDoc(context.searcher().getIndexReader().maxDoc()); } catch (Exception e) { throw new DfsPhaseExecutionException(context, "Exception during dfs phase", e); + } finally { + if (termsSet != null) { + termsSet.clear(); // don't hold on to terms + } } } } diff --git a/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java b/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java index 680e99b2a6a..4c8cbb60d20 100644 --- a/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java +++ b/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java @@ -19,21 +19,20 @@ package org.elasticsearch.search.dfs; -import gnu.trove.map.TMap; +import java.io.IOException; +import java.util.Map; + import org.apache.lucene.index.Term; import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.TermStatistics; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.collect.XMaps; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.trove.ExtTHashMap; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.transport.TransportResponse; -import java.io.IOException; -import java.util.Map; - /** * */ @@ -46,7 +45,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes private long id; private Term[] terms; private TermStatistics[] termStatistics; - private TMap fieldStatistics = new ExtTHashMap(); + private Map fieldStatistics = XMaps.newNoNullKeysMap(); private int maxDoc; public DfsSearchResult() { @@ -86,7 +85,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes return this; } - public DfsSearchResult fieldStatistics(TMap fieldStatistics) { + public DfsSearchResult fieldStatistics(Map fieldStatistics) { this.fieldStatistics = fieldStatistics; return this; } @@ -99,7 +98,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes return termStatistics; } - public TMap fieldStatistics() { + public Map fieldStatistics() { return fieldStatistics; } @@ -113,7 +112,6 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readLong(); -// shardTarget = readSearchShardTarget(in); int termsSize = in.readVInt(); if (termsSize == 0) { terms = EMPTY_TERMS; @@ -123,52 +121,108 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes terms[i] = new Term(in.readString(), in.readBytesRef()); } } - int termsStatsSize = in.readVInt(); - if (termsStatsSize == 0) { - termStatistics = EMPTY_TERM_STATS; - } else { - termStatistics = new TermStatistics[termsStatsSize]; - for (int i = 0; i < termStatistics.length; i++) { - BytesRef term = terms[i].bytes(); - long docFreq = in.readVLong(); - long totalTermFreq = in.readVLong(); - termStatistics[i] = new TermStatistics(term, docFreq, totalTermFreq); - } - } - int numFieldStatistics = in.readVInt(); - for (int i = 0; i < numFieldStatistics; i++) { - String field = in.readString(); - CollectionStatistics stats = new CollectionStatistics(field, in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); - fieldStatistics.put(field, stats); - } + this.termStatistics = readTermStats(in, terms); + readFieldStats(in, fieldStatistics); + maxDoc = in.readVInt(); } - @Override + + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(id); -// shardTarget.writeTo(out); out.writeVInt(terms.length); for (Term term : terms) { out.writeString(term.field()); out.writeBytesRef(term.bytes()); } - out.writeVInt(termStatistics.length); - for (TermStatistics termStatistic : termStatistics) { - out.writeVLong(termStatistic.docFreq()); - out.writeVLong(termStatistic.totalTermFreq()); - } + writeTermStats(out, termStatistics); + writeFieldStats(out, fieldStatistics); + out.writeVInt(maxDoc); + } + + public static void writeFieldStats(StreamOutput out, Map fieldStatistics) throws IOException { out.writeVInt(fieldStatistics.size()); for (Map.Entry entry : fieldStatistics.entrySet()) { out.writeString(entry.getKey()); + assert entry.getValue().maxDoc() >= 0; out.writeVLong(entry.getValue().maxDoc()); - out.writeVLong(entry.getValue().docCount()); - out.writeVLong(entry.getValue().sumTotalTermFreq()); - out.writeVLong(entry.getValue().sumDocFreq()); + out.writeVLong(makePositive(entry.getValue().docCount())); + out.writeVLong(makePositive(entry.getValue().sumTotalTermFreq())); + out.writeVLong(makePositive(entry.getValue().sumDocFreq())); } - out.writeVInt(maxDoc); + } + + public static void writeTermStats(StreamOutput out, TermStatistics[] termStatistics) throws IOException { + out.writeVInt(termStatistics.length); + for (TermStatistics termStatistic : termStatistics) { + writeSingleTermStats(out, termStatistic); + } + } + + public static void writeSingleTermStats(StreamOutput out, TermStatistics termStatistic) throws IOException { + assert termStatistic.docFreq() >= 0; + out.writeVLong(termStatistic.docFreq()); + out.writeVLong(makePositive(termStatistic.totalTermFreq())); + } + + public static Map readFieldStats(StreamInput in) throws IOException { + return readFieldStats(in, null); + } + + public static Map readFieldStats(StreamInput in, Map fieldStatistics) throws IOException { + final int numFieldStatistics = in.readVInt(); + if (fieldStatistics == null) { + fieldStatistics = XMaps.newNoNullKeysMap(numFieldStatistics); + } + for (int i = 0; i < numFieldStatistics; i++) { + final String field = in.readString(); + assert field != null; + final long maxDoc = in.readVLong(); + final long docCount = toNotAvailable(in.readVLong()); + final long sumTotalTermFreq = toNotAvailable(in.readVLong()); + final long sumDocFreq = toNotAvailable(in.readVLong()); + CollectionStatistics stats = new CollectionStatistics(field, maxDoc, docCount, sumTotalTermFreq, sumDocFreq); + fieldStatistics.put(field, stats); + } + return fieldStatistics; + } + + public static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throws IOException { + int termsStatsSize = in.readVInt(); + final TermStatistics[] termStatistics; + if (termsStatsSize == 0) { + termStatistics = EMPTY_TERM_STATS; + } else { + termStatistics = new TermStatistics[termsStatsSize]; + assert terms.length == termsStatsSize; + for (int i = 0; i < termStatistics.length; i++) { + BytesRef term = terms[i].bytes(); + final long docFreq = in.readVLong(); + assert docFreq >= 0; + final long totalTermFreq = toNotAvailable(in.readVLong()); + termStatistics[i] = new TermStatistics(term, docFreq, totalTermFreq); + } + } + return termStatistics; + } + + + /* + * optional statistics are set to -1 in lucene by default. + * Since we are using var longs to encode values we add one to each value + * to ensure we don't waste space and don't add negative values. + */ + public static long makePositive(long value) { + assert Math.signum(value+1) >= 0; + return value+1; + } + + public static long toNotAvailable(long value) { + assert Math.signum(value) >= 0; + return value-1; } } diff --git a/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java b/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java index b24d3433633..706a8966e27 100644 --- a/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java +++ b/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java @@ -60,6 +60,7 @@ public class SimpleQueryTests extends AbstractNodesTests { @BeforeClass public void createNodes() throws Exception { startNode("node1"); + startNode("node2"); client = getClient(); } @@ -1351,5 +1352,106 @@ public class SimpleQueryTests extends AbstractNodesTests { assertNoFailures(response); assertHitCount(response, 3l); } + + @Test + public void testSimpleDFSQuery() throws ElasticSearchException, IOException { + + client.admin().indices().prepareDelete().execute().actionGet(); + client.admin().indices().prepareCreate("test").setSettings( + ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 5) + .put("index.number_of_replicas", 0) + ).addMapping("s", jsonBuilder() + .startObject() + .startObject("s") + .startObject("_routing") + .field("required", true) + .field("path", "bs") + .endObject() + .startObject("properties") + .startObject("online") + .field("type", "boolean") + .endObject() + .startObject("ts") + .field("type", "date") + .field("ignore_malformed", false) + .field("format", "dateOptionalTime") + .endObject() + .startObject("bs") + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + .endObject() + .endObject() + .endObject()) + .addMapping("bs", jsonBuilder() + .startObject() + .startObject("s") + .startObject("properties") + .startObject("online") + .field("type", "boolean") + .endObject() + .startObject("ts") + .field("type", "date") + .field("ignore_malformed", false) + .field("format", "dateOptionalTime") + .endObject() + .endObject() + .endObject() + .endObject()) + + + .execute().actionGet(); + + client.prepareIndex("test", "s", "1").setSource(jsonBuilder().startObject() + .field("online", false) + .field("bs", "Y") + .field("ts", System.currentTimeMillis()- 100) + .endObject()) + .execute().actionGet(); + + client.prepareIndex("test", "s", "2").setSource(jsonBuilder().startObject() + .field("online", true) + .field("bs", "X") + .field("ts", System.currentTimeMillis()- 10000000) + .endObject()) + .execute().actionGet(); + + client.prepareIndex("test", "bs", "3").setSource(jsonBuilder().startObject() + .field("online", false) + .field("ts", System.currentTimeMillis()- 100) + .endObject()) + .execute().actionGet(); + + client.prepareIndex("test", "bs", "4").setSource(jsonBuilder().startObject() + .field("online", true) + .field("ts", System.currentTimeMillis() - 123123) + .endObject()) + .execute().actionGet(); + + client.admin().indices().prepareRefresh().execute().actionGet(); + SearchResponse response = client.prepareSearch("test") + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) + .setQuery( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("online", true)) + .must(QueryBuilders.boolQuery() + .should(QueryBuilders.boolQuery() + .must(QueryBuilders.rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))) + .must(QueryBuilders.termQuery("_type", "bs")) + ) + .should(QueryBuilders.boolQuery() + .must(QueryBuilders.rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))) + .must(QueryBuilders.termQuery("_type", "s")) + ) + ) + ) + .setVersion(true) + .setFrom(0).setSize(100).setExplain(true) + .execute() + .actionGet(); + assertNoFailures(response); + + } }