From e6b459cb9f4256a6c26fde5177ee88a1296cf8d8 Mon Sep 17 00:00:00 2001 From: markharwood Date: Mon, 4 Aug 2014 09:13:10 +0100 Subject: [PATCH] Update API enhancement - add support for scripted upserts. In the case of inserts the UpdateHelper class will now allow the script used to apply updates to run on the upsert doc provided by clients. This allows the logic for managing the internal state of the data item to be managed by the script and is not reliant on clients performing the initialisation of data structures managed by the script. Closes #7143 --- docs/reference/docs/update.asciidoc | 36 +++++++++- rest-api-spec/api/update.json | 7 ++ .../test/update/25_script_upsert.yaml | 19 ++++++ .../action/update/UpdateHelper.java | 65 +++++++++++++++---- .../action/update/UpdateRequest.java | 18 +++++ .../action/update/UpdateRequestBuilder.java | 9 +++ .../org/elasticsearch/update/UpdateTests.java | 55 ++++++++++++++-- 7 files changed, 192 insertions(+), 17 deletions(-) diff --git a/docs/reference/docs/update.asciidoc b/docs/reference/docs/update.asciidoc index 8cb0f86f1d8..f9416c069f3 100644 --- a/docs/reference/docs/update.asciidoc +++ b/docs/reference/docs/update.asciidoc @@ -126,6 +126,7 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{ If `name` was `new_name` before the request was sent then the entire update request is ignored. +=== Upserts There is also support for `upsert`. If the document does not already exists, the content of the `upsert` element will be used to index the fresh doc: @@ -142,8 +143,38 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{ } }' -------------------------------------------------- +added[1.4.0] -Last it also supports `doc_as_upsert`. So that the +If the document does not exist you may want your update script to +run anyway in order to initialize the document contents using +business logic unknown to the client. In this case pass the +new `scripted_upsert` parameter with the value `true`. + +[source,js] +-------------------------------------------------- +curl -XPOST 'localhost:9200/sessions/session/dh3sgudg8gsrgl/_update' -d '{ + "script_id" : "my_web_session_summariser", + "scripted_upsert":true, + "params" : { + "pageViewEvent" : { + "url":"foo.com/bar", + "response":404, + "time":"2014-01-01 12:32" + } + }, + "upsert" : { + } +}' +-------------------------------------------------- +The default `scripted_upsert` setting is `false` meaning the script is not executed for inserts. +However, in scenarios like the one above we may be using a non-trivial script stored +using the new "indexed scripts" feature. The script may be deriving properties +like the duration of our web session based on observing multiple page view events so the +client can supply a blank "upsert" document and allow the script to fill in most of the details +using the events passed in the `params` element. + + +Last, the upsert facility also supports `doc_as_upsert`. So that the provided document will be inserted if the document does not already exist. This will reduce the amount of data that needs to be sent to elasticsearch. @@ -158,6 +189,9 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{ }' -------------------------------------------------- + +=== Parameters + The update operation supports similar parameters as the index API, including: diff --git a/rest-api-spec/api/update.json b/rest-api-spec/api/update.json index 85ce81ea1f5..6196067f63c 100644 --- a/rest-api-spec/api/update.json +++ b/rest-api-spec/api/update.json @@ -61,6 +61,13 @@ "script": { "description": "The URL-encoded script definition (instead of using request body)" }, + "script_id": { + "description": "The id of a stored script" + }, + "scripted_upsert": { + "type": "boolean", + "description": "True if the script referenced in script or script_id should be called to perform inserts - defaults to false" + }, "timeout": { "type": "time", "description": "Explicit operation timeout" diff --git a/rest-api-spec/test/update/25_script_upsert.yaml b/rest-api-spec/test/update/25_script_upsert.yaml index 64226b7e899..c91992affaa 100644 --- a/rest-api-spec/test/update/25_script_upsert.yaml +++ b/rest-api-spec/test/update/25_script_upsert.yaml @@ -37,5 +37,24 @@ id: 1 - match: { _source.foo: xxx } + + - do: + update: + index: test_1 + type: test + id: 2 + body: + script: "ctx._source.foo = bar" + params: { bar: 'xxx' } + upsert: { foo: baz } + scripted_upsert: true + + - do: + get: + index: test_1 + type: test + id: 2 + + - match: { _source.foo: xxx } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 0b177e2d7ff..a1bad51557d 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -90,11 +90,49 @@ public class UpdateHelper extends AbstractComponent { if (request.upsertRequest() == null && !request.docAsUpsert()) { throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); } + Long ttl = null; IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest(); + if (request.scriptedUpsert() && (request.script() != null)) { + // Run the script to perform the create logic + IndexRequest upsert = request.upsertRequest(); + Map upsertDoc = upsert.sourceAsMap(); + Map ctx = new HashMap<>(2); + // Tell the script that this is a create and not an update + ctx.put("op", "create"); + ctx.put("_source", upsertDoc); + try { + ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptType, request.scriptParams); + script.setNextVar("ctx", ctx); + script.run(); + // we need to unwrap the ctx... + ctx = (Map) script.unwrap(ctx); + } catch (Exception e) { + throw new ElasticsearchIllegalArgumentException("failed to execute script", e); + } + //Allow the script to set TTL using ctx._ttl + ttl = getTTLFromScriptContext(ctx); + //Allow the script to abort the create by setting "op" to "none" + String scriptOpChoice = (String) ctx.get("op"); + + // Only valid options for an upsert script are "create" + // (the default) or "none", meaning abort upsert + if (!"create".equals(scriptOpChoice)) { + if (!"none".equals(scriptOpChoice)) { + logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice, request.script); + } + UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), + getResult.getVersion(), false); + update.setGetResult(getResult); + return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON); + } + indexRequest.source((Map)ctx.get("_source")); + } + indexRequest.index(request.index()).type(request.type()).id(request.id()) // it has to be a "create!" - .create(true) + .create(true) .routing(request.routing()) + .ttl(ttl) .refresh(request.refresh()) .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); indexRequest.operationThreaded(false); @@ -121,7 +159,6 @@ public class UpdateHelper extends AbstractComponent { String operation = null; String timestamp = null; Long ttl = null; - Object fetchedTTL = null; final Map updatedSourceAsMap; final XContentType updateSourceContentType = sourceAndContent.v1(); String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null; @@ -164,15 +201,8 @@ public class UpdateHelper extends AbstractComponent { operation = (String) ctx.get("op"); timestamp = (String) ctx.get("_timestamp"); - fetchedTTL = ctx.get("_ttl"); - if (fetchedTTL != null) { - if (fetchedTTL instanceof Number) { - ttl = ((Number) fetchedTTL).longValue(); - } else { - ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); - } - } - + ttl = getTTLFromScriptContext(ctx); + updatedSourceAsMap = (Map) ctx.get("_source"); } @@ -211,6 +241,19 @@ public class UpdateHelper extends AbstractComponent { } } + private Long getTTLFromScriptContext(Map ctx) { + Long ttl = null; + Object fetchedTTL = ctx.get("_ttl"); + if (fetchedTTL != null) { + if (fetchedTTL instanceof Number) { + ttl = ((Number) fetchedTTL).longValue(); + } else { + ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); + } + } + return ttl; + } + /** * Extracts the fields from the updated document to be returned in a update response */ diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 529b27742c7..94a19755073 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -76,6 +76,7 @@ public class UpdateRequest extends InstanceShardOperationRequest private IndexRequest upsertRequest; + private boolean scriptedUpsert = false; private boolean docAsUpsert = false; private boolean detectNoop = false; @@ -596,6 +597,8 @@ public class UpdateRequest extends InstanceShardOperationRequest scriptParams = parser.map(); } else if ("lang".equals(currentFieldName)) { scriptLang = parser.text(); + } else if ("scripted_upsert".equals(currentFieldName)) { + scriptedUpsert = parser.booleanValue(); } else if ("upsert".equals(currentFieldName)) { XContentBuilder builder = XContentFactory.contentBuilder(xContentType); builder.copyCurrentStructure(parser); @@ -621,6 +624,15 @@ public class UpdateRequest extends InstanceShardOperationRequest public void docAsUpsert(boolean shouldUpsertDoc) { this.docAsUpsert = shouldUpsertDoc; } + + public boolean scriptedUpsert(){ + return this.scriptedUpsert; + } + + public void scriptedUpsert(boolean scriptedUpsert) { + this.scriptedUpsert = scriptedUpsert; + } + @Override public void readFrom(StreamInput in) throws IOException { @@ -663,6 +675,9 @@ public class UpdateRequest extends InstanceShardOperationRequest if (in.getVersion().onOrAfter(Version.V_1_3_0)) { detectNoop = in.readBoolean(); } + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + scriptedUpsert = in.readBoolean(); + } } @Override @@ -715,6 +730,9 @@ public class UpdateRequest extends InstanceShardOperationRequest if (out.getVersion().onOrAfter(Version.V_1_3_0)) { out.writeBoolean(detectNoop); } + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + out.writeBoolean(scriptedUpsert); + } } } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index 1235439cd95..640369288ea 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -353,6 +353,15 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder listener) { diff --git a/src/test/java/org/elasticsearch/update/UpdateTests.java b/src/test/java/org/elasticsearch/update/UpdateTests.java index e60db0afc12..9fcd5149a78 100644 --- a/src/test/java/org/elasticsearch/update/UpdateTests.java +++ b/src/test/java/org/elasticsearch/update/UpdateTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.update; -import org.apache.lucene.document.Field; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.util.LuceneTestCase.Slow; @@ -27,10 +26,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.NoNodeAvailableException; @@ -44,14 +43,15 @@ import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyModule; -import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider; -import org.elasticsearch.index.store.CorruptedFileTest; import org.elasticsearch.index.store.Store; import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -179,6 +179,51 @@ public class UpdateTests extends ElasticsearchIntegrationTest { assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2")); } } + + @Test + public void testScriptedUpsert() throws Exception { + createIndex(); + ensureGreen(); + + // Script logic is + // 1) New accounts take balance from "balance" in upsert doc and first payment is charged at 50% + // 2) Existing accounts subtract full payment from balance stored in elasticsearch + + String script="int oldBalance=ctx._source.balance;"+ + "int deduction=ctx.op == \"create\" ? (payment/2) : payment;"+ + "ctx._source.balance=oldBalance-deduction;"; + int openingBalance=10; + + // Pay money from what will be a new account and opening balance comes from upsert doc + // provided by client + UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1") + .setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject()) + .setScriptedUpsert(true) + .addScriptParam("payment", 2) + .setScript(script, ScriptService.ScriptType.INLINE) + .execute().actionGet(); + assertTrue(updateResponse.isCreated()); + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("9")); + } + + // Now pay money for an existing account where balance is stored in es + updateResponse = client().prepareUpdate("test", "type1", "1") + .setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject()) + .setScriptedUpsert(true) + .addScriptParam("payment", 2) + .setScript(script, ScriptService.ScriptType.INLINE) + .execute().actionGet(); + assertFalse(updateResponse.isCreated()); + + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("7")); + } + } @Test public void testUpsertDoc() throws Exception {