Ensure query resources are fetched asynchronously during rewrite (#25791)

The `QueryRewriteContext` used to provide a client object that can
be used to fetch geo-shapes, terms or documents for percolation. Unfortunately
all client calls used to be blocking calls which can have significant impact on the
rewrite phase since it occupies an entire search thread until the resource is
received. In the case that the index the resource is fetched from isn't on the local
node this can have significant impact on query throughput.

Note: this doesn't fix MLT since it fetches stuff in doQuery which is a different beast. Yet, it is a huge step in the right direction
This commit is contained in:
Simon Willnauer 2017-07-20 15:37:50 +02:00 committed by GitHub
parent 3e4bc027eb
commit 5e629cfba0
36 changed files with 994 additions and 179 deletions

View File

@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.function.BiFunction;
import java.util.function.Supplier;
@ -317,23 +318,57 @@ public class SearchTransportService extends AbstractComponent {
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
() -> TransportResponse.Empty.INSTANCE);
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
DfsSearchResult result = searchService.executeDfsPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});
}
});
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new);
@ -389,8 +424,8 @@ public class SearchTransportService extends AbstractComponent {
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
// this is super cheap and should not hit thread-pool rejections
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
false, true, new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
boolean canMatch = searchService.canMatch(request);

View File

@ -144,7 +144,7 @@ public class AliasValidator extends AbstractComponent {
private static void validateAliasFilter(XContentParser parser, QueryShardContext queryShardContext) throws IOException {
QueryBuilder parseInnerQueryBuilder = parseInnerQueryBuilder(parser);
QueryBuilder queryBuilder = Rewriteable.rewrite(parseInnerQueryBuilder, queryShardContext);
QueryBuilder queryBuilder = Rewriteable.rewrite(parseInnerQueryBuilder, queryShardContext, true);
queryBuilder.toFilter(queryShardContext);
}
}

View File

@ -28,6 +28,8 @@ import org.apache.lucene.spatial.prefix.PrefixTreeStrategy;
import org.apache.lucene.spatial.prefix.RecursivePrefixTreeStrategy;
import org.apache.lucene.spatial.query.SpatialArgs;
import org.apache.lucene.spatial.query.SpatialOperation;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
@ -48,6 +50,7 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;
/**
* {@link QueryBuilder} that builds a GeoShape Query
@ -77,6 +80,7 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
private final String fieldName;
private final ShapeBuilder shape;
private final Supplier<ShapeBuilder> supplier;
private SpatialStrategy strategy;
@ -133,6 +137,15 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
this.shape = shape;
this.indexedShapeId = indexedShapeId;
this.indexedShapeType = indexedShapeType;
this.supplier = null;
}
private GeoShapeQueryBuilder(String fieldName, Supplier<ShapeBuilder> supplier, String indexedShapeId, String indexedShapeType) {
this.fieldName = fieldName;
this.shape = null;
this.supplier = supplier;
this.indexedShapeId = indexedShapeId;
this.indexedShapeType = indexedShapeType;
}
/**
@ -155,10 +168,14 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
relation = ShapeRelation.readFromStream(in);
strategy = in.readOptionalWriteable(SpatialStrategy::readFromStream);
ignoreUnmapped = in.readBoolean();
supplier = null;
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
if (supplier != null) {
throw new IllegalStateException("supplier must be null, can't serialize suppliers, missing a rewriteAndFetch?");
}
out.writeString(fieldName);
boolean hasShape = shape != null;
out.writeBoolean(hasShape);
@ -312,7 +329,7 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
@Override
protected Query doToQuery(QueryShardContext context) {
if (shape == null) {
if (shape == null || supplier != null) {
throw new UnsupportedOperationException("query must be rewritten first");
}
final ShapeBuilder shapeToQuery = shape;
@ -361,20 +378,21 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
* @param path
* Name or path of the field in the Shape Document where the
* Shape itself is located
* @return Shape with the given ID
* @throws IOException
* Can be thrown while parsing the Shape Document and extracting
* the Shape
*/
private ShapeBuilder fetch(Client client, GetRequest getRequest, String path) throws IOException {
private void fetch(Client client, GetRequest getRequest, String path, ActionListener<ShapeBuilder> listener) {
if (ShapesAvailability.JTS_AVAILABLE == false) {
throw new IllegalStateException("JTS not available");
}
getRequest.preference("_local");
getRequest.operationThreaded(false);
GetResponse response = client.get(getRequest).actionGet();
client.get(getRequest, new ActionListener<GetResponse>(){
@Override
public void onResponse(GetResponse response) {
try {
if (!response.isExists()) {
throw new IllegalArgumentException("Shape with ID [" + getRequest.id() + "] in type [" + getRequest.type() + "] not found");
throw new IllegalArgumentException("Shape with ID [" + getRequest.id() + "] in type [" + getRequest.type()
+ "] not found");
}
if (response.isSourceEmpty()) {
throw new IllegalArgumentException("Shape with ID [" + getRequest.id() + "] in type [" + getRequest.type() +
@ -392,7 +410,7 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
if (pathElements[currentPathSlot].equals(parser.currentName())) {
parser.nextToken();
if (++currentPathSlot == pathElements.length) {
return ShapeBuilder.parse(parser);
listener.onResponse(ShapeBuilder.parse(parser));
}
} else {
parser.nextToken();
@ -402,6 +420,17 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
}
throw new IllegalStateException("Shape with name [" + getRequest.id() + "] found but missing " + path + " field");
}
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
public static SpatialArgs getArgs(ShapeBuilder shape, ShapeRelation relation) {
@ -573,6 +602,7 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
&& Objects.equals(indexedShapeType, other.indexedShapeType)
&& Objects.equals(relation, other.relation)
&& Objects.equals(shape, other.shape)
&& Objects.equals(supplier, other.supplier)
&& Objects.equals(strategy, other.strategy)
&& Objects.equals(ignoreUnmapped, other.ignoreUnmapped);
}
@ -580,7 +610,7 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
@Override
protected int doHashCode() {
return Objects.hash(fieldName, indexedShapeId, indexedShapeIndex,
indexedShapePath, indexedShapeType, relation, shape, strategy, ignoreUnmapped);
indexedShapePath, indexedShapeType, relation, shape, strategy, ignoreUnmapped, supplier);
}
@Override
@ -589,11 +619,21 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
}
@Override
protected QueryBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException {
if (this.shape == null) {
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
if (supplier != null) {
return supplier.get() == null ? this : new GeoShapeQueryBuilder(this.fieldName, supplier.get()).relation(relation).strategy
(strategy);
} else if (this.shape == null) {
SetOnce<ShapeBuilder> supplier = new SetOnce<>();
queryRewriteContext.registerAsyncAction((client, listener) -> {
GetRequest getRequest = new GetRequest(indexedShapeIndex, indexedShapeType, indexedShapeId);
ShapeBuilder shape = fetch(queryShardContext.getClient(), getRequest, indexedShapePath);
return new GeoShapeQueryBuilder(this.fieldName, shape).relation(relation).strategy(strategy);
fetch(client, getRequest, indexedShapePath, ActionListener.wrap(builder-> {
supplier.set(builder);
listener.onResponse(null);
}, listener::onFailure));
});
return new GeoShapeQueryBuilder(this.fieldName, supplier::get, this.indexedShapeId, this.indexedShapeType).relation(relation)
.strategy(strategy);
}
return this;
}

View File

@ -18,20 +18,26 @@
*/
package org.elasticsearch.index.query;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
/**
* Context object used to rewrite {@link QueryBuilder} instances into simplified version.
*/
public class QueryRewriteContext {
private final NamedXContentRegistry xContentRegistry;
protected final Client client;
protected final LongSupplier nowInMillis;
private final List<BiConsumer<Client, ActionListener<?>>> asyncActions = new ArrayList<>();
public QueryRewriteContext(NamedXContentRegistry xContentRegistry, Client client, LongSupplier nowInMillis) {
this.xContentRegistry = xContentRegistry;
@ -39,13 +45,6 @@ public class QueryRewriteContext {
this.nowInMillis = nowInMillis;
}
/**
* Returns a clients to fetch resources from local or remove nodes.
*/
public Client getClient() {
return client;
}
/**
* The registry used to build new {@link XContentParser}s. Contains registered named parsers needed to parse the query.
*/
@ -53,6 +52,9 @@ public class QueryRewriteContext {
return xContentRegistry;
}
/**
* Returns the time in milliseconds that is shared across all resources involved. Even across shards and nodes.
*/
public long nowInMillis() {
return nowInMillis.getAsLong();
}
@ -63,4 +65,54 @@ public class QueryRewriteContext {
public QueryShardContext convertToShardContext() {
return null;
}
/**
* Registers an async action that must be executed before the next rewrite round in order to make progress.
* This should be used if a rewriteabel needs to fetch some external resources in order to be executed ie. a document
* from an index.
*/
public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncAction) {
asyncActions.add(asyncAction);
}
/**
* Returns <code>true</code> if there are any registered async actions.
*/
public boolean hasAsyncActions() {
return asyncActions.isEmpty() == false;
}
/**
* Executes all registered async actions and notifies the listener once it's done. The value that is passed to the listener is always
* <code>null</code>. The list of registered actions is cleared once this method returns.
*/
public void executeAsyncActions(ActionListener listener) {
if (asyncActions.isEmpty()) {
listener.onResponse(null);
} else {
CountDown countDown = new CountDown(asyncActions.size());
ActionListener<?> internalListener = new ActionListener() {
@Override
public void onResponse(Object o) {
if (countDown.countDown()) {
listener.onResponse(null);
}
}
@Override
public void onFailure(Exception e) {
if (countDown.fastForward()) {
listener.onFailure(e);
}
}
};
// make a copy to prevent concurrent modification exception
List<BiConsumer<Client, ActionListener<?>>> biConsumers = new ArrayList<>(asyncActions);
asyncActions.clear();
for (BiConsumer<Client, ActionListener<?>> action : biConsumers) {
action.accept(client, internalListener);
}
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParsingException;
@ -58,6 +59,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import static java.util.Collections.unmodifiableMap;
@ -304,7 +306,7 @@ public class QueryShardContext extends QueryRewriteContext {
private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction<QueryBuilder, Query, IOException> filterOrQuery) {
reset();
try {
QueryBuilder rewriteQuery = Rewriteable.rewrite(queryBuilder, this);
QueryBuilder rewriteQuery = Rewriteable.rewrite(queryBuilder, this, true);
return new ParsedQuery(filterOrQuery.apply(rewriteQuery), copyNamedQueries());
} catch(QueryShardException | ParsingException e ) {
throw e;
@ -327,7 +329,7 @@ public class QueryShardContext extends QueryRewriteContext {
/**
* if this method is called the query context will throw exception if methods are accessed
* that could yield different results across executions like {@link #getTemplateBytes(Script)}
* that could yield different results across executions like {@link #getClient()}
*/
public final void freezeContext() {
this.frozen.set(Boolean.TRUE);
@ -351,10 +353,16 @@ public class QueryShardContext extends QueryRewriteContext {
}
}
public final String getTemplateBytes(Script template) {
@Override
public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncAction) {
failIfFrozen();
TemplateScript compiledTemplate = scriptService.compile(template, TemplateScript.CONTEXT).newInstance(template.getParams());
return compiledTemplate.execute();
super.registerAsyncAction(asyncAction);
}
@Override
public void executeAsyncActions(ActionListener listener) {
failIfFrozen();
super.executeAsyncActions(listener);
}
/**
@ -377,10 +385,9 @@ public class QueryShardContext extends QueryRewriteContext {
return super.nowInMillis();
}
@Override
public Client getClient() {
failIfFrozen(); // we somebody uses a terms filter with lookup for instance can't be cached...
return super.getClient();
return client;
}
public QueryBuilder parseInnerQueryBuilder(XContentParser parser) throws IOException {

View File

@ -18,13 +18,19 @@
*/
package org.elasticsearch.index.query;
import org.elasticsearch.action.ActionListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* A basic interface for rewriteable classes.
*/
public interface Rewriteable<T> {
int MAX_REWRITE_ROUNDS = 16;
/**
* Rewrites this instance based on the provided context. The returned
* objects will be the same instance as this if no changes during the
@ -36,14 +42,97 @@ public interface Rewriteable<T> {
* Rewrites the given {@link Rewriteable} into its primitive form. Rewriteables that for instance fetch resources from remote hosts or
* can simplify / optimize itself should do their heavy lifting during {@link #rewrite(QueryRewriteContext)}. This method
* rewrites the rewriteable until it doesn't change anymore.
* @param original the original rewriteable to rewrite
* @param context the rewrite context to use
* @throws IOException if an {@link IOException} occurs
*/
static <T extends Rewriteable<T>> T rewrite(T original, QueryRewriteContext context) throws IOException {
return rewrite(original, context, false);
}
/**
* Rewrites the given {@link Rewriteable} into its primitive form. Rewriteables that for instance fetch resources from remote hosts or
* can simplify / optimize itself should do their heavy lifting during
* {@link #rewriteAndFetch(Rewriteable, QueryRewriteContext, ActionListener)} (QueryRewriteContext)}. This method rewrites the
* rewriteable until it doesn't change anymore.
* @param original the original rewriteable to rewrite
* @param context the rewrite context to use
* @param assertNoAsyncTasks if <code>true</code> the rewrite will fail if there are any pending async tasks on the context after the
* rewrite. See {@link QueryRewriteContext#executeAsyncActions(ActionListener)} for detals
* @throws IOException if an {@link IOException} occurs
*/
static <T extends Rewriteable<T>> T rewrite(T original, QueryRewriteContext context, boolean assertNoAsyncTasks) throws IOException {
T builder = original;
int iteration = 0;
for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder;
rewrittenBuilder = builder.rewrite(context)) {
if (assertNoAsyncTasks && context.hasAsyncActions()) {
throw new IllegalStateException("async actions are left after rewrite");
}
builder = rewrittenBuilder;
if (iteration++ >= MAX_REWRITE_ROUNDS) {
// this is some protection against user provided queries if they don't obey the contract of rewrite we allow 16 rounds
// and then we fail to prevent infinite loops
throw new IllegalStateException("too many rewrite rounds, rewriteable might return new objects even if they are not " +
"rewritten");
}
}
return builder;
}
/**
* Rewrites the given rewriteable and fetches pending async tasks for each round before rewriting again.
*/
static <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T> rewriteResponse) {
rewriteAndFetch(original, context, rewriteResponse, 0);
}
/**
* Rewrites the given rewriteable and fetches pending async tasks for each round before rewriting again.
*/
static <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T>
rewriteResponse, int iteration) {
T builder = original;
try {
for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder;
rewrittenBuilder = builder.rewrite(context)) {
builder = rewrittenBuilder;
if (iteration++ >= MAX_REWRITE_ROUNDS) {
// this is some protection against user provided queries if they don't obey the contract of rewrite we allow 16 rounds
// and then we fail to prevent infinite loops
throw new IllegalStateException("too many rewrite rounds, rewriteable might return new objects even if they are not " +
"rewritten");
}
if (context.hasAsyncActions()) {
T finalBuilder = builder;
final int currentIterationNumber = iteration;
context.executeAsyncActions(ActionListener.wrap(n -> rewriteAndFetch(finalBuilder, context, rewriteResponse,
currentIterationNumber), rewriteResponse::onFailure));
return;
}
}
rewriteResponse.onResponse(builder);
} catch (IOException ex) {
rewriteResponse.onFailure(ex);
}
}
/**
* Rewrites each element of the list until it doesn't change and returns a new list iff there is at least one element of the list that
* changed during it's rewrite. Otherwise the given list instance is returned unchanged.
*/
static <T extends Rewriteable<T>> List<T> rewrite(List<T> rewritables, QueryRewriteContext context) throws IOException {
List<T> list = rewritables;
boolean changed = false;
if (rewritables != null && rewritables.isEmpty() == false) {
list = new ArrayList<>(rewritables.size());
for (T instance : rewritables) {
T rewrite = rewrite(instance, context);
if (instance != rewrite) {
changed = true;
}
list.add(rewrite);
}
}
return changed ? list : rewritables;
}
}

View File

@ -23,6 +23,8 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
@ -49,6 +51,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -61,6 +64,7 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
private final String fieldName;
private final List<?> values;
private final TermsLookup termsLookup;
private final Supplier<List<?>> supplier;
public TermsQueryBuilder(String fieldName, TermsLookup termsLookup) {
this(fieldName, null, termsLookup);
@ -82,6 +86,7 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
this.fieldName = fieldName;
this.values = values == null ? null : convert(values);
this.termsLookup = termsLookup;
this.supplier = null;
}
/**
@ -161,6 +166,14 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
this.fieldName = fieldName;
this.values = convert(values);
this.termsLookup = null;
this.supplier = null;
}
private TermsQueryBuilder(String fieldName, Supplier<List<?>> supplier) {
this.fieldName = fieldName;
this.values = null;
this.termsLookup = null;
this.supplier = supplier;
}
/**
@ -171,10 +184,14 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
fieldName = in.readString();
termsLookup = in.readOptionalWriteable(TermsLookup::new);
values = (List<?>) in.readGenericValue();
this.supplier = null;
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
if (supplier != null) {
throw new IllegalStateException("supplier must be null, can't serialize suppliers, missing a rewriteAndFetch?");
}
out.writeString(fieldName);
out.writeOptionalWriteable(termsLookup);
out.writeGenericValue(values);
@ -393,7 +410,7 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
@Override
protected Query doToQuery(QueryShardContext context) throws IOException {
if (termsLookup != null) {
if (termsLookup != null || supplier != null) {
throw new UnsupportedOperationException("query must be rewritten first");
}
if (values == null || values.isEmpty()) {
@ -412,37 +429,55 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
}
}
private List<Object> fetch(TermsLookup termsLookup, Client client) {
List<Object> terms = new ArrayList<>();
private void fetch(TermsLookup termsLookup, Client client, ActionListener<List<Object>> actionListener) {
GetRequest getRequest = new GetRequest(termsLookup.index(), termsLookup.type(), termsLookup.id())
.preference("_local").routing(termsLookup.routing());
final GetResponse getResponse = client.get(getRequest).actionGet();
client.get(getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
List<Object> terms = new ArrayList<>();
if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists
List<Object> extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap());
terms.addAll(extractedValues);
}
return terms;
actionListener.onResponse(terms);
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
});
}
@Override
protected int doHashCode() {
return Objects.hash(fieldName, values, termsLookup);
return Objects.hash(fieldName, values, termsLookup, supplier);
}
@Override
protected boolean doEquals(TermsQueryBuilder other) {
return Objects.equals(fieldName, other.fieldName) &&
Objects.equals(values, other.values) &&
Objects.equals(termsLookup, other.termsLookup);
Objects.equals(termsLookup, other.termsLookup) &&
Objects.equals(supplier, other.supplier);
}
@Override
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) {
if (this.termsLookup != null) {
List<Object> values = fetch(termsLookup, queryRewriteContext.getClient());
return new TermsQueryBuilder(this.fieldName, values);
if (supplier != null) {
return supplier.get() == null ? this : new TermsQueryBuilder(this.fieldName, supplier.get());
} else if (this.termsLookup != null) {
SetOnce<List<?>> supplier = new SetOnce<>();
queryRewriteContext.registerAsyncAction((client, listener) -> {
fetch(termsLookup, client, ActionListener.wrap(list -> {
supplier.set(list);
listener.onResponse(null);
}, listener::onFailure));
});
return new TermsQueryBuilder(this.fieldName, supplier::get);
}
return this;
}
}

View File

@ -85,6 +85,7 @@ import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
@ -127,6 +128,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -1237,4 +1239,8 @@ public class IndicesService extends AbstractLifecycleComponent
return new AliasFilter(ShardSearchRequest.parseAliasFilter(filterParser, indexMetaData, aliases), aliases);
}
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
return new QueryRewriteContext(xContentRegistry, client, nowInMillis);
}
}

View File

@ -239,7 +239,7 @@ final class DefaultSearchContext extends SearchContext {
// initialize the filtering alias based on the provided filters
try {
final QueryBuilder queryBuilder = request.filteringAliases();
final QueryBuilder queryBuilder = request.getAliasFilter().getQueryBuilder();
aliasFilter = queryBuilder == null ? null : queryBuilder.toFilter(queryShardContext);
} catch (IOException e) {
throw new UncheckedIOException(e);

View File

@ -24,6 +24,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
@ -36,6 +37,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.index.Index;
@ -46,7 +48,9 @@ import org.elasticsearch.index.query.InnerHitContextBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchNoneQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
@ -227,7 +231,25 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
keepAliveReaper.cancel();
}
public DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException {
public void executeDfsPhase(ShardSearchRequest request, SearchTask task, ActionListener<SearchPhaseResult> listener) {
rewriteShardRequest(request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest request) {
try {
listener.onResponse(executeDfsPhase(request, task));
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException {
final SearchContext context = createAndPutContext(request);
context.incRef();
try {
@ -258,7 +280,25 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener<SearchPhaseResult> listener) {
rewriteShardRequest(request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest request) {
try {
listener.onResponse(executeQueryPhase(request, task));
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
@ -519,6 +559,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, @Nullable Engine.Searcher searcher)
throws IOException {
return createSearchContext(request, timeout, searcher, true);
}
private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, @Nullable Engine.Searcher searcher,
boolean assertAsyncActions)
throws IOException {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
@ -533,7 +578,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
// during rewrite and normalized / evaluate templates etc.
request.rewrite(new QueryShardContext(searchContext.getQueryShardContext()));
QueryShardContext context = new QueryShardContext(searchContext.getQueryShardContext());
Rewriteable.rewrite(request.getRewriteable(), context, assertAsyncActions);
assert searchContext.getQueryShardContext().isCachable();
success = true;
} finally {
@ -855,7 +901,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
*/
public boolean canMatch(ShardSearchRequest request) throws IOException {
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
try (DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout, null)) {
try (DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout, null, false)) {
SearchSourceBuilder source = context.request().source();
if (canRewriteToMatchNone(source)) {
QueryBuilder queryBuilder = source.query();
@ -883,4 +929,24 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
return true;
}
/*
* Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously
* The action listener is guaranteed to be executed on the search thread-pool
*/
private void rewriteShardRequest(ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis),
ActionListener.wrap(r ->
threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
listener.onResponse(request);
}
}), listener::onFailure));
}
}

View File

@ -196,7 +196,7 @@ public class AdjacencyMatrixAggregationBuilder extends AbstractAggregationBuilde
List<KeyedFilter> rewrittenFilters = new ArrayList<>(filters.size());
for (KeyedFilter kf : filters) {
rewrittenFilters.add(new KeyedFilter(kf.key(), Rewriteable.rewrite(kf.filter(), context.getQueryShardContext())));
rewrittenFilters.add(new KeyedFilter(kf.key(), Rewriteable.rewrite(kf.filter(), context.getQueryShardContext(), true)));
}
return new AdjacencyMatrixAggregatorFactory(name, rewrittenFilters, separator, context, parent,

View File

@ -37,7 +37,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchExtBuilder;
@ -874,7 +873,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
*/
@Override
public SearchSourceBuilder rewrite(QueryRewriteContext context) throws IOException {
assert (this.equals(shallowCopy(queryBuilder, postQueryBuilder, aggregations, sliceBuilder)));
assert (this.equals(shallowCopy(queryBuilder, postQueryBuilder, aggregations, sliceBuilder, sorts, rescoreBuilders,
highlightBuilder)));
QueryBuilder queryBuilder = null;
if (this.queryBuilder != null) {
queryBuilder = this.queryBuilder.rewrite(context);
@ -887,10 +887,19 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
if (this.aggregations != null) {
aggregations = this.aggregations.rewrite(context);
}
List<SortBuilder<?>> sorts = Rewriteable.rewrite(this.sorts, context);
List<RescoreBuilder> rescoreBuilders = Rewriteable.rewrite(this.rescoreBuilders, context);
HighlightBuilder highlightBuilder = this.highlightBuilder;
if (highlightBuilder != null) {
highlightBuilder = this.highlightBuilder.rewrite(context);
}
boolean rewritten = queryBuilder != this.queryBuilder || postQueryBuilder != this.postQueryBuilder
|| aggregations != this.aggregations;
|| aggregations != this.aggregations || rescoreBuilders != this.rescoreBuilders || sorts != this.sorts ||
this.highlightBuilder != highlightBuilder;
if (rewritten) {
return shallowCopy(queryBuilder, postQueryBuilder, aggregations, sliceBuilder);
return shallowCopy(queryBuilder, postQueryBuilder, aggregations, this.sliceBuilder, sorts, rescoreBuilders, highlightBuilder);
}
return this;
}
@ -899,7 +908,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
* Create a shallow copy of this builder with a new slice configuration.
*/
public SearchSourceBuilder copyWithNewSlice(SliceBuilder slice) {
return shallowCopy(queryBuilder, postQueryBuilder, aggregations, slice);
return shallowCopy(queryBuilder, postQueryBuilder, aggregations, slice, sorts, rescoreBuilders, highlightBuilder);
}
/**
@ -907,7 +916,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
* {@link #rewrite(QueryRewriteContext)} and {@link #copyWithNewSlice(SliceBuilder)}.
*/
private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder postQueryBuilder,
AggregatorFactories.Builder aggregations, SliceBuilder slice) {
AggregatorFactories.Builder aggregations, SliceBuilder slice, List<SortBuilder<?>> sorts,
List<RescoreBuilder> rescoreBuilders, HighlightBuilder highlightBuilder) {
SearchSourceBuilder rewrittenBuilder = new SearchSourceBuilder();
rewrittenBuilder.aggregations = aggregations;
rewrittenBuilder.explain = explain;

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder.BoundaryScannerType;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder.Order;
@ -49,7 +50,8 @@ import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQuery
* This abstract class holds parameters shared by {@link HighlightBuilder} and {@link HighlightBuilder.Field}
* and provides the common setters, equality, hashCode calculation and common serialization
*/
public abstract class AbstractHighlighterBuilder<HB extends AbstractHighlighterBuilder<?>> extends ToXContentToBytes implements Writeable {
public abstract class AbstractHighlighterBuilder<HB extends AbstractHighlighterBuilder<?>> extends ToXContentToBytes implements
Writeable, Rewriteable<HB> {
public static final ParseField PRE_TAGS_FIELD = new ParseField("pre_tags");
public static final ParseField POST_TAGS_FIELD = new ParseField("post_tags");
public static final ParseField FIELDS_FIELD = new ParseField("fields");
@ -112,6 +114,27 @@ public abstract class AbstractHighlighterBuilder<HB extends AbstractHighlighterB
public AbstractHighlighterBuilder() {
}
protected AbstractHighlighterBuilder(AbstractHighlighterBuilder template, QueryBuilder queryBuilder) {
preTags = template.preTags;
postTags = template.postTags;
fragmentSize = template.fragmentSize;
numOfFragments = template.numOfFragments;
highlighterType = template.highlighterType;
fragmenter = template.fragmenter;
highlightQuery = queryBuilder;
order = template.order;
highlightFilter = template.highlightFilter;
forceSource = template.forceSource;
boundaryScannerType = template.boundaryScannerType;
boundaryMaxScan = template.boundaryMaxScan;
boundaryChars = template.boundaryChars;
boundaryScannerLocale = template.boundaryScannerLocale;
noMatchSize = template.noMatchSize;
phraseLimit = template.phraseLimit;
options = template.options;
requireFieldMatch = template.requireFieldMatch;
}
/**
* Read from a stream.
*/

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.ObjectParser.NamedObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.fetch.subphase.highlight.SearchContextHighlight.FieldOptions;
@ -99,13 +100,21 @@ public class HighlightBuilder extends AbstractHighlighterBuilder<HighlightBuilde
.boundaryMaxScan(SimpleBoundaryScanner.DEFAULT_MAX_SCAN).boundaryChars(SimpleBoundaryScanner.DEFAULT_BOUNDARY_CHARS)
.boundaryScannerLocale(Locale.ROOT).noMatchSize(DEFAULT_NO_MATCH_SIZE).phraseLimit(DEFAULT_PHRASE_LIMIT).build();
private final List<Field> fields = new ArrayList<>();
private final List<Field> fields;
private String encoder;
private boolean useExplicitFieldOrder = false;
public HighlightBuilder() {
fields = new ArrayList<>();
}
private HighlightBuilder(HighlightBuilder template, QueryBuilder highlightQuery, List<Field> fields) {
super(template, highlightQuery);
this.encoder = template.encoder;
this.useExplicitFieldOrder = template.useExplicitFieldOrder;
this.fields = fields;
}
/**
@ -115,20 +124,15 @@ public class HighlightBuilder extends AbstractHighlighterBuilder<HighlightBuilde
super(in);
encoder(in.readOptionalString());
useExplicitFieldOrder(in.readBoolean());
int fields = in.readVInt();
for (int i = 0; i < fields; i++) {
field(new Field(in));
}
this.fields = in.readList(Field::new);
assert this.equals(new HighlightBuilder(this, highlightQuery, fields)) : "copy constructor is broken";
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalString(encoder);
out.writeBoolean(useExplicitFieldOrder);
out.writeVInt(fields.size());
for (int i = 0; i < fields.size(); i++) {
fields.get(i).writeTo(out);
}
out.writeList(fields);
}
/**
@ -358,7 +362,7 @@ public class HighlightBuilder extends AbstractHighlighterBuilder<HighlightBuilde
targetOptionsBuilder.options(highlighterBuilder.options);
}
if (highlighterBuilder.highlightQuery != null) {
targetOptionsBuilder.highlightQuery(Rewriteable.rewrite(highlighterBuilder.highlightQuery, context).toQuery(context));
targetOptionsBuilder.highlightQuery(highlighterBuilder.highlightQuery.toQuery(context));
}
}
@ -416,6 +420,20 @@ public class HighlightBuilder extends AbstractHighlighterBuilder<HighlightBuilde
Objects.equals(fields, other.fields);
}
@Override
public HighlightBuilder rewrite(QueryRewriteContext ctx) throws IOException {
QueryBuilder highlightQuery = this.highlightQuery;
if (highlightQuery != null) {
highlightQuery = this.highlightQuery.rewrite(ctx);
}
List<Field> fields = Rewriteable.rewrite(this.fields, ctx);
if (highlightQuery == this.highlightQuery && fields == this.fields) {
return this;
}
return new HighlightBuilder(this, highlightQuery, fields);
}
public static class Field extends AbstractHighlighterBuilder<Field> {
static final NamedObjectParser<Field, Void> PARSER;
static {
@ -436,6 +454,13 @@ public class HighlightBuilder extends AbstractHighlighterBuilder<HighlightBuilde
this.name = name;
}
private Field(Field template, QueryBuilder builder) {
super(template, builder);
name = template.name;
fragmentOffset = template.fragmentOffset;
matchedFields = template.matchedFields;
}
/**
* Read from a stream.
*/
@ -444,6 +469,7 @@ public class HighlightBuilder extends AbstractHighlighterBuilder<HighlightBuilde
name = in.readString();
fragmentOffset(in.readVInt());
matchedFields(in.readOptionalStringArray());
assert this.equals(new Field(this, highlightQuery)) : "copy constructor is broken";
}
@Override
@ -498,6 +524,17 @@ public class HighlightBuilder extends AbstractHighlighterBuilder<HighlightBuilde
Objects.equals(fragmentOffset, other.fragmentOffset) &&
Arrays.equals(matchedFields, other.matchedFields);
}
@Override
public Field rewrite(QueryRewriteContext ctx) throws IOException {
if (highlightQuery != null) {
QueryBuilder rewrite = highlightQuery.rewrite(ctx);
if (rewrite != highlightQuery) {
return new Field(this, rewrite);
}
}
return this;
}
}
public enum Order implements Writeable {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
@ -123,6 +124,16 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
return source;
}
@Override
public AliasFilter getAliasFilter() {
return aliasFilter;
}
@Override
public void setAliasFilter(AliasFilter aliasFilter) {
this.aliasFilter = aliasFilter;
}
@Override
public void source(SearchSourceBuilder source) {
this.source = source;
@ -138,11 +149,6 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
return searchType;
}
@Override
public QueryBuilder filteringAliases() {
return aliasFilter.getQueryBuilder();
}
@Override
public float indexBoost() {
return indexBoost;
@ -237,14 +243,36 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
return new BytesArray(out.bytes().toBytesRef(), true);// do a deep copy
}
@Override
public void rewrite(QueryRewriteContext context) throws IOException {
aliasFilter = Rewriteable.rewrite(aliasFilter, context);
source = source == null ? null : Rewriteable.rewrite(source, context);
}
@Override
public String getClusterAlias() {
return clusterAlias;
}
@Override
public Rewriteable<Rewriteable> getRewriteable() {
return new RequestRewritable(this);
}
static class RequestRewritable implements Rewriteable<Rewriteable> {
final ShardSearchRequest request;
RequestRewritable(ShardSearchRequest request) {
this.request = request;
}
@Override
public Rewriteable rewrite(QueryRewriteContext ctx) throws IOException {
SearchSourceBuilder newSource = request.source() == null ? null : Rewriteable.rewrite(request.source(), ctx);
AliasFilter newAliasFilter = Rewriteable.rewrite(request.getAliasFilter(), ctx);
if (newSource == request.source() && newAliasFilter == request.getAliasFilter()) {
return this;
} else {
request.source(newSource);
request.setAliasFilter(newAliasFilter);
return new RequestRewritable(request);
}
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.AliasFilterParsingException;
import org.elasticsearch.indices.InvalidAliasNameException;
@ -52,14 +53,16 @@ public interface ShardSearchRequest {
SearchSourceBuilder source();
AliasFilter getAliasFilter();
void setAliasFilter(AliasFilter filter);
void source(SearchSourceBuilder source);
int numberOfShards();
SearchType searchType();
QueryBuilder filteringAliases();
float indexBoost();
long nowInMillis();
@ -84,12 +87,6 @@ public interface ShardSearchRequest {
*/
BytesReference cacheKey() throws IOException;
/**
* Rewrites this request into its primitive form. e.g. by rewriting the
* QueryBuilder.
*/
void rewrite(QueryRewriteContext context) throws IOException;
/**
* Returns the filter associated with listed filtering aliases.
* <p>
@ -148,4 +145,6 @@ public interface ShardSearchRequest {
*/
String getClusterAlias();
Rewriteable<Rewriteable> getRewriteable();
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -96,6 +97,16 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
return shardSearchLocalRequest.source();
}
@Override
public AliasFilter getAliasFilter() {
return shardSearchLocalRequest.getAliasFilter();
}
@Override
public void setAliasFilter(AliasFilter filter) {
shardSearchLocalRequest.setAliasFilter(filter);
}
@Override
public void source(SearchSourceBuilder source) {
shardSearchLocalRequest.source(source);
@ -111,11 +122,6 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
return shardSearchLocalRequest.searchType();
}
@Override
public QueryBuilder filteringAliases() {
return shardSearchLocalRequest.filteringAliases();
}
@Override
public float indexBoost() {
return shardSearchLocalRequest.indexBoost();
@ -167,11 +173,6 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
return shardSearchLocalRequest.isProfile();
}
@Override
public void rewrite(QueryRewriteContext context) throws IOException {
shardSearchLocalRequest.rewrite(context);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
@ -187,4 +188,9 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
public String getClusterAlias() {
return shardSearchLocalRequest.getClusterAlias();
}
@Override
public Rewriteable<Rewriteable> getRewriteable() {
return shardSearchLocalRequest.getRewriteable();
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.rescore.QueryRescorer.QueryRescoreContext;
@ -171,7 +172,8 @@ public class QueryRescorerBuilder extends RescoreBuilder<QueryRescorerBuilder> {
public QueryRescoreContext build(QueryShardContext context) throws IOException {
org.elasticsearch.search.rescore.QueryRescorer rescorer = new org.elasticsearch.search.rescore.QueryRescorer();
QueryRescoreContext queryRescoreContext = new QueryRescoreContext(rescorer);
queryRescoreContext.setQuery(Rewriteable.rewrite(this.queryBuilder, context).toQuery(context));
// query is rewritten at this point already
queryRescoreContext.setQuery(queryBuilder.toQuery(context));
queryRescoreContext.setQueryWeight(this.queryWeight);
queryRescoreContext.setRescoreQueryWeight(this.rescoreQueryWeight);
queryRescoreContext.setScoreMode(this.scoreMode);
@ -244,4 +246,13 @@ public class QueryRescorerBuilder extends RescoreBuilder<QueryRescorerBuilder> {
this.scoreMode = scoreMode;
}
}
@Override
public RescoreBuilder rewrite(QueryRewriteContext ctx) throws IOException {
QueryBuilder rewrite = queryBuilder.rewrite(ctx);
if (rewrite == queryBuilder) {
return this;
}
return new QueryRescorerBuilder(rewrite);
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.rescore.QueryRescorer.QueryRescoreContext;
import java.io.IOException;
@ -37,7 +38,8 @@ import java.util.Objects;
/**
* The abstract base builder for instances of {@link RescoreBuilder}.
*/
public abstract class RescoreBuilder<RB extends RescoreBuilder<RB>> extends ToXContentToBytes implements NamedWriteable {
public abstract class RescoreBuilder<RB extends RescoreBuilder<RB>> extends ToXContentToBytes implements NamedWriteable,
Rewriteable<RescoreBuilder<RB>> {
protected Integer windowSize;

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.N
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.search.DocValueFormat;
@ -334,4 +335,16 @@ public class FieldSortBuilder extends SortBuilder<FieldSortBuilder> {
PARSER.declareString((b, v) -> b.sortMode(SortMode.fromString(v)), SORT_MODE);
PARSER.declareObject(FieldSortBuilder::setNestedFilter, (p, c) -> SortBuilder.parseNestedFilter(p), NESTED_FILTER_FIELD);
}
@Override
public SortBuilder rewrite(QueryRewriteContext ctx) throws IOException {
if (nestedFilter == null) {
return this;
}
QueryBuilder rewrite = nestedFilter.rewrite(ctx);
if (nestedFilter == rewrite) {
return this;
}
return new FieldSortBuilder(this).setNestedFilter(rewrite);
}
}

View File

@ -48,6 +48,7 @@ import org.elasticsearch.index.fielddata.plain.AbstractLatLonPointDVIndexFieldDa
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.GeoValidationMethod;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
@ -604,4 +605,16 @@ public class GeoDistanceSortBuilder extends SortBuilder<GeoDistanceSortBuilder>
}
}
@Override
public SortBuilder rewrite(QueryRewriteContext ctx) throws IOException {
if (nestedFilter == null) {
return this;
}
QueryBuilder rewrite = nestedFilter.rewrite(ctx);
if (nestedFilter == rewrite) {
return this;
}
return new GeoDistanceSortBuilder(this).setNestedFilter(rewrite);
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
@ -121,4 +122,9 @@ public class ScoreSortBuilder extends SortBuilder<ScoreSortBuilder> {
public String getWriteableName() {
return NAME;
}
@Override
public SortBuilder rewrite(QueryRewriteContext ctx) throws IOException {
return this;
}
}

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource;
import org.elasticsearch.index.fielddata.fieldcomparator.DoubleValuesComparatorSource;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.script.Script;
@ -376,4 +377,16 @@ public class ScriptSortBuilder extends SortBuilder<ScriptSortBuilder> {
return name().toLowerCase(Locale.ROOT);
}
}
@Override
public SortBuilder rewrite(QueryRewriteContext ctx) throws IOException {
if (nestedFilter == null) {
return this;
}
QueryBuilder rewrite = nestedFilter.rewrite(ctx);
if (nestedFilter == rewrite) {
return this;
}
return new ScriptSortBuilder(this).setNestedFilter(rewrite);
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.index.query.Rewriteable;
@ -48,7 +49,8 @@ import java.util.Optional;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
public abstract class SortBuilder<T extends SortBuilder<T>> extends ToXContentToBytes implements NamedWriteable {
public abstract class SortBuilder<T extends SortBuilder<T>> extends ToXContentToBytes implements NamedWriteable,
Rewriteable<SortBuilder<?>>{
protected SortOrder order = SortOrder.ASC;
@ -190,7 +192,8 @@ public abstract class SortBuilder<T extends SortBuilder<T>> extends ToXContentTo
Query innerDocumentsQuery;
if (nestedFilter != null) {
context.nestedScope().nextLevel(nestedObjectMapper);
innerDocumentsQuery = Rewriteable.rewrite(nestedFilter, context).toFilter(context);
assert nestedFilter == Rewriteable.rewrite(nestedFilter, context) : "nested filter is not rewritten";
innerDocumentsQuery = nestedFilter.toFilter(context);
context.nestedScope().previousLevel();
} else {
innerDocumentsQuery = nestedObjectMapper.nestedTypeFilter();

View File

@ -112,7 +112,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
ShardSearchTransportRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator);
assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions());
assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices());
assertEquals(new MatchAllQueryBuilder(), shardSearchTransportRequest.filteringAliases());
assertEquals(new MatchAllQueryBuilder(), shardSearchTransportRequest.getAliasFilter().getQueryBuilder());
assertEquals(2.0f, shardSearchTransportRequest.indexBoost(), 0.0f);
}
}

View File

@ -30,9 +30,11 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.test.ESSingleNodeTestCase;
@ -70,6 +72,16 @@ public class SearchSlowLogTests extends ESSingleNodeTestCase {
return searchSourceBuilder;
}
@Override
public AliasFilter getAliasFilter() {
return new AliasFilter(QueryBuilders.matchAllQuery(), "foo");
}
@Override
public void setAliasFilter(AliasFilter filter) {
}
@Override
public void source(SearchSourceBuilder source) {
searchSourceBuilder = source;
@ -85,11 +97,6 @@ public class SearchSlowLogTests extends ESSingleNodeTestCase {
return null;
}
@Override
public QueryBuilder filteringAliases() {
return null;
}
@Override
public float indexBoost() {
return 1.0f;
@ -126,7 +133,8 @@ public class SearchSlowLogTests extends ESSingleNodeTestCase {
}
@Override
public void rewrite(QueryRewriteContext context) throws IOException {
public Rewriteable getRewriteable() {
return null;
}
@Override

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.geo.SpatialStrategy;
import org.elasticsearch.common.geo.builders.EnvelopeBuilder;
import org.elasticsearch.common.geo.builders.ShapeBuilder;
import org.elasticsearch.common.geo.builders.ShapeBuilders;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -61,12 +62,14 @@ public class GeoShapeQueryBuilderTests extends AbstractQueryTestCase<GeoShapeQue
@Override
protected GeoShapeQueryBuilder doCreateTestQueryBuilder() {
return doCreateTestQueryBuilder(randomBoolean());
}
private GeoShapeQueryBuilder doCreateTestQueryBuilder(boolean indexedShape) {
ShapeType shapeType = ShapeType.randomType(random());
ShapeBuilder shape = RandomShapeGenerator.createShapeWithin(random(), null, shapeType);
GeoShapeQueryBuilder builder;
clearShapeFields();
if (randomBoolean()) {
if (indexedShape == false) {
builder = new GeoShapeQueryBuilder(GEO_SHAPE_FIELD_NAME, shape);
} else {
indexedShapeToReturn = shape;
@ -234,7 +237,7 @@ public class GeoShapeQueryBuilderTests extends AbstractQueryTestCase<GeoShapeQue
UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> query.toQuery(createShardContext()));
assertEquals("query must be rewritten first", e.getMessage());
QueryBuilder rewrite = query.rewrite(createShardContext());
QueryBuilder rewrite = rewriteAndFetch(query, createShardContext());
GeoShapeQueryBuilder geoShapeQueryBuilder = new GeoShapeQueryBuilder(GEO_SHAPE_FIELD_NAME, indexedShapeToReturn);
geoShapeQueryBuilder.strategy(query.strategy());
geoShapeQueryBuilder.relation(query.relation());
@ -255,4 +258,13 @@ public class GeoShapeQueryBuilderTests extends AbstractQueryTestCase<GeoShapeQue
QueryShardException e = expectThrows(QueryShardException.class, () -> failingQueryBuilder.toQuery(createShardContext()));
assertThat(e.getMessage(), containsString("failed to find geo_shape field [unmapped]"));
}
public void testSerializationFailsUnlessFetched() throws IOException {
QueryBuilder builder = doCreateTestQueryBuilder(true);
QueryBuilder queryBuilder = Rewriteable.rewrite(builder, createShardContext());
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> queryBuilder.writeTo(new BytesStreamOutput(10)));
assertEquals(ise.getMessage(), "supplier must be null, can't serialize suppliers, missing a rewriteAndFetch?");
builder = rewriteAndFetch(builder, createShardContext());
builder.writeTo(new BytesStreamOutput(10));
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.query;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
public class RewriteableTests extends ESTestCase {
public void testRewrite() throws IOException {
QueryRewriteContext context = new QueryRewriteContext(null, null, null);
TestRewriteable rewrite = Rewriteable.rewrite(new TestRewriteable(randomIntBetween(0, Rewriteable.MAX_REWRITE_ROUNDS)), context,
randomBoolean());
assertEquals(rewrite.numRewrites, 0);
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> Rewriteable.rewrite(new TestRewriteable(Rewriteable
.MAX_REWRITE_ROUNDS+1),
context));
assertEquals(ise.getMessage(), "too many rewrite rounds, rewriteable might return new objects even if they are not rewritten");
ise = expectThrows(IllegalStateException.class, () -> Rewriteable.rewrite(new TestRewriteable(Rewriteable
.MAX_REWRITE_ROUNDS + 1, true),
context, true));
assertEquals(ise.getMessage(), "async actions are left after rewrite");
}
public void testRewriteAndFetch() throws ExecutionException, InterruptedException {
QueryRewriteContext context = new QueryRewriteContext(null, null, null);
PlainActionFuture<TestRewriteable> future = new PlainActionFuture<>();
Rewriteable.rewriteAndFetch(new TestRewriteable(randomIntBetween(0, Rewriteable.MAX_REWRITE_ROUNDS), true), context, future);
TestRewriteable rewrite = future.get();
assertEquals(rewrite.numRewrites, 0);
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> {
PlainActionFuture<TestRewriteable> f = new PlainActionFuture<>();
Rewriteable.rewriteAndFetch(new TestRewriteable(Rewriteable.MAX_REWRITE_ROUNDS + 1, true), context, f);
try {
f.get();
} catch (ExecutionException e) {
throw e.getCause(); // we expect the underlying exception here
}
});
assertEquals(ise.getMessage(), "too many rewrite rounds, rewriteable might return new objects even if they are not rewritten");
}
public void testRewriteList() throws IOException {
QueryRewriteContext context = new QueryRewriteContext(null, null, null);
List<TestRewriteable> rewriteableList = new ArrayList();
int numInstances = randomIntBetween(1, 10);
rewriteableList.add(new TestRewriteable(randomIntBetween(1, Rewriteable.MAX_REWRITE_ROUNDS)));
for (int i = 0; i < numInstances; i++) {
rewriteableList.add(new TestRewriteable(randomIntBetween(0, Rewriteable.MAX_REWRITE_ROUNDS)));
}
List<TestRewriteable> rewrittenList = Rewriteable.rewrite(rewriteableList, context);
assertNotSame(rewrittenList, rewriteableList);
for (TestRewriteable instance : rewrittenList) {
assertEquals(0, instance.numRewrites);
}
rewriteableList = Collections.emptyList();
assertSame(rewriteableList, Rewriteable.rewrite(rewriteableList, context));
rewriteableList = null;
assertNull(Rewriteable.rewrite(rewriteableList, context));
rewriteableList = new ArrayList<>();
for (int i = 0; i < numInstances; i++) {
rewriteableList.add(new TestRewriteable(0));
}
assertSame(rewriteableList, Rewriteable.rewrite(rewriteableList, context));
}
private static final class TestRewriteable implements Rewriteable<TestRewriteable> {
final int numRewrites;
final boolean fetch;
final Supplier<Boolean> supplier;
TestRewriteable(int numRewrites) {
this(numRewrites, false, null);
}
TestRewriteable(int numRewrites, boolean fetch) {
this(numRewrites, fetch, null);
}
TestRewriteable(int numRewrites, boolean fetch, Supplier supplier) {
this.numRewrites = numRewrites;
this.fetch = fetch;
this.supplier = supplier;
}
@Override
public TestRewriteable rewrite(QueryRewriteContext ctx) throws IOException {
if (numRewrites == 0) {
return this;
}
if (supplier != null && supplier.get() == null) {
return this;
}
if (supplier != null) {
assertTrue(supplier.get());
}
if (fetch) {
SetOnce<Boolean> setOnce = new SetOnce<>();
ctx.registerAsyncAction((c, l) -> {
Runnable r = () -> {
setOnce.set(Boolean.TRUE);
l.onResponse(null);
};
if (randomBoolean()) {
new Thread(r).start();
} else {
r.run();
}
});
return new TestRewriteable(numRewrites-1, fetch, setOnce::get);
}
return new TestRewriteable(numRewrites-1, fetch, null);
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.get.GetResult;
@ -254,7 +255,7 @@ public class TermsQueryBuilderTests extends AbstractQueryTestCase<TermsQueryBuil
UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class,
() -> termsQueryBuilder.toQuery(createShardContext()));
assertEquals("query must be rewritten first", e.getMessage());
assertEquals(termsQueryBuilder.rewrite(createShardContext()), new TermsQueryBuilder(STRING_FIELD_NAME,
assertEquals(rewriteAndFetch(termsQueryBuilder, createShardContext()), new TermsQueryBuilder(STRING_FIELD_NAME,
randomTerms.stream().filter(x -> x != null).collect(Collectors.toList()))); // terms lookup removes null values
}
@ -275,6 +276,15 @@ public class TermsQueryBuilderTests extends AbstractQueryTestCase<TermsQueryBuil
return super.isCachable(queryBuilder);
}
public void testSerializationFailsUnlessFetched() throws IOException {
QueryBuilder builder = new TermsQueryBuilder(STRING_FIELD_NAME, randomTermsLookup());
QueryBuilder termsQueryBuilder = Rewriteable.rewrite(builder, createShardContext());
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> termsQueryBuilder.writeTo(new BytesStreamOutput(10)));
assertEquals(ise.getMessage(), "supplier must be null, can't serialize suppliers, missing a rewriteAndFetch?");
builder = rewriteAndFetch(builder, createShardContext());
builder.writeTo(new BytesStreamOutput(10));
}
public void testConversion() {
List<Object> list = Arrays.asList();
assertSame(Collections.emptyList(), TermsQueryBuilder.convert(list));

View File

@ -33,8 +33,12 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchNoneQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.RandomQueryBuilder;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.AbstractSearchTestCase;
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
@ -47,6 +51,7 @@ import java.io.IOException;
import java.util.Map;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
@ -135,11 +140,73 @@ public class SearchSourceBuilderTests extends AbstractSearchTestCase {
}
}
public void testParseAndRewrite() throws IOException {
String restContent = "{\n" +
" \"query\": {\n" +
" \"bool\": {\n" +
" \"must\": {\n" +
" \"match_none\": {}\n" +
" }\n" +
" }\n" +
" },\n" +
" \"rescore\": {\n" +
" \"window_size\": 50,\n" +
" \"query\": {\n" +
" \"rescore_query\": {\n" +
" \"bool\": {\n" +
" \"must\": {\n" +
" \"match_none\": {}\n" +
" }\n" +
" }\n" +
" },\n" +
" \"rescore_query_weight\": 10\n" +
" }\n" +
" },\n" +
" \"highlight\": {\n" +
" \"order\": \"score\",\n" +
" \"fields\": {\n" +
" \"content\": {\n" +
" \"fragment_size\": 150,\n" +
" \"number_of_fragments\": 3,\n" +
" \"highlight_query\": {\n" +
" \"bool\": {\n" +
" \"must\": {\n" +
" \"match_none\": {}\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser);
assertThat(searchSourceBuilder.query(), instanceOf(BoolQueryBuilder.class));
assertThat(searchSourceBuilder.rescores().get(0), instanceOf(QueryRescorerBuilder.class));
assertThat(((QueryRescorerBuilder)searchSourceBuilder.rescores().get(0)).getRescoreQuery(),
instanceOf(BoolQueryBuilder.class));
assertThat(searchSourceBuilder.highlighter().fields().get(0).highlightQuery(), instanceOf(BoolQueryBuilder.class));
searchSourceBuilder = rewrite(searchSourceBuilder);
assertThat(searchSourceBuilder.query(), instanceOf(MatchNoneQueryBuilder.class));
assertThat(searchSourceBuilder.rescores().get(0), instanceOf(QueryRescorerBuilder.class));
assertThat(((QueryRescorerBuilder)searchSourceBuilder.rescores().get(0)).getRescoreQuery(),
instanceOf(MatchNoneQueryBuilder.class));
assertThat(searchSourceBuilder.highlighter().fields().get(0).highlightQuery(), instanceOf(MatchNoneQueryBuilder.class));
assertEquals(searchSourceBuilder.highlighter().fields().get(0).fragmentSize().intValue(), 150);
assertEquals(searchSourceBuilder.highlighter().fields().get(0).numOfFragments().intValue(), 3);
}
}
public void testParseSort() throws IOException {
{
String restContent = " { \"sort\": \"foo\"}";
try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser);
searchSourceBuilder = rewrite(searchSourceBuilder);
assertEquals(1, searchSourceBuilder.sorts().size());
assertEquals(new FieldSortBuilder("foo"), searchSourceBuilder.sorts().get(0));
}
@ -155,6 +222,7 @@ public class SearchSourceBuilderTests extends AbstractSearchTestCase {
" ]}";
try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser);
searchSourceBuilder = rewrite(searchSourceBuilder);
assertEquals(5, searchSourceBuilder.sorts().size());
assertEquals(new FieldSortBuilder("post_date"), searchSourceBuilder.sorts().get(0));
assertEquals(new FieldSortBuilder("user"), searchSourceBuilder.sorts().get(1));
@ -178,6 +246,7 @@ public class SearchSourceBuilderTests extends AbstractSearchTestCase {
"}\n";
try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser);
searchSourceBuilder = rewrite(searchSourceBuilder);
assertEquals(1, searchSourceBuilder.aggregations().count());
}
}
@ -193,6 +262,7 @@ public class SearchSourceBuilderTests extends AbstractSearchTestCase {
"}\n";
try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser);
searchSourceBuilder = rewrite(searchSourceBuilder);
assertEquals(1, searchSourceBuilder.aggregations().count());
}
}
@ -218,6 +288,7 @@ public class SearchSourceBuilderTests extends AbstractSearchTestCase {
"}\n";
try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser);
searchSourceBuilder = rewrite(searchSourceBuilder);
assertEquals(1, searchSourceBuilder.rescores().size());
assertEquals(new QueryRescorerBuilder(QueryBuilders.matchQuery("content", "baz")).windowSize(50),
searchSourceBuilder.rescores().get(0));
@ -240,6 +311,7 @@ public class SearchSourceBuilderTests extends AbstractSearchTestCase {
"}\n";
try (XContentParser parser = createParser(JsonXContent.jsonXContent, restContent)) {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(parser);
searchSourceBuilder = rewrite(searchSourceBuilder);
assertEquals(1, searchSourceBuilder.rescores().size());
assertEquals(new QueryRescorerBuilder(QueryBuilders.matchQuery("content", "baz")).windowSize(50),
searchSourceBuilder.rescores().get(0));
@ -374,4 +446,9 @@ public class SearchSourceBuilderTests extends AbstractSearchTestCase {
assertEquals(expectedErrorMessage, e.getMessage());
}
}
private SearchSourceBuilder rewrite(SearchSourceBuilder searchSourceBuilder) throws IOException {
return Rewriteable.rewrite(searchSourceBuilder, new QueryRewriteContext(xContentRegistry(), null, Long
.valueOf(1)::longValue));
}
}

View File

@ -64,7 +64,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
ShardSearchTransportRequest deserializedRequest = new ShardSearchTransportRequest();
deserializedRequest.readFrom(in);
assertEquals(deserializedRequest.scroll(), shardSearchTransportRequest.scroll());
assertEquals(deserializedRequest.filteringAliases(), shardSearchTransportRequest.filteringAliases());
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
assertArrayEquals(deserializedRequest.indices(), shardSearchTransportRequest.indices());
assertArrayEquals(deserializedRequest.types(), shardSearchTransportRequest.types());
assertEquals(deserializedRequest.indicesOptions(), shardSearchTransportRequest.indicesOptions());
@ -76,7 +76,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
assertEquals(deserializedRequest.numberOfShards(), shardSearchTransportRequest.numberOfShards());
assertEquals(deserializedRequest.cacheKey(), shardSearchTransportRequest.cacheKey());
assertNotSame(deserializedRequest, shardSearchTransportRequest);
assertEquals(deserializedRequest.filteringAliases(), shardSearchTransportRequest.filteringAliases());
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f);
}
}

View File

@ -40,11 +40,12 @@ import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
@ -77,6 +78,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
import static org.elasticsearch.index.mapper.SourceToParse.source;
import static org.elasticsearch.percolator.PercolatorFieldMapper.parseQuery;
@ -108,6 +110,7 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
private final String indexedDocumentRouting;
private final String indexedDocumentPreference;
private final Long indexedDocumentVersion;
private final Supplier<BytesReference> documentSupplier;
/**
* @deprecated use {@link #PercolateQueryBuilder(String, BytesReference, XContentType)} with the document content
@ -147,6 +150,24 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
indexedDocumentRouting = null;
indexedDocumentPreference = null;
indexedDocumentVersion = null;
this.documentSupplier = null;
}
private PercolateQueryBuilder(String field, String documentType, Supplier<BytesReference> documentSupplier) {
if (field == null) {
throw new IllegalArgumentException("[field] is a required argument");
}
this.field = field;
this.documentType = documentType;
this.document = null;
this.documentXContentType = null;
this.documentSupplier = documentSupplier;
indexedDocumentIndex = null;
indexedDocumentType = null;
indexedDocumentId = null;
indexedDocumentRouting = null;
indexedDocumentPreference = null;
indexedDocumentVersion = null;
}
/**
@ -192,6 +213,7 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
this.indexedDocumentVersion = indexedDocumentVersion;
this.document = null;
this.documentXContentType = null;
this.documentSupplier = null;
}
/**
@ -225,10 +247,14 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
} else {
documentXContentType = null;
}
documentSupplier = null;
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
if (documentSupplier != null) {
throw new IllegalStateException("supplier must be null, can't serialize suppliers, missing a rewriteAndFetch?");
}
out.writeString(field);
if (out.getVersion().before(Version.V_6_0_0_beta1)) {
out.writeString(documentType);
@ -369,12 +395,14 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
&& Objects.equals(document, other.document)
&& Objects.equals(indexedDocumentIndex, other.indexedDocumentIndex)
&& Objects.equals(indexedDocumentType, other.indexedDocumentType)
&& Objects.equals(documentSupplier, other.documentSupplier)
&& Objects.equals(indexedDocumentId, other.indexedDocumentId);
}
@Override
protected int doHashCode() {
return Objects.hash(field, documentType, document, indexedDocumentIndex, indexedDocumentType, indexedDocumentId);
return Objects.hash(field, documentType, document, indexedDocumentIndex, indexedDocumentType, indexedDocumentId, documentSupplier);
}
@Override
@ -386,8 +414,14 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
protected QueryBuilder doRewrite(QueryRewriteContext queryShardContext) {
if (document != null) {
return this;
} else if (documentSupplier != null) {
final BytesReference source = documentSupplier.get();
if (source == null) {
return this; // not executed yet
} else {
return new PercolateQueryBuilder(field, documentType, source, XContentFactory.xContentType(source));
}
}
GetRequest getRequest = new GetRequest(indexedDocumentIndex, indexedDocumentType, indexedDocumentId);
getRequest.preference("_local");
getRequest.routing(indexedDocumentRouting);
@ -395,7 +429,9 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
if (indexedDocumentVersion != null) {
getRequest.version(indexedDocumentVersion);
}
GetResponse getResponse = queryShardContext.getClient().get(getRequest).actionGet();
SetOnce<BytesReference> documentSupplier = new SetOnce<>();
queryShardContext.registerAsyncAction((client, listener) -> {
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse.isExists() == false) {
throw new ResourceNotFoundException(
"indexed document [{}/{}/{}] couldn't be found", indexedDocumentIndex, indexedDocumentType, indexedDocumentId
@ -403,11 +439,15 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
}
if(getResponse.isSourceEmpty()) {
throw new IllegalArgumentException(
"indexed document [" + indexedDocumentIndex + "/" + indexedDocumentType + "/" + indexedDocumentId + "] source disabled"
"indexed document [" + indexedDocumentIndex + "/" + indexedDocumentType + "/" + indexedDocumentId
+ "] source disabled"
);
}
final BytesReference source = getResponse.getSourceAsBytesRef();
return new PercolateQueryBuilder(field, documentType, source, XContentFactory.xContentType(source));
documentSupplier.set(getResponse.getSourceAsBytesRef());
listener.onResponse(null);
}, listener::onFailure));
});
return new PercolateQueryBuilder(field, documentType, documentSupplier::get);
}
@Override
@ -415,7 +455,7 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
// Call nowInMillis() so that this query becomes un-cacheable since we
// can't be sure that it doesn't use now or scripts
context.nowInMillis();
if (indexedDocumentIndex != null || indexedDocumentType != null || indexedDocumentId != null) {
if (indexedDocumentIndex != null || indexedDocumentType != null || indexedDocumentId != null || documentSupplier != null) {
throw new IllegalStateException("query builder must be rewritten first");
}

View File

@ -37,6 +37,7 @@ import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Setting;
@ -283,7 +284,9 @@ public class PercolatorFieldMapper extends FieldMapper {
);
verifyQuery(queryBuilder);
// Fetching of terms, shapes and indexed scripts happen during this rewrite:
queryBuilder = Rewriteable.rewrite(queryBuilder, queryShardContext);
PlainActionFuture<QueryBuilder> future = new PlainActionFuture<>();
Rewriteable.rewriteAndFetch(queryBuilder, queryShardContext, future);
queryBuilder = future.actionGet();
try (XContentBuilder builder = XContentFactory.contentBuilder(QUERY_BUILDER_CONTENT_TYPE)) {
queryBuilder.toXContent(builder, new MapParams(Collections.emptyMap()));

View File

@ -45,6 +45,7 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.internal.SearchContext;
@ -163,7 +164,7 @@ public class PercolateQueryBuilderTests extends AbstractQueryTestCase<PercolateQ
PercolateQueryBuilder pqb = doCreateTestQueryBuilder(true);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> pqb.toQuery(createShardContext()));
assertThat(e.getMessage(), equalTo("query builder must be rewritten first"));
QueryBuilder rewrite = pqb.rewrite(createShardContext());
QueryBuilder rewrite = rewriteAndFetch(pqb, createShardContext());
PercolateQueryBuilder geoShapeQueryBuilder =
new PercolateQueryBuilder(pqb.getField(), pqb.getDocumentType(), documentSource, XContentType.JSON);
assertEquals(geoShapeQueryBuilder, rewrite);
@ -172,7 +173,8 @@ public class PercolateQueryBuilderTests extends AbstractQueryTestCase<PercolateQ
public void testIndexedDocumentDoesNotExist() throws IOException {
indexedDocumentExists = false;
PercolateQueryBuilder pqb = doCreateTestQueryBuilder(true);
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> pqb.rewrite(createShardContext()));
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> rewriteAndFetch(pqb,
createShardContext()));
String expectedString = "indexed document [" + indexedDocumentIndex + "/" + indexedDocumentType + "/" +
indexedDocumentId + "] couldn't be found";
assertThat(e.getMessage() , equalTo(expectedString));
@ -287,4 +289,13 @@ public class PercolateQueryBuilderTests extends AbstractQueryTestCase<PercolateQ
protected boolean builderGeneratesCacheableQueries() {
return false;
}
public void testSerializationFailsUnlessFetched() throws IOException {
QueryBuilder builder = doCreateTestQueryBuilder(true);
QueryBuilder queryBuilder = Rewriteable.rewrite(builder, createShardContext());
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> queryBuilder.writeTo(new BytesStreamOutput(10)));
assertEquals(ise.getMessage(), "supplier must be null, can't serialize suppliers, missing a rewriteAndFetch?");
builder = rewriteAndFetch(builder, createShardContext());
builder.writeTo(new BytesStreamOutput(10));
}
}

View File

@ -35,6 +35,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TermRangeQuery;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
@ -58,8 +59,10 @@ import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.DisMaxQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.RandomScoreFunctionBuilder;
import org.elasticsearch.indices.TermsLookup;
@ -308,8 +311,13 @@ public class PercolatorFieldMapperTests extends ESSingleNodeTestCase {
.endObject().bytes(),
XContentType.JSON));
BytesRef qbSource = doc.rootDoc().getFields(fieldType.queryBuilderField.name())[0].binaryValue();
assertQueryBuilder(qbSource, queryBuilder.rewrite(indexService.newQueryShardContext(
randomInt(20), null, () -> { throw new UnsupportedOperationException(); })));
QueryShardContext shardContext = indexService.newQueryShardContext(
randomInt(20), null, () -> {
throw new UnsupportedOperationException();
});
PlainActionFuture<QueryBuilder> future = new PlainActionFuture<>();
Rewriteable.rewriteAndFetch(queryBuilder, shardContext, future);
assertQueryBuilder(qbSource, future.get());
}

View File

@ -29,6 +29,7 @@ import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
@ -116,6 +117,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Stream;
@ -619,7 +621,7 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
}
private QueryBuilder rewriteQuery(QB queryBuilder, QueryRewriteContext rewriteContext) throws IOException {
QueryBuilder rewritten = Rewriteable.rewrite(queryBuilder, rewriteContext);
QueryBuilder rewritten = rewriteAndFetch(queryBuilder, rewriteContext);
// extra safety to fail fast - serialize the rewritten version to ensure it's serializable.
assertSerialization(rewritten);
return rewritten;
@ -873,14 +875,17 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.equals(Client.class.getMethod("get", GetRequest.class))) {
return new PlainActionFuture<GetResponse>() {
@Override
public GetResponse get() throws InterruptedException, ExecutionException {
return delegate.executeGet((GetRequest) args[0]);
if (method.equals(Client.class.getMethod("get", GetRequest.class, ActionListener.class))){
GetResponse getResponse = delegate.executeGet((GetRequest) args[0]);
ActionListener<GetResponse> listener = (ActionListener<GetResponse>) args[1];
if (randomBoolean()) {
listener.onResponse(getResponse);
} else {
new Thread(() -> listener.onResponse(getResponse)).start();
}
};
} else if (method.equals(Client.class.getMethod("multiTermVectors", MultiTermVectorsRequest.class))) {
return null;
} else if (method.equals(Client.class.getMethod
("multiTermVectors", MultiTermVectorsRequest.class))) {
return new PlainActionFuture<MultiTermVectorsResponse>() {
@Override
public MultiTermVectorsResponse get() throws InterruptedException, ExecutionException {
@ -1081,4 +1086,10 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
return new ScriptModule(Settings.EMPTY, scriptPlugins);
}
}
protected QueryBuilder rewriteAndFetch(QueryBuilder builder, QueryRewriteContext context) throws IOException {
PlainActionFuture<QueryBuilder> future = new PlainActionFuture<>();
Rewriteable.rewriteAndFetch(builder, context, future);
return future.actionGet();
}
}