Search service can now fully populate SearchContext from SearchSourceBuilder

This commit is contained in:
Colin Goodheart-Smithe 2015-09-23 16:40:58 +01:00
parent a53cf896dc
commit d7429a6c63
5 changed files with 201 additions and 111 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search; package org.elasticsearch.search;
import com.carrotsearch.hppc.ObjectFloatHashMap;
import com.carrotsearch.hppc.ObjectHashSet; import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet; import com.carrotsearch.hppc.ObjectSet;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
@ -677,23 +678,31 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
context.from(source.from()); context.from(source.from());
context.size(source.size()); context.size(source.size());
Float indexBoost = source.indexBoost().get(context.shardTarget().index()); ObjectFloatHashMap<String> indexBoostMap = source.indexBoost();
if (indexBoostMap != null) {
Float indexBoost = indexBoostMap.get(context.shardTarget().index());
if (indexBoost != null) { if (indexBoost != null) {
context.queryBoost(indexBoost); context.queryBoost(indexBoost);
} }
}
context.parsedQuery(context.queryParserService().parse(source.query())); context.parsedQuery(context.queryParserService().parse(source.query()));
if (source.postFilter() != null) {
context.parsedPostFilter(context.queryParserService().parse(source.postFilter())); context.parsedPostFilter(context.queryParserService().parse(source.postFilter()));
}
if (source.sorts() != null) {
XContentParser completeSortParser = null; XContentParser completeSortParser = null;
try { try {
XContentBuilder completeSortBuilder = XContentFactory.jsonBuilder(); XContentBuilder completeSortBuilder = XContentFactory.jsonBuilder();
completeSortBuilder.startArray(); completeSortBuilder.startArray();
for (BytesReference sort : source.sorts()) { for (BytesReference sort : source.sorts()) {
XContentParser parser = XContentFactory.xContent(sort).createParser(sort); XContentParser parser = XContentFactory.xContent(sort).createParser(sort);
parser.nextToken();
completeSortBuilder.copyCurrentStructure(parser); completeSortBuilder.copyCurrentStructure(parser);
} }
completeSortBuilder.endArray(); completeSortBuilder.endArray();
BytesReference completeSortBytes = completeSortBuilder.bytes(); BytesReference completeSortBytes = completeSortBuilder.bytes();
completeSortParser = XContentFactory.xContent(completeSortBytes).createParser(completeSortBytes); completeSortParser = XContentFactory.xContent(completeSortBytes).createParser(completeSortBytes);
completeSortParser.nextToken();
this.elementParsers.get("sort").parse(completeSortParser, context); this.elementParsers.get("sort").parse(completeSortParser, context);
} catch (Exception e) { } catch (Exception e) {
String sSource = "_na_"; String sSource = "_na_";
@ -705,13 +714,43 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
XContentLocation location = completeSortParser != null ? completeSortParser.getTokenLocation() : null; XContentLocation location = completeSortParser != null ? completeSortParser.getTokenLocation() : null;
throw new SearchParseException(context, "failed to parse sort source [" + sSource + "]", location, e); throw new SearchParseException(context, "failed to parse sort source [" + sSource + "]", location, e);
} // NORELEASE fix this to be more elegant } // NORELEASE fix this to be more elegant
}
context.trackScores(source.trackScores()); context.trackScores(source.trackScores());
if (source.minScore() != null) {
context.minimumScore(source.minScore()); context.minimumScore(source.minScore());
}
context.timeoutInMillis(source.timeoutInMillis()); context.timeoutInMillis(source.timeoutInMillis());
context.terminateAfter(source.terminateAfter()); context.terminateAfter(source.terminateAfter());
context.aggregations(null); // NOCOMMIT parse source.aggregations() if (source.aggregations() != null) {
// ByteReference into XContentParser completeAggregationsParser = null;
// SearchContextAggregations object try {
XContentBuilder completeAggregationsBuilder = XContentFactory.jsonBuilder();
completeAggregationsBuilder.startObject();
for (BytesReference agg : source.aggregations()) {
XContentParser parser = XContentFactory.xContent(agg).createParser(agg);
parser.nextToken();
parser.nextToken();
completeAggregationsBuilder.field(parser.currentName());
parser.nextToken();
completeAggregationsBuilder.copyCurrentStructure(parser);
}
completeAggregationsBuilder.endObject();
BytesReference completeAggregationsBytes = completeAggregationsBuilder.bytes();
completeAggregationsParser = XContentFactory.xContent(completeAggregationsBytes).createParser(completeAggregationsBytes);
completeAggregationsParser.nextToken();
this.elementParsers.get("aggregations").parse(completeAggregationsParser, context);
} catch (Exception e) {
String sSource = "_na_";
try {
sSource = source.toString();
} catch (Throwable e1) {
// ignore
}
XContentLocation location = completeAggregationsParser != null ? completeAggregationsParser.getTokenLocation() : null;
throw new SearchParseException(context, "failed to parse rescore source [" + sSource + "]", location, e);
} // NORELEASE fix this to be more elegant
}
if (source.suggest() != null) {
XContentParser suggestParser = null; XContentParser suggestParser = null;
try { try {
suggestParser = XContentFactory.xContent(source.suggest()).createParser(source.suggest()); suggestParser = XContentFactory.xContent(source.suggest()).createParser(source.suggest());
@ -726,17 +765,21 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
XContentLocation location = suggestParser != null ? suggestParser.getTokenLocation() : null; XContentLocation location = suggestParser != null ? suggestParser.getTokenLocation() : null;
throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e); throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e);
} }
}
if (source.rescores() != null) {
XContentParser completeRescoreParser = null; XContentParser completeRescoreParser = null;
try { try {
XContentBuilder completeRescoreBuilder = XContentFactory.jsonBuilder(); XContentBuilder completeRescoreBuilder = XContentFactory.jsonBuilder();
completeRescoreBuilder.startArray(); completeRescoreBuilder.startArray();
for (BytesReference rescore : source.rescores()) { for (BytesReference rescore : source.rescores()) {
XContentParser parser = XContentFactory.xContent(rescore).createParser(rescore); XContentParser parser = XContentFactory.xContent(rescore).createParser(rescore);
parser.nextToken();
completeRescoreBuilder.copyCurrentStructure(parser); completeRescoreBuilder.copyCurrentStructure(parser);
} }
completeRescoreBuilder.endArray(); completeRescoreBuilder.endArray();
BytesReference completeRescoreBytes = completeRescoreBuilder.bytes(); BytesReference completeRescoreBytes = completeRescoreBuilder.bytes();
completeRescoreParser = XContentFactory.xContent(completeRescoreBytes).createParser(completeRescoreBytes); completeRescoreParser = XContentFactory.xContent(completeRescoreBytes).createParser(completeRescoreBytes);
completeRescoreParser.nextToken();
this.elementParsers.get("rescore").parse(completeRescoreParser, context); this.elementParsers.get("rescore").parse(completeRescoreParser, context);
} catch (Exception e) { } catch (Exception e) {
String sSource = "_na_"; String sSource = "_na_";
@ -748,13 +791,23 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
XContentLocation location = completeRescoreParser != null ? completeRescoreParser.getTokenLocation() : null; XContentLocation location = completeRescoreParser != null ? completeRescoreParser.getTokenLocation() : null;
throw new SearchParseException(context, "failed to parse rescore source [" + sSource + "]", location, e); throw new SearchParseException(context, "failed to parse rescore source [" + sSource + "]", location, e);
} // NORELEASE fix this to be more elegant } // NORELEASE fix this to be more elegant
}
if (source.fields() != null) {
context.fieldNames().addAll(source.fields()); context.fieldNames().addAll(source.fields());
}
if (source.explain() != null) {
context.explain(source.explain()); context.explain(source.explain());
}
if (source.fetchSource() != null) {
context.fetchSourceContext(source.fetchSource()); context.fetchSourceContext(source.fetchSource());
}
if (source.fieldDataFields() != null) {
FieldDataFieldsContext fieldDataFieldsContext = context.getFetchSubPhaseContext(FieldDataFieldsFetchSubPhase.CONTEXT_FACTORY); FieldDataFieldsContext fieldDataFieldsContext = context.getFetchSubPhaseContext(FieldDataFieldsFetchSubPhase.CONTEXT_FACTORY);
for (String field : source.fieldDataFields()) { for (String field : source.fieldDataFields()) {
fieldDataFieldsContext.add(new FieldDataField(field)); fieldDataFieldsContext.add(new FieldDataField(field));
} }
}
if (source.highlighter() != null) {
XContentParser highlighterParser = null; XContentParser highlighterParser = null;
try { try {
highlighterParser = XContentFactory.xContent(source.highlighter()).createParser(source.highlighter()); highlighterParser = XContentFactory.xContent(source.highlighter()).createParser(source.highlighter());
@ -769,6 +822,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
XContentLocation location = highlighterParser != null ? highlighterParser.getTokenLocation() : null; XContentLocation location = highlighterParser != null ? highlighterParser.getTokenLocation() : null;
throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e); throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e);
} }
}
if (source.innerHits() != null) {
XContentParser innerHitsParser = null; XContentParser innerHitsParser = null;
try { try {
innerHitsParser = XContentFactory.xContent(source.innerHits()).createParser(source.innerHits()); innerHitsParser = XContentFactory.xContent(source.innerHits()).createParser(source.innerHits());
@ -783,14 +838,21 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
XContentLocation location = innerHitsParser != null ? innerHitsParser.getTokenLocation() : null; XContentLocation location = innerHitsParser != null ? innerHitsParser.getTokenLocation() : null;
throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e); throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e);
} }
}
if (source.scriptFields() != null) {
for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) { for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) {
SearchScript searchScript = context.scriptService().search(context.lookup(), field.script(), ScriptContext.Standard.SEARCH); SearchScript searchScript = context.scriptService().search(context.lookup(), field.script(), ScriptContext.Standard.SEARCH);
context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, false)); // NORELEASE need to have ignore_exception parsed somewhere context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, false)); // NORELEASE need to have ignore_exception parsed somewhere
} }
}
// NOCOMMIT need to work out what to do about term_vectors_fetch (previously handled by TermVectorsFetchParseElement) as this is not available as an option in SearchSourceBuilder // NOCOMMIT need to work out what to do about term_vectors_fetch (previously handled by TermVectorsFetchParseElement) as this is not available as an option in SearchSourceBuilder
if (source.version() != null) {
context.version(source.version()); context.version(source.version());
}
if (source.stats() != null) {
context.groupStats(Arrays.asList(source.stats())); // NORELEASE stats should be a list in SearchSourceBuilder context.groupStats(Arrays.asList(source.stats())); // NORELEASE stats should be a list in SearchSourceBuilder
} }
}
private static final int[] EMPTY_DOC_IDS = new int[0]; private static final int[] EMPTY_DOC_IDS = new int[0];

