From d7429a6c63ddce038d14237d5bf3b3f0dcc8bd2b Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Wed, 23 Sep 2015 16:40:58 +0100 Subject: [PATCH] Search service can now fully populate SearchContext from SearchSourceBuilder --- .../elasticsearch/search/SearchService.java | 252 +++++++++++------- .../search/builder/SearchSourceBuilder.java | 3 +- .../transport/local/LocalTransport.java | 22 +- .../hamcrest/ElasticsearchAssertions.java | 29 +- .../transport/AssertingLocalTransport.java | 6 +- 5 files changed, 201 insertions(+), 111 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 78212052cf5..406e1b0c13c 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -19,6 +19,7 @@ package org.elasticsearch.search; +import com.carrotsearch.hppc.ObjectFloatHashMap; import com.carrotsearch.hppc.ObjectHashSet; import com.carrotsearch.hppc.ObjectSet; import com.carrotsearch.hppc.cursors.ObjectCursor; @@ -677,119 +678,180 @@ public class SearchService extends AbstractLifecycleComponent { context.from(source.from()); context.size(source.size()); - Float indexBoost = source.indexBoost().get(context.shardTarget().index()); - if (indexBoost != null) { - context.queryBoost(indexBoost); + ObjectFloatHashMap indexBoostMap = source.indexBoost(); + if (indexBoostMap != null) { + Float indexBoost = indexBoostMap.get(context.shardTarget().index()); + if (indexBoost != null) { + context.queryBoost(indexBoost); + } } context.parsedQuery(context.queryParserService().parse(source.query())); - context.parsedPostFilter(context.queryParserService().parse(source.postFilter())); - XContentParser completeSortParser = null; - try { - XContentBuilder completeSortBuilder = XContentFactory.jsonBuilder(); - completeSortBuilder.startArray(); - for (BytesReference sort : source.sorts()) { - XContentParser parser = XContentFactory.xContent(sort).createParser(sort); - completeSortBuilder.copyCurrentStructure(parser); - } - completeSortBuilder.endArray(); - BytesReference completeSortBytes = completeSortBuilder.bytes(); - completeSortParser = XContentFactory.xContent(completeSortBytes).createParser(completeSortBytes); - this.elementParsers.get("sort").parse(completeSortParser, context); - } catch (Exception e) { - String sSource = "_na_"; + if (source.postFilter() != null) { + context.parsedPostFilter(context.queryParserService().parse(source.postFilter())); + } + if (source.sorts() != null) { + XContentParser completeSortParser = null; try { - sSource = source.toString(); - } catch (Throwable e1) { - // ignore - } - XContentLocation location = completeSortParser != null ? completeSortParser.getTokenLocation() : null; - throw new SearchParseException(context, "failed to parse sort source [" + sSource + "]", location, e); - } // NORELEASE fix this to be more elegant + XContentBuilder completeSortBuilder = XContentFactory.jsonBuilder(); + completeSortBuilder.startArray(); + for (BytesReference sort : source.sorts()) { + XContentParser parser = XContentFactory.xContent(sort).createParser(sort); + parser.nextToken(); + completeSortBuilder.copyCurrentStructure(parser); + } + completeSortBuilder.endArray(); + BytesReference completeSortBytes = completeSortBuilder.bytes(); + completeSortParser = XContentFactory.xContent(completeSortBytes).createParser(completeSortBytes); + completeSortParser.nextToken(); + this.elementParsers.get("sort").parse(completeSortParser, context); + } catch (Exception e) { + String sSource = "_na_"; + try { + sSource = source.toString(); + } catch (Throwable e1) { + // ignore + } + XContentLocation location = completeSortParser != null ? completeSortParser.getTokenLocation() : null; + throw new SearchParseException(context, "failed to parse sort source [" + sSource + "]", location, e); + } // NORELEASE fix this to be more elegant + } context.trackScores(source.trackScores()); - context.minimumScore(source.minScore()); + if (source.minScore() != null) { + context.minimumScore(source.minScore()); + } context.timeoutInMillis(source.timeoutInMillis()); context.terminateAfter(source.terminateAfter()); - context.aggregations(null); // NOCOMMIT parse source.aggregations() - // ByteReference into - // SearchContextAggregations object - XContentParser suggestParser = null; - try { - suggestParser = XContentFactory.xContent(source.suggest()).createParser(source.suggest()); - this.elementParsers.get("suggest").parse(suggestParser, context); - } catch (Exception e) { - String sSource = "_na_"; + if (source.aggregations() != null) { + XContentParser completeAggregationsParser = null; try { - sSource = source.toString(); - } catch (Throwable e1) { - // ignore - } - XContentLocation location = suggestParser != null ? suggestParser.getTokenLocation() : null; - throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e); + XContentBuilder completeAggregationsBuilder = XContentFactory.jsonBuilder(); + completeAggregationsBuilder.startObject(); + for (BytesReference agg : source.aggregations()) { + XContentParser parser = XContentFactory.xContent(agg).createParser(agg); + parser.nextToken(); + parser.nextToken(); + completeAggregationsBuilder.field(parser.currentName()); + parser.nextToken(); + completeAggregationsBuilder.copyCurrentStructure(parser); + } + completeAggregationsBuilder.endObject(); + BytesReference completeAggregationsBytes = completeAggregationsBuilder.bytes(); + completeAggregationsParser = XContentFactory.xContent(completeAggregationsBytes).createParser(completeAggregationsBytes); + completeAggregationsParser.nextToken(); + this.elementParsers.get("aggregations").parse(completeAggregationsParser, context); + } catch (Exception e) { + String sSource = "_na_"; + try { + sSource = source.toString(); + } catch (Throwable e1) { + // ignore + } + XContentLocation location = completeAggregationsParser != null ? completeAggregationsParser.getTokenLocation() : null; + throw new SearchParseException(context, "failed to parse rescore source [" + sSource + "]", location, e); + } // NORELEASE fix this to be more elegant } - XContentParser completeRescoreParser = null; - try { - XContentBuilder completeRescoreBuilder = XContentFactory.jsonBuilder(); - completeRescoreBuilder.startArray(); - for (BytesReference rescore : source.rescores()) { - XContentParser parser = XContentFactory.xContent(rescore).createParser(rescore); - completeRescoreBuilder.copyCurrentStructure(parser); - } - completeRescoreBuilder.endArray(); - BytesReference completeRescoreBytes = completeRescoreBuilder.bytes(); - completeRescoreParser = XContentFactory.xContent(completeRescoreBytes).createParser(completeRescoreBytes); - this.elementParsers.get("rescore").parse(completeRescoreParser, context); - } catch (Exception e) { - String sSource = "_na_"; + if (source.suggest() != null) { + XContentParser suggestParser = null; try { - sSource = source.toString(); - } catch (Throwable e1) { - // ignore + suggestParser = XContentFactory.xContent(source.suggest()).createParser(source.suggest()); + this.elementParsers.get("suggest").parse(suggestParser, context); + } catch (Exception e) { + String sSource = "_na_"; + try { + sSource = source.toString(); + } catch (Throwable e1) { + // ignore + } + XContentLocation location = suggestParser != null ? suggestParser.getTokenLocation() : null; + throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e); } - XContentLocation location = completeRescoreParser != null ? completeRescoreParser.getTokenLocation() : null; - throw new SearchParseException(context, "failed to parse rescore source [" + sSource + "]", location, e); - } // NORELEASE fix this to be more elegant - context.fieldNames().addAll(source.fields()); - context.explain(source.explain()); - context.fetchSourceContext(source.fetchSource()); - FieldDataFieldsContext fieldDataFieldsContext = context.getFetchSubPhaseContext(FieldDataFieldsFetchSubPhase.CONTEXT_FACTORY); - for (String field : source.fieldDataFields()) { - fieldDataFieldsContext.add(new FieldDataField(field)); } - XContentParser highlighterParser = null; - try { - highlighterParser = XContentFactory.xContent(source.highlighter()).createParser(source.highlighter()); - this.elementParsers.get("highlight").parse(highlighterParser, context); - } catch (Exception e) { - String sSource = "_na_"; + if (source.rescores() != null) { + XContentParser completeRescoreParser = null; try { - sSource = source.toString(); - } catch (Throwable e1) { - // ignore - } - XContentLocation location = highlighterParser != null ? highlighterParser.getTokenLocation() : null; - throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e); + XContentBuilder completeRescoreBuilder = XContentFactory.jsonBuilder(); + completeRescoreBuilder.startArray(); + for (BytesReference rescore : source.rescores()) { + XContentParser parser = XContentFactory.xContent(rescore).createParser(rescore); + parser.nextToken(); + completeRescoreBuilder.copyCurrentStructure(parser); + } + completeRescoreBuilder.endArray(); + BytesReference completeRescoreBytes = completeRescoreBuilder.bytes(); + completeRescoreParser = XContentFactory.xContent(completeRescoreBytes).createParser(completeRescoreBytes); + completeRescoreParser.nextToken(); + this.elementParsers.get("rescore").parse(completeRescoreParser, context); + } catch (Exception e) { + String sSource = "_na_"; + try { + sSource = source.toString(); + } catch (Throwable e1) { + // ignore + } + XContentLocation location = completeRescoreParser != null ? completeRescoreParser.getTokenLocation() : null; + throw new SearchParseException(context, "failed to parse rescore source [" + sSource + "]", location, e); + } // NORELEASE fix this to be more elegant } - XContentParser innerHitsParser = null; - try { - innerHitsParser = XContentFactory.xContent(source.innerHits()).createParser(source.innerHits()); - this.elementParsers.get("highlight").parse(innerHitsParser, context); - } catch (Exception e) { - String sSource = "_na_"; + if (source.fields() != null) { + context.fieldNames().addAll(source.fields()); + } + if (source.explain() != null) { + context.explain(source.explain()); + } + if (source.fetchSource() != null) { + context.fetchSourceContext(source.fetchSource()); + } + if (source.fieldDataFields() != null) { + FieldDataFieldsContext fieldDataFieldsContext = context.getFetchSubPhaseContext(FieldDataFieldsFetchSubPhase.CONTEXT_FACTORY); + for (String field : source.fieldDataFields()) { + fieldDataFieldsContext.add(new FieldDataField(field)); + } + } + if (source.highlighter() != null) { + XContentParser highlighterParser = null; try { - sSource = source.toString(); - } catch (Throwable e1) { - // ignore + highlighterParser = XContentFactory.xContent(source.highlighter()).createParser(source.highlighter()); + this.elementParsers.get("highlight").parse(highlighterParser, context); + } catch (Exception e) { + String sSource = "_na_"; + try { + sSource = source.toString(); + } catch (Throwable e1) { + // ignore + } + XContentLocation location = highlighterParser != null ? highlighterParser.getTokenLocation() : null; + throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e); } - XContentLocation location = innerHitsParser != null ? innerHitsParser.getTokenLocation() : null; - throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e); } - for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) { - SearchScript searchScript = context.scriptService().search(context.lookup(), field.script(), ScriptContext.Standard.SEARCH); - context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, false)); // NORELEASE need to have ignore_exception parsed somewhere + if (source.innerHits() != null) { + XContentParser innerHitsParser = null; + try { + innerHitsParser = XContentFactory.xContent(source.innerHits()).createParser(source.innerHits()); + this.elementParsers.get("highlight").parse(innerHitsParser, context); + } catch (Exception e) { + String sSource = "_na_"; + try { + sSource = source.toString(); + } catch (Throwable e1) { + // ignore + } + XContentLocation location = innerHitsParser != null ? innerHitsParser.getTokenLocation() : null; + throw new SearchParseException(context, "failed to parse suggest source [" + sSource + "]", location, e); + } + } + if (source.scriptFields() != null) { + for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) { + SearchScript searchScript = context.scriptService().search(context.lookup(), field.script(), ScriptContext.Standard.SEARCH); + context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, false)); // NORELEASE need to have ignore_exception parsed somewhere + } } // NOCOMMIT need to work out what to do about term_vectors_fetch (previously handled by TermVectorsFetchParseElement) as this is not available as an option in SearchSourceBuilder - context.version(source.version()); - context.groupStats(Arrays.asList(source.stats())); // NORELEASE stats should be a list in SearchSourceBuilder + if (source.version() != null) { + context.version(source.version()); + } + if (source.stats() != null) { + context.groupStats(Arrays.asList(source.stats())); // NORELEASE stats should be a list in SearchSourceBuilder + } } private static final int[] EMPTY_DOC_IDS = new int[0]; diff --git a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index 1cee4965a46..74891a086ab 100644 --- a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -40,7 +40,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.script.Script; -import org.elasticsearch.script.Template; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.fetch.innerhits.InnerHitsBuilder; import org.elasticsearch.search.fetch.source.FetchSourceContext; @@ -233,7 +232,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ /** * Gets the minimum score below which docs will be filtered out. */ - public float minScore() { + public Float minScore() { return minScore; } diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index a500fb386f8..179a5db433e 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -37,11 +37,27 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.RequestHandlerRegistry; +import org.elasticsearch.transport.ResponseHandlerFailureTransportException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportSerializationException; +import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.support.TransportStatus; import java.io.IOException; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -65,7 +81,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem private final static ConcurrentMap transports = newConcurrentMap(); private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private final ConcurrentMap connectedNodes = newConcurrentMap(); - private final NamedWriteableRegistry namedWriteableRegistry; + protected final NamedWriteableRegistry namedWriteableRegistry; public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address"; public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers"; diff --git a/core/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/core/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 5772543dc6b..9c24a11ed5e 100644 --- a/core/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/core/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -53,6 +53,8 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -82,13 +84,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; -import static org.elasticsearch.test.ESTestCase.assertArrayEquals; -import static org.elasticsearch.test.ESTestCase.assertEquals; -import static org.elasticsearch.test.ESTestCase.assertFalse; -import static org.elasticsearch.test.ESTestCase.assertNotNull; -import static org.elasticsearch.test.ESTestCase.assertTrue; -import static org.elasticsearch.test.ESTestCase.fail; -import static org.elasticsearch.test.ESTestCase.random; +import static org.apache.lucene.util.LuceneTestCase.random; import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -102,6 +98,12 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * @@ -663,6 +665,10 @@ public class ElasticsearchAssertions { } public static void assertVersionSerializable(Version version, Streamable streamable) { + assertVersionSerializable(version, streamable, null); + } + + public static void assertVersionSerializable(Version version, Streamable streamable, NamedWriteableRegistry namedWriteableRegistry) { try { Streamable newInstance = tryCreateNewInstance(streamable); if (newInstance == null) { @@ -674,10 +680,15 @@ public class ElasticsearchAssertions { } BytesReference orig = serialize(version, streamable); StreamInput input = StreamInput.wrap(orig); + if (namedWriteableRegistry != null) { + input = new NamedWriteableAwareStreamInput(input, namedWriteableRegistry); + } input.setVersion(version); newInstance.readFrom(input); - assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(), equalTo(0)); - assertThat("Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + "]", serialize(version, streamable), equalTo(orig)); + assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(), + equalTo(0)); + assertThat("Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + "]", + serialize(version, streamable), equalTo(orig)); } catch (Throwable ex) { throw new RuntimeException("failed to check serialization - version [" + version + "] for streamable [" + streamable + "]", ex); } diff --git a/core/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransport.java b/core/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransport.java index c253a752d6b..64cc401cb5f 100644 --- a/core/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransport.java +++ b/core/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransport.java @@ -77,13 +77,15 @@ public class AssertingLocalTransport extends LocalTransport { @Override protected void handleParsedResponse(final TransportResponse response, final TransportResponseHandler handler) { - ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), response); + ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), response, + namedWriteableRegistry); super.handleParsedResponse(response, handler); } @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { - ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), request); + ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), request, + namedWriteableRegistry); super.sendRequest(node, requestId, action, request, options); } }