Added msearch api to high level client

This commit is contained in:
Martijn van Groningen 2017-11-06 11:26:04 +01:00
parent c203cff692
commit 4d78e1a9ad
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
12 changed files with 759 additions and 126 deletions

View File

@ -35,6 +35,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.ActiveShardCount;
@ -49,6 +50,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
@ -381,6 +383,18 @@ public final class Request {
return new Request("DELETE", "/_search/scroll", Collections.emptyMap(), entity);
}
static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Params params = Params.builder();
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
if (multiSearchRequest.maxConcurrentSearchRequests() != MultiSearchRequest.MAX_CONCURRENT_SEARCH_REQUESTS_DEFAULT) {
params.putParam("max_concurrent_searches", Integer.toString(multiSearchRequest.maxConcurrentSearchRequests()));
}
XContent xContent = REQUEST_BODY_CONTENT_TYPE.xContent();
byte[] source = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, xContent);
HttpEntity entity = new ByteArrayEntity(source, createContentType(xContent.type()));
return new Request("GET", "/_msearch", params.getParams(), entity);
}
private static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) throws IOException {
BytesRef source = XContentHelper.toXContent(toXContent, xContentType, false).toBytesRef();
return new ByteArrayEntity(source.bytes, source.offset, source.length, createContentType(xContentType));

View File

@ -38,6 +38,8 @@ import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
@ -377,6 +379,28 @@ public class RestHighLevelClient implements Closeable {
performRequestAsyncAndParseEntity(searchRequest, Request::search, SearchResponse::fromXContent, listener, emptySet(), headers);
}
/**
* Executes a multi search using the msearch API
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html">Multi search API on
* elastic.co</a>
*/
public final MultiSearchResponse multiSearch(MultiSearchRequest multiSearchRequest, Header... headers) throws IOException {
return performRequestAndParseEntity(multiSearchRequest, Request::multiSearch, MultiSearchResponse::fromXContext,
emptySet(), headers);
}
/**
* Asynchronously executes a multi search using the msearch API
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html">Multi search API on
* elastic.co</a>
*/
public final void multiSearchAsync(MultiSearchRequest searchRequest, ActionListener<MultiSearchResponse> listener, Header... headers) {
performRequestAsyncAndParseEntity(searchRequest, Request::multiSearch, MultiSearchResponse::fromXContext, listener,
emptySet(), headers);
}
/**
* Executes a search using the Search Scroll API
*

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType;
@ -42,6 +43,7 @@ import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -56,6 +58,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -72,16 +75,21 @@ import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.client.Request.REQUEST_BODY_CONTENT_TYPE;
import static org.elasticsearch.client.Request.enforceSameContentType;
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
public class RequestTests extends ESTestCase {
@ -771,6 +779,55 @@ public class RequestTests extends ESTestCase {
}
}
public void testMultiSearch() throws IOException {
int numberOfSearchRequests = randomIntBetween(0, 32);
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
for (int i = 0; i < numberOfSearchRequests; i++) {
SearchRequest searchRequest = randomSearchRequest(() -> {
// No need to return a very complex SearchSourceBuilder here, that is tested elsewhere
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.from(randomInt(10));
searchSourceBuilder.size(randomIntBetween(20, 100));
return searchSourceBuilder;
});
// scroll is not supported in the current msearch api, so unset it:
searchRequest.scroll((Scroll) null);
// only expand_wildcards, ignore_unavailable and allow_no_indices can be specified from msearch api, so unset other options:
IndicesOptions randomlyGenerated = searchRequest.indicesOptions();
IndicesOptions msearchDefault = new MultiSearchRequest().indicesOptions();
searchRequest.indicesOptions(IndicesOptions.fromOptions(
randomlyGenerated.ignoreUnavailable(), randomlyGenerated.allowNoIndices(), randomlyGenerated.expandWildcardsOpen(),
randomlyGenerated.expandWildcardsClosed(), msearchDefault.allowAliasesToMultipleIndices(),
msearchDefault.forbidClosedIndices(), msearchDefault.ignoreAliases()
));
multiSearchRequest.add(searchRequest);
}
Map<String, String> expectedParams = new HashMap<>();
expectedParams.put(RestSearchAction.TYPED_KEYS_PARAM, "true");
if (randomBoolean()) {
multiSearchRequest.maxConcurrentSearchRequests(randomIntBetween(1, 8));
expectedParams.put("max_concurrent_searches", Integer.toString(multiSearchRequest.maxConcurrentSearchRequests()));
}
Request request = Request.multiSearch(multiSearchRequest);
assertEquals("/_msearch", request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
List<SearchRequest> requests = new ArrayList<>();
CheckedBiConsumer<SearchRequest, XContentParser, IOException> consumer = (searchRequest, p) -> {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(p);
if (searchSourceBuilder.equals(new SearchSourceBuilder()) == false) {
searchRequest.source(searchSourceBuilder);
}
requests.add(searchRequest);
};
MultiSearchRequest.readMultiLineFormat(new BytesArray(EntityUtils.toByteArray(request.getEntity())),
REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null,
null, xContentRegistry(), true);
assertEquals(requests, multiSearchRequest.requests());
}
public void testSearchScroll() throws IOException {
SearchScrollRequest searchScrollRequest = new SearchScrollRequest();
searchScrollRequest.scrollId(randomAlphaOfLengthBetween(5, 10));
@ -782,7 +839,7 @@ public class RequestTests extends ESTestCase {
assertEquals("/_search/scroll", request.getEndpoint());
assertEquals(0, request.getParameters().size());
assertToXContentBody(searchScrollRequest, request.getEntity());
assertEquals(Request.REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}
public void testClearScroll() throws IOException {
@ -796,11 +853,11 @@ public class RequestTests extends ESTestCase {
assertEquals("/_search/scroll", request.getEndpoint());
assertEquals(0, request.getParameters().size());
assertToXContentBody(clearScrollRequest, request.getEntity());
assertEquals(Request.REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}
private static void assertToXContentBody(ToXContent expectedBody, HttpEntity actualEntity) throws IOException {
BytesReference expectedBytes = XContentHelper.toXContent(expectedBody, Request.REQUEST_BODY_CONTENT_TYPE, false);
BytesReference expectedBytes = XContentHelper.toXContent(expectedBody, REQUEST_BODY_CONTENT_TYPE, false);
assertEquals(XContentType.JSON.mediaTypeWithoutParameters(), actualEntity.getContentType().getValue());
assertEquals(expectedBytes, new BytesArray(EntityUtils.toByteArray(actualEntity)));
}

View File

@ -23,20 +23,30 @@ import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.NestedQueryBuilder;
import org.elasticsearch.index.query.ScriptQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.join.aggregations.Children;
import org.elasticsearch.join.aggregations.ChildrenAggregationBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.range.Range;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@ -45,10 +55,12 @@ import org.elasticsearch.search.aggregations.matrix.stats.MatrixStats;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder;
import org.hamcrest.Matchers;
import org.junit.Before;
import java.io.IOException;
@ -64,6 +76,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.nullValue;
public class SearchIT extends ESRestHighLevelClientTestCase {
@ -80,10 +93,24 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
StringEntity doc5 = new StringEntity("{\"type\":\"type2\", \"num\":100, \"num2\":10}", ContentType.APPLICATION_JSON);
client().performRequest("PUT", "/index/type/5", Collections.emptyMap(), doc5);
client().performRequest("POST", "/index/_refresh");
StringEntity doc = new StringEntity("{\"field\":\"value1\"}", ContentType.APPLICATION_JSON);
client().performRequest("PUT", "/index1/doc/1", Collections.emptyMap(), doc);
doc = new StringEntity("{\"field\":\"value2\"}", ContentType.APPLICATION_JSON);
client().performRequest("PUT", "/index1/doc/2", Collections.emptyMap(), doc);
doc = new StringEntity("{\"field\":\"value1\"}", ContentType.APPLICATION_JSON);
client().performRequest("PUT", "/index2/doc/3", Collections.emptyMap(), doc);
doc = new StringEntity("{\"field\":\"value2\"}", ContentType.APPLICATION_JSON);
client().performRequest("PUT", "/index2/doc/4", Collections.emptyMap(), doc);
doc = new StringEntity("{\"field\":\"value1\"}", ContentType.APPLICATION_JSON);
client().performRequest("PUT", "/index3/doc/5", Collections.emptyMap(), doc);
doc = new StringEntity("{\"field\":\"value2\"}", ContentType.APPLICATION_JSON);
client().performRequest("PUT", "/index3/doc/6", Collections.emptyMap(), doc);
client().performRequest("POST", "/index1,index2,index3/_refresh");
}
public void testSearchNoQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("index");
SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync);
assertSearchHeader(searchResponse);
assertNull(searchResponse.getAggregations());
@ -106,7 +133,7 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
}
public void testSearchMatchQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("index");
searchRequest.source(new SearchSourceBuilder().query(new MatchQueryBuilder("num", 10)));
SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync);
assertSearchHeader(searchResponse);
@ -164,7 +191,7 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
assertEquals(RestStatus.BAD_REQUEST, exception.status());
}
SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("index");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new RangeAggregationBuilder("agg1").field("num")
.addRange("first", 0, 30).addRange("second", 31, 200));
@ -193,7 +220,7 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
}
public void testSearchWithTermsAndRangeAgg() throws IOException {
SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("index");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermsAggregationBuilder agg = new TermsAggregationBuilder("agg1", ValueType.STRING).field("type.keyword");
agg.subAggregation(new RangeAggregationBuilder("subagg").field("num")
@ -247,7 +274,7 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
}
public void testSearchWithMatrixStats() throws IOException {
SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("index");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new MatrixStatsAggregationBuilder("agg1").fields(Arrays.asList("num", "num2")));
searchSourceBuilder.size(0);
@ -374,7 +401,7 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
}
public void testSearchWithSuggest() throws IOException {
SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("index");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion("sugg1", new PhraseSuggestionBuilder("type"))
.setGlobalText("type"));
@ -464,6 +491,185 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
}
}
public void testMultiSearch() throws Exception {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
SearchRequest searchRequest1 = new SearchRequest("index1");
searchRequest1.source().sort("_id", SortOrder.ASC);
multiSearchRequest.add(searchRequest1);
SearchRequest searchRequest2 = new SearchRequest("index2");
searchRequest2.source().sort("_id", SortOrder.ASC);
multiSearchRequest.add(searchRequest2);
SearchRequest searchRequest3 = new SearchRequest("index3");
searchRequest3.source().sort("_id", SortOrder.ASC);
multiSearchRequest.add(searchRequest3);
MultiSearchResponse multiSearchResponse =
execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync);
assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L));
assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(3));
assertThat(multiSearchResponse.getResponses()[0].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse());
assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L));
assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("1"));
assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getAt(1).getId(), Matchers.equalTo("2"));
assertThat(multiSearchResponse.getResponses()[1].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[1].getResponse());
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L));
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("3"));
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(1).getId(), Matchers.equalTo("4"));
assertThat(multiSearchResponse.getResponses()[2].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[2].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[2].getResponse());
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L));
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("5"));
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(1).getId(), Matchers.equalTo("6"));
}
public void testMultiSearch_withAgg() throws Exception {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
SearchRequest searchRequest1 = new SearchRequest("index1");
searchRequest1.source().size(0).aggregation(new TermsAggregationBuilder("name", ValueType.STRING).field("field.keyword")
.order(BucketOrder.key(true)));
multiSearchRequest.add(searchRequest1);
SearchRequest searchRequest2 = new SearchRequest("index2");
searchRequest2.source().size(0).aggregation(new TermsAggregationBuilder("name", ValueType.STRING).field("field.keyword")
.order(BucketOrder.key(true)));
multiSearchRequest.add(searchRequest2);
SearchRequest searchRequest3 = new SearchRequest("index3");
searchRequest3.source().size(0).aggregation(new TermsAggregationBuilder("name", ValueType.STRING).field("field.keyword")
.order(BucketOrder.key(true)));
multiSearchRequest.add(searchRequest3);
MultiSearchResponse multiSearchResponse =
execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync);
assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L));
assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(3));
assertThat(multiSearchResponse.getResponses()[0].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse());
assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L));
assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getHits().length, Matchers.equalTo(0));
Terms terms = multiSearchResponse.getResponses()[0].getResponse().getAggregations().get("name");
assertThat(terms.getBuckets().size(), Matchers.equalTo(2));
assertThat(terms.getBuckets().get(0).getKeyAsString(), Matchers.equalTo("value1"));
assertThat(terms.getBuckets().get(1).getKeyAsString(), Matchers.equalTo("value2"));
assertThat(multiSearchResponse.getResponses()[1].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse());
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L));
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getHits().length, Matchers.equalTo(0));
terms = multiSearchResponse.getResponses()[1].getResponse().getAggregations().get("name");
assertThat(terms.getBuckets().size(), Matchers.equalTo(2));
assertThat(terms.getBuckets().get(0).getKeyAsString(), Matchers.equalTo("value1"));
assertThat(terms.getBuckets().get(1).getKeyAsString(), Matchers.equalTo("value2"));
assertThat(multiSearchResponse.getResponses()[2].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[2].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse());
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getTotalHits(), Matchers.equalTo(2L));
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getHits().length, Matchers.equalTo(0));
terms = multiSearchResponse.getResponses()[2].getResponse().getAggregations().get("name");
assertThat(terms.getBuckets().size(), Matchers.equalTo(2));
assertThat(terms.getBuckets().get(0).getKeyAsString(), Matchers.equalTo("value1"));
assertThat(terms.getBuckets().get(1).getKeyAsString(), Matchers.equalTo("value2"));
}
public void testMultiSearch_withQuery() throws Exception {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
SearchRequest searchRequest1 = new SearchRequest("index1");
searchRequest1.source().query(new TermsQueryBuilder("field", "value2"));
multiSearchRequest.add(searchRequest1);
SearchRequest searchRequest2 = new SearchRequest("index2");
searchRequest2.source().query(new TermsQueryBuilder("field", "value2"));
multiSearchRequest.add(searchRequest2);
SearchRequest searchRequest3 = new SearchRequest("index3");
searchRequest3.source().query(new TermsQueryBuilder("field", "value2"));
multiSearchRequest.add(searchRequest3);
MultiSearchResponse multiSearchResponse =
execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync);
assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L));
assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(3));
assertThat(multiSearchResponse.getResponses()[0].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse());
assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L));
assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("2"));
assertThat(multiSearchResponse.getResponses()[1].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[1].getResponse());
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L));
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("4"));
assertThat(multiSearchResponse.getResponses()[2].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[2].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[2].getResponse());
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L));
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("6"));
searchRequest1.source().highlighter(new HighlightBuilder().field("field"));
searchRequest2.source().highlighter(new HighlightBuilder().field("field"));
searchRequest3.source().highlighter(new HighlightBuilder().field("field"));
multiSearchResponse = execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync);
assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L));
assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(3));
assertThat(multiSearchResponse.getResponses()[0].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[0].getResponse());
assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L));
assertThat(multiSearchResponse.getResponses()[0].getResponse().getHits().getAt(0).getHighlightFields()
.get("field").fragments()[0].string(), Matchers.equalTo("<em>value2</em>"));
assertThat(multiSearchResponse.getResponses()[1].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[1].getResponse());
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L));
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("4"));
assertThat(multiSearchResponse.getResponses()[1].getResponse().getHits().getAt(0).getHighlightFields()
.get("field").fragments()[0].string(), Matchers.equalTo("<em>value2</em>"));
assertThat(multiSearchResponse.getResponses()[2].getFailure(), Matchers.nullValue());
assertThat(multiSearchResponse.getResponses()[2].isFailure(), Matchers.is(false));
SearchIT.assertSearchHeader(multiSearchResponse.getResponses()[2].getResponse());
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getTotalHits(), Matchers.equalTo(1L));
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(0).getId(), Matchers.equalTo("6"));
assertThat(multiSearchResponse.getResponses()[2].getResponse().getHits().getAt(0).getHighlightFields()
.get("field").fragments()[0].string(), Matchers.equalTo("<em>value2</em>"));
}
public void testMultiSearch_failure() throws Exception {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
SearchRequest searchRequest1 = new SearchRequest("index1");
searchRequest1.source().query(new ScriptQueryBuilder(new Script(ScriptType.INLINE, "invalid", "code", Collections.emptyMap())));
multiSearchRequest.add(searchRequest1);
SearchRequest searchRequest2 = new SearchRequest("index2");
searchRequest2.source().query(new ScriptQueryBuilder(new Script(ScriptType.INLINE, "invalid", "code", Collections.emptyMap())));
multiSearchRequest.add(searchRequest2);
MultiSearchResponse multiSearchResponse =
execute(multiSearchRequest, highLevelClient()::multiSearch, highLevelClient()::multiSearchAsync);
assertThat(multiSearchResponse.getTook().millis(), Matchers.greaterThanOrEqualTo(0L));
assertThat(multiSearchResponse.getResponses().length, Matchers.equalTo(2));
assertThat(multiSearchResponse.getResponses()[0].isFailure(), Matchers.is(true));
assertThat(multiSearchResponse.getResponses()[0].getFailure().getMessage(), containsString("search_phase_execution_exception"));
assertThat(multiSearchResponse.getResponses()[0].getResponse(), nullValue());
assertThat(multiSearchResponse.getResponses()[1].isFailure(), Matchers.is(true));
assertThat(multiSearchResponse.getResponses()[1].getFailure().getMessage(), containsString("search_phase_execution_exception"));
assertThat(multiSearchResponse.getResponses()[1].getResponse(), nullValue());
}
private static void assertSearchHeader(SearchResponse searchResponse) {
assertThat(searchResponse.getTook().nanos(), greaterThanOrEqualTo(0L));
assertEquals(0, searchResponse.getFailedShards());

View File

@ -23,20 +23,36 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringValue;
/**
* A multi search API request.
*/
public class MultiSearchRequest extends ActionRequest implements CompositeIndicesRequest {
public static final int MAX_CONCURRENT_SEARCH_REQUESTS_DEFAULT = 0;
private int maxConcurrentSearchRequests = 0;
private List<SearchRequest> requests = new ArrayList<>();
@ -131,4 +147,171 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
request.writeTo(out);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MultiSearchRequest that = (MultiSearchRequest) o;
return maxConcurrentSearchRequests == that.maxConcurrentSearchRequests &&
Objects.equals(requests, that.requests) &&
Objects.equals(indicesOptions, that.indicesOptions);
}
@Override
public int hashCode() {
return Objects.hash(maxConcurrentSearchRequests, requests, indicesOptions);
}
public static void readMultiLineFormat(BytesReference data,
XContent xContent,
CheckedBiConsumer<SearchRequest, XContentParser, IOException> consumer,
String[] indices,
IndicesOptions indicesOptions,
String[] types,
String routing,
String searchType,
NamedXContentRegistry registry,
boolean allowExplicitIndex) throws IOException {
int from = 0;
int length = data.length();
byte marker = xContent.streamSeparator();
while (true) {
int nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
break;
}
// support first line with \n
if (nextMarker == 0) {
from = nextMarker + 1;
continue;
}
SearchRequest searchRequest = new SearchRequest();
if (indices != null) {
searchRequest.indices(indices);
}
if (indicesOptions != null) {
searchRequest.indicesOptions(indicesOptions);
}
if (types != null && types.length > 0) {
searchRequest.types(types);
}
if (routing != null) {
searchRequest.routing(routing);
}
if (searchType != null) {
searchRequest.searchType(searchType);
}
IndicesOptions defaultOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
// now parse the action
if (nextMarker - from > 0) {
try (XContentParser parser = xContent.createParser(registry, data.slice(from, nextMarker - from))) {
Map<String, Object> source = parser.map();
for (Map.Entry<String, Object> entry : source.entrySet()) {
Object value = entry.getValue();
if ("index".equals(entry.getKey()) || "indices".equals(entry.getKey())) {
if (!allowExplicitIndex) {
throw new IllegalArgumentException("explicit index in multi search is not allowed");
}
searchRequest.indices(nodeStringArrayValue(value));
} else if ("type".equals(entry.getKey()) || "types".equals(entry.getKey())) {
searchRequest.types(nodeStringArrayValue(value));
} else if ("search_type".equals(entry.getKey()) || "searchType".equals(entry.getKey())) {
searchRequest.searchType(nodeStringValue(value, null));
} else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) {
searchRequest.requestCache(nodeBooleanValue(value, entry.getKey()));
} else if ("preference".equals(entry.getKey())) {
searchRequest.preference(nodeStringValue(value, null));
} else if ("routing".equals(entry.getKey())) {
searchRequest.routing(nodeStringValue(value, null));
}
}
defaultOptions = IndicesOptions.fromMap(source, defaultOptions);
}
}
searchRequest.indicesOptions(defaultOptions);
// move pointers
from = nextMarker + 1;
// now for the body
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
break;
}
BytesReference bytes = data.slice(from, nextMarker - from);
try (XContentParser parser = xContent.createParser(registry, bytes)) {
consumer.accept(searchRequest, parser);
}
// move pointers
from = nextMarker + 1;
}
}
private static int findNextMarker(byte marker, int from, BytesReference data, int length) {
for (int i = from; i < length; i++) {
if (data.get(i) == marker) {
return i;
}
}
if (from != length) {
throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]");
}
return -1;
}
public static byte[] writeMultiLineFormat(MultiSearchRequest multiSearchRequest, XContent xContent) throws IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream();
for (SearchRequest request : multiSearchRequest.requests()) {
try (XContentBuilder xContentBuilder = XContentBuilder.builder(xContent)) {
xContentBuilder.startObject();
if (request.indices() != null) {
xContentBuilder.field("index", request.indices());
}
if (request.indicesOptions() != null && request.indicesOptions() != SearchRequest.DEFAULT_INDICES_OPTIONS) {
if (request.indicesOptions().expandWildcardsOpen() && request.indicesOptions().expandWildcardsClosed()) {
xContentBuilder.field("expand_wildcards", "all");
} else if (request.indicesOptions().expandWildcardsOpen()) {
xContentBuilder.field("expand_wildcards", "open");
} else if (request.indicesOptions().expandWildcardsClosed()) {
xContentBuilder.field("expand_wildcards", "closed");
} else {
xContentBuilder.field("expand_wildcards", "none");
}
xContentBuilder.field("ignore_unavailable", request.indicesOptions().ignoreUnavailable());
xContentBuilder.field("allow_no_indices", request.indicesOptions().allowNoIndices());
}
if (request.types() != null) {
xContentBuilder.field("types", request.types());
}
if (request.searchType() != null) {
xContentBuilder.field("search_type", request.searchType().name().toLowerCase(Locale.ROOT));
}
if (request.requestCache() != null) {
xContentBuilder.field("request_cache", request.requestCache());
}
if (request.preference() != null) {
xContentBuilder.field("preference", request.preference());
}
if (request.routing() != null) {
xContentBuilder.field("routing", request.routing());
}
xContentBuilder.endObject();
xContentBuilder.bytes().writeTo(output);
}
output.write(xContent.streamSeparator());
try (XContentBuilder xContentBuilder = XContentBuilder.builder(xContent)) {
if (request.source() != null) {
request.source().toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
} else {
xContentBuilder.startObject();
xContentBuilder.endObject();
}
xContentBuilder.bytes().writeTo(output);
}
output.write(xContent.streamSeparator());
}
return output.toByteArray();
}
}

View File

@ -24,23 +24,39 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
/**
* A multi search response.
*/
public class MultiSearchResponse extends ActionResponse implements Iterable<MultiSearchResponse.Item>, ToXContentObject {
private static final ParseField RESPONSES = new ParseField(Fields.RESPONSES);
private static final ParseField TOOK_IN_MILLIS = new ParseField("took");
private static final ConstructingObjectParser<MultiSearchResponse, Void> PARSER = new ConstructingObjectParser<>("multi_search",
true, a -> new MultiSearchResponse(((List<Item>)a[0]).toArray(new Item[0]), (long) a[1]));
static {
PARSER.declareObjectArray(constructorArg(), (p, c) -> itemFromXContent(p), RESPONSES);
PARSER.declareLong(constructorArg(), TOOK_IN_MILLIS);
}
/**
* A search response item, holding the actual search response, or an error message if it failed.
*/
@ -188,6 +204,45 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
return builder;
}
public static MultiSearchResponse fromXContext(XContentParser parser) {
return PARSER.apply(parser, null);
}
private static MultiSearchResponse.Item itemFromXContent(XContentParser parser) throws IOException {
// This parsing logic is a bit tricky here, because the multi search response itself is tricky:
// 1) The json objects inside the responses array are either a search response or a serialized exception
// 2) Each response json object gets a status field injected that ElasticsearchException.failureFromXContent(...) does not parse,
// but SearchResponse.innerFromXContent(...) parses and then ignores. The status field is not needed to parse
// the response item. However in both cases this method does need to parse the 'status' field otherwise the parsing of
// the response item in the next json array element will fail due to parsing errors.
Item item = null;
String fieldName = null;
Token token = parser.nextToken();
assert token == Token.FIELD_NAME;
outer: for (; token != Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
fieldName = parser.currentName();
if ("error".equals(fieldName)) {
item = new Item(null, ElasticsearchException.failureFromXContent(parser));
} else if ("status".equals(fieldName) == false) {
item = new Item(SearchResponse.innerFromXContent(parser), null);
break outer;
}
break;
case VALUE_NUMBER:
if ("status".equals(fieldName)) {
// Ignore the status value
}
break;
}
}
assert parser.currentToken() == Token.END_OBJECT;
return item;
}
static final class Fields {
static final String RESPONSES = "responses";
static final String STATUS = "status";

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.search.SearchHits;
@ -242,9 +243,14 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
}
public static SearchResponse fromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
XContentParser.Token token;
String currentFieldName = null;
ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
parser.nextToken();
return innerFromXContent(parser);
}
static SearchResponse innerFromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation);
String currentFieldName = parser.currentName();
SearchHits hits = null;
Aggregations aggs = null;
Suggest suggest = null;
@ -259,8 +265,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
String scrollId = null;
List<ShardSearchFailure> failures = new ArrayList<>();
Clusters clusters = Clusters.EMPTY;
while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (SCROLL_ID.match(currentFieldName)) {
@ -276,7 +282,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
} else {
parser.skipChildren();
}
} else if (token == XContentParser.Token.START_OBJECT) {
} else if (token == Token.START_OBJECT) {
if (SearchHits.Fields.HITS.equals(currentFieldName)) {
hits = SearchHits.fromXContent(parser);
} else if (Aggregations.AGGREGATIONS_FIELD.equals(currentFieldName)) {
@ -286,8 +292,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
} else if (SearchProfileShardResults.PROFILE_FIELD.equals(currentFieldName)) {
profile = SearchProfileShardResults.fromXContent(parser);
} else if (RestActions._SHARDS_FIELD.match(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (RestActions.FAILED_FIELD.match(currentFieldName)) {
@ -301,9 +307,9 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
} else {
parser.skipChildren();
}
} else if (token == XContentParser.Token.START_ARRAY) {
} else if (token == Token.START_ARRAY) {
if (RestActions.FAILURES_FIELD.match(currentFieldName)) {
while((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
while((token = parser.nextToken()) != Token.END_ARRAY) {
failures.add(ShardSearchFailure.fromXContent(parser));
}
} else {

View File

@ -76,7 +76,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
int maxConcurrentSearches = request.maxConcurrentSearchRequests();
if (maxConcurrentSearches == 0) {
if (maxConcurrentSearches == MultiSearchRequest.MAX_CONCURRENT_SEARCH_REQUESTS_DEFAULT) {
maxConcurrentSearches = defaultMaxConcurrentSearches(availableProcessors, clusterState);
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -93,12 +94,8 @@ public class RestMultiSearchAction extends BaseRestHandler {
parseMultiLineRequest(restRequest, multiRequest.indicesOptions(), allowExplicitIndex, (searchRequest, parser) -> {
try {
searchRequest.source(SearchSourceBuilder.fromXContent(parser));
multiRequest.add(searchRequest);
} catch (IOException e) {
throw new ElasticsearchParseException("Exception when parsing search request", e);
}
searchRequest.source(SearchSourceBuilder.fromXContent(parser));
multiRequest.add(searchRequest);
});
List<SearchRequest> requests = multiRequest.requests();
preFilterShardSize = Math.max(1, preFilterShardSize / (requests.size()+1));
@ -113,7 +110,7 @@ public class RestMultiSearchAction extends BaseRestHandler {
* Parses a multi-line {@link RestRequest} body, instantiating a {@link SearchRequest} for each line and applying the given consumer.
*/
public static void parseMultiLineRequest(RestRequest request, IndicesOptions indicesOptions, boolean allowExplicitIndex,
BiConsumer<SearchRequest, XContentParser> consumer) throws IOException {
CheckedBiConsumer<SearchRequest, XContentParser, IOException> consumer) throws IOException {
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
String[] types = Strings.splitStringByCommaToArray(request.param("type"));
@ -123,83 +120,8 @@ public class RestMultiSearchAction extends BaseRestHandler {
final Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
final XContent xContent = sourceTuple.v1().xContent();
final BytesReference data = sourceTuple.v2();
int from = 0;
int length = data.length();
byte marker = xContent.streamSeparator();
while (true) {
int nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
break;
}
// support first line with \n
if (nextMarker == 0) {
from = nextMarker + 1;
continue;
}
SearchRequest searchRequest = new SearchRequest();
if (indices != null) {
searchRequest.indices(indices);
}
if (indicesOptions != null) {
searchRequest.indicesOptions(indicesOptions);
}
if (types != null && types.length > 0) {
searchRequest.types(types);
}
if (routing != null) {
searchRequest.routing(routing);
}
if (searchType != null) {
searchRequest.searchType(searchType);
}
IndicesOptions defaultOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
// now parse the action
if (nextMarker - from > 0) {
try (XContentParser parser = xContent.createParser(request.getXContentRegistry(), data.slice(from, nextMarker - from))) {
Map<String, Object> source = parser.map();
for (Map.Entry<String, Object> entry : source.entrySet()) {
Object value = entry.getValue();
if ("index".equals(entry.getKey()) || "indices".equals(entry.getKey())) {
if (!allowExplicitIndex) {
throw new IllegalArgumentException("explicit index in multi search is not allowed");
}
searchRequest.indices(nodeStringArrayValue(value));
} else if ("type".equals(entry.getKey()) || "types".equals(entry.getKey())) {
searchRequest.types(nodeStringArrayValue(value));
} else if ("search_type".equals(entry.getKey()) || "searchType".equals(entry.getKey())) {
searchRequest.searchType(nodeStringValue(value, null));
} else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) {
searchRequest.requestCache(nodeBooleanValue(value, entry.getKey()));
} else if ("preference".equals(entry.getKey())) {
searchRequest.preference(nodeStringValue(value, null));
} else if ("routing".equals(entry.getKey())) {
searchRequest.routing(nodeStringValue(value, null));
}
}
defaultOptions = IndicesOptions.fromMap(source, defaultOptions);
}
}
searchRequest.indicesOptions(defaultOptions);
// move pointers
from = nextMarker + 1;
// now for the body
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
break;
}
BytesReference bytes = data.slice(from, nextMarker - from);
try (XContentParser parser = xContent.createParser(request.getXContentRegistry(), bytes)) {
consumer.accept(searchRequest, parser);
}
// move pointers
from = nextMarker + 1;
}
MultiSearchRequest.readMultiLineFormat(data, xContent, consumer, indices, indicesOptions, types, routing,
searchType, request.getXContentRegistry(), allowExplicitIndex);
}
@Override
@ -207,18 +129,6 @@ public class RestMultiSearchAction extends BaseRestHandler {
return true;
}
private static int findNextMarker(byte marker, int from, BytesReference data, int length) {
for (int i = from; i < length; i++) {
if (data.get(i) == marker) {
return i;
}
}
if (from != length) {
throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]");
}
return -1;
}
@Override
protected Set<String> responseParams() {
return RESPONSE_PARAMS;

View File

@ -19,24 +19,36 @@
package org.elasticsearch.action.search;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.StreamsUtils;
import org.elasticsearch.test.rest.FakeRestRequest;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import static java.util.Collections.singletonList;
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest;
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@ -202,4 +214,87 @@ public class MultiSearchRequestTests extends ESTestCase {
return new NamedXContentRegistry(singletonList(new NamedXContentRegistry.Entry(QueryBuilder.class,
new ParseField(MatchAllQueryBuilder.NAME), (p, c) -> MatchAllQueryBuilder.fromXContent(p))));
}
public void testMultiLineSerialization() throws IOException {
int iters = 16;
for (int i = 0; i < iters; i++) {
// The only formats that support stream separator
XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE);
MultiSearchRequest originalRequest = createMultiSearchRequest();
byte[] originalBytes = MultiSearchRequest.writeMultiLineFormat(originalRequest, xContentType.xContent());
MultiSearchRequest parsedRequest = new MultiSearchRequest();
CheckedBiConsumer<SearchRequest, XContentParser, IOException> consumer = (r, p) -> {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(p);
if (searchSourceBuilder.equals(new SearchSourceBuilder()) == false) {
r.source(searchSourceBuilder);
}
parsedRequest.add(r);
};
MultiSearchRequest.readMultiLineFormat(new BytesArray(originalBytes), xContentType.xContent(),
consumer, null, null, null, null, null, xContentRegistry(), true);
assertEquals(originalRequest, parsedRequest);
}
}
public void testEqualsAndHashcode() throws IOException {
checkEqualsAndHashCode(createMultiSearchRequest(), MultiSearchRequestTests::copyRequest, MultiSearchRequestTests::mutate);
}
private static MultiSearchRequest mutate(MultiSearchRequest searchRequest) throws IOException {
MultiSearchRequest mutation = copyRequest(searchRequest);
List<CheckedRunnable<IOException>> mutators = new ArrayList<>();
mutators.add(() -> mutation.indicesOptions(randomValueOtherThan(searchRequest.indicesOptions(),
() -> IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()))));
mutators.add(() -> mutation.maxConcurrentSearchRequests(randomIntBetween(1, 32)));
mutators.add(() -> mutation.add(createSimpleSearchRequest()));
randomFrom(mutators).run();
return mutation;
}
private static MultiSearchRequest copyRequest(MultiSearchRequest request) throws IOException {
MultiSearchRequest copy = new MultiSearchRequest();
if (request.maxConcurrentSearchRequests() > 0) {
copy.maxConcurrentSearchRequests(request.maxConcurrentSearchRequests());
}
copy.indicesOptions(request.indicesOptions());
for (SearchRequest searchRequest : request.requests()) {
copy.add(searchRequest);
}
return copy;
}
private static MultiSearchRequest createMultiSearchRequest() throws IOException {
int numSearchRequest = randomIntBetween(1, 128);
MultiSearchRequest request = new MultiSearchRequest();
for (int j = 0; j < numSearchRequest; j++) {
SearchRequest searchRequest = createSimpleSearchRequest();
// scroll is not supported in the current msearch api, so unset it:
searchRequest.scroll((Scroll) null);
// only expand_wildcards, ignore_unavailable and allow_no_indices can be specified from msearch api, so unset other options:
IndicesOptions randomlyGenerated = searchRequest.indicesOptions();
IndicesOptions msearchDefault = IndicesOptions.strictExpandOpenAndForbidClosed();
searchRequest.indicesOptions(IndicesOptions.fromOptions(
randomlyGenerated.ignoreUnavailable(), randomlyGenerated.allowNoIndices(), randomlyGenerated.expandWildcardsOpen(),
randomlyGenerated.expandWildcardsClosed(), msearchDefault.allowAliasesToMultipleIndices(),
msearchDefault.forbidClosedIndices(), msearchDefault.ignoreAliases()
));
request.add(searchRequest);
}
return request;
}
private static SearchRequest createSimpleSearchRequest() throws IOException {
return randomSearchRequest(() -> {
// No need to return a very complex SearchSourceBuilder here, that is tested elsewhere
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.from(randomInt(10));
searchSourceBuilder.size(randomIntBetween(20, 100));
return searchSourceBuilder;
});
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class MultiSearchResponseTests extends ESTestCase {
public void testFromXContent() throws IOException {
for (int runs = 0; runs < 20; runs++) {
MultiSearchResponse expected = createTestInstance();
XContentType xContentType = randomFrom(XContentType.values());
BytesReference shuffled = toShuffledXContent(expected, xContentType, ToXContent.EMPTY_PARAMS, false);
XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled);
MultiSearchResponse actual = MultiSearchResponse.fromXContext(parser);
assertThat(parser.nextToken(), nullValue());
assertThat(actual.getTook(), equalTo(expected.getTook()));
assertThat(actual.getResponses().length, equalTo(expected.getResponses().length));
for (int i = 0; i < expected.getResponses().length; i++) {
MultiSearchResponse.Item expectedItem = expected.getResponses()[i];
MultiSearchResponse.Item actualItem = actual.getResponses()[i];
if (expectedItem.isFailure()) {
assertThat(actualItem.getResponse(), nullValue());
assertThat(actualItem.getFailureMessage(), containsString(expectedItem.getFailureMessage()));
} else {
assertThat(actualItem.getResponse().toString(), equalTo(expectedItem.getResponse().toString()));
assertThat(actualItem.getFailure(), nullValue());
}
}
}
}
private static MultiSearchResponse createTestInstance() {
int numItems = randomIntBetween(0, 128);
MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[numItems];
for (int i = 0; i < numItems; i++) {
if (randomBoolean()) {
// Creating a minimal response is OK, because SearchResponse self
// is tested elsewhere.
long tookInMillis = randomNonNegativeLong();
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
items[i] = new MultiSearchResponse.Item(searchResponse, null);
} else {
items[i] = new MultiSearchResponse.Item(null, new ElasticsearchException("an error"));
}
}
return new MultiSearchResponse(items, randomNonNegativeLong());
}
}

View File

@ -77,16 +77,12 @@ public class RestMultiSearchTemplateAction extends BaseRestHandler {
RestMultiSearchAction.parseMultiLineRequest(restRequest, multiRequest.indicesOptions(), allowExplicitIndex,
(searchRequest, bytes) -> {
try {
SearchTemplateRequest searchTemplateRequest = RestSearchTemplateAction.parse(bytes);
if (searchTemplateRequest.getScript() != null) {
searchTemplateRequest.setRequest(searchRequest);
multiRequest.add(searchTemplateRequest);
} else {
throw new IllegalArgumentException("Malformed search template");
}
} catch (IOException e) {
throw new ElasticsearchParseException("Exception when parsing search template request", e);
SearchTemplateRequest searchTemplateRequest = RestSearchTemplateAction.parse(bytes);
if (searchTemplateRequest.getScript() != null) {
searchTemplateRequest.setRequest(searchRequest);
multiRequest.add(searchTemplateRequest);
} else {
throw new IllegalArgumentException("Malformed search template");
}
});
return multiRequest;