Java api: allow to specify ttl as a time value (either as a string or using TimeValue)

The ttl could be specified as a time value only via the REST layer. That is now possible via java api too, either as a string or as a proper TimeValue. The internal format in IndexRequest becomes now TimeValue, which will then still converted to a long before storing the document.

Closes #15047
This commit is contained in:
javanna 2015-11-26 15:08:49 +01:00 committed by Luca Cavanna
parent 0c2c7e7ef5
commit fae494aa84
14 changed files with 162 additions and 63 deletions

View File

@ -300,7 +300,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
String parent = null;
String[] fields = defaultFields;
String timestamp = null;
Long ttl = null;
TimeValue ttl = null;
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
@ -333,9 +333,9 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
timestamp = parser.text();
} else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) {
if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
ttl = TimeValue.parseTimeValue(parser.text(), null, currentFieldName).millis();
ttl = TimeValue.parseTimeValue(parser.text(), null, currentFieldName);
} else {
ttl = parser.longValue();
ttl = new TimeValue(parser.longValue());
}
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text();

View File

@ -335,7 +335,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
indexRequest.process(clusterState.metaData(), mappingMd, allowIdGeneration, request.index());
}
return executeIndexRequestOnPrimary(request, indexRequest, indexShard);
return executeIndexRequestOnPrimary(indexRequest, indexShard);
}
private WriteResult<DeleteResponse> shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType;
@ -136,7 +137,8 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
private String parent;
@Nullable
private String timestamp;
private long ttl = -1;
@Nullable
private TimeValue ttl;
private BytesReference source;
@ -229,6 +231,12 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
if (!versionType.validateVersionForWrites(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type [" + versionType.name() + "]", validationException);
}
if (ttl != null) {
if (ttl.millis() < 0) {
validationException = addValidationError("ttl must not be negative", validationException);
}
}
return validationException;
}
@ -324,22 +332,33 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
}
/**
* Sets the relative ttl value. It musts be &gt; 0 as it makes little sense otherwise. Setting it
* to <tt>null</tt> will reset to have no ttl.
* Sets the ttl value as a time value expression.
*/
public IndexRequest ttl(Long ttl) throws ElasticsearchGenerationException {
if (ttl == null) {
this.ttl = -1;
return this;
}
if (ttl <= 0) {
throw new IllegalArgumentException("TTL value must be > 0. Illegal value provided [" + ttl + "]");
}
public IndexRequest ttl(String ttl) {
this.ttl = TimeValue.parseTimeValue(ttl, null, "ttl");
return this;
}
/**
* Sets the ttl as a {@link TimeValue} instance.
*/
public IndexRequest ttl(TimeValue ttl) {
this.ttl = ttl;
return this;
}
public long ttl() {
/**
* Sets the relative ttl value in milliseconds. It musts be greater than 0 as it makes little sense otherwise.
*/
public IndexRequest ttl(long ttl) {
this.ttl = new TimeValue(ttl);
return this;
}
/**
* Returns the ttl as a {@link TimeValue}
*/
public TimeValue ttl() {
return this.ttl;
}
@ -665,7 +684,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
routing = in.readOptionalString();
parent = in.readOptionalString();
timestamp = in.readOptionalString();
ttl = in.readLong();
ttl = in.readBoolean() ? TimeValue.readTimeValue(in) : null;
source = in.readBytesReference();
opType = OpType.fromId(in.readByte());
@ -682,7 +701,12 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
out.writeOptionalString(routing);
out.writeOptionalString(parent);
out.writeOptionalString(timestamp);
out.writeLong(ttl);
if (ttl == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
ttl.writeTo(out);
}
out.writeBytesReference(source);
out.writeByte(opType.id());
out.writeBoolean(refresh);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
@ -254,9 +255,27 @@ public class IndexRequestBuilder extends ReplicationRequestBuilder<IndexRequest,
return this;
}
// Sets the relative ttl value. It musts be > 0 as it makes little sense otherwise.
/**
* Sets the ttl value as a time value expression.
*/
public IndexRequestBuilder setTTL(String ttl) {
request.ttl(ttl);
return this;
}
/**
* Sets the relative ttl value in milliseconds. It musts be greater than 0 as it makes little sense otherwise.
*/
public IndexRequestBuilder setTTL(long ttl) {
request.ttl(ttl);
return this;
}
/**
* Sets the ttl as a {@link TimeValue} instance.
*/
public IndexRequestBuilder setTTL(TimeValue ttl) {
request.ttl(ttl);
return this;
}
}

View File

@ -166,7 +166,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardRequest.shardId.id());
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard);
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(request, indexShard);
final IndexResponse response = result.response;
final Translog.Location location = result.location;

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.index.IndexResponse;
@ -1074,23 +1073,22 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
/** Utility method to create either an index or a create operation depending
* on the {@link OpType} of the request. */
private final Engine.Index prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
private Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
return indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
}
/** Execute the given {@link IndexRequest} on a primary shard, throwing a
* {@link RetryOnPrimaryException} if the operation needs to be re-tried. */
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.Index operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = indexShard.shardId();
if (update != null) {
final String indexName = shardId.getIndex();
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
operation = prepareIndexOperationOnPrimary(request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,

View File

@ -88,7 +88,7 @@ public class UpdateHelper extends AbstractComponent {
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
}
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
Long ttl = indexRequest.ttl();
TimeValue ttl = indexRequest.ttl();
if (request.scriptedUpsert() && request.script() != null) {
// Run the script to perform the create logic
IndexRequest upsert = request.upsertRequest();
@ -99,7 +99,7 @@ public class UpdateHelper extends AbstractComponent {
ctx.put("_source", upsertDoc);
ctx = executeScript(request, ctx);
//Allow the script to set TTL using ctx._ttl
if (ttl < 0) {
if (ttl == null) {
ttl = getTTLFromScriptContext(ctx);
}
@ -124,7 +124,7 @@ public class UpdateHelper extends AbstractComponent {
indexRequest.index(request.index()).type(request.type()).id(request.id())
// it has to be a "create!"
.create(true)
.ttl(ttl == null || ttl < 0 ? null : ttl)
.ttl(ttl)
.refresh(request.refresh())
.routing(request.routing())
.parent(request.parent())
@ -151,7 +151,7 @@ public class UpdateHelper extends AbstractComponent {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
String operation = null;
String timestamp = null;
Long ttl = null;
TimeValue ttl = null;
final Map<String, Object> updatedSourceAsMap;
final XContentType updateSourceContentType = sourceAndContent.v1();
String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null;
@ -160,7 +160,7 @@ public class UpdateHelper extends AbstractComponent {
if (request.script() == null && request.doc() != null) {
IndexRequest indexRequest = request.doc();
updatedSourceAsMap = sourceAndContent.v2();
if (indexRequest.ttl() > 0) {
if (indexRequest.ttl() != null) {
ttl = indexRequest.ttl();
}
timestamp = indexRequest.timestamp();
@ -211,9 +211,9 @@ public class UpdateHelper extends AbstractComponent {
// apply script to update the source
// No TTL has been given in the update script so we keep previous TTL value if there is one
if (ttl == null) {
ttl = getResult.getFields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).getValue() : null;
if (ttl != null) {
ttl = ttl - TimeValue.nsecToMSec(System.nanoTime() - getDateNS); // It is an approximation of exact TTL value, could be improved
Long ttlAsLong = getResult.getFields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).getValue() : null;
if (ttlAsLong != null) {
ttl = new TimeValue(ttlAsLong - TimeValue.nsecToMSec(System.nanoTime() - getDateNS));// It is an approximation of exact TTL value, could be improved
}
}
@ -256,17 +256,15 @@ public class UpdateHelper extends AbstractComponent {
return ctx;
}
private Long getTTLFromScriptContext(Map<String, Object> ctx) {
Long ttl = null;
private TimeValue getTTLFromScriptContext(Map<String, Object> ctx) {
Object fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null, "_ttl").millis();
return new TimeValue(((Number) fetchedTTL).longValue());
}
return TimeValue.parseTimeValue((String) fetchedTTL, null, "_ttl");
}
return ttl;
return null;
}
/**
@ -337,13 +335,10 @@ public class UpdateHelper extends AbstractComponent {
}
}
public static enum Operation {
public enum Operation {
UPSERT,
INDEX,
DELETE,
NONE
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
@ -325,7 +326,7 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
}
/**
* Set the new ttl of the document. Note that if detectNoop is true (the default)
* Set the new ttl of the document as a long. Note that if detectNoop is true (the default)
* and the source of the document isn't changed then the ttl update won't take
* effect.
*/
@ -333,4 +334,24 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
request.doc().ttl(ttl);
return this;
}
/**
* Set the new ttl of the document as a time value expression. Note that if detectNoop is true (the default)
* and the source of the document isn't changed then the ttl update won't take
* effect.
*/
public UpdateRequestBuilder setTtl(String ttl) {
request.doc().ttl(ttl);
return this;
}
/**
* Set the new ttl of the document as a {@link TimeValue} instance. Note that if detectNoop is true (the default)
* and the source of the document isn't changed then the ttl update won't take
* effect.
*/
public UpdateRequestBuilder setTtl(TimeValue ttl) {
request.doc().ttl(ttl);
return this;
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.mapper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
/**
@ -159,16 +160,22 @@ public class SourceToParse {
return this.ttl;
}
public SourceToParse ttl(TimeValue ttl) {
if (ttl == null) {
this.ttl = -1;
return this;
}
this.ttl = ttl.millis();
return this;
}
public SourceToParse ttl(long ttl) {
this.ttl = ttl;
return this;
}
public static enum Origin {
public enum Origin {
PRIMARY,
REPLICA
}
}

View File

@ -74,7 +74,7 @@ public class RestIndexAction extends BaseRestHandler {
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
indexRequest.ttl(request.paramAsTime("ttl", null).millis());
indexRequest.ttl(request.param("ttl"));
}
indexRequest.source(request.content());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));

View File

@ -105,7 +105,7 @@ public class RestUpdateAction extends BaseRestHandler {
upsertRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
upsertRequest.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
upsertRequest.ttl(request.paramAsTime("ttl", null).millis());
upsertRequest.ttl(request.param("ttl"));
}
upsertRequest.version(RestActions.parseVersion(request));
upsertRequest.versionType(VersionType.fromString(request.param("version_type"), upsertRequest.versionType()));
@ -116,7 +116,7 @@ public class RestUpdateAction extends BaseRestHandler {
doc.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
doc.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
doc.ttl(request.paramAsTime("ttl", null).millis());
doc.ttl(request.param("ttl"));
}
doc.version(RestActions.parseVersion(request));
doc.versionType(VersionType.fromString(request.param("version_type"), doc.versionType()));

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.action.index;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.ESTestCase;
@ -25,10 +27,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.*;
/**
*/
@ -67,4 +66,43 @@ public class IndexRequestTests extends ESTestCase {
request.version(randomIntBetween(0, Integer.MAX_VALUE));
assertThat(request.validate().validationErrors(), not(empty()));
}
public void testSetTTLAsTimeValue() {
IndexRequest indexRequest = new IndexRequest();
TimeValue ttl = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl");
indexRequest.ttl(ttl);
assertThat(indexRequest.ttl(), equalTo(ttl));
}
public void testSetTTLAsString() {
IndexRequest indexRequest = new IndexRequest();
String ttlAsString = randomTimeValue();
TimeValue ttl = TimeValue.parseTimeValue(ttlAsString, null, "ttl");
indexRequest.ttl(ttlAsString);
assertThat(indexRequest.ttl(), equalTo(ttl));
}
public void testSetTTLAsLong() {
IndexRequest indexRequest = new IndexRequest();
String ttlAsString = randomTimeValue();
TimeValue ttl = TimeValue.parseTimeValue(ttlAsString, null, "ttl");
indexRequest.ttl(ttl.millis());
assertThat(indexRequest.ttl(), equalTo(ttl));
}
public void testValidateTTL() {
IndexRequest indexRequest = new IndexRequest("index", "type");
if (randomBoolean()) {
indexRequest.ttl(randomIntBetween(Integer.MIN_VALUE, -1));
} else {
if (randomBoolean()) {
indexRequest.ttl(new TimeValue(randomIntBetween(Integer.MIN_VALUE, -1)));
} else {
indexRequest.ttl(randomIntBetween(Integer.MIN_VALUE, -1) + "ms");
}
}
ActionRequestValidationException validate = indexRequest.validate();
assertThat(validate, notNullValue());
assertThat(validate.getMessage(), containsString("ttl must not be negative"));
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.get.GetResult;
@ -33,11 +34,7 @@ import org.elasticsearch.test.ESTestCase;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
public class UpdateRequestTests extends ESTestCase {
public void testUpdateRequest() throws Exception {
@ -127,7 +124,7 @@ public class UpdateRequestTests extends ESTestCase {
// Related to issue 3256
public void testUpdateRequestWithTTL() throws Exception {
long providedTTLValue = randomIntBetween(500, 1000);
TimeValue providedTTLValue = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl");
Settings settings = settings(Version.CURRENT).build();
UpdateHelper updateHelper = new UpdateHelper(settings, null);

View File

@ -295,7 +295,7 @@ public class TTLMappingTests extends ESSingleNodeTestCase {
request.process(MetaData.builder().build(), mappingMetaData, true, "test");
// _ttl in a document never worked, so backcompat is ignoring the field
assertEquals(-1, request.ttl());
assertNull(request.ttl());
assertNull(docMapper.parse("test", "type", "1", doc.bytes()).rootDoc().get("_ttl"));
}