better/faster parsing of update request (with upsert)

This commit is contained in:
Shay Banon 2012-06-13 13:12:37 +02:00
parent 0b4fe4add3
commit 6eb419649a
5 changed files with 149 additions and 35 deletions

View File

@ -30,6 +30,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
@ -164,6 +166,14 @@ public class UpdateRequest extends InstanceShardOperationRequest {
return this.shardId;
}
public String script() {
return this.script;
}
public Map<String, Object> scriptParams() {
return this.scriptParams;
}
/**
* The script to execute. Note, make sure not to send different script each times and instead
* use script params if possible with the same (automatically compiled) script.
@ -403,6 +413,40 @@ public class UpdateRequest extends InstanceShardOperationRequest {
return upsertRequest;
}
public UpdateRequest source(XContentBuilder source) throws Exception {
return source(source.underlyingBytes(), 0, source.underlyingBytesLength());
}
public UpdateRequest source(byte[] source) throws Exception {
return source(source, 0, source.length);
}
public UpdateRequest source(byte[] source, int offset, int length) throws Exception {
XContentType xContentType = XContentFactory.xContentType(source, offset, length);
XContentParser parser = XContentFactory.xContent(xContentType).createParser(source, offset, length);
XContentParser.Token t = parser.nextToken();
if (t == null) {
return this;
}
String currentFieldName = null;
while ((t = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (t == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if ("script".equals(currentFieldName)) {
script = parser.textOrNull();
} else if ("params".equals(currentFieldName)) {
scriptParams = parser.map();
} else if ("lang".equals(currentFieldName)) {
scriptLang = parser.text();
} else if ("upsert".equals(currentFieldName)) {
XContentBuilder builder = XContentFactory.contentBuilder(xContentType);
builder.copyCurrentStructure(parser);
safeUpsertRequest().source(builder);
}
}
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -240,6 +240,21 @@ public class UpdateRequestBuilder extends BaseRequestBuilder<UpdateRequest, Upda
return this;
}
public UpdateRequestBuilder setSource(XContentBuilder source) throws Exception {
request.source(source);
return this;
}
public UpdateRequestBuilder setSource(byte[] source) throws Exception {
request.source(source);
return this;
}
public UpdateRequestBuilder setSource(byte[] source, int offset, int length) throws Exception {
request.source(source, offset, length);
return this;
}
@Override
protected void doExecute(ActionListener<UpdateResponse> listener) {
client.update(request, listener);

View File

@ -51,6 +51,10 @@ public class XContentHelper {
}
}
public static Tuple<XContentType, Map<String, Object>> convertToMap(byte[] data, boolean ordered) throws ElasticSearchParseException {
return convertToMap(data, 0, data.length, ordered);
}
public static Tuple<XContentType, Map<String, Object>> convertToMap(byte[] data, int offset, int length, boolean ordered) throws ElasticSearchParseException {
try {
XContentParser parser;

View File

@ -31,8 +31,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
@ -90,41 +88,26 @@ public class RestUpdateAction extends BaseRestHandler {
// see if we have it in the body
if (request.hasContent()) {
XContentType xContentType = XContentFactory.xContentType(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
if (xContentType != null) {
try {
Map<String, Object> content = XContentFactory.xContent(xContentType)
.createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapOrderedAndClose();
if (content.containsKey("script")) {
updateRequest.script(content.get("script").toString());
try {
updateRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.routing(request.param("routing"));
upsertRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
upsertRequest.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
upsertRequest.ttl(request.paramAsTime("ttl", null).millis());
}
if (content.containsKey("lang")) {
updateRequest.scriptLang(content.get("lang").toString());
}
if (content.containsKey("params")) {
updateRequest.scriptParams((Map<String, Object>) content.get("params"));
}
if (content.containsKey("upsert")) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.source((Map) content.get("upsert"), xContentType);
indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
indexRequest.ttl(request.paramAsTime("ttl", null).millis());
}
indexRequest.version(RestActions.parseVersion(request));
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
updateRequest.upsert(indexRequest);
}
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.warn("Failed to send response", e1);
}
return;
upsertRequest.version(RestActions.parseVersion(request));
upsertRequest.versionType(VersionType.fromString(request.param("version_type"), upsertRequest.versionType()));
}
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.warn("Failed to send response", e1);
}
return;
}
}

View File

@ -22,17 +22,21 @@ package org.elasticsearch.test.integration.update;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.hamcrest.MatcherAssert.assertThat;
@ -84,6 +88,70 @@ public class UpdateTests extends AbstractNodesTests {
return client("node1");
}
@Test
public void testUpdateRequest() throws Exception {
UpdateRequest request = new UpdateRequest("test", "type", "1");
// simple script
request.source(XContentFactory.jsonBuilder().startObject()
.field("script", "script1")
.endObject());
assertThat(request.script(), equalTo("script1"));
// script with params
request = new UpdateRequest("test", "type", "1");
request.source(XContentFactory.jsonBuilder().startObject()
.field("script", "script1")
.startObject("params").field("param1", "value1").endObject()
.endObject());
assertThat(request.script(), equalTo("script1"));
assertThat(request.scriptParams().get("param1").toString(), equalTo("value1"));
request = new UpdateRequest("test", "type", "1");
request.source(XContentFactory.jsonBuilder().startObject()
.startObject("params").field("param1", "value1").endObject()
.field("script", "script1")
.endObject());
assertThat(request.script(), equalTo("script1"));
assertThat(request.scriptParams().get("param1").toString(), equalTo("value1"));
// script with params and upsert
request = new UpdateRequest("test", "type", "1");
request.source(XContentFactory.jsonBuilder().startObject()
.startObject("params").field("param1", "value1").endObject()
.field("script", "script1")
.startObject("upsert").field("field1", "value1").startObject("compound").field("field2", "value2").endObject().endObject()
.endObject());
assertThat(request.script(), equalTo("script1"));
assertThat(request.scriptParams().get("param1").toString(), equalTo("value1"));
Map<String, Object> upsertDoc = XContentHelper.convertToMap(request.upsertRequest().source(), true).v2();
assertThat(upsertDoc.get("field1").toString(), equalTo("value1"));
assertThat(((Map) upsertDoc.get("compound")).get("field2").toString(), equalTo("value2"));
request = new UpdateRequest("test", "type", "1");
request.source(XContentFactory.jsonBuilder().startObject()
.startObject("upsert").field("field1", "value1").startObject("compound").field("field2", "value2").endObject().endObject()
.startObject("params").field("param1", "value1").endObject()
.field("script", "script1")
.endObject());
assertThat(request.script(), equalTo("script1"));
assertThat(request.scriptParams().get("param1").toString(), equalTo("value1"));
upsertDoc = XContentHelper.convertToMap(request.upsertRequest().source(), true).v2();
assertThat(upsertDoc.get("field1").toString(), equalTo("value1"));
assertThat(((Map) upsertDoc.get("compound")).get("field2").toString(), equalTo("value2"));
request = new UpdateRequest("test", "type", "1");
request.source(XContentFactory.jsonBuilder().startObject()
.startObject("params").field("param1", "value1").endObject()
.startObject("upsert").field("field1", "value1").startObject("compound").field("field2", "value2").endObject().endObject()
.field("script", "script1")
.endObject());
assertThat(request.script(), equalTo("script1"));
assertThat(request.scriptParams().get("param1").toString(), equalTo("value1"));
upsertDoc = XContentHelper.convertToMap(request.upsertRequest().source(), true).v2();
assertThat(upsertDoc.get("field1").toString(), equalTo("value1"));
assertThat(((Map) upsertDoc.get("compound")).get("field2").toString(), equalTo("value2"));
}
@Test
public void testUpsert() throws Exception {
createIndex();