Query DSL: Terms filter to allow for terms lookup from another document

closes #2674
This commit is contained in:
Shay Banon 2013-02-22 14:04:10 +01:00
parent 6978aa2189
commit 03fdc6aa80
9 changed files with 554 additions and 4 deletions

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -48,12 +49,14 @@ import static com.google.common.collect.Lists.newArrayList;
public class TransportClearIndicesCacheAction extends TransportBroadcastOperationAction<ClearIndicesCacheRequest, ClearIndicesCacheResponse, ShardClearIndicesCacheRequest, ShardClearIndicesCacheResponse> { public class TransportClearIndicesCacheAction extends TransportBroadcastOperationAction<ClearIndicesCacheRequest, ClearIndicesCacheResponse, ShardClearIndicesCacheRequest, ShardClearIndicesCacheResponse> {
private final IndicesService indicesService; private final IndicesService indicesService;
private final IndicesTermsFilterCache termsFilterCache;
@Inject @Inject
public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService) { TransportService transportService, IndicesService indicesService, IndicesTermsFilterCache termsFilterCache) {
super(settings, threadPool, clusterService, transportService); super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService; this.indicesService = indicesService;
this.termsFilterCache = termsFilterCache;
} }
@Override @Override
@ -123,10 +126,12 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
if (request.isFilterCache()) { if (request.isFilterCache()) {
clearedAtLeastOne = true; clearedAtLeastOne = true;
service.cache().filter().clear("api"); service.cache().filter().clear("api");
termsFilterCache.clear("api");
} }
if (request.getFilterKeys() != null && request.getFilterKeys().length > 0) { if (request.getFilterKeys() != null && request.getFilterKeys().length > 0) {
clearedAtLeastOne = true; clearedAtLeastOne = true;
service.cache().filter().clear("api", request.getFilterKeys()); service.cache().filter().clear("api", request.getFilterKeys());
termsFilterCache.clear("api", request.getFilterKeys());
} }
if (request.isFieldDataCache()) { if (request.isFieldDataCache()) {
clearedAtLeastOne = true; clearedAtLeastOne = true;
@ -150,9 +155,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
} }
} else { } else {
service.cache().clear("api"); service.cache().clear("api");
termsFilterCache.clear("api");
} }
} }
service.cache().invalidateCache(); service.cache().invalidateStatsCache();
} }
return new ShardClearIndicesCacheResponse(request.getIndex(), request.getShardId()); return new ShardClearIndicesCacheResponse(request.getIndex(), request.getShardId());
} }

View File

