From 03fdc6aa8099e30b875efe5369ac98fe955c59a4 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 22 Feb 2013 14:04:10 +0100 Subject: [PATCH] Query DSL: Terms filter to allow for terms lookup from another document closes #2674 --- .../TransportClearIndicesCacheAction.java | 10 +- .../elasticsearch/index/cache/IndexCache.java | 2 +- .../index/query/FilterBuilders.java | 8 + .../index/query/TermsFilterParser.java | 59 ++++- .../index/query/TermsLookupFilterBuilder.java | 111 ++++++++++ .../elasticsearch/indices/IndicesModule.java | 2 + .../filter/terms/IndicesTermsFilterCache.java | 206 ++++++++++++++++++ .../cache/filter/terms/TermsLookup.java | 68 ++++++ .../search/query/SimpleQueryTests.java | 92 ++++++++ 9 files changed, 554 insertions(+), 4 deletions(-) create mode 100644 src/main/java/org/elasticsearch/index/query/TermsLookupFilterBuilder.java create mode 100644 src/main/java/org/elasticsearch/indices/cache/filter/terms/IndicesTermsFilterCache.java create mode 100644 src/main/java/org/elasticsearch/indices/cache/filter/terms/TermsLookup.java diff --git a/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index 582b626f64c..6f809ee694a 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -48,12 +49,14 @@ import static com.google.common.collect.Lists.newArrayList; public class TransportClearIndicesCacheAction extends TransportBroadcastOperationAction { private final IndicesService indicesService; + private final IndicesTermsFilterCache termsFilterCache; @Inject public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportService transportService, IndicesService indicesService) { + TransportService transportService, IndicesService indicesService, IndicesTermsFilterCache termsFilterCache) { super(settings, threadPool, clusterService, transportService); this.indicesService = indicesService; + this.termsFilterCache = termsFilterCache; } @Override @@ -123,10 +126,12 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio if (request.isFilterCache()) { clearedAtLeastOne = true; service.cache().filter().clear("api"); + termsFilterCache.clear("api"); } if (request.getFilterKeys() != null && request.getFilterKeys().length > 0) { clearedAtLeastOne = true; service.cache().filter().clear("api", request.getFilterKeys()); + termsFilterCache.clear("api", request.getFilterKeys()); } if (request.isFieldDataCache()) { clearedAtLeastOne = true; @@ -150,9 +155,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio } } else { service.cache().clear("api"); + termsFilterCache.clear("api"); } } - service.cache().invalidateCache(); + service.cache().invalidateStatsCache(); } return new ShardClearIndicesCacheResponse(request.getIndex(), request.getShardId()); } diff --git a/src/main/java/org/elasticsearch/index/cache/IndexCache.java b/src/main/java/org/elasticsearch/index/cache/IndexCache.java index 28d4e6212ba..8405d8af2f6 100644 --- a/src/main/java/org/elasticsearch/index/cache/IndexCache.java +++ b/src/main/java/org/elasticsearch/index/cache/IndexCache.java @@ -75,7 +75,7 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo } } - public synchronized void invalidateCache() { + public synchronized void invalidateStatsCache() { FilterCache.EntriesStats filterEntriesStats = filterCache.entriesStats(); latestCacheStats = new CacheStats(filterCache.evictions(), filterEntriesStats.sizeInBytes, filterEntriesStats.count, idCache.sizeInBytes()); latestCacheStatsTimestamp = System.currentTimeMillis(); diff --git a/src/main/java/org/elasticsearch/index/query/FilterBuilders.java b/src/main/java/org/elasticsearch/index/query/FilterBuilders.java index af0e71a4062..d6c96ed851c 100644 --- a/src/main/java/org/elasticsearch/index/query/FilterBuilders.java +++ b/src/main/java/org/elasticsearch/index/query/FilterBuilders.java @@ -195,6 +195,14 @@ public abstract class FilterBuilders { return new TermsFilterBuilder(name, values); } + /** + * A terms lookup filter for the provided field name. A lookup terms filter can + * extract the terms to filter by from another doc in an index. + */ + public static TermsLookupFilterBuilder termsLookupFilter(String name) { + return new TermsLookupFilterBuilder(name); + } + /** * A filer for a field based on several terms matching on any of them. * diff --git a/src/main/java/org/elasticsearch/index/query/TermsFilterParser.java b/src/main/java/org/elasticsearch/index/query/TermsFilterParser.java index f758d52db4b..f60737dbf6f 100644 --- a/src/main/java/org/elasticsearch/index/query/TermsFilterParser.java +++ b/src/main/java/org/elasticsearch/index/query/TermsFilterParser.java @@ -27,11 +27,16 @@ import org.apache.lucene.search.Filter; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.BytesRefs; -import org.elasticsearch.common.lucene.search.*; +import org.elasticsearch.common.lucene.search.AndFilter; +import org.elasticsearch.common.lucene.search.OrFilter; +import org.elasticsearch.common.lucene.search.TermFilter; +import org.elasticsearch.common.lucene.search.XBooleanFilter; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.cache.filter.support.CacheKeyFilter; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache; +import org.elasticsearch.indices.cache.filter.terms.TermsLookup; import java.io.IOException; import java.util.List; @@ -45,6 +50,8 @@ public class TermsFilterParser implements FilterParser { public static final String NAME = "terms"; + private IndicesTermsFilterCache termsFilterCache; + @Inject public TermsFilterParser() { } @@ -54,6 +61,11 @@ public class TermsFilterParser implements FilterParser { return new String[]{NAME, "in"}; } + @Inject(optional = true) + public void setIndicesTermsFilterCache(IndicesTermsFilterCache termsFilterCache) { + this.termsFilterCache = termsFilterCache; + } + @Override public Filter parse(QueryParseContext parseContext) throws IOException, QueryParsingException { XContentParser parser = parseContext.parser(); @@ -62,6 +74,12 @@ public class TermsFilterParser implements FilterParser { Boolean cache = null; String filterName = null; String currentFieldName = null; + + String lookupIndex = parseContext.index().name(); + String lookupType = null; + String lookupId = null; + String lookupPath = null; + CacheKeyFilter.Key cacheKey = null; XContentParser.Token token; String execution = "plain"; @@ -80,6 +98,34 @@ public class TermsFilterParser implements FilterParser { } terms.add(value); } + } else if (token == XContentParser.Token.START_OBJECT) { + fieldName = currentFieldName; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("index".equals(currentFieldName)) { + lookupIndex = parser.text(); + } else if ("type".equals(currentFieldName)) { + lookupType = parser.text(); + } else if ("id".equals(currentFieldName)) { + lookupId = parser.text(); + } else if ("path".equals(currentFieldName)) { + lookupPath = parser.text(); + } else { + throw new QueryParsingException(parseContext.index(), "[terms] filter does not support [" + currentFieldName + "] within lookup element"); + } + } + } + if (lookupType == null) { + throw new QueryParsingException(parseContext.index(), "[terms] filter lookup element requires specifying the type"); + } + if (lookupId == null) { + throw new QueryParsingException(parseContext.index(), "[terms] filter lookup element requires specifying the id"); + } + if (lookupPath == null) { + throw new QueryParsingException(parseContext.index(), "[terms] filter lookup element requires specifying the path"); + } } else if (token.isValue()) { if ("execution".equals(currentFieldName)) { execution = parser.text(); @@ -113,6 +159,17 @@ public class TermsFilterParser implements FilterParser { } } + if (lookupId != null) { + // external lookup, use it + TermsLookup termsLookup = new TermsLookup(fieldMapper, lookupIndex, lookupType, lookupId, lookupPath); + if (cacheKey == null) { + cacheKey = new CacheKeyFilter.Key(termsLookup.toString()); + } + Filter filter = termsFilterCache.lookupTermsFilter(cacheKey, termsLookup); + filter = parseContext.cacheFilter(filter, null); // cacheKey is passed as null, so we don't double cache the key + return filter; + } + try { Filter filter; if ("plain".equals(execution)) { diff --git a/src/main/java/org/elasticsearch/index/query/TermsLookupFilterBuilder.java b/src/main/java/org/elasticsearch/index/query/TermsLookupFilterBuilder.java new file mode 100644 index 00000000000..cb5947e507e --- /dev/null +++ b/src/main/java/org/elasticsearch/index/query/TermsLookupFilterBuilder.java @@ -0,0 +1,111 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.query; + +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * A filer for a field based on several terms matching on any of them. + */ +public class TermsLookupFilterBuilder extends BaseFilterBuilder { + + private final String name; + private String lookupIndex; + private String lookupType; + private String lookupId; + private String lookupPath; + + private String cacheKey; + private String filterName; + + public TermsLookupFilterBuilder(String name) { + this.name = name; + } + + /** + * Sets the filter name for the filter that can be used when searching for matched_filters per hit. + */ + public TermsLookupFilterBuilder filterName(String filterName) { + this.filterName = filterName; + return this; + } + + /** + * Sets the index name to lookup the terms from. + */ + public TermsLookupFilterBuilder lookupIndex(String lookupIndex) { + this.lookupIndex = lookupIndex; + return this; + } + + /** + * Sets the index type to lookup the terms from. + */ + public TermsLookupFilterBuilder lookupType(String lookupType) { + this.lookupType = lookupType; + return this; + } + + /** + * Sets the doc id to lookup the terms from. + */ + public TermsLookupFilterBuilder lookupId(String lookupId) { + this.lookupId = lookupId; + return this; + } + + /** + * Sets the path within the document to lookup the terms from. + */ + public TermsLookupFilterBuilder lookupPath(String lookupPath) { + this.lookupPath = lookupPath; + return this; + } + + public TermsLookupFilterBuilder cacheKey(String cacheKey) { + this.cacheKey = cacheKey; + return this; + } + + @Override + public void doXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(TermsFilterParser.NAME); + + builder.startObject(name); + if (lookupIndex != null) { + builder.field("index", lookupIndex); + } + builder.field("type", lookupType); + builder.field("id", lookupId); + builder.field("path", lookupPath); + builder.endObject(); + + if (filterName != null) { + builder.field("_name", filterName); + } + if (cacheKey != null) { + builder.field("_cache_key", cacheKey); + } + + builder.endObject(); + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index ac61b3b594c..341ba0510a6 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indices.analysis.IndicesAnalysisModule; import org.elasticsearch.indices.cache.filter.IndicesFilterCache; +import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.query.IndicesQueriesModule; @@ -68,6 +69,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules { bind(IndicesClusterStateService.class).asEagerSingleton(); bind(IndexingMemoryController.class).asEagerSingleton(); bind(IndicesFilterCache.class).asEagerSingleton(); + bind(IndicesTermsFilterCache.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton(); bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/indices/cache/filter/terms/IndicesTermsFilterCache.java b/src/main/java/org/elasticsearch/indices/cache/filter/terms/IndicesTermsFilterCache.java new file mode 100644 index 00000000000..095c18a2bd3 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/cache/filter/terms/IndicesTermsFilterCache.java @@ -0,0 +1,206 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.cache.filter.terms; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.Weigher; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.Filter; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.cache.filter.support.CacheKeyFilter; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + */ +public class IndicesTermsFilterCache extends AbstractComponent { + + private static TermsFilterValue NO_TERMS = new TermsFilterValue(0, null); + + private final Client client; + + private final Cache cache; + + @Inject + public IndicesTermsFilterCache(Settings settings, Client client) { + super(settings); + this.client = client; + + ByteSizeValue size = componentSettings.getAsBytesSize("size", new ByteSizeValue(10, ByteSizeUnit.MB)); + TimeValue expireAfterWrite = componentSettings.getAsTime("expire_after_write", null); + TimeValue expireAfterAccess = componentSettings.getAsTime("expire_after_access", null); + + CacheBuilder builder = CacheBuilder.newBuilder() + .maximumWeight(size.bytes()) + .weigher(new TermsFilterValueWeigher()); + + if (expireAfterAccess != null) { + builder.expireAfterAccess(expireAfterAccess.millis(), TimeUnit.MILLISECONDS); + } + if (expireAfterWrite != null) { + builder.expireAfterWrite(expireAfterWrite.millis(), TimeUnit.MILLISECONDS); + } + + this.cache = builder.build(); + } + + /** + * An external lookup terms filter. Note, already implements the {@link CacheKeyFilter} so no need + * to double cache key it. + */ + public Filter lookupTermsFilter(final CacheKeyFilter.Key cacheKey, final TermsLookup lookup) { + return new LookupTermsFilter(lookup, cacheKey, this); + } + + @Nullable + private Filter termsFilter(final CacheKeyFilter.Key cacheKey, final TermsLookup lookup) throws RuntimeException { + try { + return cache.get(cacheKey, new Callable() { + @Override + public TermsFilterValue call() throws Exception { + GetResponse getResponse = client.get(new GetRequest(lookup.getIndex(), lookup.getType(), lookup.getId()).setPreference("_local")).actionGet(); + if (!getResponse.isExists()) { + return NO_TERMS; + } + List values = XContentMapValues.extractRawValues(lookup.getPath(), getResponse.getSourceAsMap()); + if (values.isEmpty()) { + return NO_TERMS; + } + Filter filter = lookup.getFieldMapper().termsFilter(values, null); + return new TermsFilterValue(estimateSizeInBytes(values), filter); + } + }).filter; + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw new ElasticSearchException(e.getMessage(), e.getCause()); + } + } + + long estimateSizeInBytes(List terms) { + long size = 8; + for (Object term : terms) { + if (term instanceof BytesRef) { + size += ((BytesRef) term).length; + } else if (term instanceof String) { + size += ((String) term).length() / 2; + } else { + size += 4; + } + } + return size; + } + + public void clear(String reason) { + cache.invalidateAll(); + } + + public void clear(String reason, String[] keys) { + for (String key : keys) { + cache.invalidate(new CacheKeyFilter.Key(key)); + } + } + + static class LookupTermsFilter extends Filter implements CacheKeyFilter { + + private final TermsLookup lookup; + private final CacheKeyFilter.Key cacheKey; + private final IndicesTermsFilterCache cache; + + LookupTermsFilter(TermsLookup lookup, CacheKeyFilter.Key cacheKey, IndicesTermsFilterCache cache) { + this.lookup = lookup; + this.cacheKey = cacheKey; + this.cache = cache; + } + + @Override + public DocIdSet getDocIdSet(AtomicReaderContext context, Bits acceptDocs) throws IOException { + Filter filter = cache.termsFilter(cacheKey, lookup); + if (filter == null) return null; + return filter.getDocIdSet(context, acceptDocs); + } + + @Override + public Key cacheKey() { + return this.cacheKey; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LookupTermsFilter that = (LookupTermsFilter) o; + + if (!cacheKey.equals(that.cacheKey)) return false; + + return true; + } + + @Override + public int hashCode() { + return cacheKey.hashCode(); + } + + @Override + public String toString() { + return "terms(" + lookup.toString() + ")"; + } + } + + static class TermsFilterValueWeigher implements Weigher { + + @Override + public int weigh(CacheKeyFilter.Key key, TermsFilterValue value) { + return (int) (key.bytes().length + value.sizeInBytes); + } + } + + // TODO: if TermsFilter exposed sizeInBytes, we won't need this wrapper + static class TermsFilterValue { + public final long sizeInBytes; + public final Filter filter; + + TermsFilterValue(long sizeInBytes, Filter filter) { + this.sizeInBytes = sizeInBytes; + this.filter = filter; + } + } +} diff --git a/src/main/java/org/elasticsearch/indices/cache/filter/terms/TermsLookup.java b/src/main/java/org/elasticsearch/indices/cache/filter/terms/TermsLookup.java new file mode 100644 index 00000000000..a0fc0a02879 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/cache/filter/terms/TermsLookup.java @@ -0,0 +1,68 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.cache.filter.terms; + +import org.elasticsearch.index.cache.filter.support.CacheKeyFilter; +import org.elasticsearch.index.mapper.FieldMapper; + +/** + */ +public class TermsLookup { + + private final FieldMapper fieldMapper; + + private final String index; + private final String type; + private final String id; + private final String path; + + public TermsLookup(FieldMapper fieldMapper, String index, String type, String id, String path) { + // TODO: do we want to intern index, type and path? + this.fieldMapper = fieldMapper; + this.index = index; + this.type = type; + this.id = id; + this.path = path; + } + + public FieldMapper getFieldMapper() { + return fieldMapper; + } + + public String getIndex() { + return index; + } + + public String getType() { + return type; + } + + public String getId() { + return id; + } + + public String getPath() { + return path; + } + + public String toString() { + return fieldMapper.names().fullName() + ":" + index + "/" + type + "/" + id + "/" + path; + } +} diff --git a/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java b/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java index 098e23ccdeb..41834d3971f 100644 --- a/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java +++ b/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java @@ -23,11 +23,13 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.*; import org.elasticsearch.index.query.CommonTermsQueryBuilder.Operator; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.facet.FacetBuilders; import org.elasticsearch.test.integration.AbstractNodesTests; +import org.hamcrest.Matchers; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -704,4 +706,94 @@ public class SimpleQueryTests extends AbstractNodesTests { assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); } + + @Test + public void testTermsLookupFilter() throws Exception { + client.admin().indices().prepareDelete().execute().actionGet(); + + client.prepareIndex("lookup", "type", "1").setSource("terms", new String[]{"1", "3"}).execute().actionGet(); + client.prepareIndex("lookup", "type", "2").setSource("terms", new String[]{"2"}).execute().actionGet(); + client.prepareIndex("lookup", "type", "3").setSource("terms", new String[]{"2", "4"}).execute().actionGet(); + + client.prepareIndex("lookup2", "type", "1").setSource(XContentFactory.jsonBuilder().startObject() + .startArray("arr") + .startObject().field("term", "1").endObject() + .startObject().field("term", "3").endObject() + .endArray() + .endObject()).execute().actionGet(); + client.prepareIndex("lookup2", "type", "2").setSource(XContentFactory.jsonBuilder().startObject() + .startArray("arr") + .startObject().field("term", "2").endObject() + .endArray() + .endObject()).execute().actionGet(); + client.prepareIndex("lookup2", "type", "3").setSource(XContentFactory.jsonBuilder().startObject() + .startArray("arr") + .startObject().field("term", "2").endObject() + .startObject().field("term", "4").endObject() + .endArray() + .endObject()).execute().actionGet(); + + client.prepareIndex("test", "type", "1").setSource("term", "1").execute().actionGet(); + client.prepareIndex("test", "type", "2").setSource("term", "2").execute().actionGet(); + client.prepareIndex("test", "type", "3").setSource("term", "3").execute().actionGet(); + client.prepareIndex("test", "type", "4").setSource("term", "4").execute().actionGet(); + + client.admin().indices().prepareRefresh().execute().actionGet(); + + SearchResponse searchResponse = client.prepareSearch("test") + .setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup").lookupType("type").lookupId("1").lookupPath("terms")) + ).execute().actionGet(); + assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); + assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("3"))); + assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("1"), equalTo("3"))); + + // another search with same parameters... + searchResponse = client.prepareSearch("test") + .setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup").lookupType("type").lookupId("1").lookupPath("terms")) + ).execute().actionGet(); + assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); + assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("3"))); + assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("1"), equalTo("3"))); + + searchResponse = client.prepareSearch("test") + .setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup").lookupType("type").lookupId("2").lookupPath("terms")) + ).execute().actionGet(); + assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l)); + assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("2"))); + + searchResponse = client.prepareSearch("test") + .setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup").lookupType("type").lookupId("3").lookupPath("terms")) + ).execute().actionGet(); + assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); + assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("2"), equalTo("4"))); + assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("2"), equalTo("4"))); + + + searchResponse = client.prepareSearch("test") + .setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup2").lookupType("type").lookupId("1").lookupPath("arr.term")) + ).execute().actionGet(); + assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); + assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("3"))); + assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("1"), equalTo("3"))); + + searchResponse = client.prepareSearch("test") + .setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup2").lookupType("type").lookupId("2").lookupPath("arr.term")) + ).execute().actionGet(); + assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l)); + assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("2"))); + + searchResponse = client.prepareSearch("test") + .setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup2").lookupType("type").lookupId("3").lookupPath("arr.term")) + ).execute().actionGet(); + assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); + assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("2"), equalTo("4"))); + assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("2"), equalTo("4"))); + } }