This commit is contained in:
Shay Banon 2012-06-27 21:50:36 +02:00
parent d6bc17fee5
commit 0aa0b9ef22
3 changed files with 40 additions and 66 deletions

View File

@ -216,7 +216,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
return; return;
} }
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef().bytes(), getResult.internalSourceRef().offset(), getResult.internalSourceRef().length(), true); Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef().bytes(), getResult.internalSourceRef().offset(), getResult.internalSourceRef().length(), true);
String operation = null; String operation = null;
String timestamp = null; String timestamp = null;
Long ttl = null; Long ttl = null;
@ -225,7 +225,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
final XContentType updateSourceContentType = sourceAndContent.v1(); final XContentType updateSourceContentType = sourceAndContent.v1();
String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null; String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null;
String parent = getResult.fields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).value().toString() : null; String parent = getResult.fields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).value().toString() : null;
if (request.script() == null && request.doc() != null) { if (request.script() == null && request.doc() != null) {
IndexRequest indexRequest = request.doc(); IndexRequest indexRequest = request.doc();
updatedSourceAsMap = sourceAndContent.v2(); updatedSourceAsMap = sourceAndContent.v2();
@ -239,11 +239,11 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
if (indexRequest.parent() != null) { if (indexRequest.parent() != null) {
parent = indexRequest.parent(); parent = indexRequest.parent();
} }
updateSource(updatedSourceAsMap, indexRequest.underlyingSourceAsMap()); XContentHelper.update(updatedSourceAsMap, indexRequest.underlyingSourceAsMap());
} else { } else {
Map<String, Object> ctx = new HashMap<String, Object>(2); Map<String, Object> ctx = new HashMap<String, Object>(2);
ctx.put("_source", sourceAndContent.v2()); ctx.put("_source", sourceAndContent.v2());
try { try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams); ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams);
script.setNextVar("ctx", ctx); script.setNextVar("ctx", ctx);
@ -264,10 +264,10 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
} }
} }
updatedSourceAsMap = (Map<String, Object>) ctx.get("_source"); updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
} }
// apply script to update the source // apply script to update the source
// No TTL has been given in the update script so we keep previous TTL value if there is one // No TTL has been given in the update script so we keep previous TTL value if there is one
if (ttl == null) { if (ttl == null) {
@ -388,24 +388,4 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
return new GetResult(request.index(), request.type(), request.id(), version, true, sourceRequested ? sourceAsBytes : null, fields); return new GetResult(request.index(), request.type(), request.id(), version, true, sourceRequested ? sourceAsBytes : null, fields);
} }
/**
* Updates the source with the specified changes. Maps are updated recursively.
*/
private void updateSource(Map<String, Object> source, Map<String, Object> changes) {
for (Map.Entry<String, Object> changesEntry : changes.entrySet()) {
if (!source.containsKey(changesEntry.getKey())) {
// safe to copy, change does not exist in source
source.put(changesEntry.getKey(), changesEntry.getValue());
} else {
if (source.get(changesEntry.getKey()) instanceof Map && changesEntry.getValue() instanceof Map) {
// recursive merge maps
updateSource((Map<String, Object>) source.get(changesEntry.getKey()), (Map<String, Object>) changesEntry.getValue());
} else {
// update the field
source.put(changesEntry.getKey(), changesEntry.getValue());
}
}
}
}
} }

View File

@ -70,7 +70,7 @@ public class UpdateRequest extends InstanceShardOperationRequest {
@Nullable @Nullable
private IndexRequest doc; private IndexRequest doc;
UpdateRequest() { UpdateRequest() {
} }
@ -377,8 +377,8 @@ public class UpdateRequest extends InstanceShardOperationRequest {
* Sets the doc to use for updates when a script is not specified. * Sets the doc to use for updates when a script is not specified.
*/ */
public UpdateRequest doc(Map source, XContentType contentType) { public UpdateRequest doc(Map source, XContentType contentType) {
safeDoc().source(source, contentType); safeDoc().source(source, contentType);
return this; return this;
} }
/** /**
@ -415,7 +415,7 @@ public class UpdateRequest extends InstanceShardOperationRequest {
} }
return doc; return doc;
} }
/** /**
* Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException} * Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException}
* is thrown. * is thrown.
@ -529,20 +529,12 @@ public class UpdateRequest extends InstanceShardOperationRequest {
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
type = in.readUTF(); type = in.readUTF();
id = in.readUTF(); id = in.readUTF();
if (in.readBoolean()) { routing = in.readOptionalUTF();
routing = in.readUTF(); script = in.readOptionalUTF();
} scriptLang = in.readOptionalUTF();
if (in.readBoolean()) {
script = in.readUTF();
}
if (in.readBoolean()) {
scriptLang = in.readUTF();
}
scriptParams = in.readMap(); scriptParams = in.readMap();
retryOnConflict = in.readVInt(); retryOnConflict = in.readVInt();
if (in.readBoolean()) { percolate = in.readOptionalUTF();
percolate = in.readUTF();
}
refresh = in.readBoolean(); refresh = in.readBoolean();
if (in.readBoolean()) { if (in.readBoolean()) {
doc = new IndexRequest(); doc = new IndexRequest();
@ -568,32 +560,12 @@ public class UpdateRequest extends InstanceShardOperationRequest {
out.writeByte(consistencyLevel.id()); out.writeByte(consistencyLevel.id());
out.writeUTF(type); out.writeUTF(type);
out.writeUTF(id); out.writeUTF(id);
if (routing == null) { out.writeOptionalUTF(routing);
out.writeBoolean(false); out.writeOptionalUTF(script);
} else { out.writeOptionalUTF(scriptLang);
out.writeBoolean(true);
out.writeUTF(routing);
}
if (script == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(script);
}
if (scriptLang == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(scriptLang);
}
out.writeMap(scriptParams); out.writeMap(scriptParams);
out.writeVInt(retryOnConflict); out.writeVInt(retryOnConflict);
if (percolate == null) { out.writeOptionalUTF(percolate);
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(percolate);
}
out.writeBoolean(refresh); out.writeBoolean(refresh);
if (doc == null) { if (doc == null) {
out.writeBoolean(false); out.writeBoolean(false);

View File

@ -37,6 +37,7 @@ import java.util.Map;
/** /**
* *
*/ */
@SuppressWarnings("unchecked")
public class XContentHelper { public class XContentHelper {
public static XContentParser createParser(byte[] data, int offset, int length) throws IOException { public static XContentParser createParser(byte[] data, int offset, int length) throws IOException {
@ -105,6 +106,27 @@ public class XContentHelper {
} }
} }
/**
* Updates the provided changes into the source. If the key exists in the changes, it overrides the one in source
* unless both are Maps, in which case it recuersively updated it.
*/
public static void update(Map<String, Object> source, Map<String, Object> changes) {
for (Map.Entry<String, Object> changesEntry : changes.entrySet()) {
if (!source.containsKey(changesEntry.getKey())) {
// safe to copy, change does not exist in source
source.put(changesEntry.getKey(), changesEntry.getValue());
} else {
if (source.get(changesEntry.getKey()) instanceof Map && changesEntry.getValue() instanceof Map) {
// recursive merge maps
update((Map<String, Object>) source.get(changesEntry.getKey()), (Map<String, Object>) changesEntry.getValue());
} else {
// update the field
source.put(changesEntry.getKey(), changesEntry.getValue());
}
}
}
}
/** /**
* Merges the defaults provided as the second parameter into the content of the first. Only does recursive merge * Merges the defaults provided as the second parameter into the content of the first. Only does recursive merge
* for inner maps. * for inner maps.