diff --git a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 5a296cdf925..ae9eea79b9f 100644 --- a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -43,6 +43,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.DocumentMissingException; @@ -52,7 +53,7 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.mapper.internal.RoutingFieldMapper; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; -import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; +import org.elasticsearch.index.mapper.internal.TTLFieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.ShardId; @@ -152,8 +153,9 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio IndexService indexService = indicesService.indexServiceSafe(request.index()); IndexShard indexShard = indexService.shardSafe(request.shardId()); + long getDate = System.currentTimeMillis(); GetResult getResult = indexShard.getService().get(request.type(), request.id(), - new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TimestampFieldMapper.NAME}, true); + new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME}, true); // no doc, what to do, what to do... if (!getResult.exists()) { @@ -183,12 +185,28 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio } String operation = (String) ctx.get("op"); + String timestamp = (String) ctx.get("_timestamp"); + Long ttl = null; + Object fetchedTTL = ctx.get("_ttl"); + if (fetchedTTL != null) { + if (fetchedTTL instanceof Number) { + ttl = ((Number) fetchedTTL).longValue(); + } else { + ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); + } + } source = (Map) ctx.get("_source"); // apply script to update the source String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null; String parent = getResult.fields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).value().toString() : null; - // TODO ttl/timestamp + // No TTL has been given in the update script so we keep previous TTL value if there is one + if (ttl == null) { + ttl = getResult.fields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).value() : null; + if (ttl != null) { + ttl = ttl - (System.currentTimeMillis() - getDate); // It is an approximation of exact TTL value, could be improved + } + } // TODO percolate? @@ -197,7 +215,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio if (operation == null || "index".equals(operation)) { IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .source(source, sourceAndContent.v1()) - .version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); + .version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()) + .timestamp(timestamp).ttl(ttl); indexRequest.operationThreaded(false); indexAction.execute(indexRequest, new ActionListener() { @Override