Expose `ctx._now` in update scripts (#20835)
Update scripts might want to update the documents `_timestamp` but need a notion of `now()`. Painless doesn't support any notion of now() since it would make scripts non-pure functions. Yet, in the update case this is a valid value and we can pass it with the context together to allow the script to record the timestamp the document was updated. Relates to #17895
This commit is contained in:
parent
e874dee3bc
commit
37ca38df3d
|
@ -402,7 +402,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
}
|
}
|
||||||
|
|
||||||
private UpdateResult shardUpdateOperation(IndexMetaData metaData, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
|
private UpdateResult shardUpdateOperation(IndexMetaData metaData, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
|
||||||
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
|
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard, threadPool::estimatedTimeInMillis);
|
||||||
switch (translate.getResponseResult()) {
|
switch (translate.getResponseResult()) {
|
||||||
case CREATED:
|
case CREATED:
|
||||||
case UPDATED:
|
case UPDATED:
|
||||||
|
|
|
@ -176,7 +176,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
||||||
final ShardId shardId = request.getShardId();
|
final ShardId shardId = request.getShardId();
|
||||||
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||||
final IndexShard indexShard = indexService.getShard(shardId.getId());
|
final IndexShard indexShard = indexService.getShard(shardId.getId());
|
||||||
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard);
|
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::estimatedTimeInMillis);
|
||||||
switch (result.getResponseResult()) {
|
switch (result.getResponseResult()) {
|
||||||
case CREATED:
|
case CREATED:
|
||||||
IndexRequest upsertRequest = result.action();
|
IndexRequest upsertRequest = result.action();
|
||||||
|
|
|
@ -58,6 +58,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.LongSupplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper for translating an update request to an index, delete request or update response.
|
* Helper for translating an update request to an index, delete request or update response.
|
||||||
|
@ -73,18 +74,18 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
/**
|
/**
|
||||||
* Prepares an update request by converting it into an index or delete request or an update response (no action).
|
* Prepares an update request by converting it into an index or delete request or an update response (no action).
|
||||||
*/
|
*/
|
||||||
public Result prepare(UpdateRequest request, IndexShard indexShard) {
|
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
|
||||||
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
|
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
|
||||||
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME, TimestampFieldMapper.NAME},
|
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME, TimestampFieldMapper.NAME},
|
||||||
true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE);
|
true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE);
|
||||||
return prepare(indexShard.shardId(), request, getResult);
|
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepares an update request by converting it into an index or delete request or an update response (no action).
|
* Prepares an update request by converting it into an index or delete request or an update response (no action).
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult) {
|
protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) {
|
||||||
long getDateNS = System.nanoTime();
|
long getDateNS = System.nanoTime();
|
||||||
if (!getResult.isExists()) {
|
if (!getResult.isExists()) {
|
||||||
if (request.upsertRequest() == null && !request.docAsUpsert()) {
|
if (request.upsertRequest() == null && !request.docAsUpsert()) {
|
||||||
|
@ -100,6 +101,7 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
// Tell the script that this is a create and not an update
|
// Tell the script that this is a create and not an update
|
||||||
ctx.put("op", "create");
|
ctx.put("op", "create");
|
||||||
ctx.put("_source", upsertDoc);
|
ctx.put("_source", upsertDoc);
|
||||||
|
ctx.put("_now", nowInMillis.getAsLong());
|
||||||
ctx = executeScript(request.script, ctx);
|
ctx = executeScript(request.script, ctx);
|
||||||
//Allow the script to set TTL using ctx._ttl
|
//Allow the script to set TTL using ctx._ttl
|
||||||
if (ttl == null) {
|
if (ttl == null) {
|
||||||
|
@ -188,6 +190,7 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
ctx.put("_timestamp", originalTimestamp);
|
ctx.put("_timestamp", originalTimestamp);
|
||||||
ctx.put("_ttl", originalTtl);
|
ctx.put("_ttl", originalTtl);
|
||||||
ctx.put("_source", sourceAndContent.v2());
|
ctx.put("_source", sourceAndContent.v2());
|
||||||
|
ctx.put("_now", nowInMillis.getAsLong());
|
||||||
|
|
||||||
ctx = executeScript(request.script, ctx);
|
ctx = executeScript(request.script, ctx);
|
||||||
|
|
||||||
|
|
|
@ -28,13 +28,25 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.index.get.GetResult;
|
import org.elasticsearch.index.get.GetResult;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.script.MockScriptEngine;
|
||||||
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.Script;
|
||||||
|
import org.elasticsearch.script.ScriptContextRegistry;
|
||||||
|
import org.elasticsearch.script.ScriptEngineRegistry;
|
||||||
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.script.ScriptService.ScriptType;
|
import org.elasticsearch.script.ScriptService.ScriptType;
|
||||||
|
import org.elasticsearch.script.ScriptSettings;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.hamcrest.Matchers.arrayContaining;
|
import static org.hamcrest.Matchers.arrayContaining;
|
||||||
|
@ -184,9 +196,10 @@ public class UpdateRequestTests extends ESTestCase {
|
||||||
.doc(jsonBuilder().startObject().field("fooz", "baz").endObject())
|
.doc(jsonBuilder().startObject().field("fooz", "baz").endObject())
|
||||||
.upsert(indexRequest);
|
.upsert(indexRequest);
|
||||||
|
|
||||||
|
long nowInMillis = randomPositiveLong();
|
||||||
// We simulate that the document is not existing yet
|
// We simulate that the document is not existing yet
|
||||||
GetResult getResult = new GetResult("test", "type1", "1", 0, false, null, null);
|
GetResult getResult = new GetResult("test", "type1", "1", 0, false, null, null);
|
||||||
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0),updateRequest, getResult);
|
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0),updateRequest, getResult, () -> nowInMillis);
|
||||||
Streamable action = result.action();
|
Streamable action = result.action();
|
||||||
assertThat(action, instanceOf(IndexRequest.class));
|
assertThat(action, instanceOf(IndexRequest.class));
|
||||||
IndexRequest indexAction = (IndexRequest) action;
|
IndexRequest indexAction = (IndexRequest) action;
|
||||||
|
@ -203,7 +216,7 @@ public class UpdateRequestTests extends ESTestCase {
|
||||||
|
|
||||||
// We simulate that the document is not existing yet
|
// We simulate that the document is not existing yet
|
||||||
getResult = new GetResult("test", "type1", "2", 0, false, null, null);
|
getResult = new GetResult("test", "type1", "2", 0, false, null, null);
|
||||||
result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult);
|
result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis);
|
||||||
action = result.action();
|
action = result.action();
|
||||||
assertThat(action, instanceOf(IndexRequest.class));
|
assertThat(action, instanceOf(IndexRequest.class));
|
||||||
indexAction = (IndexRequest) action;
|
indexAction = (IndexRequest) action;
|
||||||
|
@ -276,4 +289,70 @@ public class UpdateRequestTests extends ESTestCase {
|
||||||
assertThat(request.fetchSource().includes()[0], equalTo("path.inner.*"));
|
assertThat(request.fetchSource().includes()[0], equalTo("path.inner.*"));
|
||||||
assertThat(request.fetchSource().excludes()[0], equalTo("another.inner.*"));
|
assertThat(request.fetchSource().excludes()[0], equalTo("another.inner.*"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testNowInScript() throws IOException {
|
||||||
|
Path genericConfigFolder = createTempDir();
|
||||||
|
Settings baseSettings = Settings.builder()
|
||||||
|
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||||
|
.put(Environment.PATH_CONF_SETTING.getKey(), genericConfigFolder)
|
||||||
|
.build();
|
||||||
|
Environment environment = new Environment(baseSettings);
|
||||||
|
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
|
||||||
|
scripts.put("ctx._source.update_timestamp = ctx._now",
|
||||||
|
(vars) -> {
|
||||||
|
Map<String, Object> ctx = (Map) vars.get("ctx");
|
||||||
|
Map<String, Object> source = (Map) ctx.get("_source");
|
||||||
|
source.put("update_timestamp", ctx.get("_now"));
|
||||||
|
return null;});
|
||||||
|
scripts.put("ctx._timestamp = ctx._now",
|
||||||
|
(vars) -> {
|
||||||
|
Map<String, Object> ctx = (Map) vars.get("ctx");
|
||||||
|
ctx.put("_timestamp", ctx.get("_now"));
|
||||||
|
return null;});
|
||||||
|
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList());
|
||||||
|
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(new MockScriptEngine("mock",
|
||||||
|
scripts)));
|
||||||
|
|
||||||
|
ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
|
||||||
|
ScriptService scriptService = new ScriptService(baseSettings, environment,
|
||||||
|
new ResourceWatcherService(baseSettings, null), scriptEngineRegistry, scriptContextRegistry, scriptSettings);
|
||||||
|
TimeValue providedTTLValue = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl");
|
||||||
|
Settings settings = settings(Version.CURRENT).build();
|
||||||
|
|
||||||
|
UpdateHelper updateHelper = new UpdateHelper(settings, scriptService);
|
||||||
|
|
||||||
|
// We just upsert one document with now() using a script
|
||||||
|
IndexRequest indexRequest = new IndexRequest("test", "type1", "2")
|
||||||
|
.source(jsonBuilder().startObject().field("foo", "bar").endObject())
|
||||||
|
.ttl(providedTTLValue);
|
||||||
|
|
||||||
|
{
|
||||||
|
UpdateRequest updateRequest = new UpdateRequest("test", "type1", "2")
|
||||||
|
.upsert(indexRequest)
|
||||||
|
.script(new Script("ctx._source.update_timestamp = ctx._now", ScriptType.INLINE, "mock", Collections.emptyMap()))
|
||||||
|
.scriptedUpsert(true);
|
||||||
|
long nowInMillis = randomPositiveLong();
|
||||||
|
// We simulate that the document is not existing yet
|
||||||
|
GetResult getResult = new GetResult("test", "type1", "2", 0, false, null, null);
|
||||||
|
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis);
|
||||||
|
Streamable action = result.action();
|
||||||
|
assertThat(action, instanceOf(IndexRequest.class));
|
||||||
|
IndexRequest indexAction = (IndexRequest) action;
|
||||||
|
assertEquals(indexAction.sourceAsMap().get("update_timestamp"), nowInMillis);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
UpdateRequest updateRequest = new UpdateRequest("test", "type1", "2")
|
||||||
|
.upsert(indexRequest)
|
||||||
|
.script(new Script("ctx._timestamp = ctx._now", ScriptType.INLINE, "mock", Collections.emptyMap()))
|
||||||
|
.scriptedUpsert(true);
|
||||||
|
long nowInMillis = randomPositiveLong();
|
||||||
|
// We simulate that the document is not existing yet
|
||||||
|
GetResult getResult = new GetResult("test", "type1", "2", 0, true, new BytesArray("{}"), null);
|
||||||
|
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis);
|
||||||
|
Streamable action = result.action();
|
||||||
|
assertThat(action, instanceOf(IndexRequest.class));
|
||||||
|
IndexRequest indexAction = (IndexRequest) action;
|
||||||
|
assertEquals(indexAction.timestamp(), Long.toString(nowInMillis));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,4 +63,24 @@
|
||||||
|
|
||||||
- match: { _source.foo: xxx }
|
- match: { _source.foo: xxx }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
update:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 3
|
||||||
|
body:
|
||||||
|
script:
|
||||||
|
inline: "ctx._source.has_now = ctx._now > 0"
|
||||||
|
lang: "painless"
|
||||||
|
upsert: { has_now: false }
|
||||||
|
scripted_upsert: true
|
||||||
|
|
||||||
|
- do:
|
||||||
|
get:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 3
|
||||||
|
|
||||||
|
- match: { _source.has_now: true }
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue