diff --git a/src/main/java/org/elasticsearch/action/ActionModule.java b/src/main/java/org/elasticsearch/action/ActionModule.java index 6ed8a25ae89..8fc26892a48 100644 --- a/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/src/main/java/org/elasticsearch/action/ActionModule.java @@ -98,10 +98,7 @@ import org.elasticsearch.action.mlt.MoreLikeThisAction; import org.elasticsearch.action.mlt.TransportMoreLikeThisAction; import org.elasticsearch.action.percolate.PercolateAction; import org.elasticsearch.action.percolate.TransportPercolateAction; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchScrollAction; -import org.elasticsearch.action.search.TransportSearchAction; -import org.elasticsearch.action.search.TransportSearchScrollAction; +import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.type.*; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.update.TransportUpdateAction; @@ -211,6 +208,7 @@ public class ActionModule extends AbstractModule { TransportSearchScrollQueryThenFetchAction.class, TransportSearchScrollQueryAndFetchAction.class ); + registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class); registerAction(MoreLikeThisAction.INSTANCE, TransportMoreLikeThisAction.class); registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class); diff --git a/src/main/java/org/elasticsearch/action/search/MultiSearchAction.java b/src/main/java/org/elasticsearch/action/search/MultiSearchAction.java new file mode 100644 index 00000000000..6a4077ecd1e --- /dev/null +++ b/src/main/java/org/elasticsearch/action/search/MultiSearchAction.java @@ -0,0 +1,45 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.Client; + +/** + */ +public class MultiSearchAction extends Action { + + public static final MultiSearchAction INSTANCE = new MultiSearchAction(); + public static final String NAME = "msearch"; + + private MultiSearchAction() { + super(NAME); + } + + @Override + public MultiSearchResponse newResponse() { + return new MultiSearchResponse(); + } + + @Override + public MultiSearchRequestBuilder newRequestBuilder(Client client) { + return new MultiSearchRequestBuilder(client); + } +} diff --git a/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java b/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java new file mode 100644 index 00000000000..cf5f1c0b20c --- /dev/null +++ b/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java @@ -0,0 +1,188 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import com.google.common.collect.Lists; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContent; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * A multi search API request. + */ +public class MultiSearchRequest implements ActionRequest { + + private List requests = Lists.newArrayList(); + + private boolean listenerThreaded = false; + + /** + * Add a search request to execute. Note, the order is important, the search response will be returned in the + * same order as the search requests. + */ + public MultiSearchRequest add(SearchRequestBuilder request) { + requests.add(request.request()); + return this; + } + + /** + * Add a search request to execute. Note, the order is important, the search response will be returned in the + * same order as the search requests. + */ + public MultiSearchRequest add(SearchRequest request) { + requests.add(request); + return this; + } + + public MultiSearchRequest add(byte[] data, int from, int length, boolean contentUnsafe, @Nullable String[] indices, @Nullable String[] types) throws Exception { + XContent xContent = XContentFactory.xContent(data, from, length); + byte marker = xContent.streamSeparator(); + while (true) { + int nextMarker = findNextMarker(marker, from, data, length); + if (nextMarker == -1) { + break; + } + // now parse the action + XContentParser parser = xContent.createParser(data, from, nextMarker - from); + + // move pointers + from = nextMarker + 1; + + // Move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token == null) { + continue; + } + assert token == XContentParser.Token.START_OBJECT; + + + SearchRequest searchRequest = new SearchRequest(indices); + if (types != null && types.length > 0) { + searchRequest.types(types); + } + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("index".equals(currentFieldName) || "indices".equals(currentFieldName)) { + searchRequest.indices(Strings.splitStringByCommaToArray(parser.text())); + } else if ("type".equals(currentFieldName) || "types".equals(currentFieldName)) { + searchRequest.types(Strings.splitStringByCommaToArray(parser.text())); + } else if ("search_type".equals(currentFieldName) || "searchType".equals(currentFieldName)) { + searchRequest.searchType(parser.text()); + } else if ("preference".equals(currentFieldName)) { + searchRequest.preference(parser.text()); + } else if ("routing".equals(currentFieldName)) { + searchRequest.routing(parser.text()); + } else if ("query_hint".equals(currentFieldName) || "queryHint".equals(currentFieldName)) { + searchRequest.queryHint(parser.text()); + } + } + } + + // now for the body + nextMarker = findNextMarker(marker, from, data, length); + if (nextMarker == -1) { + break; + } + + searchRequest.source(data, from, nextMarker - from, contentUnsafe); + // move pointers + from = nextMarker + 1; + + add(searchRequest); + } + + return this; + } + + private int findNextMarker(byte marker, int from, byte[] data, int length) { + for (int i = from; i < length; i++) { + if (data[i] == marker) { + return i; + } + } + return -1; + } + + List requests() { + return this.requests; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (requests.isEmpty()) { + validationException = addValidationError("no requests added", validationException); + } + for (int i = 0; i < requests.size(); i++) { + ActionRequestValidationException ex = requests.get(i).validate(); + if (ex != null) { + if (validationException == null) { + validationException = new ActionRequestValidationException(); + } + validationException.addValidationErrors(ex.validationErrors()); + } + } + + return validationException; + } + + @Override + public boolean listenerThreaded() { + return listenerThreaded; + } + + @Override + public MultiSearchRequest listenerThreaded(boolean listenerThreaded) { + this.listenerThreaded = listenerThreaded; + return this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + SearchRequest request = new SearchRequest(); + request.readFrom(in); + requests.add(request); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(requests.size()); + for (SearchRequest request : requests) { + request.writeTo(out); + } + } +} diff --git a/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java b/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java new file mode 100644 index 00000000000..acd60156f37 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java @@ -0,0 +1,57 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.BaseRequestBuilder; +import org.elasticsearch.client.Client; + +/** + * A request builder for multiple search requests. + */ +public class MultiSearchRequestBuilder extends BaseRequestBuilder { + + public MultiSearchRequestBuilder(Client client) { + super(client, new MultiSearchRequest()); + } + + /** + * Add a search request to execute. Note, the order is important, the search response will be returned in the + * same order as the search requests. + */ + public MultiSearchRequestBuilder add(SearchRequest request) { + super.request.add(request); + return this; + } + + /** + * Add a search request to execute. Note, the order is important, the search response will be returned in the + * same order as the search requests. + */ + public MultiSearchRequestBuilder add(SearchRequestBuilder request) { + super.request.add(request); + return this; + } + + @Override + protected void doExecute(ActionListener listener) { + client.multiSearch(request, listener); + } +} diff --git a/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java new file mode 100644 index 00000000000..bb1abea37c0 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -0,0 +1,169 @@ +package org.elasticsearch.action.search; + +import com.google.common.collect.Iterators; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; +import java.util.Iterator; + +/** + * A multi search response. + */ +public class MultiSearchResponse implements ActionResponse, Iterable, ToXContent { + + /** + * A search response item, holding the actual search response, or an error message if it failed. + */ + public static class Item implements Streamable { + private SearchResponse response; + private String failureMessage; + + Item() { + + } + + public Item(SearchResponse response, String failureMessage) { + this.response = response; + this.failureMessage = failureMessage; + } + + /** + * Is it a failed search? + */ + public boolean isFailure() { + return failureMessage != null; + } + + /** + * The actual failure message, null if its not a failure. + */ + @Nullable + public String failureMessage() { + return failureMessage; + } + + /** + * The actual failure message, null if its not a failure. + */ + @Nullable + public String getFailureMessage() { + return failureMessage; + } + + /** + * The actual search response, null if its a failure. + */ + @Nullable + public SearchResponse response() { + return this.response; + } + + /** + * The actual search response, null if its a failure. + */ + @Nullable + public SearchResponse getResponse() { + return this.response; + } + + public static Item readItem(StreamInput in) throws IOException { + Item item = new Item(); + item.readFrom(in); + return item; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + if (in.readBoolean()) { + this.response = new SearchResponse(); + response.readFrom(in); + } else { + failureMessage = in.readUTF(); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (response != null) { + out.writeBoolean(true); + response.writeTo(out); + } else { + out.writeUTF(failureMessage); + } + } + } + + private Item[] items; + + MultiSearchResponse() { + } + + public MultiSearchResponse(Item[] items) { + this.items = items; + } + + @Override + public Iterator iterator() { + return Iterators.forArray(items); + } + + /** + * The list of responses, the order is the same as the one provided in the request. + */ + public Item[] responses() { + return this.items; + } + + /** + * The list of responses, the order is the same as the one provided in the request. + */ + public Item[] getResponses() { + return this.items; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + items = new Item[in.readVInt()]; + for (int i = 0; i < items.length; i++) { + items[i] = Item.readItem(in); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(items.length); + for (Item item : items) { + item.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(Fields.RESPONSES); + for (Item item : items) { + if (item.isFailure()) { + builder.startObject(); + builder.field(Fields.ERROR, item.failureMessage()); + builder.endObject(); + } else { + builder.startObject(); + item.response().toXContent(builder, params); + builder.endObject(); + } + } + builder.endArray(); + return builder; + } + + static final class Fields { + static final XContentBuilderString RESPONSES = new XContentBuilderString("responses"); + static final XContentBuilderString ERROR = new XContentBuilderString("error"); + } +} diff --git a/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index a289471d3ac..0a9313363b5 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -714,6 +714,14 @@ public class SearchRequestBuilder extends BaseRequestBuilder listener) { if (sourceBuilder != null) { diff --git a/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java new file mode 100644 index 00000000000..f5e4f1ad897 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -0,0 +1,128 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BaseTransportRequestHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportService; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class TransportMultiSearchAction extends TransportAction { + + private final ClusterService clusterService; + + private final TransportSearchAction searchAction; + + @Inject + public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, TransportSearchAction searchAction) { + super(settings, threadPool); + this.clusterService = clusterService; + this.searchAction = searchAction; + + transportService.registerHandler(MultiSearchAction.NAME, new TransportHandler()); + } + + @Override + protected void doExecute(final MultiSearchRequest request, final ActionListener listener) { + ClusterState clusterState = clusterService.state(); + clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); + + final MultiSearchResponse.Item[] responses = new MultiSearchResponse.Item[request.requests().size()]; + final AtomicInteger counter = new AtomicInteger(responses.length); + for (int i = 0; i < responses.length; i++) { + final int index = i; + searchAction.execute(request.requests().get(i), new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + synchronized (responses) { + responses[index] = new MultiSearchResponse.Item(searchResponse, null); + } + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable e) { + synchronized (responses) { + responses[index] = new MultiSearchResponse.Item(null, ExceptionsHelper.detailedMessage(e)); + } + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + private void finishHim() { + listener.onResponse(new MultiSearchResponse(responses)); + } + }); + } + } + + class TransportHandler extends BaseTransportRequestHandler { + + @Override + public MultiSearchRequest newInstance() { + return new MultiSearchRequest(); + } + + @Override + public void messageReceived(final MultiSearchRequest request, final TransportChannel channel) throws Exception { + // no need to use threaded listener, since we just send a response + request.listenerThreaded(false); + execute(request, new ActionListener() { + @Override + public void onResponse(MultiSearchResponse response) { + try { + channel.sendResponse(response); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn("Failed to send error response for action [msearch] and request [" + request + "]", e1); + } + } + }); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } +} diff --git a/src/main/java/org/elasticsearch/client/Client.java b/src/main/java/org/elasticsearch/client/Client.java index 44d413c64a7..43462bbd761 100644 --- a/src/main/java/org/elasticsearch/client/Client.java +++ b/src/main/java/org/elasticsearch/client/Client.java @@ -374,6 +374,21 @@ public interface Client { */ SearchScrollRequestBuilder prepareSearchScroll(String scrollId); + /** + * Performs multiple search requests. + */ + ActionFuture multiSearch(MultiSearchRequest request); + + /** + * Performs multiple search requests. + */ + void multiSearch(MultiSearchRequest request, ActionListener listener); + + /** + * Performs multiple search requests. + */ + MultiSearchRequestBuilder prepareMultiSearch(); + /** * A more like this action to search for documents that are "like" a specific document. * diff --git a/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 41e06f01ce6..46a6e5653e9 100644 --- a/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -226,6 +226,21 @@ public abstract class AbstractClient implements InternalClient { return new SearchScrollRequestBuilder(this, scrollId); } + @Override + public ActionFuture multiSearch(MultiSearchRequest request) { + return execute(MultiSearchAction.INSTANCE, request); + } + + @Override + public void multiSearch(MultiSearchRequest request, ActionListener listener) { + execute(MultiSearchAction.INSTANCE, request, listener); + } + + @Override + public MultiSearchRequestBuilder prepareMultiSearch() { + return new MultiSearchRequestBuilder(this); + } + @Override public ActionFuture count(final CountRequest request) { return execute(CountAction.INSTANCE, request); diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index d11ea6f9c33..133a8f82896 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -39,9 +39,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.mlt.MoreLikeThisRequest; import org.elasticsearch.action.percolate.PercolateRequest; import org.elasticsearch.action.percolate.PercolateResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.*; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.AdminClient; @@ -357,6 +355,16 @@ public class TransportClient extends AbstractClient { internalClient.searchScroll(request, listener); } + @Override + public ActionFuture multiSearch(MultiSearchRequest request) { + return internalClient.multiSearch(request); + } + + @Override + public void multiSearch(MultiSearchRequest request, ActionListener listener) { + internalClient.multiSearch(request, listener); + } + @Override public ActionFuture moreLikeThis(MoreLikeThisRequest request) { return internalClient.moreLikeThis(request); diff --git a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index ed9e6905577..a211ee97d63 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -66,6 +66,7 @@ import org.elasticsearch.rest.action.index.RestIndexAction; import org.elasticsearch.rest.action.main.RestMainAction; import org.elasticsearch.rest.action.mlt.RestMoreLikeThisAction; import org.elasticsearch.rest.action.percolate.RestPercolateAction; +import org.elasticsearch.rest.action.search.RestMultiSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchScrollAction; import org.elasticsearch.rest.action.update.RestUpdateAction; @@ -142,6 +143,7 @@ public class RestActionModule extends AbstractModule { bind(RestSearchAction.class).asEagerSingleton(); bind(RestSearchScrollAction.class).asEagerSingleton(); + bind(RestMultiSearchAction.class).asEagerSingleton(); bind(RestValidateQueryAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java new file mode 100644 index 00000000000..f9d3af10e1d --- /dev/null +++ b/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -0,0 +1,99 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestActions; + +import java.io.IOException; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; +import static org.elasticsearch.rest.RestStatus.OK; +import static org.elasticsearch.rest.action.support.RestXContentBuilder.restContentBuilder; + +/** + */ +public class RestMultiSearchAction extends BaseRestHandler { + + @Inject + public RestMultiSearchAction(Settings settings, Client client, RestController controller) { + super(settings, client); + + controller.registerHandler(GET, "/_msearch", this); + controller.registerHandler(POST, "/_msearch", this); + controller.registerHandler(GET, "/{index}/_msearch", this); + controller.registerHandler(POST, "/{index}/_msearch", this); + controller.registerHandler(GET, "/{index}/{type}/_msearch", this); + controller.registerHandler(POST, "/{index}/{type}/_msearch", this); + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel) { + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + + String[] indices = RestActions.splitIndices(request.param("index")); + String[] types = RestActions.splitTypes(request.param("type")); + + try { + multiSearchRequest.add(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe(), indices, types); + } catch (Exception e) { + try { + XContentBuilder builder = restContentBuilder(request); + channel.sendResponse(new XContentRestResponse(request, BAD_REQUEST, builder.startObject().field("error", e.getMessage()).endObject())); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + return; + } + + client.multiSearch(multiSearchRequest, new ActionListener() { + @Override + public void onResponse(MultiSearchResponse response) { + try { + XContentBuilder builder = restContentBuilder(request); + builder.startObject(); + response.toXContent(builder, request); + builder.endObject(); + channel.sendResponse(new XContentRestResponse(request, OK, builder)); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } +} diff --git a/src/test/java/org/elasticsearch/test/integration/search/msearch/SimpleMultiSearchTests.java b/src/test/java/org/elasticsearch/test/integration/search/msearch/SimpleMultiSearchTests.java new file mode 100644 index 00000000000..17b29021c9c --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/search/msearch/SimpleMultiSearchTests.java @@ -0,0 +1,64 @@ +package org.elasticsearch.test.integration.search.msearch; + +import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +/** + */ +@Test +public class SimpleMultiSearchTests extends AbstractNodesTests { + + private Client client; + + @BeforeClass + public void createNodes() throws Exception { + Settings settings = settingsBuilder().put("number_of_shards", 3).put("number_of_replicas", 0).build(); + startNode("node1", settings); + startNode("node2", settings); + client = getClient(); + } + + @AfterClass + public void closeNodes() { + client.close(); + closeAllNodes(); + } + + protected Client getClient() { + return client("node1"); + } + + @Test + public void simpleMultiSearch() { + client.admin().indices().prepareDelete().execute().actionGet(); + + client.prepareIndex("test", "type", "1").setSource("field", "xxx").execute().actionGet(); + client.prepareIndex("test", "type", "2").setSource("field", "yyy").execute().actionGet(); + + client.admin().indices().prepareRefresh().execute().actionGet(); + + MultiSearchResponse response = client.prepareMultiSearch() + .add(client.prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "xxx"))) + .add(client.prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "yyy"))) + .add(client.prepareSearch("test").setQuery(QueryBuilders.matchAllQuery())) + .execute().actionGet(); + + assertThat(response.responses().length, equalTo(3)); + assertThat(response.responses()[0].response().hits().totalHits(), equalTo(1l)); + assertThat(response.responses()[1].response().hits().totalHits(), equalTo(1l)); + assertThat(response.responses()[2].response().hits().totalHits(), equalTo(2l)); + + assertThat(response.responses()[0].response().hits().getAt(0).id(), equalTo("1")); + assertThat(response.responses()[1].response().hits().getAt(0).id(), equalTo("2")); + } +}