Upsert does not use ttl value
When running an upsert which defines a ttl value, the ttl value is set to null and ignored. Related to #3256#issuecomment-64963409
This commit is contained in:
parent
177180ac94
commit
1a3eb4d83a
|
@ -73,39 +73,41 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public Result prepare(UpdateRequest request, IndexShard indexShard) {
|
public Result prepare(UpdateRequest request, IndexShard indexShard) {
|
||||||
long getDateNS = System.nanoTime();
|
|
||||||
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
|
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
|
||||||
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME, TimestampFieldMapper.NAME},
|
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME, TimestampFieldMapper.NAME},
|
||||||
true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE, false);
|
true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE, false);
|
||||||
|
return prepare(request, getResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepares an update request by converting it into an index or delete request or an update response (no action).
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
protected Result prepare(UpdateRequest request, final GetResult getResult) {
|
||||||
|
long getDateNS = System.nanoTime();
|
||||||
if (!getResult.isExists()) {
|
if (!getResult.isExists()) {
|
||||||
if (request.upsertRequest() == null && !request.docAsUpsert()) {
|
if (request.upsertRequest() == null && !request.docAsUpsert()) {
|
||||||
throw new DocumentMissingException(new ShardId(indexShard.indexService().index().name(), request.shardId()), request.type(), request.id());
|
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
|
||||||
}
|
}
|
||||||
Long ttl = null;
|
|
||||||
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
|
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
|
||||||
if (request.scriptedUpsert() && (request.script() != null)) {
|
Long ttl = indexRequest.ttl();
|
||||||
|
if (request.scriptedUpsert() && request.script() != null) {
|
||||||
// Run the script to perform the create logic
|
// Run the script to perform the create logic
|
||||||
IndexRequest upsert = request.upsertRequest();
|
IndexRequest upsert = request.upsertRequest();
|
||||||
Map<String, Object> upsertDoc = upsert.sourceAsMap();
|
Map<String, Object> upsertDoc = upsert.sourceAsMap();
|
||||||
Map<String, Object> ctx = new HashMap<>(2);
|
Map<String, Object> ctx = new HashMap<>(2);
|
||||||
// Tell the script that this is a create and not an update
|
// Tell the script that this is a create and not an update
|
||||||
ctx.put("op", "create");
|
ctx.put("op", "create");
|
||||||
ctx.put("_source", upsertDoc);
|
ctx.put("_source", upsertDoc);
|
||||||
try {
|
ctx = executeScript(request, ctx);
|
||||||
ExecutableScript script = scriptService.executable(request.script, ScriptContext.Standard.UPDATE);
|
|
||||||
script.setNextVar("ctx", ctx);
|
|
||||||
script.run();
|
|
||||||
// we need to unwrap the ctx...
|
|
||||||
ctx = (Map<String, Object>) script.unwrap(ctx);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IllegalArgumentException("failed to execute script", e);
|
|
||||||
}
|
|
||||||
//Allow the script to set TTL using ctx._ttl
|
//Allow the script to set TTL using ctx._ttl
|
||||||
ttl = getTTLFromScriptContext(ctx);
|
if (ttl < 0) {
|
||||||
|
ttl = getTTLFromScriptContext(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
//Allow the script to abort the create by setting "op" to "none"
|
//Allow the script to abort the create by setting "op" to "none"
|
||||||
String scriptOpChoice = (String) ctx.get("op");
|
String scriptOpChoice = (String) ctx.get("op");
|
||||||
|
|
||||||
// Only valid options for an upsert script are "create"
|
// Only valid options for an upsert script are "create"
|
||||||
// (the default) or "none", meaning abort upsert
|
// (the default) or "none", meaning abort upsert
|
||||||
if (!"create".equals(scriptOpChoice)) {
|
if (!"create".equals(scriptOpChoice)) {
|
||||||
|
@ -123,8 +125,8 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
|
|
||||||
indexRequest.index(request.index()).type(request.type()).id(request.id())
|
indexRequest.index(request.index()).type(request.type()).id(request.id())
|
||||||
// it has to be a "create!"
|
// it has to be a "create!"
|
||||||
.create(true)
|
.create(true)
|
||||||
.ttl(ttl)
|
.ttl(ttl == null || ttl < 0 ? null : ttl)
|
||||||
.refresh(request.refresh())
|
.refresh(request.refresh())
|
||||||
.routing(request.routing())
|
.routing(request.routing())
|
||||||
.parent(request.parent())
|
.parent(request.parent())
|
||||||
|
@ -146,7 +148,7 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
|
|
||||||
if (getResult.internalSourceRef() == null) {
|
if (getResult.internalSourceRef() == null) {
|
||||||
// no source, we can't do nothing, through a failure...
|
// no source, we can't do nothing, through a failure...
|
||||||
throw new DocumentSourceMissingException(new ShardId(indexShard.indexService().index().name(), request.shardId()), request.type(), request.id());
|
throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
|
||||||
}
|
}
|
||||||
|
|
||||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
||||||
|
@ -192,15 +194,7 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
ctx.put("_ttl", originalTtl);
|
ctx.put("_ttl", originalTtl);
|
||||||
ctx.put("_source", sourceAndContent.v2());
|
ctx.put("_source", sourceAndContent.v2());
|
||||||
|
|
||||||
try {
|
ctx = executeScript(request, ctx);
|
||||||
ExecutableScript script = scriptService.executable(request.script, ScriptContext.Standard.UPDATE);
|
|
||||||
script.setNextVar("ctx", ctx);
|
|
||||||
script.run();
|
|
||||||
// we need to unwrap the ctx...
|
|
||||||
ctx = (Map<String, Object>) script.unwrap(ctx);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IllegalArgumentException("failed to execute script", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
operation = (String) ctx.get("op");
|
operation = (String) ctx.get("op");
|
||||||
|
|
||||||
|
@ -213,7 +207,7 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
ttl = getTTLFromScriptContext(ctx);
|
ttl = getTTLFromScriptContext(ctx);
|
||||||
|
|
||||||
updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
|
updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,7 +237,7 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
|
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
|
||||||
} else if ("none".equals(operation)) {
|
} else if ("none".equals(operation)) {
|
||||||
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
|
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
|
||||||
update.setGetResult(extractGetResult(request, indexShard.indexService().index().name(), getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
|
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
|
||||||
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
|
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script.getScript());
|
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script.getScript());
|
||||||
|
@ -252,6 +246,21 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> executeScript(UpdateRequest request, Map<String, Object> ctx) {
|
||||||
|
try {
|
||||||
|
if (scriptService != null) {
|
||||||
|
ExecutableScript script = scriptService.executable(request.script, ScriptContext.Standard.UPDATE);
|
||||||
|
script.setNextVar("ctx", ctx);
|
||||||
|
script.run();
|
||||||
|
// we need to unwrap the ctx...
|
||||||
|
ctx = (Map<String, Object>) script.unwrap(ctx);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IllegalArgumentException("failed to execute script", e);
|
||||||
|
}
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
private Long getTTLFromScriptContext(Map<String, Object> ctx) {
|
private Long getTTLFromScriptContext(Map<String, Object> ctx) {
|
||||||
Long ttl = null;
|
Long ttl = null;
|
||||||
Object fetchedTTL = ctx.get("_ttl");
|
Object fetchedTTL = ctx.get("_ttl");
|
||||||
|
|
|
@ -690,16 +690,18 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
|
||||||
return this.docAsUpsert;
|
return this.docAsUpsert;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void docAsUpsert(boolean shouldUpsertDoc) {
|
public UpdateRequest docAsUpsert(boolean shouldUpsertDoc) {
|
||||||
this.docAsUpsert = shouldUpsertDoc;
|
this.docAsUpsert = shouldUpsertDoc;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean scriptedUpsert(){
|
public boolean scriptedUpsert(){
|
||||||
return this.scriptedUpsert;
|
return this.scriptedUpsert;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void scriptedUpsert(boolean scriptedUpsert) {
|
public UpdateRequest scriptedUpsert(boolean scriptedUpsert) {
|
||||||
this.scriptedUpsert = scriptedUpsert;
|
this.scriptedUpsert = scriptedUpsert;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -19,8 +19,13 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.update;
|
package org.elasticsearch.action.update;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
|
import org.elasticsearch.index.get.GetResult;
|
||||||
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.Script;
|
||||||
import org.elasticsearch.script.ScriptService.ScriptType;
|
import org.elasticsearch.script.ScriptService.ScriptType;
|
||||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||||
|
@ -28,9 +33,10 @@ import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.*;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
public class UpdateRequestTests extends ElasticsearchTestCase {
|
public class UpdateRequestTests extends ElasticsearchTestCase {
|
||||||
|
|
||||||
|
@ -119,4 +125,45 @@ public class UpdateRequestTests extends ElasticsearchTestCase {
|
||||||
assertThat(doc.get("field1").toString(), equalTo("value1"));
|
assertThat(doc.get("field1").toString(), equalTo("value1"));
|
||||||
assertThat(((Map) doc.get("compound")).get("field2").toString(), equalTo("value2"));
|
assertThat(((Map) doc.get("compound")).get("field2").toString(), equalTo("value2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test // Related to issue 3256
|
||||||
|
public void testUpdateRequestWithTTL() throws Exception {
|
||||||
|
long providedTTLValue = randomIntBetween(500, 1000);
|
||||||
|
Settings settings = settings(Version.CURRENT).build();
|
||||||
|
|
||||||
|
UpdateHelper updateHelper = new UpdateHelper(settings, null);
|
||||||
|
|
||||||
|
// We just upsert one document with ttl
|
||||||
|
IndexRequest indexRequest = new IndexRequest("test", "type1", "1")
|
||||||
|
.source(jsonBuilder().startObject().field("foo", "bar").endObject())
|
||||||
|
.ttl(providedTTLValue);
|
||||||
|
UpdateRequest updateRequest = new UpdateRequest("test", "type1", "1")
|
||||||
|
.doc(jsonBuilder().startObject().field("fooz", "baz").endObject())
|
||||||
|
.upsert(indexRequest);
|
||||||
|
|
||||||
|
// We simulate that the document is not existing yet
|
||||||
|
GetResult getResult = new GetResult("test", "type1", "1", 0, false, null, null);
|
||||||
|
UpdateHelper.Result result = updateHelper.prepare(updateRequest, getResult);
|
||||||
|
Streamable action = result.action();
|
||||||
|
assertThat(action, instanceOf(IndexRequest.class));
|
||||||
|
IndexRequest indexAction = (IndexRequest) action;
|
||||||
|
assertThat(indexAction.ttl(), is(providedTTLValue));
|
||||||
|
|
||||||
|
// We just upsert one document with ttl using a script
|
||||||
|
indexRequest = new IndexRequest("test", "type1", "2")
|
||||||
|
.source(jsonBuilder().startObject().field("foo", "bar").endObject())
|
||||||
|
.ttl(providedTTLValue);
|
||||||
|
updateRequest = new UpdateRequest("test", "type1", "2")
|
||||||
|
.upsert(indexRequest)
|
||||||
|
.script(new Script(";"))
|
||||||
|
.scriptedUpsert(true);
|
||||||
|
|
||||||
|
// We simulate that the document is not existing yet
|
||||||
|
getResult = new GetResult("test", "type1", "2", 0, false, null, null);
|
||||||
|
result = updateHelper.prepare(updateRequest, getResult);
|
||||||
|
action = result.action();
|
||||||
|
assertThat(action, instanceOf(IndexRequest.class));
|
||||||
|
indexAction = (IndexRequest) action;
|
||||||
|
assertThat(indexAction.ttl(), is(providedTTLValue));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
|
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue