Rewrote the percolate existing doc api.
The percolate existing doc feature now reuses the get request instead of having a separate request body. Relates to #3380
This commit is contained in:
parent
a9dd3c9756
commit
3f6877ec2b
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.percolate;
|
|||
|
||||
import org.elasticsearch.ElasticSearchGenerationException;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
|
@ -46,11 +47,12 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
private String documentType;
|
||||
private String routing;
|
||||
private String preference;
|
||||
private GetRequest getRequest;
|
||||
|
||||
private BytesReference source;
|
||||
private boolean unsafe;
|
||||
|
||||
private BytesReference fetchedDoc;
|
||||
private BytesReference docSource;
|
||||
|
||||
// Used internally in order to compute tookInMillis, TransportBroadcastOperationAction itself doesn't allow
|
||||
// to hold it temporarily in an easy way
|
||||
|
@ -64,13 +66,13 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
this.documentType = documentType;
|
||||
}
|
||||
|
||||
public PercolateRequest(PercolateRequest request, BytesReference fetchedDoc) {
|
||||
public PercolateRequest(PercolateRequest request, BytesReference docSource) {
|
||||
super(request.indices());
|
||||
this.documentType = request.documentType();
|
||||
this.routing = request.routing();
|
||||
this.preference = request.preference();
|
||||
this.source = request.source;
|
||||
this.fetchedDoc = fetchedDoc;
|
||||
this.docSource = docSource;
|
||||
}
|
||||
|
||||
public String documentType() {
|
||||
|
@ -99,6 +101,14 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
return this;
|
||||
}
|
||||
|
||||
public GetRequest getRequest() {
|
||||
return getRequest;
|
||||
}
|
||||
|
||||
public void getRequest(GetRequest getRequest) {
|
||||
this.getRequest = getRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Before we fork on a local thread, make sure we copy over the bytes if they are unsafe
|
||||
*/
|
||||
|
@ -164,8 +174,8 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
return this;
|
||||
}
|
||||
|
||||
BytesReference fetchedDoc() {
|
||||
return fetchedDoc;
|
||||
BytesReference docSource() {
|
||||
return docSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -177,8 +187,11 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
if (documentType == null) {
|
||||
validationException = addValidationError("type is missing", validationException);
|
||||
}
|
||||
if (source == null) {
|
||||
validationException = addValidationError("source is missing", validationException);
|
||||
if (source == null && getRequest == null) {
|
||||
validationException = addValidationError("source or get is missing", validationException);
|
||||
}
|
||||
if (getRequest != null && getRequest.fields() != null) {
|
||||
validationException = addValidationError("get fields option isn't supported via percolate request", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
@ -186,21 +199,33 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
startTime = in.readVLong();
|
||||
documentType = in.readString();
|
||||
routing = in.readOptionalString();
|
||||
preference = in.readOptionalString();
|
||||
unsafe = false;
|
||||
source = in.readBytesReference();
|
||||
startTime = in.readVLong();
|
||||
docSource = in.readBytesReference();
|
||||
if (in.readBoolean()) {
|
||||
getRequest = new GetRequest(null);
|
||||
getRequest.readFrom(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVLong(startTime);
|
||||
out.writeString(documentType);
|
||||
out.writeOptionalString(routing);
|
||||
out.writeOptionalString(preference);
|
||||
out.writeBytesReference(source);
|
||||
out.writeVLong(startTime);
|
||||
out.writeBytesReference(docSource);
|
||||
if (getRequest != null) {
|
||||
out.writeBoolean(true);
|
||||
getRequest.writeTo(out);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.action.percolate;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.internal.InternalClient;
|
||||
|
@ -89,6 +90,15 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<Pe
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables percolating an existing document. Instead of specifying the source of the document to percolate, define
|
||||
* a get request that will fetch a document and use its source.
|
||||
*/
|
||||
public PercolateRequestBuilder setGetRequest(GetRequest getRequest) {
|
||||
request.getRequest(getRequest);
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequestBuilder setSource(PercolateSourceBuilder source) {
|
||||
sourceBuilder = source;
|
||||
return this;
|
||||
|
@ -144,11 +154,6 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<Pe
|
|||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequestBuilder setPercolateGet(PercolateSourceBuilder.GetBuilder getBuilder) {
|
||||
sourceBuilder().setGet(getBuilder);
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequestBuilder setPercolateQuery(QueryBuilder queryBuilder) {
|
||||
sourceBuilder().setQueryBuilder(queryBuilder);
|
||||
return this;
|
||||
|
|
|
@ -13,7 +13,7 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
|
||||
private String documentType;
|
||||
private BytesReference source;
|
||||
private BytesReference fetchedDoc;
|
||||
private BytesReference docSource;
|
||||
|
||||
public PercolateShardRequest() {
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
super(index, shardId, request);
|
||||
this.documentType = request.documentType();
|
||||
this.source = request.source();
|
||||
this.fetchedDoc = request.fetchedDoc();
|
||||
this.docSource = request.docSource();
|
||||
}
|
||||
|
||||
public String documentType() {
|
||||
|
@ -33,8 +33,8 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
return source;
|
||||
}
|
||||
|
||||
public BytesReference fetchedDoc() {
|
||||
return fetchedDoc;
|
||||
public BytesReference docSource() {
|
||||
return docSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -42,9 +42,7 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
super.readFrom(in);
|
||||
documentType = in.readString();
|
||||
source = in.readBytesReference();
|
||||
if (in.readBoolean()) {
|
||||
fetchedDoc = in.readBytesReference();
|
||||
}
|
||||
docSource = in.readBytesReference();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,12 +50,7 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
super.writeTo(out);
|
||||
out.writeString(documentType);
|
||||
out.writeBytesReference(source);
|
||||
if (fetchedDoc != null) {
|
||||
out.writeBoolean(true);
|
||||
out.writeBytesReference(fetchedDoc);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
out.writeBytesReference(docSource);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -35,13 +35,11 @@ import java.util.Map;
|
|||
public class PercolateSourceBuilder implements ToXContent {
|
||||
|
||||
private DocBuilder docBuilder;
|
||||
private GetBuilder getBuilder;
|
||||
private QueryBuilder queryBuilder;
|
||||
private FilterBuilder filterBuilder;
|
||||
|
||||
public DocBuilder percolateDocument() {
|
||||
if (docBuilder == null) {
|
||||
getBuilder = null;
|
||||
docBuilder = new DocBuilder();
|
||||
}
|
||||
return docBuilder;
|
||||
|
@ -55,22 +53,6 @@ public class PercolateSourceBuilder implements ToXContent {
|
|||
this.docBuilder = docBuilder;
|
||||
}
|
||||
|
||||
public GetBuilder percolateGet() {
|
||||
if (getBuilder == null) {
|
||||
docBuilder = null;
|
||||
getBuilder = new GetBuilder();
|
||||
}
|
||||
return getBuilder;
|
||||
}
|
||||
|
||||
public GetBuilder getGet() {
|
||||
return getBuilder;
|
||||
}
|
||||
|
||||
public void setGet(GetBuilder getBuilder) {
|
||||
this.getBuilder = getBuilder;
|
||||
}
|
||||
|
||||
public QueryBuilder getQueryBuilder() {
|
||||
return queryBuilder;
|
||||
}
|
||||
|
@ -103,9 +85,6 @@ public class PercolateSourceBuilder implements ToXContent {
|
|||
if (docBuilder != null) {
|
||||
docBuilder.toXContent(builder, params);
|
||||
}
|
||||
if (getBuilder != null) {
|
||||
getBuilder.toXContent(builder, params);
|
||||
}
|
||||
if (queryBuilder != null) {
|
||||
builder.field("query");
|
||||
queryBuilder.toXContent(builder, params);
|
||||
|
@ -172,89 +151,4 @@ public class PercolateSourceBuilder implements ToXContent {
|
|||
}
|
||||
}
|
||||
|
||||
public static GetBuilder getBuilder(String index, String type, String id) {
|
||||
return new GetBuilder().setIndex(index).setType(type).setId(id);
|
||||
}
|
||||
|
||||
public static class GetBuilder implements ToXContent {
|
||||
|
||||
private String index;
|
||||
private String type;
|
||||
private String id;
|
||||
private Long version;
|
||||
private String routing;
|
||||
private String preference;
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public GetBuilder setIndex(String index) {
|
||||
this.index = index;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public GetBuilder setType(String type) {
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public GetBuilder setId(String id) {
|
||||
this.id = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public GetBuilder setVersion(Long version) {
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getRouting() {
|
||||
return routing;
|
||||
}
|
||||
|
||||
public GetBuilder setRouting(String routing) {
|
||||
this.routing = routing;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getPreference() {
|
||||
return preference;
|
||||
}
|
||||
|
||||
public GetBuilder setPreference(String preference) {
|
||||
this.preference = preference;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject("get");
|
||||
builder.field("index", index);
|
||||
builder.field("type", type);
|
||||
builder.field("id", id);
|
||||
if (version != null) {
|
||||
builder.field("version", version);
|
||||
}
|
||||
if (routing != null) {
|
||||
builder.field("routing", routing);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,10 +20,8 @@
|
|||
package org.elasticsearch.action.percolate;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchParseException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.get.TransportGetAction;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
|
@ -36,21 +34,16 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.percolator.PercolateException;
|
||||
import org.elasticsearch.index.percolator.PercolatorService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -76,124 +69,31 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(PercolateRequest request, ActionListener<PercolateResponse> listener) {
|
||||
resolveGet(request, listener);
|
||||
}
|
||||
|
||||
// Add redirect here if a request ends up on a non data node? In the case when percolating an existing doc this
|
||||
// could be beneficial.
|
||||
void resolveGet(PercolateRequest originalRequest, ActionListener<PercolateResponse> listener) {
|
||||
originalRequest.startTime = System.currentTimeMillis();
|
||||
BytesReference body = originalRequest.source();
|
||||
Tuple<GetRequest, Long> tuple = null;
|
||||
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentFactory.xContent(body).createParser(body);
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token != XContentParser.Token.START_OBJECT) {
|
||||
throw new ElasticSearchParseException("percolate request didn't start with start object");
|
||||
}
|
||||
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
// we need to check the "doc" here, so the next token will be START_OBJECT which is
|
||||
// the actual document starting
|
||||
if ("doc".equals(currentFieldName)) {
|
||||
parser.close();
|
||||
super.doExecute(originalRequest, listener);
|
||||
protected void doExecute(final PercolateRequest request, final ActionListener<PercolateResponse> listener) {
|
||||
request.startTime = System.currentTimeMillis();
|
||||
if (request.getRequest() != null) {
|
||||
getAction.execute(request.getRequest(), new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetResponse getResponse) {
|
||||
if (!getResponse.isExists()) {
|
||||
onFailure(new DocumentMissingException(null, request.getRequest().type(), request.getRequest().id()));
|
||||
return;
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("get".equals(currentFieldName)) {
|
||||
tuple = createGetRequest(parser, originalRequest.indices()[0], originalRequest.documentType());
|
||||
break;
|
||||
} else {
|
||||
parser.skipChildren();
|
||||
}
|
||||
} else if (token == null) {
|
||||
break;
|
||||
} else {
|
||||
parser.skipChildren();
|
||||
}
|
||||
}
|
||||
|
||||
// docSource shouldn't be null
|
||||
assert tuple != null;
|
||||
executeGet(tuple, originalRequest, listener);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchParseException("failed to parse request", e);
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
}
|
||||
BytesReference docSource = getResponse.getSourceAsBytesRef();
|
||||
TransportPercolateAction.super.doExecute(new PercolateRequest(request, docSource), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
super.doExecute(request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
void executeGet(Tuple<GetRequest, Long> tuple, final PercolateRequest originalRequest, final ActionListener<PercolateResponse> listener) {
|
||||
final GetRequest getRequest = tuple.v1();
|
||||
final Long getVersion = tuple.v2();
|
||||
getAction.execute(tuple.v1(), new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetResponse getResponse) {
|
||||
if (!getResponse.isExists()) {
|
||||
onFailure(new DocumentMissingException(null, getRequest.type(), getRequest.id()));
|
||||
return;
|
||||
}
|
||||
|
||||
if (getVersion != null && getVersion != getResponse.getVersion()) {
|
||||
onFailure(new VersionConflictEngineException(null, getRequest.type(), getRequest.id(), getResponse.getVersion(), getVersion));
|
||||
return;
|
||||
}
|
||||
BytesReference fetchedSource = getResponse.getSourceAsBytesRef();
|
||||
TransportPercolateAction.super.doExecute(new PercolateRequest(originalRequest, fetchedSource), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Tuple<GetRequest, Long> createGetRequest(XContentParser parser, String index, String type) throws IOException {
|
||||
String getCurrentField = null;
|
||||
String getIndex = index;
|
||||
String getType = type;
|
||||
String getId = null;
|
||||
Long getVersion = null;
|
||||
String getRouting = null;
|
||||
String getPreference = "_local";
|
||||
|
||||
XContentParser.Token token;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
getCurrentField = parser.currentName();
|
||||
} else if (token.isValue()) {
|
||||
if ("index".equals(getCurrentField)) {
|
||||
getIndex = parser.text();
|
||||
} else if ("type".equals(getCurrentField)) {
|
||||
getType = parser.text();
|
||||
} else if ("id".equals(getCurrentField)) {
|
||||
getId = parser.text();
|
||||
} else if ("version".equals(getCurrentField)) {
|
||||
getVersion = parser.longValue();
|
||||
} else if ("routing".equals(getCurrentField)) {
|
||||
getRouting = parser.text();
|
||||
} else if ("preference".equals(getCurrentField)) {
|
||||
getPreference = parser.text();
|
||||
}
|
||||
}
|
||||
}
|
||||
return new Tuple<GetRequest, Long>(
|
||||
// We are on the network thread, so operationThreaded should be true
|
||||
new GetRequest(getIndex).preference(getPreference).operationThreaded(true).type(getType).id(getId).routing(getRouting),
|
||||
getVersion
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.PERCOLATE;
|
||||
|
|
|
@ -98,11 +98,11 @@ public class PercolatorService extends AbstractComponent {
|
|||
|
||||
ParsedDocument parsedDocument;
|
||||
Query query;
|
||||
if (request.fetchedDoc() != null) {
|
||||
parsedDocument = parseFetchedDoc(request.fetchedDoc(), percolateIndexService, request.documentType());
|
||||
if (request.docSource() != null && request.docSource().length() != 0) {
|
||||
parsedDocument = parseFetchedDoc(request.docSource(), percolateIndexService, request.documentType());
|
||||
query = parseQueryOrFilter(percolateIndexService, request.source());
|
||||
} else {
|
||||
Tuple<ParsedDocument, Query> parseResult = parsePercolate(percolateIndexService, indexShard, request.documentType(), request.source());
|
||||
Tuple<ParsedDocument, Query> parseResult = parsePercolate(percolateIndexService, request.documentType(), request.source());
|
||||
parsedDocument = parseResult.v1();
|
||||
query = parseResult.v2();
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ public class PercolatorService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private Tuple<ParsedDocument, Query> parsePercolate(IndexService documentIndexService, IndexShard indexShard, String type, BytesReference source) throws ElasticSearchException {
|
||||
private Tuple<ParsedDocument, Query> parsePercolate(IndexService documentIndexService, String type, BytesReference source) throws ElasticSearchException {
|
||||
Query query = null;
|
||||
ParsedDocument doc = null;
|
||||
XContentParser parser = null;
|
||||
|
@ -255,6 +255,10 @@ public class PercolatorService extends AbstractComponent {
|
|||
}
|
||||
|
||||
private Query parseQueryOrFilter(IndexService documentIndexService, BytesReference source) {
|
||||
if (source == null || source.length() == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Query query = null;
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
|
|
|
@ -21,16 +21,18 @@ package org.elasticsearch.rest.action.percolate;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.percolate.PercolateRequest;
|
||||
import org.elasticsearch.action.percolate.PercolateResponse;
|
||||
import org.elasticsearch.action.percolate.PercolateSourceBuilder;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
import org.elasticsearch.rest.action.support.RestXContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -49,20 +51,32 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
super(settings, client);
|
||||
controller.registerHandler(GET, "/{index}/{type}/_percolate", this);
|
||||
controller.registerHandler(POST, "/{index}/{type}/_percolate", this);
|
||||
controller.registerHandler(GET, "/{index}/{type}/{id}/_percolate", new RestPercolatingExistingDocumentAction());
|
||||
controller.registerHandler(GET, "/{index}/{type}/{id}/_percolate", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleRequest(RestRequest restRequest, RestChannel restChannel) {
|
||||
PercolateRequest percolateRequest = new PercolateRequest(restRequest.param("index"), restRequest.param("type"));
|
||||
public void handleRequest(final RestRequest restRequest, final RestChannel restChannel) {
|
||||
String index = restRequest.param("index");
|
||||
String type = restRequest.param("type");
|
||||
|
||||
PercolateRequest percolateRequest = new PercolateRequest(index, type);
|
||||
percolateRequest.routing(restRequest.param("routing"));
|
||||
percolateRequest.preference(restRequest.param("preference"));
|
||||
percolateRequest.source(restRequest.content(), restRequest.contentUnsafe());
|
||||
|
||||
executePercolateRequest(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
percolateRequest.routing(restRequest.param("routing"));
|
||||
percolateRequest.preference(restRequest.param("preference"));
|
||||
|
||||
GetRequest getRequest = new GetRequest(restRequest.param("get_index", index), restRequest.param("get_type", type),
|
||||
restRequest.param("id"));
|
||||
getRequest.routing(restRequest.param("get_routing"));
|
||||
getRequest.preference(restRequest.param("get_preference"));
|
||||
getRequest.refresh(restRequest.paramAsBoolean("refresh", getRequest.refresh()));
|
||||
getRequest.realtime(restRequest.paramAsBooleanOptional("realtime", null));
|
||||
getRequest.version(RestActions.parseVersion(restRequest));
|
||||
getRequest.versionType(VersionType.fromString(restRequest.param("version_type"), getRequest.versionType()));
|
||||
percolateRequest.getRequest(getRequest);
|
||||
|
||||
void executePercolateRequest(PercolateRequest percolateRequest, final RestRequest restRequest, final RestChannel restChannel) {
|
||||
// we just send a response, no need to fork
|
||||
percolateRequest.listenerThreaded(false);
|
||||
client.percolate(percolateRequest, new ActionListener<PercolateResponse>() {
|
||||
|
@ -116,29 +130,6 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
});
|
||||
}
|
||||
|
||||
class RestPercolatingExistingDocumentAction implements RestHandler {
|
||||
|
||||
@Override
|
||||
public void handleRequest(RestRequest restRequest, RestChannel restChannel) {
|
||||
String index = restRequest.param("index");
|
||||
String type = restRequest.param("type");
|
||||
PercolateRequest percolateRequest = new PercolateRequest(index, type);
|
||||
percolateRequest.routing(restRequest.param("routing"));
|
||||
percolateRequest.preference(restRequest.param("preference"));
|
||||
|
||||
PercolateSourceBuilder builder = new PercolateSourceBuilder();
|
||||
builder.percolateGet().setIndex(restRequest.param("get_index", index))
|
||||
.setType(restRequest.param("get_type", type))
|
||||
.setId(restRequest.param("id"))
|
||||
.setRouting(restRequest.param("get_routing"))
|
||||
.setPreference(restRequest.param("get_preference"));
|
||||
percolateRequest.source(builder);
|
||||
|
||||
executePercolateRequest(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards");
|
||||
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
|
||||
|
|
|
@ -25,12 +25,14 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
|||
import org.elasticsearch.action.count.CountResponse;
|
||||
import org.elasticsearch.action.percolate.PercolateResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.query.FilterBuilders;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
|
@ -38,7 +40,6 @@ import org.elasticsearch.test.integration.AbstractSharedClusterTest;
|
|||
import org.junit.Test;
|
||||
|
||||
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder;
|
||||
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.getBuilder;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.*;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.*;
|
||||
|
@ -113,6 +114,15 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
assertThat(searchResponse.getHits().totalHits(), equalTo(1L));
|
||||
assertThat(searchResponse.getHits().getAt(0).type(), equalTo("type"));
|
||||
assertThat(searchResponse.getHits().getAt(0).id(), equalTo("1"));
|
||||
|
||||
logger.info("--> Percolate non existing doc");
|
||||
try {
|
||||
client().preparePercolate("test", "type")
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("5"))
|
||||
.execute().actionGet();
|
||||
fail("Exception should have been thrown");
|
||||
} catch (DocumentMissingException e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -601,29 +611,37 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
|
||||
logger.info("--> Percolate existing doc with id 1");
|
||||
PercolateResponse response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "1"))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("1"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 2");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2"))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("2"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 3");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "3"))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("3"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(4));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "2", "3", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 4");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "4"))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("4"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(1));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContaining("4"));
|
||||
|
||||
|
@ -667,29 +685,37 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
|
||||
logger.info("--> Percolate existing doc with id 1");
|
||||
PercolateResponse response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "1").setRouting("4"))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("1").routing("4"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 2");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2").setRouting("3"))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("2").routing("3"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 3");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "3").setRouting("2"))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("3").routing("2"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(4));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "2", "3", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 4");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "4").setRouting("1"))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("4").routing("1"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(1));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContaining("4"));
|
||||
}
|
||||
|
@ -725,17 +751,19 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
|
||||
logger.info("--> Percolate existing doc with id 2 and version 1");
|
||||
PercolateResponse response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2").setVersion(1l))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("2").version(1l))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 2 and version 2");
|
||||
try {
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2").setVersion(2l))
|
||||
client().preparePercolate("test", "type")
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("2").version(2l))
|
||||
.execute().actionGet();
|
||||
fail("Error should have been throwed");
|
||||
fail("Error should have been thrown");
|
||||
} catch (VersionConflictEngineException e) {
|
||||
}
|
||||
|
||||
|
@ -744,8 +772,10 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
|
||||
logger.info("--> Percolate existing doc with id 2 and version 2");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2").setVersion(2l))
|
||||
.setGetRequest(Requests.getRequest("test").type("type").id("2").version(2l))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
assertThat(response.getSuccessfulShards(), equalTo(5));
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4"));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue