Added support for percolating an existing document.
The percolate an existing document feature executes an internal get operation to get the source of the document to percolate. All options for percolating an existing document: * `id` - The id of the document to percolate. * `type` - The type of the document to percolate. * `index` - The index to fetch the document to percolate from. * `routing` - The routing value to use to retrieve the document to percolate. * `preference` - Which shard to prefer (defaults to `_local`). * `version` - Enables a version check. If the fetched document's version isn't equal to the specified version then the request fails with a version conflict and the percolation request is aborted. All the option can be specified inside the `get` body part or via query string arguments. Internally the percolate api will issue a get request for fetching the`_source` of the document to percolate. For this feature to work the `_source` for documents to percolate need to be stored. Closes #3380
This commit is contained in:
parent
cc5998bf6d
commit
ebad9e57d4
|
@ -22,7 +22,7 @@ package org.elasticsearch.action.percolate;
|
|||
import org.elasticsearch.ElasticSearchGenerationException;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
|
||||
import org.elasticsearch.common.Required;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -41,12 +41,16 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
|
|||
*/
|
||||
public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest> {
|
||||
|
||||
public static final XContentType contentType = Requests.CONTENT_TYPE;
|
||||
|
||||
private String documentType;
|
||||
private String routing;
|
||||
private String preference;
|
||||
|
||||
private BytesReference documentSource;
|
||||
private boolean documentUnsafe;
|
||||
private BytesReference source;
|
||||
private boolean unsafe;
|
||||
|
||||
private BytesReference fetchedDoc;
|
||||
|
||||
// Used internally in order to compute tookInMillis, TransportBroadcastOperationAction itself doesn't allow
|
||||
// to hold it temporarily in an easy way
|
||||
|
@ -60,6 +64,15 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
this.documentType = documentType;
|
||||
}
|
||||
|
||||
public PercolateRequest(PercolateRequest request, BytesReference fetchedDoc) {
|
||||
super(request.indices());
|
||||
this.documentType = request.documentType();
|
||||
this.routing = request.routing();
|
||||
this.preference = request.preference();
|
||||
this.source = request.source;
|
||||
this.fetchedDoc = fetchedDoc;
|
||||
}
|
||||
|
||||
public String documentType() {
|
||||
return documentType;
|
||||
}
|
||||
|
@ -91,67 +104,70 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
*/
|
||||
@Override
|
||||
public void beforeLocalFork() {
|
||||
if (documentUnsafe) {
|
||||
documentSource = documentSource.copyBytesArray();
|
||||
documentUnsafe = false;
|
||||
if (unsafe) {
|
||||
source = source.copyBytesArray();
|
||||
unsafe = false;
|
||||
}
|
||||
}
|
||||
|
||||
public BytesReference documentSource() {
|
||||
return documentSource;
|
||||
public BytesReference source() {
|
||||
return source;
|
||||
}
|
||||
|
||||
@Required
|
||||
public PercolateRequest documentSource(Map document) throws ElasticSearchGenerationException {
|
||||
return documentSource(document, XContentType.SMILE);
|
||||
public PercolateRequest source(Map document) throws ElasticSearchGenerationException {
|
||||
return source(document, contentType);
|
||||
}
|
||||
|
||||
@Required
|
||||
public PercolateRequest documentSource(Map document, XContentType contentType) throws ElasticSearchGenerationException {
|
||||
public PercolateRequest source(Map document, XContentType contentType) throws ElasticSearchGenerationException {
|
||||
try {
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
|
||||
builder.map(document);
|
||||
return documentSource(builder);
|
||||
return source(builder);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchGenerationException("Failed to generate [" + document + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Required
|
||||
public PercolateRequest documentSource(String document) {
|
||||
this.documentSource = new BytesArray(document);
|
||||
this.documentUnsafe = false;
|
||||
public PercolateRequest source(String document) {
|
||||
this.source = new BytesArray(document);
|
||||
this.unsafe = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Required
|
||||
public PercolateRequest documentSource(XContentBuilder documentBuilder) {
|
||||
documentSource = documentBuilder.bytes();
|
||||
documentUnsafe = false;
|
||||
public PercolateRequest source(XContentBuilder documentBuilder) {
|
||||
source = documentBuilder.bytes();
|
||||
unsafe = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequest documentSource(byte[] document) {
|
||||
return documentSource(document, 0, document.length);
|
||||
public PercolateRequest source(byte[] document) {
|
||||
return source(document, 0, document.length);
|
||||
}
|
||||
|
||||
@Required
|
||||
public PercolateRequest documentSource(byte[] source, int offset, int length) {
|
||||
return documentSource(source, offset, length, false);
|
||||
public PercolateRequest source(byte[] source, int offset, int length) {
|
||||
return source(source, offset, length, false);
|
||||
}
|
||||
|
||||
@Required
|
||||
public PercolateRequest documentSource(byte[] source, int offset, int length, boolean unsafe) {
|
||||
return documentSource(new BytesArray(source, offset, length), unsafe);
|
||||
public PercolateRequest source(byte[] source, int offset, int length, boolean unsafe) {
|
||||
return source(new BytesArray(source, offset, length), unsafe);
|
||||
}
|
||||
|
||||
@Required
|
||||
public PercolateRequest documentSource(BytesReference source, boolean unsafe) {
|
||||
this.documentSource = source;
|
||||
this.documentUnsafe = unsafe;
|
||||
public PercolateRequest source(BytesReference source, boolean unsafe) {
|
||||
this.source = source;
|
||||
this.unsafe = unsafe;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequest source(PercolateSourceBuilder sourceBuilder) {
|
||||
this.source = sourceBuilder.buildAsBytes(contentType);
|
||||
this.unsafe = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
BytesReference fetchedDoc() {
|
||||
return fetchedDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = super.validate();
|
||||
|
@ -161,8 +177,8 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
if (documentType == null) {
|
||||
validationException = addValidationError("type is missing", validationException);
|
||||
}
|
||||
if (documentSource == null) {
|
||||
validationException = addValidationError("documentSource is missing", validationException);
|
||||
if (source == null) {
|
||||
validationException = addValidationError("source is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
@ -173,8 +189,8 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
documentType = in.readString();
|
||||
routing = in.readOptionalString();
|
||||
preference = in.readOptionalString();
|
||||
documentUnsafe = false;
|
||||
documentSource = in.readBytesReference();
|
||||
unsafe = false;
|
||||
source = in.readBytesReference();
|
||||
startTime = in.readVLong();
|
||||
}
|
||||
|
||||
|
@ -184,7 +200,7 @@ public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest
|
|||
out.writeString(documentType);
|
||||
out.writeOptionalString(routing);
|
||||
out.writeOptionalString(preference);
|
||||
out.writeBytesReference(documentSource);
|
||||
out.writeBytesReference(source);
|
||||
out.writeVLong(startTime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.FilterBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -35,6 +37,8 @@ import java.util.Map;
|
|||
*/
|
||||
public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<PercolateRequest, PercolateResponse, PercolateRequestBuilder> {
|
||||
|
||||
private PercolateSourceBuilder sourceBuilder;
|
||||
|
||||
public PercolateRequestBuilder(Client client, String index, String type) {
|
||||
super((InternalClient) client, new PercolateRequest(index, type));
|
||||
}
|
||||
|
@ -85,97 +89,88 @@ public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<Pe
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Index the Map as a JSON.
|
||||
*
|
||||
* @param source The map to index
|
||||
*/
|
||||
public PercolateRequestBuilder setSource(PercolateSourceBuilder source) {
|
||||
sourceBuilder = source;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequestBuilder setSource(Map<String, Object> source) {
|
||||
request.documentSource(source);
|
||||
request.source(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Index the Map as the provided content type.
|
||||
*
|
||||
* @param source The map to index
|
||||
*/
|
||||
public PercolateRequestBuilder setSource(Map<String, Object> source, XContentType contentType) {
|
||||
request.documentSource(source, contentType);
|
||||
request.source(source, contentType);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the document source to index.
|
||||
* <p/>
|
||||
* <p>Note, its preferable to either set it using {@link #setSource(org.elasticsearch.common.xcontent.XContentBuilder)}
|
||||
* or using the {@link #setSource(byte[])}.
|
||||
*/
|
||||
public PercolateRequestBuilder setSource(String source) {
|
||||
request.documentSource(source);
|
||||
request.source(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the content source to index.
|
||||
*/
|
||||
public PercolateRequestBuilder setSource(XContentBuilder sourceBuilder) {
|
||||
request.documentSource(sourceBuilder);
|
||||
request.source(sourceBuilder);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the document to index in bytes form.
|
||||
*/
|
||||
public PercolateRequestBuilder setSource(BytesReference source) {
|
||||
request.documentSource(source, false);
|
||||
request.source(source, false);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the document to index in bytes form.
|
||||
*/
|
||||
public PercolateRequestBuilder setSource(BytesReference source, boolean unsafe) {
|
||||
request.documentSource(source, unsafe);
|
||||
request.source(source, unsafe);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the document to index in bytes form.
|
||||
*/
|
||||
public PercolateRequestBuilder setSource(byte[] source) {
|
||||
request.documentSource(source);
|
||||
request.source(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the document to index in bytes form (assumed to be safe to be used from different
|
||||
* threads).
|
||||
*
|
||||
* @param source The source to index
|
||||
* @param offset The offset in the byte array
|
||||
* @param length The length of the data
|
||||
*/
|
||||
public PercolateRequestBuilder setSource(byte[] source, int offset, int length) {
|
||||
request.documentSource(source, offset, length);
|
||||
request.source(source, offset, length);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the document to index in bytes form.
|
||||
*
|
||||
* @param source The source to index
|
||||
* @param offset The offset in the byte array
|
||||
* @param length The length of the data
|
||||
* @param unsafe Is the byte array safe to be used form a different thread
|
||||
*/
|
||||
public PercolateRequestBuilder setSource(byte[] source, int offset, int length, boolean unsafe) {
|
||||
request.documentSource(source, offset, length, unsafe);
|
||||
request.source(source, offset, length, unsafe);
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequestBuilder setPercolateDoc(PercolateSourceBuilder.DocBuilder docBuilder) {
|
||||
sourceBuilder().setDoc(docBuilder);
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequestBuilder setPercolateGet(PercolateSourceBuilder.GetBuilder getBuilder) {
|
||||
sourceBuilder().setGet(getBuilder);
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequestBuilder setPercolateQuery(QueryBuilder queryBuilder) {
|
||||
sourceBuilder().setQueryBuilder(queryBuilder);
|
||||
return this;
|
||||
}
|
||||
|
||||
public PercolateRequestBuilder setPercolateFilter(FilterBuilder filterBuilder) {
|
||||
sourceBuilder().setFilterBuilder(filterBuilder);
|
||||
return this;
|
||||
}
|
||||
|
||||
private PercolateSourceBuilder sourceBuilder() {
|
||||
if (sourceBuilder == null) {
|
||||
sourceBuilder = new PercolateSourceBuilder();
|
||||
}
|
||||
return sourceBuilder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(ActionListener<PercolateResponse> listener) {
|
||||
if (sourceBuilder != null) {
|
||||
request.source(sourceBuilder);
|
||||
}
|
||||
((Client) client).percolate(request, listener);
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,8 @@ import java.io.IOException;
|
|||
public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
||||
|
||||
private String documentType;
|
||||
private BytesReference documentSource;
|
||||
private BytesReference source;
|
||||
private BytesReference fetchedDoc;
|
||||
|
||||
public PercolateShardRequest() {
|
||||
}
|
||||
|
@ -20,29 +21,43 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
|
|||
public PercolateShardRequest(String index, int shardId, PercolateRequest request) {
|
||||
super(index, shardId, request);
|
||||
this.documentType = request.documentType();
|
||||
this.documentSource = request.documentSource();
|
||||
this.source = request.source();
|
||||
this.fetchedDoc = request.fetchedDoc();
|
||||
}
|
||||
|
||||
public String documentType() {
|
||||
return documentType;
|
||||
}
|
||||
|
||||
public BytesReference documentSource() {
|
||||
return documentSource;
|
||||
public BytesReference source() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public BytesReference fetchedDoc() {
|
||||
return fetchedDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
documentType = in.readString();
|
||||
documentSource = in.readBytesReference();
|
||||
source = in.readBytesReference();
|
||||
if (in.readBoolean()) {
|
||||
fetchedDoc = in.readBytesReference();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(documentType);
|
||||
out.writeBytesReference(documentSource);
|
||||
out.writeBytesReference(source);
|
||||
if (fetchedDoc != null) {
|
||||
out.writeBoolean(true);
|
||||
out.writeBytesReference(fetchedDoc);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,260 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.percolate;
|
||||
|
||||
import org.elasticsearch.ElasticSearchGenerationException;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.index.query.FilterBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilderException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class PercolateSourceBuilder implements ToXContent {
|
||||
|
||||
private DocBuilder docBuilder;
|
||||
private GetBuilder getBuilder;
|
||||
private QueryBuilder queryBuilder;
|
||||
private FilterBuilder filterBuilder;
|
||||
|
||||
public DocBuilder percolateDocument() {
|
||||
if (docBuilder == null) {
|
||||
getBuilder = null;
|
||||
docBuilder = new DocBuilder();
|
||||
}
|
||||
return docBuilder;
|
||||
}
|
||||
|
||||
public DocBuilder getDoc() {
|
||||
return docBuilder;
|
||||
}
|
||||
|
||||
public void setDoc(DocBuilder docBuilder) {
|
||||
this.docBuilder = docBuilder;
|
||||
}
|
||||
|
||||
public GetBuilder percolateGet() {
|
||||
if (getBuilder == null) {
|
||||
docBuilder = null;
|
||||
getBuilder = new GetBuilder();
|
||||
}
|
||||
return getBuilder;
|
||||
}
|
||||
|
||||
public GetBuilder getGet() {
|
||||
return getBuilder;
|
||||
}
|
||||
|
||||
public void setGet(GetBuilder getBuilder) {
|
||||
this.getBuilder = getBuilder;
|
||||
}
|
||||
|
||||
public QueryBuilder getQueryBuilder() {
|
||||
return queryBuilder;
|
||||
}
|
||||
|
||||
public void setQueryBuilder(QueryBuilder queryBuilder) {
|
||||
this.queryBuilder = queryBuilder;
|
||||
}
|
||||
|
||||
public FilterBuilder getFilterBuilder() {
|
||||
return filterBuilder;
|
||||
}
|
||||
|
||||
public void setFilterBuilder(FilterBuilder filterBuilder) {
|
||||
this.filterBuilder = filterBuilder;
|
||||
}
|
||||
|
||||
public BytesReference buildAsBytes(XContentType contentType) throws SearchSourceBuilderException {
|
||||
try {
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
|
||||
toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
return builder.bytes();
|
||||
} catch (Exception e) {
|
||||
throw new SearchSourceBuilderException("Failed to build search source", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (docBuilder != null) {
|
||||
docBuilder.toXContent(builder, params);
|
||||
}
|
||||
if (getBuilder != null) {
|
||||
getBuilder.toXContent(builder, params);
|
||||
}
|
||||
if (queryBuilder != null) {
|
||||
builder.field("query");
|
||||
queryBuilder.toXContent(builder, params);
|
||||
}
|
||||
if (filterBuilder != null) {
|
||||
builder.field("filter");
|
||||
filterBuilder.toXContent(builder, params);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static DocBuilder docBuilder() {
|
||||
return new DocBuilder();
|
||||
}
|
||||
|
||||
public static class DocBuilder implements ToXContent {
|
||||
|
||||
private BytesReference doc;
|
||||
|
||||
public DocBuilder setDoc(BytesReference doc) {
|
||||
this.doc = doc;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DocBuilder setDoc(String doc) {
|
||||
this.doc = new BytesArray(doc);
|
||||
return this;
|
||||
}
|
||||
|
||||
public DocBuilder setDoc(XContentBuilder doc) {
|
||||
this.doc = doc.bytes();
|
||||
return this;
|
||||
}
|
||||
|
||||
public DocBuilder setDoc(Map doc) {
|
||||
return setDoc(doc, PercolateRequest.contentType);
|
||||
}
|
||||
|
||||
public DocBuilder setDoc(Map doc, XContentType contentType) {
|
||||
try {
|
||||
return setDoc(XContentFactory.contentBuilder(contentType).map(doc));
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchGenerationException("Failed to generate [" + doc + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
XContentType contentType = XContentFactory.xContentType(doc);
|
||||
if (contentType == builder.contentType()) {
|
||||
builder.rawField("doc", doc);
|
||||
} else {
|
||||
XContentParser parser = XContentFactory.xContent(contentType).createParser(doc);
|
||||
try {
|
||||
parser.nextToken();
|
||||
builder.field("doc");
|
||||
builder.copyCurrentStructure(parser);
|
||||
} finally {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
||||
public static GetBuilder getBuilder(String index, String type, String id) {
|
||||
return new GetBuilder().setIndex(index).setType(type).setId(id);
|
||||
}
|
||||
|
||||
public static class GetBuilder implements ToXContent {
|
||||
|
||||
private String index;
|
||||
private String type;
|
||||
private String id;
|
||||
private Long version;
|
||||
private String routing;
|
||||
private String preference;
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public GetBuilder setIndex(String index) {
|
||||
this.index = index;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public GetBuilder setType(String type) {
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public GetBuilder setId(String id) {
|
||||
this.id = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public GetBuilder setVersion(Long version) {
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getRouting() {
|
||||
return routing;
|
||||
}
|
||||
|
||||
public GetBuilder setRouting(String routing) {
|
||||
this.routing = routing;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getPreference() {
|
||||
return preference;
|
||||
}
|
||||
|
||||
public GetBuilder setPreference(String preference) {
|
||||
this.preference = preference;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject("get");
|
||||
builder.field("index", index);
|
||||
builder.field("type", type);
|
||||
builder.field("id", id);
|
||||
if (version != null) {
|
||||
builder.field("version", version);
|
||||
}
|
||||
if (routing != null) {
|
||||
builder.field("routing", routing);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -20,8 +20,12 @@
|
|||
package org.elasticsearch.action.percolate;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchParseException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.get.TransportGetAction;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
|
||||
|
@ -31,15 +35,22 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.percolator.PercolateException;
|
||||
import org.elasticsearch.index.percolator.PercolatorService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -53,18 +64,134 @@ import static com.google.common.collect.Lists.newArrayList;
|
|||
public class TransportPercolateAction extends TransportBroadcastOperationAction<PercolateRequest, PercolateResponse, PercolateShardRequest, PercolateShardResponse> {
|
||||
|
||||
private final PercolatorService percolatorService;
|
||||
private final TransportGetAction getAction;
|
||||
|
||||
@Inject
|
||||
public TransportPercolateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, PercolatorService percolatorService) {
|
||||
TransportService transportService, PercolatorService percolatorService,
|
||||
TransportGetAction getAction) {
|
||||
super(settings, threadPool, clusterService, transportService);
|
||||
this.percolatorService = percolatorService;
|
||||
this.getAction = getAction;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(PercolateRequest request, ActionListener<PercolateResponse> listener) {
|
||||
request.startTime = System.currentTimeMillis();
|
||||
super.doExecute(request, listener);
|
||||
resolveGet(request, listener);
|
||||
}
|
||||
|
||||
// Add redirect here if a request ends up on a non data node? In the case when percolating an existing doc this
|
||||
// could be beneficial.
|
||||
void resolveGet(PercolateRequest originalRequest, ActionListener<PercolateResponse> listener) {
|
||||
originalRequest.startTime = System.currentTimeMillis();
|
||||
BytesReference body = originalRequest.source();
|
||||
Tuple<GetRequest, Long> tuple = null;
|
||||
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentFactory.xContent(body).createParser(body);
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token != XContentParser.Token.START_OBJECT) {
|
||||
throw new ElasticSearchParseException("percolate request didn't start with start object");
|
||||
}
|
||||
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
// we need to check the "doc" here, so the next token will be START_OBJECT which is
|
||||
// the actual document starting
|
||||
if ("doc".equals(currentFieldName)) {
|
||||
parser.close();
|
||||
super.doExecute(originalRequest, listener);
|
||||
return;
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("get".equals(currentFieldName)) {
|
||||
tuple = createGetRequest(parser, originalRequest.indices()[0], originalRequest.documentType());
|
||||
break;
|
||||
} else {
|
||||
parser.skipChildren();
|
||||
}
|
||||
} else if (token == null) {
|
||||
break;
|
||||
} else {
|
||||
parser.skipChildren();
|
||||
}
|
||||
}
|
||||
|
||||
// docSource shouldn't be null
|
||||
assert tuple != null;
|
||||
executeGet(tuple, originalRequest, listener);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchParseException("failed to parse request", e);
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void executeGet(Tuple<GetRequest, Long> tuple, final PercolateRequest originalRequest, final ActionListener<PercolateResponse> listener) {
|
||||
final GetRequest getRequest = tuple.v1();
|
||||
final Long getVersion = tuple.v2();
|
||||
getAction.execute(tuple.v1(), new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetResponse getResponse) {
|
||||
if (!getResponse.isExists()) {
|
||||
onFailure(new DocumentMissingException(null, getRequest.type(), getRequest.id()));
|
||||
return;
|
||||
}
|
||||
|
||||
if (getVersion != null && getVersion != getResponse.getVersion()) {
|
||||
onFailure(new VersionConflictEngineException(null, getRequest.type(), getRequest.id(), getResponse.getVersion(), getVersion));
|
||||
return;
|
||||
}
|
||||
BytesReference fetchedSource = getResponse.getSourceAsBytesRef();
|
||||
TransportPercolateAction.super.doExecute(new PercolateRequest(originalRequest, fetchedSource), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Tuple<GetRequest, Long> createGetRequest(XContentParser parser, String index, String type) throws IOException {
|
||||
String getCurrentField = null;
|
||||
String getIndex = index;
|
||||
String getType = type;
|
||||
String getId = null;
|
||||
Long getVersion = null;
|
||||
String getRouting = null;
|
||||
String getPreference = "_local";
|
||||
|
||||
XContentParser.Token token;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
getCurrentField = parser.currentName();
|
||||
} else if (token.isValue()) {
|
||||
if ("index".equals(getCurrentField)) {
|
||||
getIndex = parser.text();
|
||||
} else if ("type".equals(getCurrentField)) {
|
||||
getType = parser.text();
|
||||
} else if ("id".equals(getCurrentField)) {
|
||||
getId = parser.text();
|
||||
} else if ("version".equals(getCurrentField)) {
|
||||
getVersion = parser.longValue();
|
||||
} else if ("routing".equals(getCurrentField)) {
|
||||
getRouting = parser.text();
|
||||
} else if ("preference".equals(getCurrentField)) {
|
||||
getPreference = parser.text();
|
||||
}
|
||||
}
|
||||
}
|
||||
return new Tuple<GetRequest, Long>(
|
||||
// We are on the network thread, so operationThreaded should be true
|
||||
new GetRequest(getIndex).preference(getPreference).operationThreaded(true).type(getType).id(getId).routing(getRouting),
|
||||
getVersion
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -68,6 +68,13 @@ public class XContentFactory {
|
|||
return new XContentBuilder(SmileXContent.smileXContent, os);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a content builder using YAML format ({@link org.elasticsearch.common.xcontent.XContentType#YAML}.
|
||||
*/
|
||||
public static XContentBuilder yamlBuilder() throws IOException {
|
||||
return contentBuilder(XContentType.SMILE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new yaml builder that will output the result into the provided output stream.
|
||||
*/
|
||||
|
|
|
@ -96,9 +96,17 @@ public class PercolatorService extends AbstractComponent {
|
|||
return new PercolateShardResponse(StringText.EMPTY_ARRAY, request.index(), request.shardId());
|
||||
}
|
||||
|
||||
Tuple<ParsedDocument, Query> parseResult = parsePercolate(percolateIndexService, request.documentType(), request.documentSource());
|
||||
ParsedDocument parsedDocument = parseResult.v1();
|
||||
Query query = parseResult.v2();
|
||||
ParsedDocument parsedDocument;
|
||||
Query query;
|
||||
if (request.fetchedDoc() != null) {
|
||||
parsedDocument = parseFetchedDoc(request.fetchedDoc(), percolateIndexService, request.documentType());
|
||||
query = parseQueryOrFilter(percolateIndexService, request.source());
|
||||
} else {
|
||||
Tuple<ParsedDocument, Query> parseResult = parsePercolate(percolateIndexService, indexShard, request.documentType(), request.source());
|
||||
parsedDocument = parseResult.v1();
|
||||
query = parseResult.v2();
|
||||
}
|
||||
|
||||
|
||||
// first, parse the source doc into a MemoryIndex
|
||||
final MemoryIndex memoryIndex = cache.get();
|
||||
|
@ -169,12 +177,12 @@ public class PercolatorService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
Tuple<ParsedDocument, Query> parsePercolate(IndexService documentIndexService, String type, BytesReference docSource) throws ElasticSearchException {
|
||||
private Tuple<ParsedDocument, Query> parsePercolate(IndexService documentIndexService, IndexShard indexShard, String type, BytesReference source) throws ElasticSearchException {
|
||||
Query query = null;
|
||||
ParsedDocument doc = null;
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentFactory.xContent(docSource).createParser(docSource);
|
||||
parser = XContentFactory.xContent(source).createParser(source);
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
|
@ -183,6 +191,10 @@ public class PercolatorService extends AbstractComponent {
|
|||
// we need to check the "doc" here, so the next token will be START_OBJECT which is
|
||||
// the actual document starting
|
||||
if ("doc".equals(currentFieldName)) {
|
||||
if (doc != null) {
|
||||
throw new ElasticSearchParseException("Either specify doc or get, not both");
|
||||
}
|
||||
|
||||
MapperService mapperService = documentIndexService.mapperService();
|
||||
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
|
||||
doc = docMapper.parse(source(parser).type(type).flyweight(true));
|
||||
|
@ -219,6 +231,67 @@ public class PercolatorService extends AbstractComponent {
|
|||
return new Tuple<ParsedDocument, Query>(doc, query);
|
||||
}
|
||||
|
||||
private ParsedDocument parseFetchedDoc(BytesReference fetchedDoc, IndexService documentIndexService, String type) {
|
||||
ParsedDocument doc = null;
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentFactory.xContent(fetchedDoc).createParser(fetchedDoc);
|
||||
MapperService mapperService = documentIndexService.mapperService();
|
||||
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
|
||||
doc = docMapper.parse(source(parser).type(type).flyweight(true));
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchParseException("failed to parse request", e);
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
|
||||
if (doc == null) {
|
||||
throw new ElasticSearchParseException("No doc to percolate in the request");
|
||||
}
|
||||
|
||||
return doc;
|
||||
}
|
||||
|
||||
private Query parseQueryOrFilter(IndexService documentIndexService, BytesReference source) {
|
||||
Query query = null;
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentFactory.xContent(source).createParser(source);
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("query".equals(currentFieldName)) {
|
||||
if (query != null) {
|
||||
throw new ElasticSearchParseException("Either specify query or filter, not both");
|
||||
}
|
||||
query = documentIndexService.queryParserService().parse(parser).query();
|
||||
} else if ("filter".equals(currentFieldName)) {
|
||||
if (query != null) {
|
||||
throw new ElasticSearchParseException("Either specify query or filter, not both");
|
||||
}
|
||||
Filter filter = documentIndexService.queryParserService().parseInnerFilter(parser);
|
||||
query = new XConstantScoreQuery(filter);
|
||||
}
|
||||
} else if (token == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchParseException("failed to parse request", e);
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
|
||||
return query;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
cache.close();
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.percolate.PercolateRequest;
|
||||
import org.elasticsearch.action.percolate.PercolateResponse;
|
||||
import org.elasticsearch.action.percolate.PercolateSourceBuilder;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -48,23 +49,27 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
super(settings, client);
|
||||
controller.registerHandler(GET, "/{index}/{type}/_percolate", this);
|
||||
controller.registerHandler(POST, "/{index}/{type}/_percolate", this);
|
||||
controller.registerHandler(GET, "/{index}/{type}/{id}/_percolate", new RestPercolatingExistingDocumentAction());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel) {
|
||||
PercolateRequest percolateRequest = new PercolateRequest(request.param("index"), request.param("type"));
|
||||
percolateRequest.routing(request.param("routing"));
|
||||
percolateRequest.preference(request.param("preference"));
|
||||
percolateRequest.listenerThreaded(false);
|
||||
percolateRequest.documentSource(request.content(), request.contentUnsafe());
|
||||
public void handleRequest(RestRequest restRequest, RestChannel restChannel) {
|
||||
PercolateRequest percolateRequest = new PercolateRequest(restRequest.param("index"), restRequest.param("type"));
|
||||
percolateRequest.routing(restRequest.param("routing"));
|
||||
percolateRequest.preference(restRequest.param("preference"));
|
||||
percolateRequest.source(restRequest.content(), restRequest.contentUnsafe());
|
||||
|
||||
executePercolateRequest(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
|
||||
void executePercolateRequest(PercolateRequest percolateRequest, final RestRequest restRequest, final RestChannel restChannel) {
|
||||
// we just send a response, no need to fork
|
||||
percolateRequest.listenerThreaded(false);
|
||||
client.percolate(percolateRequest, new ActionListener<PercolateResponse>() {
|
||||
@Override
|
||||
public void onResponse(PercolateResponse response) {
|
||||
try {
|
||||
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
|
||||
XContentBuilder builder = RestXContentBuilder.restContentBuilder(restRequest);
|
||||
builder.startObject();
|
||||
|
||||
builder.field(Fields.TOOK, response.getTookInMillis());
|
||||
|
@ -94,7 +99,7 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
|
||||
builder.endObject();
|
||||
|
||||
channel.sendResponse(new XContentRestResponse(request, OK, builder));
|
||||
restChannel.sendResponse(new XContentRestResponse(restRequest, OK, builder));
|
||||
} catch (Throwable e) {
|
||||
onFailure(e);
|
||||
}
|
||||
|
@ -103,7 +108,7 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
try {
|
||||
channel.sendResponse(new XContentThrowableRestResponse(request, e));
|
||||
restChannel.sendResponse(new XContentThrowableRestResponse(restRequest, e));
|
||||
} catch (IOException e1) {
|
||||
logger.error("Failed to send failure response", e1);
|
||||
}
|
||||
|
@ -111,6 +116,29 @@ public class RestPercolateAction extends BaseRestHandler {
|
|||
});
|
||||
}
|
||||
|
||||
class RestPercolatingExistingDocumentAction implements RestHandler {
|
||||
|
||||
@Override
|
||||
public void handleRequest(RestRequest restRequest, RestChannel restChannel) {
|
||||
String index = restRequest.param("index");
|
||||
String type = restRequest.param("type");
|
||||
PercolateRequest percolateRequest = new PercolateRequest(index, type);
|
||||
percolateRequest.routing(restRequest.param("routing"));
|
||||
percolateRequest.preference(restRequest.param("preference"));
|
||||
|
||||
PercolateSourceBuilder builder = new PercolateSourceBuilder();
|
||||
builder.percolateGet().setIndex(restRequest.param("get_index", index))
|
||||
.setType(restRequest.param("get_type", type))
|
||||
.setId(restRequest.param("id"))
|
||||
.setRouting(restRequest.param("get_routing"))
|
||||
.setPreference(restRequest.param("get_preference"));
|
||||
percolateRequest.source(builder);
|
||||
|
||||
executePercolateRequest(percolateRequest, restRequest, restChannel);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards");
|
||||
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.test.integration.cluster;
|
||||
|
||||
import org.elasticsearch.action.percolate.PercolateSourceBuilder;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -31,8 +32,9 @@ import org.elasticsearch.test.integration.AbstractNodesTests;
|
|||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
|
@ -84,7 +86,9 @@ public class NoMasterNodeTests extends AbstractNodesTests {
|
|||
}
|
||||
|
||||
try {
|
||||
node.client().preparePercolate("test", "type1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).execute().actionGet();
|
||||
PercolateSourceBuilder percolateSource = new PercolateSourceBuilder();
|
||||
percolateSource.percolateDocument().setDoc(new HashMap());
|
||||
node.client().preparePercolate("test", "type1").setSource(percolateSource).execute().actionGet();
|
||||
assert false;
|
||||
} catch (ClusterBlockException e) {
|
||||
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
|
||||
|
|
|
@ -31,13 +31,16 @@ import org.elasticsearch.common.settings.ImmutableSettings.Builder;
|
|||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.query.FilterBuilders;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder;
|
||||
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.getBuilder;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.*;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
|
@ -79,28 +82,28 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
|
||||
logger.info("--> Percolate doc with field1=b");
|
||||
PercolateResponse response = client().preparePercolate("test", "type")
|
||||
.setSource(jsonBuilder().startObject().startObject("doc").field("field1", "b").endObject().endObject())
|
||||
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", "b").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "4"));
|
||||
|
||||
logger.info("--> Percolate doc with field1=c");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setSource(jsonBuilder().startObject().startObject("doc").field("field1", "c").endObject().endObject())
|
||||
.setPercolateDoc(docBuilder().setDoc(yamlBuilder().startObject().field("field1", "c").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4"));
|
||||
|
||||
logger.info("--> Percolate doc with field1=b c");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setSource(jsonBuilder().startObject().startObject("doc").field("field1", "b c").endObject().endObject())
|
||||
.setPercolateDoc(docBuilder().setDoc(smileBuilder().startObject().field("field1", "b c").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(4));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "2", "3", "4"));
|
||||
|
||||
logger.info("--> Percolate doc with field1=d");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setSource(jsonBuilder().startObject().startObject("doc").field("field1", "d").endObject().endObject())
|
||||
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", "d").endObject()))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(1));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContaining("4"));
|
||||
|
@ -567,6 +570,186 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
|||
assertThat(percolateSumTime, greaterThan(0l));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPercolatingExistingDocs() throws Exception {
|
||||
client().admin().indices().prepareCreate("test").execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> Adding docs");
|
||||
client().prepareIndex("test", "type", "1").setSource("field1", "b").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "2").setSource("field1", "c").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "3").setSource("field1", "b c").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "4").setSource("field1", "d").execute().actionGet();
|
||||
|
||||
logger.info("--> register a queries");
|
||||
client().prepareIndex("test", "_percolator", "1")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "b")).field("a", "b").endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "2")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "c")).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "3")
|
||||
.setSource(jsonBuilder().startObject().field("query", boolQuery()
|
||||
.must(matchQuery("field1", "b"))
|
||||
.must(matchQuery("field1", "c"))
|
||||
).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "4")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
|
||||
.execute().actionGet();
|
||||
client().admin().indices().prepareRefresh("test").execute().actionGet();
|
||||
|
||||
logger.info("--> Percolate existing doc with id 1");
|
||||
PercolateResponse response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "1"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 2");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 3");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "3"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(4));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "2", "3", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 4");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "4"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(1));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContaining("4"));
|
||||
|
||||
logger.info("--> Search normals docs, percolate queries must not be included");
|
||||
SearchResponse searchResponse = client().prepareSearch("test").execute().actionGet();
|
||||
assertThat(searchResponse.getHits().totalHits(), equalTo(4L));
|
||||
assertThat(searchResponse.getHits().getAt(0).type(), equalTo("type"));
|
||||
assertThat(searchResponse.getHits().getAt(1).type(), equalTo("type"));
|
||||
assertThat(searchResponse.getHits().getAt(2).type(), equalTo("type"));
|
||||
assertThat(searchResponse.getHits().getAt(3).type(), equalTo("type"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPercolatingExistingDocs_routing() throws Exception {
|
||||
client().admin().indices().prepareCreate("test").execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> Adding docs");
|
||||
client().prepareIndex("test", "type", "1").setSource("field1", "b").setRouting("4").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "2").setSource("field1", "c").setRouting("3").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "3").setSource("field1", "b c").setRouting("2").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "4").setSource("field1", "d").setRouting("1").execute().actionGet();
|
||||
|
||||
logger.info("--> register a queries");
|
||||
client().prepareIndex("test", "_percolator", "1")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "b")).field("a", "b").endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "2")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "c")).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "3")
|
||||
.setSource(jsonBuilder().startObject().field("query", boolQuery()
|
||||
.must(matchQuery("field1", "b"))
|
||||
.must(matchQuery("field1", "c"))
|
||||
).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "4")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
|
||||
.execute().actionGet();
|
||||
client().admin().indices().prepareRefresh("test").execute().actionGet();
|
||||
|
||||
logger.info("--> Percolate existing doc with id 1");
|
||||
PercolateResponse response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "1").setRouting("4"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 2");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2").setRouting("3"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 3");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "3").setRouting("2"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(4));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "2", "3", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 4");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "4").setRouting("1"))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(1));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContaining("4"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPercolatingExistingDocs_versionCheck() throws Exception {
|
||||
client().admin().indices().prepareCreate("test").execute().actionGet();
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> Adding docs");
|
||||
client().prepareIndex("test", "type", "1").setSource("field1", "b").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "2").setSource("field1", "c").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "3").setSource("field1", "b c").execute().actionGet();
|
||||
client().prepareIndex("test", "type", "4").setSource("field1", "d").execute().actionGet();
|
||||
|
||||
logger.info("--> register a queries");
|
||||
client().prepareIndex("test", "_percolator", "1")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "b")).field("a", "b").endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "2")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "c")).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "3")
|
||||
.setSource(jsonBuilder().startObject().field("query", boolQuery()
|
||||
.must(matchQuery("field1", "b"))
|
||||
.must(matchQuery("field1", "c"))
|
||||
).endObject())
|
||||
.execute().actionGet();
|
||||
client().prepareIndex("test", "_percolator", "4")
|
||||
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
|
||||
.execute().actionGet();
|
||||
client().admin().indices().prepareRefresh("test").execute().actionGet();
|
||||
|
||||
logger.info("--> Percolate existing doc with id 2 and version 1");
|
||||
PercolateResponse response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2").setVersion(1l))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4"));
|
||||
|
||||
logger.info("--> Percolate existing doc with id 2 and version 2");
|
||||
try {
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2").setVersion(2l))
|
||||
.execute().actionGet();
|
||||
fail("Error should have been throwed");
|
||||
} catch (VersionConflictEngineException e) {
|
||||
}
|
||||
|
||||
logger.info("--> Index doc with id for the second time");
|
||||
client().prepareIndex("test", "type", "2").setSource("field1", "c").execute().actionGet();
|
||||
|
||||
logger.info("--> Percolate existing doc with id 2 and version 2");
|
||||
response = client().preparePercolate("test", "type")
|
||||
.setPercolateGet(getBuilder("test", "type", "2").setVersion(2l))
|
||||
.execute().actionGet();
|
||||
assertThat(response.getMatches(), arrayWithSize(2));
|
||||
assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4"));
|
||||
}
|
||||
|
||||
public static String[] convertFromTextArray(Text[] texts) {
|
||||
if (texts.length == 0) {
|
||||
return Strings.EMPTY_ARRAY;
|
||||
|
|
Loading…
Reference in New Issue