Update API enhancement - add support for scripted upserts.

In the case of inserts the UpdateHelper class will now allow the script used to apply updates to run on the upsert doc provided by clients. This allows the logic for managing the internal state of the data item to be managed by the script and is not reliant on clients performing the initialisation of data structures managed by the script.

Closes #7143
This commit is contained in:
markharwood 2014-08-04 09:13:10 +01:00
parent 418ce50ec4
commit e6b459cb9f
7 changed files with 192 additions and 17 deletions

View File

@ -126,6 +126,7 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
If `name` was `new_name` before the request was sent then the entire update
request is ignored.
=== Upserts
There is also support for `upsert`. If the document does
not already exists, the content of the `upsert` element will be used to
index the fresh doc:
@ -142,8 +143,38 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
}
}'
--------------------------------------------------
added[1.4.0]
Last it also supports `doc_as_upsert`. So that the
If the document does not exist you may want your update script to
run anyway in order to initialize the document contents using
business logic unknown to the client. In this case pass the
new `scripted_upsert` parameter with the value `true`.
[source,js]
--------------------------------------------------
curl -XPOST 'localhost:9200/sessions/session/dh3sgudg8gsrgl/_update' -d '{
"script_id" : "my_web_session_summariser",
"scripted_upsert":true,
"params" : {
"pageViewEvent" : {
"url":"foo.com/bar",
"response":404,
"time":"2014-01-01 12:32"
}
},
"upsert" : {
}
}'
--------------------------------------------------
The default `scripted_upsert` setting is `false` meaning the script is not executed for inserts.
However, in scenarios like the one above we may be using a non-trivial script stored
using the new "indexed scripts" feature. The script may be deriving properties
like the duration of our web session based on observing multiple page view events so the
client can supply a blank "upsert" document and allow the script to fill in most of the details
using the events passed in the `params` element.
Last, the upsert facility also supports `doc_as_upsert`. So that the
provided document will be inserted if the document does not already
exist. This will reduce the amount of data that needs to be sent to
elasticsearch.
@ -158,6 +189,9 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
}'
--------------------------------------------------
=== Parameters
The update operation supports similar parameters as the index API,
including:

View File

@ -61,6 +61,13 @@
"script": {
"description": "The URL-encoded script definition (instead of using request body)"
},
"script_id": {
"description": "The id of a stored script"
},
"scripted_upsert": {
"type": "boolean",
"description": "True if the script referenced in script or script_id should be called to perform inserts - defaults to false"
},
"timeout": {
"type": "time",
"description": "Explicit operation timeout"

View File

@ -37,5 +37,24 @@
id: 1
- match: { _source.foo: xxx }
- do:
update:
index: test_1
type: test
id: 2
body:
script: "ctx._source.foo = bar"
params: { bar: 'xxx' }
upsert: { foo: baz }
scripted_upsert: true
- do:
get:
index: test_1
type: test
id: 2
- match: { _source.foo: xxx }

View File

@ -90,11 +90,49 @@ public class UpdateHelper extends AbstractComponent {
if (request.upsertRequest() == null && !request.docAsUpsert()) {
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
}
Long ttl = null;
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
if (request.scriptedUpsert() && (request.script() != null)) {
// Run the script to perform the create logic
IndexRequest upsert = request.upsertRequest();
Map<String, Object> upsertDoc = upsert.sourceAsMap();
Map<String, Object> ctx = new HashMap<>(2);
// Tell the script that this is a create and not an update
ctx.put("op", "create");
ctx.put("_source", upsertDoc);
try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptType, request.scriptParams);
script.setNextVar("ctx", ctx);
script.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
} catch (Exception e) {
throw new ElasticsearchIllegalArgumentException("failed to execute script", e);
}
//Allow the script to set TTL using ctx._ttl
ttl = getTTLFromScriptContext(ctx);
//Allow the script to abort the create by setting "op" to "none"
String scriptOpChoice = (String) ctx.get("op");
// Only valid options for an upsert script are "create"
// (the default) or "none", meaning abort upsert
if (!"create".equals(scriptOpChoice)) {
if (!"none".equals(scriptOpChoice)) {
logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice, request.script);
}
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(),
getResult.getVersion(), false);
update.setGetResult(getResult);
return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON);
}
indexRequest.source((Map)ctx.get("_source"));
}
indexRequest.index(request.index()).type(request.type()).id(request.id())
// it has to be a "create!"
.create(true)
.create(true)
.routing(request.routing())
.ttl(ttl)
.refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
@ -121,7 +159,6 @@ public class UpdateHelper extends AbstractComponent {
String operation = null;
String timestamp = null;
Long ttl = null;
Object fetchedTTL = null;
final Map<String, Object> updatedSourceAsMap;
final XContentType updateSourceContentType = sourceAndContent.v1();
String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null;
@ -164,15 +201,8 @@ public class UpdateHelper extends AbstractComponent {
operation = (String) ctx.get("op");
timestamp = (String) ctx.get("_timestamp");
fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}
ttl = getTTLFromScriptContext(ctx);
updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
}
@ -211,6 +241,19 @@ public class UpdateHelper extends AbstractComponent {
}
}
private Long getTTLFromScriptContext(Map<String, Object> ctx) {
Long ttl = null;
Object fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}
return ttl;
}
/**
* Extracts the fields from the updated document to be returned in a update response
*/

View File

@ -76,6 +76,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private IndexRequest upsertRequest;
private boolean scriptedUpsert = false;
private boolean docAsUpsert = false;
private boolean detectNoop = false;
@ -596,6 +597,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
scriptParams = parser.map();
} else if ("lang".equals(currentFieldName)) {
scriptLang = parser.text();
} else if ("scripted_upsert".equals(currentFieldName)) {
scriptedUpsert = parser.booleanValue();
} else if ("upsert".equals(currentFieldName)) {
XContentBuilder builder = XContentFactory.contentBuilder(xContentType);
builder.copyCurrentStructure(parser);
@ -621,6 +624,15 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
public void docAsUpsert(boolean shouldUpsertDoc) {
this.docAsUpsert = shouldUpsertDoc;
}
public boolean scriptedUpsert(){
return this.scriptedUpsert;
}
public void scriptedUpsert(boolean scriptedUpsert) {
this.scriptedUpsert = scriptedUpsert;
}
@Override
public void readFrom(StreamInput in) throws IOException {
@ -663,6 +675,9 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
detectNoop = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
scriptedUpsert = in.readBoolean();
}
}
@Override
@ -715,6 +730,9 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeBoolean(detectNoop);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(scriptedUpsert);
}
}
}

View File

@ -353,6 +353,15 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
request.detectNoop(detectNoop);
return this;
}
/**
* Sets whether the script should be run in the case of an insert
*/
public UpdateRequestBuilder setScriptedUpsert(boolean scriptedUpsert) {
request.scriptedUpsert(scriptedUpsert);
return this;
}
@Override
protected void doExecute(ActionListener<UpdateResponse> listener) {

View File

@ -19,7 +19,6 @@
package org.elasticsearch.update;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.util.LuceneTestCase.Slow;
@ -27,10 +26,10 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
@ -44,14 +43,15 @@ import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
import org.elasticsearch.index.store.CorruptedFileTest;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@ -179,6 +179,51 @@ public class UpdateTests extends ElasticsearchIntegrationTest {
assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2"));
}
}
@Test
public void testScriptedUpsert() throws Exception {
createIndex();
ensureGreen();
// Script logic is
// 1) New accounts take balance from "balance" in upsert doc and first payment is charged at 50%
// 2) Existing accounts subtract full payment from balance stored in elasticsearch
String script="int oldBalance=ctx._source.balance;"+
"int deduction=ctx.op == \"create\" ? (payment/2) : payment;"+
"ctx._source.balance=oldBalance-deduction;";
int openingBalance=10;
// Pay money from what will be a new account and opening balance comes from upsert doc
// provided by client
UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1")
.setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject())
.setScriptedUpsert(true)
.addScriptParam("payment", 2)
.setScript(script, ScriptService.ScriptType.INLINE)
.execute().actionGet();
assertTrue(updateResponse.isCreated());
for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("9"));
}
// Now pay money for an existing account where balance is stored in es
updateResponse = client().prepareUpdate("test", "type1", "1")
.setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject())
.setScriptedUpsert(true)
.addScriptParam("payment", 2)
.setScript(script, ScriptService.ScriptType.INLINE)
.execute().actionGet();
assertFalse(updateResponse.isCreated());
for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("7"));
}
}
@Test
public void testUpsertDoc() throws Exception {