add percolation support to update action

This commit is contained in:
Benjamin Devèze 2012-01-14 18:22:23 +01:00
parent 0530115f62
commit 4b21cf2993
4 changed files with 93 additions and 3 deletions

View File

@ -208,20 +208,20 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
}
// TODO percolate?
// TODO: external version type, does it make sense here? does not seem like it...
if (operation == null || "index".equals(operation)) {
IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(source, sourceAndContent.v1())
.version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
.timestamp(timestamp).ttl(ttl);
.timestamp(timestamp).ttl(ttl)
.percolate(request.percolate());
indexRequest.operationThreaded(false);
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version());
update.matches(response.matches());
listener.onResponse(update);
}

View File

@ -51,6 +51,8 @@ public class UpdateRequest extends InstanceShardOperationRequest {
int retryOnConflict = 0;
private String percolate;
private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
@ -239,6 +241,20 @@ public class UpdateRequest extends InstanceShardOperationRequest {
return this.retryOnConflict;
}
/**
* Causes the update request document to be percolated. The parameter is the percolate query
* to use to reduce the percolated queries that are going to run against this doc. Can be
* set to <tt>*</tt> to indicate that all percolate queries should be run.
*/
public UpdateRequest percolate(String percolate) {
this.percolate = percolate;
return this;
}
public String percolate() {
return this.percolate;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@ -297,6 +313,9 @@ public class UpdateRequest extends InstanceShardOperationRequest {
}
scriptParams = in.readMap();
retryOnConflict = in.readVInt();
if (in.readBoolean()) {
percolate = in.readUTF();
}
}
@Override
@ -321,5 +340,11 @@ public class UpdateRequest extends InstanceShardOperationRequest {
}
out.writeMap(scriptParams);
out.writeVInt(retryOnConflict);
if (percolate == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(percolate);
}
}
}

View File

@ -19,11 +19,14 @@
package org.elasticsearch.action.update;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*/
@ -37,6 +40,8 @@ public class UpdateResponse implements ActionResponse {
private long version;
private List<String> matches;
public UpdateResponse() {
}
@ -104,12 +109,54 @@ public class UpdateResponse implements ActionResponse {
return version();
}
/**
* Returns the percolate queries matches. <tt>null</tt> if no percolation was requested.
*/
public List<String> matches() {
return this.matches;
}
/**
* Returns the percolate queries matches. <tt>null</tt> if no percolation was requested.
*/
public List<String> getMatches() {
return this.matches;
}
/**
* Internal.
*/
public void matches(List<String> matches) {
this.matches = matches;
}
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
id = in.readUTF();
type = in.readUTF();
version = in.readLong();
if (in.readBoolean()) {
int size = in.readVInt();
if (size == 0) {
matches = ImmutableList.of();
} else if (size == 1) {
matches = ImmutableList.of(in.readUTF());
} else if (size == 2) {
matches = ImmutableList.of(in.readUTF(), in.readUTF());
} else if (size == 3) {
matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF());
} else if (size == 4) {
matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF());
} else if (size == 5) {
matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF());
} else {
matches = new ArrayList<String>();
for (int i = 0; i < size; i++) {
matches.add(in.readUTF());
}
}
}
}
@Override
@ -118,5 +165,14 @@ public class UpdateResponse implements ActionResponse {
out.writeUTF(id);
out.writeUTF(type);
out.writeLong(version);
if (matches == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(matches.size());
for (String match : matches) {
out.writeUTF(match);
}
}
}
}

View File

@ -65,6 +65,7 @@ public class RestUpdateAction extends BaseRestHandler {
if (consistencyLevel != null) {
updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
updateRequest.percolate(request.param("percolate", null));
// we just send a response, no need to fork
updateRequest.listenerThreaded(false);
updateRequest.script(request.param("script"));
@ -114,6 +115,13 @@ public class RestUpdateAction extends BaseRestHandler {
.field(Fields._TYPE, response.type())
.field(Fields._ID, response.id())
.field(Fields._VERSION, response.version());
if (response.matches() != null) {
builder.startArray(Fields.MATCHES);
for (String match : response.matches()) {
builder.value(match);
}
builder.endArray();
}
builder.endObject();
RestStatus status = OK;
if (response.version() == 1) {
@ -142,5 +150,6 @@ public class RestUpdateAction extends BaseRestHandler {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString MATCHES = new XContentBuilderString("matches");
}
}