@ -75,7 +75,7 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
} }
} }
public synchronized void invalidateCache() { public synchronized void invalidateStatsCache() {
FilterCache.EntriesStats filterEntriesStats = filterCache.entriesStats(); FilterCache.EntriesStats filterEntriesStats = filterCache.entriesStats();
latestCacheStats = new CacheStats(filterCache.evictions(), filterEntriesStats.sizeInBytes, filterEntriesStats.count, idCache.sizeInBytes()); latestCacheStats = new CacheStats(filterCache.evictions(), filterEntriesStats.sizeInBytes, filterEntriesStats.count, idCache.sizeInBytes());
latestCacheStatsTimestamp = System.currentTimeMillis(); latestCacheStatsTimestamp = System.currentTimeMillis();

View File

@ -195,6 +195,14 @@ public abstract class FilterBuilders {
return new TermsFilterBuilder(name, values); return new TermsFilterBuilder(name, values);
} }
/**
* A terms lookup filter for the provided field name. A lookup terms filter can
* extract the terms to filter by from another doc in an index.
*/
public static TermsLookupFilterBuilder termsLookupFilter(String name) {
return new TermsLookupFilterBuilder(name);
}
/** /**
* A filer for a field based on several terms matching on any of them. * A filer for a field based on several terms matching on any of them.
* *

View File

@ -27,11 +27,16 @@ import org.apache.lucene.search.Filter;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.lucene.search.*; import org.elasticsearch.common.lucene.search.AndFilter;
import org.elasticsearch.common.lucene.search.OrFilter;
import org.elasticsearch.common.lucene.search.TermFilter;
import org.elasticsearch.common.lucene.search.XBooleanFilter;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.cache.filter.support.CacheKeyFilter; import org.elasticsearch.index.cache.filter.support.CacheKeyFilter;
import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.filter.terms.TermsLookup;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -45,6 +50,8 @@ public class TermsFilterParser implements FilterParser {
public static final String NAME = "terms"; public static final String NAME = "terms";
private IndicesTermsFilterCache termsFilterCache;
@Inject @Inject
public TermsFilterParser() { public TermsFilterParser() {
} }
@ -54,6 +61,11 @@ public class TermsFilterParser implements FilterParser {
return new String[]{NAME, "in"}; return new String[]{NAME, "in"};
} }
@Inject(optional = true)
public void setIndicesTermsFilterCache(IndicesTermsFilterCache termsFilterCache) {
this.termsFilterCache = termsFilterCache;
}
@Override @Override
public Filter parse(QueryParseContext parseContext) throws IOException, QueryParsingException { public Filter parse(QueryParseContext parseContext) throws IOException, QueryParsingException {
XContentParser parser = parseContext.parser(); XContentParser parser = parseContext.parser();
@ -62,6 +74,12 @@ public class TermsFilterParser implements FilterParser {
Boolean cache = null; Boolean cache = null;
String filterName = null; String filterName = null;
String currentFieldName = null; String currentFieldName = null;
String lookupIndex = parseContext.index().name();
String lookupType = null;
String lookupId = null;
String lookupPath = null;
CacheKeyFilter.Key cacheKey = null; CacheKeyFilter.Key cacheKey = null;
XContentParser.Token token; XContentParser.Token token;
String execution = "plain"; String execution = "plain";
@ -80,6 +98,34 @@ public class TermsFilterParser implements FilterParser {
} }
terms.add(value); terms.add(value);
} }
} else if (token == XContentParser.Token.START_OBJECT) {
fieldName = currentFieldName;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
lookupIndex = parser.text();
} else if ("type".equals(currentFieldName)) {
lookupType = parser.text();
} else if ("id".equals(currentFieldName)) {
lookupId = parser.text();
} else if ("path".equals(currentFieldName)) {
lookupPath = parser.text();
} else {
throw new QueryParsingException(parseContext.index(), "[terms] filter does not support [" + currentFieldName + "] within lookup element");
}
}
}
if (lookupType == null) {
throw new QueryParsingException(parseContext.index(), "[terms] filter lookup element requires specifying the type");
}
if (lookupId == null) {
throw new QueryParsingException(parseContext.index(), "[terms] filter lookup element requires specifying the id");
}
if (lookupPath == null) {
throw new QueryParsingException(parseContext.index(), "[terms] filter lookup element requires specifying the path");
}
} else if (token.isValue()) { } else if (token.isValue()) {
if ("execution".equals(currentFieldName)) { if ("execution".equals(currentFieldName)) {
execution = parser.text(); execution = parser.text();
@ -113,6 +159,17 @@ public class TermsFilterParser implements FilterParser {
} }
} }
if (lookupId != null) {
// external lookup, use it
TermsLookup termsLookup = new TermsLookup(fieldMapper, lookupIndex, lookupType, lookupId, lookupPath);
if (cacheKey == null) {
cacheKey = new CacheKeyFilter.Key(termsLookup.toString());
}
Filter filter = termsFilterCache.lookupTermsFilter(cacheKey, termsLookup);
filter = parseContext.cacheFilter(filter, null); // cacheKey is passed as null, so we don't double cache the key
return filter;
}
try { try {
Filter filter; Filter filter;
if ("plain".equals(execution)) { if ("plain".equals(execution)) {

View File

@ -0,0 +1,111 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.query;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
/**
* A filer for a field based on several terms matching on any of them.
*/
public class TermsLookupFilterBuilder extends BaseFilterBuilder {
private final String name;
private String lookupIndex;
private String lookupType;
private String lookupId;
private String lookupPath;
private String cacheKey;
private String filterName;
public TermsLookupFilterBuilder(String name) {
this.name = name;
}
/**
* Sets the filter name for the filter that can be used when searching for matched_filters per hit.
*/
public TermsLookupFilterBuilder filterName(String filterName) {
this.filterName = filterName;
return this;
}
/**
* Sets the index name to lookup the terms from.
*/
public TermsLookupFilterBuilder lookupIndex(String lookupIndex) {
this.lookupIndex = lookupIndex;
return this;
}
/**
* Sets the index type to lookup the terms from.
*/
public TermsLookupFilterBuilder lookupType(String lookupType) {
this.lookupType = lookupType;
return this;
}
/**
* Sets the doc id to lookup the terms from.
*/
public TermsLookupFilterBuilder lookupId(String lookupId) {
this.lookupId = lookupId;
return this;
}
/**
* Sets the path within the document to lookup the terms from.
*/
public TermsLookupFilterBuilder lookupPath(String lookupPath) {
this.lookupPath = lookupPath;
return this;
}
public TermsLookupFilterBuilder cacheKey(String cacheKey) {
this.cacheKey = cacheKey;
return this;
}
@Override
public void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(TermsFilterParser.NAME);
builder.startObject(name);
if (lookupIndex != null) {
builder.field("index", lookupIndex);
}
builder.field("type", lookupType);
builder.field("id", lookupId);
builder.field("path", lookupPath);
builder.endObject();
if (filterName != null) {
builder.field("_name", filterName);
}
if (cacheKey != null) {
builder.field("_cache_key", cacheKey);
}
builder.endObject();
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.analysis.IndicesAnalysisModule; import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.indices.query.IndicesQueriesModule;
@ -68,6 +69,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndicesClusterStateService.class).asEagerSingleton(); bind(IndicesClusterStateService.class).asEagerSingleton();
bind(IndexingMemoryController.class).asEagerSingleton(); bind(IndexingMemoryController.class).asEagerSingleton();
bind(IndicesFilterCache.class).asEagerSingleton(); bind(IndicesFilterCache.class).asEagerSingleton();
bind(IndicesTermsFilterCache.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton(); bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton();

View File

@ -0,0 +1,206 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cache.filter.terms;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.Weigher;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.Filter;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.cache.filter.support.CacheKeyFilter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
*/
public class IndicesTermsFilterCache extends AbstractComponent {
private static TermsFilterValue NO_TERMS = new TermsFilterValue(0, null);
private final Client client;
private final Cache<CacheKeyFilter.Key, TermsFilterValue> cache;
@Inject
public IndicesTermsFilterCache(Settings settings, Client client) {
super(settings);
this.client = client;
ByteSizeValue size = componentSettings.getAsBytesSize("size", new ByteSizeValue(10, ByteSizeUnit.MB));
TimeValue expireAfterWrite = componentSettings.getAsTime("expire_after_write", null);
TimeValue expireAfterAccess = componentSettings.getAsTime("expire_after_access", null);
CacheBuilder<CacheKeyFilter.Key, TermsFilterValue> builder = CacheBuilder.newBuilder()
.maximumWeight(size.bytes())
.weigher(new TermsFilterValueWeigher());
if (expireAfterAccess != null) {
builder.expireAfterAccess(expireAfterAccess.millis(), TimeUnit.MILLISECONDS);
}
if (expireAfterWrite != null) {
builder.expireAfterWrite(expireAfterWrite.millis(), TimeUnit.MILLISECONDS);
}
this.cache = builder.build();
}
/**
* An external lookup terms filter. Note, already implements the {@link CacheKeyFilter} so no need
* to double cache key it.
*/
public Filter lookupTermsFilter(final CacheKeyFilter.Key cacheKey, final TermsLookup lookup) {
return new LookupTermsFilter(lookup, cacheKey, this);
}
@Nullable
private Filter termsFilter(final CacheKeyFilter.Key cacheKey, final TermsLookup lookup) throws RuntimeException {
try {
return cache.get(cacheKey, new Callable<TermsFilterValue>() {
@Override
public TermsFilterValue call() throws Exception {
GetResponse getResponse = client.get(new GetRequest(lookup.getIndex(), lookup.getType(), lookup.getId()).setPreference("_local")).actionGet();
if (!getResponse.isExists()) {
return NO_TERMS;
}
List<Object> values = XContentMapValues.extractRawValues(lookup.getPath(), getResponse.getSourceAsMap());
if (values.isEmpty()) {
return NO_TERMS;
}
Filter filter = lookup.getFieldMapper().termsFilter(values, null);
return new TermsFilterValue(estimateSizeInBytes(values), filter);
}
}).filter;
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new ElasticSearchException(e.getMessage(), e.getCause());
}
}
long estimateSizeInBytes(List<Object> terms) {
long size = 8;
for (Object term : terms) {
if (term instanceof BytesRef) {
size += ((BytesRef) term).length;
} else if (term instanceof String) {
size += ((String) term).length() / 2;
} else {
size += 4;
}
}
return size;
}
public void clear(String reason) {
cache.invalidateAll();
}
public void clear(String reason, String[] keys) {
for (String key : keys) {
cache.invalidate(new CacheKeyFilter.Key(key));
}
}
static class LookupTermsFilter extends Filter implements CacheKeyFilter {
private final TermsLookup lookup;
private final CacheKeyFilter.Key cacheKey;
private final IndicesTermsFilterCache cache;
LookupTermsFilter(TermsLookup lookup, CacheKeyFilter.Key cacheKey, IndicesTermsFilterCache cache) {
this.lookup = lookup;
this.cacheKey = cacheKey;
this.cache = cache;
}
@Override
public DocIdSet getDocIdSet(AtomicReaderContext context, Bits acceptDocs) throws IOException {
Filter filter = cache.termsFilter(cacheKey, lookup);
if (filter == null) return null;
return filter.getDocIdSet(context, acceptDocs);
}
@Override
public Key cacheKey() {
return this.cacheKey;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LookupTermsFilter that = (LookupTermsFilter) o;
if (!cacheKey.equals(that.cacheKey)) return false;
return true;
}
@Override
public int hashCode() {
return cacheKey.hashCode();
}
@Override
public String toString() {
return "terms(" + lookup.toString() + ")";
}
}
static class TermsFilterValueWeigher implements Weigher<CacheKeyFilter.Key, TermsFilterValue> {
@Override
public int weigh(CacheKeyFilter.Key key, TermsFilterValue value) {
return (int) (key.bytes().length + value.sizeInBytes);
}
}
// TODO: if TermsFilter exposed sizeInBytes, we won't need this wrapper
static class TermsFilterValue {
public final long sizeInBytes;
public final Filter filter;
TermsFilterValue(long sizeInBytes, Filter filter) {
this.sizeInBytes = sizeInBytes;
this.filter = filter;
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cache.filter.terms;
import org.elasticsearch.index.cache.filter.support.CacheKeyFilter;
import org.elasticsearch.index.mapper.FieldMapper;
/**
*/
public class TermsLookup {
private final FieldMapper fieldMapper;
private final String index;
private final String type;
private final String id;
private final String path;
public TermsLookup(FieldMapper fieldMapper, String index, String type, String id, String path) {
// TODO: do we want to intern index, type and path?
this.fieldMapper = fieldMapper;
this.index = index;
this.type = type;
this.id = id;
this.path = path;
}
public FieldMapper getFieldMapper() {
return fieldMapper;
}
public String getIndex() {
return index;
}
public String getType() {
return type;
}
public String getId() {
return id;
}
public String getPath() {
return path;
}
public String toString() {
return fieldMapper.names().fullName() + ":" + index + "/" + type + "/" + id + "/" + path;
}
}

View File

@ -23,11 +23,13 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.*; import org.elasticsearch.index.query.*;
import org.elasticsearch.index.query.CommonTermsQueryBuilder.Operator; import org.elasticsearch.index.query.CommonTermsQueryBuilder.Operator;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.facet.FacetBuilders; import org.elasticsearch.search.facet.FacetBuilders;
import org.elasticsearch.test.integration.AbstractNodesTests; import org.elasticsearch.test.integration.AbstractNodesTests;
import org.hamcrest.Matchers;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@ -704,4 +706,94 @@ public class SimpleQueryTests extends AbstractNodesTests {
assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
} }
@Test
public void testTermsLookupFilter() throws Exception {
client.admin().indices().prepareDelete().execute().actionGet();
client.prepareIndex("lookup", "type", "1").setSource("terms", new String[]{"1", "3"}).execute().actionGet();
client.prepareIndex("lookup", "type", "2").setSource("terms", new String[]{"2"}).execute().actionGet();
client.prepareIndex("lookup", "type", "3").setSource("terms", new String[]{"2", "4"}).execute().actionGet();
client.prepareIndex("lookup2", "type", "1").setSource(XContentFactory.jsonBuilder().startObject()
.startArray("arr")
.startObject().field("term", "1").endObject()
.startObject().field("term", "3").endObject()
.endArray()
.endObject()).execute().actionGet();
client.prepareIndex("lookup2", "type", "2").setSource(XContentFactory.jsonBuilder().startObject()
.startArray("arr")
.startObject().field("term", "2").endObject()
.endArray()
.endObject()).execute().actionGet();
client.prepareIndex("lookup2", "type", "3").setSource(XContentFactory.jsonBuilder().startObject()
.startArray("arr")
.startObject().field("term", "2").endObject()
.startObject().field("term", "4").endObject()
.endArray()
.endObject()).execute().actionGet();
client.prepareIndex("test", "type", "1").setSource("term", "1").execute().actionGet();
client.prepareIndex("test", "type", "2").setSource("term", "2").execute().actionGet();
client.prepareIndex("test", "type", "3").setSource("term", "3").execute().actionGet();
client.prepareIndex("test", "type", "4").setSource("term", "4").execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
SearchResponse searchResponse = client.prepareSearch("test")
.setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup").lookupType("type").lookupId("1").lookupPath("terms"))
).execute().actionGet();
assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l));
assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("3")));
assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("1"), equalTo("3")));
// another search with same parameters...
searchResponse = client.prepareSearch("test")
.setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup").lookupType("type").lookupId("1").lookupPath("terms"))
).execute().actionGet();
assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l));
assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("3")));
assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("1"), equalTo("3")));
searchResponse = client.prepareSearch("test")
.setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup").lookupType("type").lookupId("2").lookupPath("terms"))
).execute().actionGet();
assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l));
assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("2")));
searchResponse = client.prepareSearch("test")
.setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup").lookupType("type").lookupId("3").lookupPath("terms"))
).execute().actionGet();
assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l));
assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("2"), equalTo("4")));
assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("2"), equalTo("4")));
searchResponse = client.prepareSearch("test")
.setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup2").lookupType("type").lookupId("1").lookupPath("arr.term"))
).execute().actionGet();
assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l));
assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("3")));
assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("1"), equalTo("3")));
searchResponse = client.prepareSearch("test")
.setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup2").lookupType("type").lookupId("2").lookupPath("arr.term"))
).execute().actionGet();
assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l));
assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("1"), equalTo("2")));
searchResponse = client.prepareSearch("test")
.setQuery(filteredQuery(matchAllQuery(), termsLookupFilter("term").lookupIndex("lookup2").lookupType("type").lookupId("3").lookupPath("arr.term"))
).execute().actionGet();
assertThat("Failures " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l));
assertThat(searchResponse.getHits().getHits()[0].getId(), anyOf(equalTo("2"), equalTo("4")));
assertThat(searchResponse.getHits().getHits()[1].getId(), anyOf(equalTo("2"), equalTo("4")));
}
} }