Partial update without script

Allow the use of "doc" as the update source when a script is not
specified.  New fields are added, existing fields are overwritten, and
maps are merged recursively.
This commit is contained in:
Matt Weber 2012-06-16 19:51:44 -07:00 committed by Shay Banon
parent a4ad84b5e4
commit d6bc17fee5
6 changed files with 295 additions and 31 deletions

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
@ -333,6 +334,10 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return new BytesHolder(underlyingSource(), underlyingSourceOffset(), underlyingSourceLength()); return new BytesHolder(underlyingSource(), underlyingSourceOffset(), underlyingSourceLength());
} }
public Map<String, Object> underlyingSourceAsMap() {
return XContentHelper.convertToMap(underlyingSource(), underlyingSourceOffset(), underlyingSourceLength(), false).v2();
}
public byte[] underlyingSource() { public byte[] underlyingSource() {
if (sourceUnsafe) { if (sourceUnsafe) {
source(); source();

View File

@ -217,36 +217,58 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
} }
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef().bytes(), getResult.internalSourceRef().offset(), getResult.internalSourceRef().length(), true); Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef().bytes(), getResult.internalSourceRef().offset(), getResult.internalSourceRef().length(), true);
Map<String, Object> ctx = new HashMap<String, Object>(2); String operation = null;
ctx.put("_source", sourceAndContent.v2()); String timestamp = null;
try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams);
script.setNextVar("ctx", ctx);
script.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
} catch (Exception e) {
throw new ElasticSearchIllegalArgumentException("failed to execute script", e);
}
String operation = (String) ctx.get("op");
String timestamp = (String) ctx.get("_timestamp");
Long ttl = null; Long ttl = null;
Object fetchedTTL = ctx.get("_ttl"); Object fetchedTTL = null;
if (fetchedTTL != null) { final Map<String, Object> updatedSourceAsMap;
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}
final Map<String, Object> updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
final XContentType updateSourceContentType = sourceAndContent.v1(); final XContentType updateSourceContentType = sourceAndContent.v1();
// apply script to update the source
String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null; String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null;
String parent = getResult.fields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).value().toString() : null; String parent = getResult.fields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).value().toString() : null;
if (request.script() == null && request.doc() != null) {
IndexRequest indexRequest = request.doc();
updatedSourceAsMap = sourceAndContent.v2();
if (indexRequest.ttl() > 0) {
ttl = indexRequest.ttl();
}
timestamp = indexRequest.timestamp();
if (indexRequest.routing() != null) {
routing = indexRequest.routing();
}
if (indexRequest.parent() != null) {
parent = indexRequest.parent();
}
updateSource(updatedSourceAsMap, indexRequest.underlyingSourceAsMap());
} else {
Map<String, Object> ctx = new HashMap<String, Object>(2);
ctx.put("_source", sourceAndContent.v2());
try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams);
script.setNextVar("ctx", ctx);
script.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
} catch (Exception e) {
throw new ElasticSearchIllegalArgumentException("failed to execute script", e);
}
operation = (String) ctx.get("op");
timestamp = (String) ctx.get("_timestamp");
fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}
updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
}
// apply script to update the source
// No TTL has been given in the update script so we keep previous TTL value if there is one // No TTL has been given in the update script so we keep previous TTL value if there is one
if (ttl == null) { if (ttl == null) {
ttl = getResult.fields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).value() : null; ttl = getResult.fields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).value() : null;
@ -366,4 +388,24 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
return new GetResult(request.index(), request.type(), request.id(), version, true, sourceRequested ? sourceAsBytes : null, fields); return new GetResult(request.index(), request.type(), request.id(), version, true, sourceRequested ? sourceAsBytes : null, fields);
} }
/**
* Updates the source with the specified changes. Maps are updated recursively.
*/
private void updateSource(Map<String, Object> source, Map<String, Object> changes) {
for (Map.Entry<String, Object> changesEntry : changes.entrySet()) {
if (!source.containsKey(changesEntry.getKey())) {
// safe to copy, change does not exist in source
source.put(changesEntry.getKey(), changesEntry.getValue());
} else {
if (source.get(changesEntry.getKey()) instanceof Map && changesEntry.getValue() instanceof Map) {
// recursive merge maps
updateSource((Map<String, Object>) source.get(changesEntry.getKey()), (Map<String, Object>) changesEntry.getValue());
} else {
// update the field
source.put(changesEntry.getKey(), changesEntry.getValue());
}
}
}
}
} }

View File

