Update API: Allow to specify fields in the request to return updated fields, closes #1838.

This commit is contained in:
Shay Banon 2012-04-03 14:11:22 +03:00
parent 0cf0703a7b
commit cdfa87827a
6 changed files with 106 additions and 133 deletions

View File

@ -329,6 +329,10 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return source;
}
public BytesHolder underlyingSourceBytes() {
return new BytesHolder(underlyingSource(), underlyingSourceOffset(), underlyingSourceLength());
}
public byte[] underlyingSource() {
if (sourceUnsafe) {
source();

View File

@ -39,6 +39,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -153,41 +155,12 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
shardOperation(request, listener, 0);
}
protected Map<String, GetField> extractFieldsFromSource(final UpdateRequest request, final Map<String, Object> source) {
Map<String, GetField> fields = null;
if (request.fields() != null && request.fields().length > 0) {
SourceLookup sourceLookup = new SourceLookup();
sourceLookup.setNextSource(source);
for (String field : request.fields()) {
Object value = null;
if (field.equals("_source")) {
value = source;
} else {
value = sourceLookup.extractValue(field);
}
if (value != null) {
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
}
}
}
return fields;
}
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) throws ElasticSearchException {
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(request.shardId());
long getDate = System.currentTimeMillis();
GetResult getResult = indexShard.getService().get(request.type(), request.id(),
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME}, true);
// no doc, what to do, what to do...
@ -203,9 +176,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef().bytes(), getResult.internalSourceRef().offset(), getResult.internalSourceRef().length(), true);
Map<String, Object> source = sourceAndContent.v2();
Map<String, Object> ctx = new HashMap<String, Object>(2);
ctx.put("_source", source);
ctx.put("_source", sourceAndContent.v2());
try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams);
@ -228,7 +200,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}
source = (Map<String, Object>) ctx.get("_source");
final Map<String, Object> updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
final XContentType updateSourceContentType = sourceAndContent.v1();
// apply script to update the source
String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null;
@ -241,25 +214,24 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
}
// Extract fields from updated source if necessary
final Map<String, GetField> fields = extractFieldsFromSource(request, source);
// TODO: external version type, does it make sense here? does not seem like it...
if (operation == null || "index".equals(operation)) {
IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(source, sourceAndContent.v1())
final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType)
.version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
.timestamp(timestamp).ttl(ttl)
.percolate(request.percolate())
.refresh(request.refresh());
indexRequest.operationThreaded(false);
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesHolder updateSourceBytes = indexRequest.underlyingSourceBytes();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version());
update.matches(response.matches());
update.fields(fields);
update.getResult(extractGetResult(request, response.version(), updatedSourceAsMap, updateSourceContentType, updateSourceBytes));
listener.onResponse(update);
}
@ -288,7 +260,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
@Override
public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version());
update.fields(fields);
update.getResult(extractGetResult(request, response.version(), updatedSourceAsMap, updateSourceContentType, null));
listener.onResponse(update);
}
@ -311,11 +283,46 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
});
} else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(getResult.index(), getResult.type(), getResult.id(), getResult.version());
update.fields(fields);
update.getResult(extractGetResult(request, getResult.version(), updatedSourceAsMap, updateSourceContentType, null));
listener.onResponse(update);
} else {
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script);
listener.onResponse(new UpdateResponse(getResult.index(), getResult.type(), getResult.id(), getResult.version()));
}
}
@Nullable
protected GetResult extractGetResult(final UpdateRequest request, long version, final Map<String, Object> source, XContentType sourceContentType, @Nullable final BytesHolder sourceAsBytes) {
if (request.fields() == null || request.fields().length == 0) {
return null;
}
boolean sourceRequested = false;
Map<String, GetField> fields = null;
if (request.fields() != null && request.fields().length > 0) {
SourceLookup sourceLookup = new SourceLookup();
sourceLookup.setNextSource(source);
for (String field : request.fields()) {
if (field.equals("_source")) {
sourceRequested = true;
continue;
}
Object value = sourceLookup.extractValue(field);
if (value != null) {
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
}
}
}
// TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType)
return new GetResult(request.index(), request.type(), request.id(), version, true, sourceRequested ? sourceAsBytes : null, fields);
}
}

View File

@ -20,19 +20,14 @@
package org.elasticsearch.action.update;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.get.GetResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Maps.newHashMapWithExpectedSize;
import static org.elasticsearch.index.get.GetField.readGetField;
/**
*/
@ -48,7 +43,7 @@ public class UpdateResponse implements ActionResponse {
private List<String> matches;
private Map<String, GetField> fields;
private GetResult getResult;
public UpdateResponse() {
@ -131,25 +126,16 @@ public class UpdateResponse implements ActionResponse {
return this.matches;
}
/**
* Internal.
*/
public void fields(Map<String, GetField> fields) {
this.fields = fields;
void getResult(GetResult getResult) {
this.getResult = getResult;
}
/**
* Returns extracted fields from updated source. <tt>null</tt> if no field was requested.
*/
public Map<String, GetField> fields() {
return this.fields;
public GetResult getResult() {
return this.getResult;
}
/**
* Returns extracted fields from updated source. <tt>null</tt> if no field was requested.
*/
public Map<String, GetField> getFields() {
return this.fields;
public GetResult getGetResult() {
return this.getResult;
}
/**
@ -186,15 +172,8 @@ public class UpdateResponse implements ActionResponse {
}
}
}
int size = in.readVInt();
if (size == 0) {
fields = ImmutableMap.of();
} else {
fields = newHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
GetField field = readGetField(in);
fields.put(field.name(), field);
}
if (in.readBoolean()) {
getResult = GetResult.readGetResult(in);
}
}
@ -213,13 +192,11 @@ public class UpdateResponse implements ActionResponse {
out.writeUTF(match);
}
}
if (fields == null) {
out.writeVInt(0);
if (getResult == null) {
out.writeBoolean(false);
} else {
out.writeVInt(fields.size());
for (GetField field : fields.values()) {
field.writeTo(out);
}
out.writeBoolean(true);
getResult.writeTo(out);
}
}
}

View File

@ -67,7 +67,7 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
GetResult() {
}
GetResult(String index, String type, String id, long version, boolean exists, BytesHolder source, Map<String, GetField> fields) {
public GetResult(String index, String type, String id, long version, boolean exists, BytesHolder source, Map<String, GetField> fields) {
this.index = index;
this.type = type;
this.id = id;
@ -252,6 +252,35 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
static final XContentBuilderString FIELDS = new XContentBuilderString("fields");
}
public XContentBuilder toXContentEmbedded(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.EXISTS, exists);
if (source != null) {
RestXContentBuilder.restDocumentSource(source.bytes(), source.offset(), source.length(), builder, params);
}
if (fields != null && !fields.isEmpty()) {
builder.startObject(Fields.FIELDS);
for (GetField field : fields.values()) {
if (field.values().isEmpty()) {
continue;
}
if (field.values().size() == 1) {
builder.field(field.name(), field.values().get(0));
} else {
builder.field(field.name());
builder.startArray();
for (Object value : field.values()) {
builder.value(value);
}
builder.endArray();
}
}
builder.endObject();
}
return builder;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (!exists()) {
@ -270,30 +299,7 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
builder.field(Fields._VERSION, version);
}
builder.field(Fields.EXISTS, true);
if (source != null) {
RestXContentBuilder.restDocumentSource(source.bytes(), source.offset(), source.length(), builder, params);
}
if (fields != null && !fields.isEmpty()) {
builder.startObject(Fields.FIELDS);
for (GetField field : fields.values()) {
if (field.values().isEmpty()) {
continue;
}
if (field.values().size() == 1) {
builder.field(field.name(), field.values().get(0));
} else {
builder.field(field.name());
builder.startArray();
for (Object value : field.values()) {
builder.value(value);
}
builder.endArray();
}
}
builder.endObject();
}
toXContentEmbedded(builder, params);
builder.endObject();
}

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
@ -126,33 +125,12 @@ public class RestUpdateAction extends BaseRestHandler {
.field(Fields._ID, response.id())
.field(Fields._VERSION, response.version());
if (response.fields() != null) {
Map<String, GetField> fields = response.fields();
GetField sourceField = fields.get("_source");
if (sourceField != null) {
builder.field(Fields._SOURCE, sourceField.values().get(0));
fields.remove("_source");
}
if (fields.size() > 0) {
builder.startObject(Fields.FIELDS);
for (GetField field : fields.values()) {
if (field.values().isEmpty()) {
continue;
}
if (field.values().size() == 1) {
builder.field(field.name(), field.values().get(0));
} else {
builder.field(field.name());
builder.startArray();
for (Object value : field.values()) {
builder.value(value);
}
builder.endArray();
}
}
builder.endObject();
}
if (response.getResult() != null) {
builder.startObject(Fields.GET);
response.getResult().toXContentEmbedded(builder, request);
builder.endObject();
}
if (response.matches() != null) {
builder.startArray(Fields.MATCHES);
for (String match : response.matches()) {
@ -188,8 +166,7 @@ public class RestUpdateAction extends BaseRestHandler {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString _SOURCE = new XContentBuilderString("_source");
static final XContentBuilderString MATCHES = new XContentBuilderString("matches");
static final XContentBuilderString FIELDS = new XContentBuilderString("fields");
static final XContentBuilderString GET = new XContentBuilderString("get");
}
}

View File

@ -173,6 +173,8 @@ public class UpdateTests extends AbstractNodesTests {
// check fields parameter
client.prepareIndex("test", "type1", "1").setSource("field", 1).execute().actionGet();
updateResponse = client.prepareUpdate("test", "type1", "1").setScript("ctx._source.field += 1").setFields("_source", "field").execute().actionGet();
assertThat(updateResponse.fields().size(), equalTo(2));
assertThat(updateResponse.getResult(), notNullValue());
assertThat(updateResponse.getResult().sourceRef(), notNullValue());
assertThat(updateResponse.getResult().field("field").value(), notNullValue());
}
}