View File

@ -40,7 +40,6 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.Template;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.fetch.innerhits.InnerHitsBuilder; import org.elasticsearch.search.fetch.innerhits.InnerHitsBuilder;
import org.elasticsearch.search.fetch.source.FetchSourceContext; import org.elasticsearch.search.fetch.source.FetchSourceContext;
@ -233,7 +232,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
/** /**
* Gets the minimum score below which docs will be filtered out. * Gets the minimum score below which docs will be filtered out.
*/ */
public float minScore() { public Float minScore() {
return minScore; return minScore;
} }

View File

@ -37,11 +37,27 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.support.TransportStatus; import org.elasticsearch.transport.support.TransportStatus;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -65,7 +81,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
private final static ConcurrentMap<LocalTransportAddress, LocalTransport> transports = newConcurrentMap(); private final static ConcurrentMap<LocalTransportAddress, LocalTransport> transports = newConcurrentMap();
private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap(); private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
private final NamedWriteableRegistry namedWriteableRegistry; protected final NamedWriteableRegistry namedWriteableRegistry;
public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address"; public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address";
public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers"; public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers";

View File

@ -53,6 +53,8 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
@ -82,13 +84,7 @@ import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.test.ESTestCase.assertArrayEquals; import static org.apache.lucene.util.LuceneTestCase.random;
import static org.elasticsearch.test.ESTestCase.assertEquals;
import static org.elasticsearch.test.ESTestCase.assertFalse;
import static org.elasticsearch.test.ESTestCase.assertNotNull;
import static org.elasticsearch.test.ESTestCase.assertTrue;
import static org.elasticsearch.test.ESTestCase.fail;
import static org.elasticsearch.test.ESTestCase.random;
import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -102,6 +98,12 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** /**
* *
@ -663,6 +665,10 @@ public class ElasticsearchAssertions {
} }
public static void assertVersionSerializable(Version version, Streamable streamable) { public static void assertVersionSerializable(Version version, Streamable streamable) {
assertVersionSerializable(version, streamable, null);
}
public static void assertVersionSerializable(Version version, Streamable streamable, NamedWriteableRegistry namedWriteableRegistry) {
try { try {
Streamable newInstance = tryCreateNewInstance(streamable); Streamable newInstance = tryCreateNewInstance(streamable);
if (newInstance == null) { if (newInstance == null) {
@ -674,10 +680,15 @@ public class ElasticsearchAssertions {
} }
BytesReference orig = serialize(version, streamable); BytesReference orig = serialize(version, streamable);
StreamInput input = StreamInput.wrap(orig); StreamInput input = StreamInput.wrap(orig);
if (namedWriteableRegistry != null) {
input = new NamedWriteableAwareStreamInput(input, namedWriteableRegistry);
}
input.setVersion(version); input.setVersion(version);
newInstance.readFrom(input); newInstance.readFrom(input);
assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(), equalTo(0)); assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(),
assertThat("Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + "]", serialize(version, streamable), equalTo(orig)); equalTo(0));
assertThat("Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + "]",
serialize(version, streamable), equalTo(orig));
} catch (Throwable ex) { } catch (Throwable ex) {
throw new RuntimeException("failed to check serialization - version [" + version + "] for streamable [" + streamable + "]", ex); throw new RuntimeException("failed to check serialization - version [" + version + "] for streamable [" + streamable + "]", ex);
} }

View File

@ -77,13 +77,15 @@ public class AssertingLocalTransport extends LocalTransport {
@Override @Override
protected void handleParsedResponse(final TransportResponse response, final TransportResponseHandler handler) { protected void handleParsedResponse(final TransportResponse response, final TransportResponseHandler handler) {
ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), response); ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), response,
namedWriteableRegistry);
super.handleParsedResponse(response, handler); super.handleParsedResponse(response, handler);
} }
@Override @Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), request); ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), request,
namedWriteableRegistry);
super.sendRequest(node, requestId, action, request, options); super.sendRequest(node, requestId, action, request, options);
} }
} }