@ -48,6 +48,7 @@ public class UpdateRequest extends InstanceShardOperationRequest {
@Nullable @Nullable
private String routing; private String routing;
@Nullable
String script; String script;
@Nullable @Nullable
String scriptLang; String scriptLang;
@ -67,6 +68,9 @@ public class UpdateRequest extends InstanceShardOperationRequest {
private IndexRequest upsertRequest; private IndexRequest upsertRequest;
@Nullable
private IndexRequest doc;
UpdateRequest() { UpdateRequest() {
} }
@ -86,8 +90,8 @@ public class UpdateRequest extends InstanceShardOperationRequest {
if (id == null) { if (id == null) {
validationException = addValidationError("id is missing", validationException); validationException = addValidationError("id is missing", validationException);
} }
if (script == null) { if (script == null && doc == null) {
validationException = addValidationError("script is missing", validationException); validationException = addValidationError("script or doc is missing", validationException);
} }
return validationException; return validationException;
} }
@ -345,6 +349,73 @@ public class UpdateRequest extends InstanceShardOperationRequest {
return this; return this;
} }
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequest doc(IndexRequest doc) {
this.doc = doc;
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequest doc(XContentBuilder source) {
safeDoc().source(source);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequest doc(Map source) {
safeDoc().source(source);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequest doc(Map source, XContentType contentType) {
safeDoc().source(source, contentType);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequest doc(String source) {
safeDoc().source(source);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequest doc(byte[] source) {
safeDoc().source(source);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequest doc(byte[] source, int offset, int length) {
safeDoc().source(source, offset, length);
return this;
}
public IndexRequest doc() {
return this.doc;
}
private IndexRequest safeDoc() {
if (doc == null) {
doc = new IndexRequest();
}
return doc;
}
/** /**
* Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException} * Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException}
* is thrown. * is thrown.
@ -442,6 +513,10 @@ public class UpdateRequest extends InstanceShardOperationRequest {
XContentBuilder builder = XContentFactory.contentBuilder(xContentType); XContentBuilder builder = XContentFactory.contentBuilder(xContentType);
builder.copyCurrentStructure(parser); builder.copyCurrentStructure(parser);
safeUpsertRequest().source(builder); safeUpsertRequest().source(builder);
} else if ("doc".equals(currentFieldName)) {
XContentBuilder docBuilder = XContentFactory.contentBuilder(xContentType);
docBuilder.copyCurrentStructure(parser);
safeDoc().source(docBuilder);
} }
} }
return this; return this;
@ -457,7 +532,9 @@ public class UpdateRequest extends InstanceShardOperationRequest {
if (in.readBoolean()) { if (in.readBoolean()) {
routing = in.readUTF(); routing = in.readUTF();
} }
script = in.readUTF(); if (in.readBoolean()) {
script = in.readUTF();
}
if (in.readBoolean()) { if (in.readBoolean()) {
scriptLang = in.readUTF(); scriptLang = in.readUTF();
} }
@ -467,6 +544,10 @@ public class UpdateRequest extends InstanceShardOperationRequest {
percolate = in.readUTF(); percolate = in.readUTF();
} }
refresh = in.readBoolean(); refresh = in.readBoolean();
if (in.readBoolean()) {
doc = new IndexRequest();
doc.readFrom(in);
}
int size = in.readInt(); int size = in.readInt();
if (size >= 0) { if (size >= 0) {
fields = new String[size]; fields = new String[size];
@ -493,7 +574,12 @@ public class UpdateRequest extends InstanceShardOperationRequest {
out.writeBoolean(true); out.writeBoolean(true);
out.writeUTF(routing); out.writeUTF(routing);
} }
out.writeUTF(script); if (script == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(script);
}
if (scriptLang == null) { if (scriptLang == null) {
out.writeBoolean(false); out.writeBoolean(false);
} else { } else {
@ -509,6 +595,16 @@ public class UpdateRequest extends InstanceShardOperationRequest {
out.writeUTF(percolate); out.writeUTF(percolate);
} }
out.writeBoolean(refresh); out.writeBoolean(refresh);
if (doc == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
// make sure the basics are set
doc.index(index);
doc.type(type);
doc.id(id);
doc.writeTo(out);
}
if (fields == null) { if (fields == null) {
out.writeInt(-1); out.writeInt(-1);
} else { } else {

View File

@ -183,6 +183,62 @@ public class UpdateRequestBuilder extends BaseRequestBuilder<UpdateRequest, Upda
return this; return this;
} }
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequestBuilder setDoc(IndexRequest indexRequest) {
request.doc(indexRequest);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequestBuilder setDoc(XContentBuilder source) {
request.doc(source);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequestBuilder setDoc(Map source) {
request.doc(source);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequestBuilder setDoc(Map source, XContentType contentType) {
request.doc(source, contentType);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequestBuilder setDoc(String source) {
request.doc(source);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequestBuilder setDoc(byte[] source) {
request.doc(source);
return this;
}
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequestBuilder setDoc(byte[] source, int offset, int length) {
request.doc(source, offset, length);
return this;
}
/** /**
* Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException} * Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException}
* is thrown. * is thrown.

View File

@ -101,6 +101,17 @@ public class RestUpdateAction extends BaseRestHandler {
upsertRequest.version(RestActions.parseVersion(request)); upsertRequest.version(RestActions.parseVersion(request));
upsertRequest.versionType(VersionType.fromString(request.param("version_type"), upsertRequest.versionType())); upsertRequest.versionType(VersionType.fromString(request.param("version_type"), upsertRequest.versionType()));
} }
IndexRequest doc = updateRequest.doc();
if (doc != null) {
doc.routing(request.param("routing"));
doc.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
doc.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
doc.ttl(request.paramAsTime("ttl", null).millis());
}
doc.version(RestActions.parseVersion(request));
doc.versionType(VersionType.fromString(request.param("version_type"), doc.versionType()));
}
} catch (Exception e) { } catch (Exception e) {
try { try {
channel.sendResponse(new XContentThrowableRestResponse(request, e)); channel.sendResponse(new XContentThrowableRestResponse(request, e));

View File

@ -36,6 +36,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.Map; import java.util.Map;
import java.util.HashMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@ -150,6 +151,15 @@ public class UpdateTests extends AbstractNodesTests {
upsertDoc = XContentHelper.convertToMap(request.upsertRequest().source(), true).v2(); upsertDoc = XContentHelper.convertToMap(request.upsertRequest().source(), true).v2();
assertThat(upsertDoc.get("field1").toString(), equalTo("value1")); assertThat(upsertDoc.get("field1").toString(), equalTo("value1"));
assertThat(((Map) upsertDoc.get("compound")).get("field2").toString(), equalTo("value2")); assertThat(((Map) upsertDoc.get("compound")).get("field2").toString(), equalTo("value2"));
// script with doc
request = new UpdateRequest("test", "type", "1");
request.source(XContentFactory.jsonBuilder().startObject()
.startObject("doc").field("field1", "value1").startObject("compound").field("field2", "value2").endObject().endObject()
.endObject());
Map<String, Object> doc = request.doc().underlyingSourceAsMap();
assertThat(doc.get("field1").toString(), equalTo("value1"));
assertThat(((Map) doc.get("compound")).get("field2").toString(), equalTo("value2"));
} }
@Test @Test
@ -272,5 +282,49 @@ public class UpdateTests extends AbstractNodesTests {
assertThat(updateResponse.getResult(), notNullValue()); assertThat(updateResponse.getResult(), notNullValue());
assertThat(updateResponse.getResult().sourceRef(), notNullValue()); assertThat(updateResponse.getResult().sourceRef(), notNullValue());
assertThat(updateResponse.getResult().field("field").value(), notNullValue()); assertThat(updateResponse.getResult().field("field").value(), notNullValue());
// check updates without script
// add new field
client.prepareIndex("test", "type1", "1").setSource("field", 1).execute().actionGet();
updateResponse = client.prepareUpdate("test", "type1", "1").setDoc(XContentFactory.jsonBuilder().startObject().field("field2", 2).endObject()).execute().actionGet();
for (int i = 0; i < 5; i++) {
getResponse = client.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("1"));
assertThat(getResponse.sourceAsMap().get("field2").toString(), equalTo("2"));
}
// change existing field
updateResponse = client.prepareUpdate("test", "type1", "1").setDoc(XContentFactory.jsonBuilder().startObject().field("field", 3).endObject()).execute().actionGet();
for (int i = 0; i < 5; i++) {
getResponse = client.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("3"));
assertThat(getResponse.sourceAsMap().get("field2").toString(), equalTo("2"));
}
// recursive map
Map<String, Object> testMap = new HashMap<String, Object>();
Map<String, Object> testMap2 = new HashMap<String, Object>();
Map<String, Object> testMap3 = new HashMap<String, Object>();
testMap3.put("commonkey", testMap);
testMap3.put("map3", 5);
testMap2.put("map2", 6);
testMap.put("commonkey", testMap2);
testMap.put("map1", 8);
client.prepareIndex("test", "type1", "1").setSource("map", testMap).execute().actionGet();
updateResponse = client.prepareUpdate("test", "type1", "1").setDoc(XContentFactory.jsonBuilder().startObject().field("map", testMap3).endObject()).execute().actionGet();
for (int i = 0; i < 5; i++) {
getResponse = client.prepareGet("test", "type1", "1").execute().actionGet();
Map map1 = (Map) getResponse.sourceAsMap().get("map");
assertThat(map1.size(), equalTo(3));
assertThat(map1.containsKey("map1"), equalTo(true));
assertThat(map1.containsKey("map3"), equalTo(true));
assertThat(map1.containsKey("commonkey"), equalTo(true));
Map map2 = (Map) map1.get("commonkey");
assertThat(map2.size(), equalTo(3));
assertThat(map2.containsKey("map1"), equalTo(true));
assertThat(map2.containsKey("map2"), equalTo(true));
assertThat(map2.containsKey("commonkey"), equalTo(true));
}
} }
} }