API: Multi Search, closes #1722.
This commit is contained in:
parent
94c526925b
commit
4a9cb6408c
|
@ -98,10 +98,7 @@ import org.elasticsearch.action.mlt.MoreLikeThisAction;
|
|||
import org.elasticsearch.action.mlt.TransportMoreLikeThisAction;
|
||||
import org.elasticsearch.action.percolate.PercolateAction;
|
||||
import org.elasticsearch.action.percolate.TransportPercolateAction;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchScrollAction;
|
||||
import org.elasticsearch.action.search.TransportSearchAction;
|
||||
import org.elasticsearch.action.search.TransportSearchScrollAction;
|
||||
import org.elasticsearch.action.search.*;
|
||||
import org.elasticsearch.action.search.type.*;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.action.update.TransportUpdateAction;
|
||||
|
@ -211,6 +208,7 @@ public class ActionModule extends AbstractModule {
|
|||
TransportSearchScrollQueryThenFetchAction.class,
|
||||
TransportSearchScrollQueryAndFetchAction.class
|
||||
);
|
||||
registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
|
||||
registerAction(MoreLikeThisAction.INSTANCE, TransportMoreLikeThisAction.class);
|
||||
registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
|
||||
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class MultiSearchAction extends Action<MultiSearchRequest, MultiSearchResponse, MultiSearchRequestBuilder> {
|
||||
|
||||
public static final MultiSearchAction INSTANCE = new MultiSearchAction();
|
||||
public static final String NAME = "msearch";
|
||||
|
||||
private MultiSearchAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiSearchResponse newResponse() {
|
||||
return new MultiSearchResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiSearchRequestBuilder newRequestBuilder(Client client) {
|
||||
return new MultiSearchRequestBuilder(client);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContent;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
/**
|
||||
* A multi search API request.
|
||||
*/
|
||||
public class MultiSearchRequest implements ActionRequest {
|
||||
|
||||
private List<SearchRequest> requests = Lists.newArrayList();
|
||||
|
||||
private boolean listenerThreaded = false;
|
||||
|
||||
/**
|
||||
* Add a search request to execute. Note, the order is important, the search response will be returned in the
|
||||
* same order as the search requests.
|
||||
*/
|
||||
public MultiSearchRequest add(SearchRequestBuilder request) {
|
||||
requests.add(request.request());
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a search request to execute. Note, the order is important, the search response will be returned in the
|
||||
* same order as the search requests.
|
||||
*/
|
||||
public MultiSearchRequest add(SearchRequest request) {
|
||||
requests.add(request);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MultiSearchRequest add(byte[] data, int from, int length, boolean contentUnsafe, @Nullable String[] indices, @Nullable String[] types) throws Exception {
|
||||
XContent xContent = XContentFactory.xContent(data, from, length);
|
||||
byte marker = xContent.streamSeparator();
|
||||
while (true) {
|
||||
int nextMarker = findNextMarker(marker, from, data, length);
|
||||
if (nextMarker == -1) {
|
||||
break;
|
||||
}
|
||||
// now parse the action
|
||||
XContentParser parser = xContent.createParser(data, from, nextMarker - from);
|
||||
|
||||
// move pointers
|
||||
from = nextMarker + 1;
|
||||
|
||||
// Move to START_OBJECT
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token == null) {
|
||||
continue;
|
||||
}
|
||||
assert token == XContentParser.Token.START_OBJECT;
|
||||
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indices);
|
||||
if (types != null && types.length > 0) {
|
||||
searchRequest.types(types);
|
||||
}
|
||||
String currentFieldName = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token.isValue()) {
|
||||
if ("index".equals(currentFieldName) || "indices".equals(currentFieldName)) {
|
||||
searchRequest.indices(Strings.splitStringByCommaToArray(parser.text()));
|
||||
} else if ("type".equals(currentFieldName) || "types".equals(currentFieldName)) {
|
||||
searchRequest.types(Strings.splitStringByCommaToArray(parser.text()));
|
||||
} else if ("search_type".equals(currentFieldName) || "searchType".equals(currentFieldName)) {
|
||||
searchRequest.searchType(parser.text());
|
||||
} else if ("preference".equals(currentFieldName)) {
|
||||
searchRequest.preference(parser.text());
|
||||
} else if ("routing".equals(currentFieldName)) {
|
||||
searchRequest.routing(parser.text());
|
||||
} else if ("query_hint".equals(currentFieldName) || "queryHint".equals(currentFieldName)) {
|
||||
searchRequest.queryHint(parser.text());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// now for the body
|
||||
nextMarker = findNextMarker(marker, from, data, length);
|
||||
if (nextMarker == -1) {
|
||||
break;
|
||||
}
|
||||
|
||||
searchRequest.source(data, from, nextMarker - from, contentUnsafe);
|
||||
// move pointers
|
||||
from = nextMarker + 1;
|
||||
|
||||
add(searchRequest);
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private int findNextMarker(byte marker, int from, byte[] data, int length) {
|
||||
for (int i = from; i < length; i++) {
|
||||
if (data[i] == marker) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
List<SearchRequest> requests() {
|
||||
return this.requests;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (requests.isEmpty()) {
|
||||
validationException = addValidationError("no requests added", validationException);
|
||||
}
|
||||
for (int i = 0; i < requests.size(); i++) {
|
||||
ActionRequestValidationException ex = requests.get(i).validate();
|
||||
if (ex != null) {
|
||||
if (validationException == null) {
|
||||
validationException = new ActionRequestValidationException();
|
||||
}
|
||||
validationException.addValidationErrors(ex.validationErrors());
|
||||
}
|
||||
}
|
||||
|
||||
return validationException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean listenerThreaded() {
|
||||
return listenerThreaded;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiSearchRequest listenerThreaded(boolean listenerThreaded) {
|
||||
this.listenerThreaded = listenerThreaded;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
int size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
SearchRequest request = new SearchRequest();
|
||||
request.readFrom(in);
|
||||
requests.add(request);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(requests.size());
|
||||
for (SearchRequest request : requests) {
|
||||
request.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.BaseRequestBuilder;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
||||
/**
|
||||
* A request builder for multiple search requests.
|
||||
*/
|
||||
public class MultiSearchRequestBuilder extends BaseRequestBuilder<MultiSearchRequest, MultiSearchResponse> {
|
||||
|
||||
public MultiSearchRequestBuilder(Client client) {
|
||||
super(client, new MultiSearchRequest());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a search request to execute. Note, the order is important, the search response will be returned in the
|
||||
* same order as the search requests.
|
||||
*/
|
||||
public MultiSearchRequestBuilder add(SearchRequest request) {
|
||||
super.request.add(request);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a search request to execute. Note, the order is important, the search response will be returned in the
|
||||
* same order as the search requests.
|
||||
*/
|
||||
public MultiSearchRequestBuilder add(SearchRequestBuilder request) {
|
||||
super.request.add(request);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(ActionListener<MultiSearchResponse> listener) {
|
||||
client.multiSearch(request, listener);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,169 @@
|
|||
package org.elasticsearch.action.search;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* A multi search response.
|
||||
*/
|
||||
public class MultiSearchResponse implements ActionResponse, Iterable<MultiSearchResponse.Item>, ToXContent {
|
||||
|
||||
/**
|
||||
* A search response item, holding the actual search response, or an error message if it failed.
|
||||
*/
|
||||
public static class Item implements Streamable {
|
||||
private SearchResponse response;
|
||||
private String failureMessage;
|
||||
|
||||
Item() {
|
||||
|
||||
}
|
||||
|
||||
public Item(SearchResponse response, String failureMessage) {
|
||||
this.response = response;
|
||||
this.failureMessage = failureMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is it a failed search?
|
||||
*/
|
||||
public boolean isFailure() {
|
||||
return failureMessage != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* The actual failure message, null if its not a failure.
|
||||
*/
|
||||
@Nullable
|
||||
public String failureMessage() {
|
||||
return failureMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* The actual failure message, null if its not a failure.
|
||||
*/
|
||||
@Nullable
|
||||
public String getFailureMessage() {
|
||||
return failureMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* The actual search response, null if its a failure.
|
||||
*/
|
||||
@Nullable
|
||||
public SearchResponse response() {
|
||||
return this.response;
|
||||
}
|
||||
|
||||
/**
|
||||
* The actual search response, null if its a failure.
|
||||
*/
|
||||
@Nullable
|
||||
public SearchResponse getResponse() {
|
||||
return this.response;
|
||||
}
|
||||
|
||||
public static Item readItem(StreamInput in) throws IOException {
|
||||
Item item = new Item();
|
||||
item.readFrom(in);
|
||||
return item;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
if (in.readBoolean()) {
|
||||
this.response = new SearchResponse();
|
||||
response.readFrom(in);
|
||||
} else {
|
||||
failureMessage = in.readUTF();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
if (response != null) {
|
||||
out.writeBoolean(true);
|
||||
response.writeTo(out);
|
||||
} else {
|
||||
out.writeUTF(failureMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Item[] items;
|
||||
|
||||
MultiSearchResponse() {
|
||||
}
|
||||
|
||||
public MultiSearchResponse(Item[] items) {
|
||||
this.items = items;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Item> iterator() {
|
||||
return Iterators.forArray(items);
|
||||
}
|
||||
|
||||
/**
|
||||
* The list of responses, the order is the same as the one provided in the request.
|
||||
*/
|
||||
public Item[] responses() {
|
||||
return this.items;
|
||||
}
|
||||
|
||||
/**
|
||||
* The list of responses, the order is the same as the one provided in the request.
|
||||
*/
|
||||
public Item[] getResponses() {
|
||||
return this.items;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
items = new Item[in.readVInt()];
|
||||
for (int i = 0; i < items.length; i++) {
|
||||
items[i] = Item.readItem(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(items.length);
|
||||
for (Item item : items) {
|
||||
item.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startArray(Fields.RESPONSES);
|
||||
for (Item item : items) {
|
||||
if (item.isFailure()) {
|
||||
builder.startObject();
|
||||
builder.field(Fields.ERROR, item.failureMessage());
|
||||
builder.endObject();
|
||||
} else {
|
||||
builder.startObject();
|
||||
item.response().toXContent(builder, params);
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString RESPONSES = new XContentBuilderString("responses");
|
||||
static final XContentBuilderString ERROR = new XContentBuilderString("error");
|
||||
}
|
||||
}
|
|
@ -714,6 +714,14 @@ public class SearchRequestBuilder extends BaseRequestBuilder<SearchRequest, Sear
|
|||
return internalBuilder().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchRequest request() {
|
||||
if (sourceBuilder != null) {
|
||||
request.source(sourceBuilder());
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(ActionListener<SearchResponse> listener) {
|
||||
if (sourceBuilder != null) {
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TransportMultiSearchAction extends TransportAction<MultiSearchRequest, MultiSearchResponse> {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final TransportSearchAction searchAction;
|
||||
|
||||
@Inject
|
||||
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, TransportSearchAction searchAction) {
|
||||
super(settings, threadPool);
|
||||
this.clusterService = clusterService;
|
||||
this.searchAction = searchAction;
|
||||
|
||||
transportService.registerHandler(MultiSearchAction.NAME, new TransportHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(final MultiSearchRequest request, final ActionListener<MultiSearchResponse> listener) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
|
||||
|
||||
final MultiSearchResponse.Item[] responses = new MultiSearchResponse.Item[request.requests().size()];
|
||||
final AtomicInteger counter = new AtomicInteger(responses.length);
|
||||
for (int i = 0; i < responses.length; i++) {
|
||||
final int index = i;
|
||||
searchAction.execute(request.requests().get(i), new ActionListener<SearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchResponse searchResponse) {
|
||||
synchronized (responses) {
|
||||
responses[index] = new MultiSearchResponse.Item(searchResponse, null);
|
||||
}
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
finishHim();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
synchronized (responses) {
|
||||
responses[index] = new MultiSearchResponse.Item(null, ExceptionsHelper.detailedMessage(e));
|
||||
}
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
finishHim();
|
||||
}
|
||||
}
|
||||
|
||||
private void finishHim() {
|
||||
listener.onResponse(new MultiSearchResponse(responses));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class TransportHandler extends BaseTransportRequestHandler<MultiSearchRequest> {
|
||||
|
||||
@Override
|
||||
public MultiSearchRequest newInstance() {
|
||||
return new MultiSearchRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(final MultiSearchRequest request, final TransportChannel channel) throws Exception {
|
||||
// no need to use threaded listener, since we just send a response
|
||||
request.listenerThreaded(false);
|
||||
execute(request, new ActionListener<MultiSearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(MultiSearchResponse response) {
|
||||
try {
|
||||
channel.sendResponse(response);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception e1) {
|
||||
logger.warn("Failed to send error response for action [msearch] and request [" + request + "]", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -374,6 +374,21 @@ public interface Client {
|
|||
*/
|
||||
SearchScrollRequestBuilder prepareSearchScroll(String scrollId);
|
||||
|
||||
/**
|
||||
* Performs multiple search requests.
|
||||
*/
|
||||
ActionFuture<MultiSearchResponse> multiSearch(MultiSearchRequest request);
|
||||
|
||||
/**
|
||||
* Performs multiple search requests.
|
||||
*/
|
||||
void multiSearch(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener);
|
||||
|
||||
/**
|
||||
* Performs multiple search requests.
|
||||
*/
|
||||
MultiSearchRequestBuilder prepareMultiSearch();
|
||||
|
||||
/**
|
||||
* A more like this action to search for documents that are "like" a specific document.
|
||||
*
|
||||
|
|
|
@ -226,6 +226,21 @@ public abstract class AbstractClient implements InternalClient {
|
|||
return new SearchScrollRequestBuilder(this, scrollId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionFuture<MultiSearchResponse> multiSearch(MultiSearchRequest request) {
|
||||
return execute(MultiSearchAction.INSTANCE, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void multiSearch(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
|
||||
execute(MultiSearchAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiSearchRequestBuilder prepareMultiSearch() {
|
||||
return new MultiSearchRequestBuilder(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionFuture<CountResponse> count(final CountRequest request) {
|
||||
return execute(CountAction.INSTANCE, request);
|
||||
|
|
|
@ -39,9 +39,7 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.action.mlt.MoreLikeThisRequest;
|
||||
import org.elasticsearch.action.percolate.PercolateRequest;
|
||||
import org.elasticsearch.action.percolate.PercolateResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchScrollRequest;
|
||||
import org.elasticsearch.action.search.*;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
|
@ -357,6 +355,16 @@ public class TransportClient extends AbstractClient {
|
|||
internalClient.searchScroll(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionFuture<MultiSearchResponse> multiSearch(MultiSearchRequest request) {
|
||||
return internalClient.multiSearch(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void multiSearch(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
|
||||
internalClient.multiSearch(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionFuture<SearchResponse> moreLikeThis(MoreLikeThisRequest request) {
|
||||
return internalClient.moreLikeThis(request);
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.elasticsearch.rest.action.index.RestIndexAction;
|
|||
import org.elasticsearch.rest.action.main.RestMainAction;
|
||||
import org.elasticsearch.rest.action.mlt.RestMoreLikeThisAction;
|
||||
import org.elasticsearch.rest.action.percolate.RestPercolateAction;
|
||||
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
|
||||
import org.elasticsearch.rest.action.search.RestSearchAction;
|
||||
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
|
||||
import org.elasticsearch.rest.action.update.RestUpdateAction;
|
||||
|
@ -142,6 +143,7 @@ public class RestActionModule extends AbstractModule {
|
|||
|
||||
bind(RestSearchAction.class).asEagerSingleton();
|
||||
bind(RestSearchScrollAction.class).asEagerSingleton();
|
||||
bind(RestMultiSearchAction.class).asEagerSingleton();
|
||||
|
||||
bind(RestValidateQueryAction.class).asEagerSingleton();
|
||||
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.rest.action.search;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.search.MultiSearchRequest;
|
||||
import org.elasticsearch.action.search.MultiSearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.rest.RestRequest.Method.GET;
|
||||
import static org.elasticsearch.rest.RestRequest.Method.POST;
|
||||
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
|
||||
import static org.elasticsearch.rest.RestStatus.OK;
|
||||
import static org.elasticsearch.rest.action.support.RestXContentBuilder.restContentBuilder;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class RestMultiSearchAction extends BaseRestHandler {
|
||||
|
||||
@Inject
|
||||
public RestMultiSearchAction(Settings settings, Client client, RestController controller) {
|
||||
super(settings, client);
|
||||
|
||||
controller.registerHandler(GET, "/_msearch", this);
|
||||
controller.registerHandler(POST, "/_msearch", this);
|
||||
controller.registerHandler(GET, "/{index}/_msearch", this);
|
||||
controller.registerHandler(POST, "/{index}/_msearch", this);
|
||||
controller.registerHandler(GET, "/{index}/{type}/_msearch", this);
|
||||
controller.registerHandler(POST, "/{index}/{type}/_msearch", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel) {
|
||||
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
|
||||
|
||||
String[] indices = RestActions.splitIndices(request.param("index"));
|
||||
String[] types = RestActions.splitTypes(request.param("type"));
|
||||
|
||||
try {
|
||||
multiSearchRequest.add(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe(), indices, types);
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
XContentBuilder builder = restContentBuilder(request);
|
||||
channel.sendResponse(new XContentRestResponse(request, BAD_REQUEST, builder.startObject().field("error", e.getMessage()).endObject()));
|
||||
} catch (IOException e1) {
|
||||
logger.error("Failed to send failure response", e1);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
client.multiSearch(multiSearchRequest, new ActionListener<MultiSearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(MultiSearchResponse response) {
|
||||
try {
|
||||
XContentBuilder builder = restContentBuilder(request);
|
||||
builder.startObject();
|
||||
response.toXContent(builder, request);
|
||||
builder.endObject();
|
||||
channel.sendResponse(new XContentRestResponse(request, OK, builder));
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
try {
|
||||
channel.sendResponse(new XContentThrowableRestResponse(request, e));
|
||||
} catch (IOException e1) {
|
||||
logger.error("Failed to send failure response", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
package org.elasticsearch.test.integration.search.msearch;
|
||||
|
||||
import org.elasticsearch.action.search.MultiSearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
*/
|
||||
@Test
|
||||
public class SimpleMultiSearchTests extends AbstractNodesTests {
|
||||
|
||||
private Client client;
|
||||
|
||||
@BeforeClass
|
||||
public void createNodes() throws Exception {
|
||||
Settings settings = settingsBuilder().put("number_of_shards", 3).put("number_of_replicas", 0).build();
|
||||
startNode("node1", settings);
|
||||
startNode("node2", settings);
|
||||
client = getClient();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void closeNodes() {
|
||||
client.close();
|
||||
closeAllNodes();
|
||||
}
|
||||
|
||||
protected Client getClient() {
|
||||
return client("node1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simpleMultiSearch() {
|
||||
client.admin().indices().prepareDelete().execute().actionGet();
|
||||
|
||||
client.prepareIndex("test", "type", "1").setSource("field", "xxx").execute().actionGet();
|
||||
client.prepareIndex("test", "type", "2").setSource("field", "yyy").execute().actionGet();
|
||||
|
||||
client.admin().indices().prepareRefresh().execute().actionGet();
|
||||
|
||||
MultiSearchResponse response = client.prepareMultiSearch()
|
||||
.add(client.prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "xxx")))
|
||||
.add(client.prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "yyy")))
|
||||
.add(client.prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()))
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.responses().length, equalTo(3));
|
||||
assertThat(response.responses()[0].response().hits().totalHits(), equalTo(1l));
|
||||
assertThat(response.responses()[1].response().hits().totalHits(), equalTo(1l));
|
||||
assertThat(response.responses()[2].response().hits().totalHits(), equalTo(2l));
|
||||
|
||||
assertThat(response.responses()[0].response().hits().getAt(0).id(), equalTo("1"));
|
||||
assertThat(response.responses()[1].response().hits().getAt(0).id(), equalTo("2"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue