handle timestamp and TTL in update action
This commit is contained in:
parent
ed8a46ce09
commit
682176497f
|
@ -43,6 +43,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||||
|
@ -52,7 +53,7 @@ import org.elasticsearch.index.get.GetResult;
|
||||||
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
|
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
|
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
|
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
|
||||||
import org.elasticsearch.index.service.IndexService;
|
import org.elasticsearch.index.service.IndexService;
|
||||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
@ -152,8 +153,9 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
||||||
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||||
IndexShard indexShard = indexService.shardSafe(request.shardId());
|
IndexShard indexShard = indexService.shardSafe(request.shardId());
|
||||||
|
|
||||||
|
long getDate = System.currentTimeMillis();
|
||||||
GetResult getResult = indexShard.getService().get(request.type(), request.id(),
|
GetResult getResult = indexShard.getService().get(request.type(), request.id(),
|
||||||
new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TimestampFieldMapper.NAME}, true);
|
new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME}, true);
|
||||||
|
|
||||||
// no doc, what to do, what to do...
|
// no doc, what to do, what to do...
|
||||||
if (!getResult.exists()) {
|
if (!getResult.exists()) {
|
||||||
|
@ -183,12 +185,28 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
||||||
}
|
}
|
||||||
|
|
||||||
String operation = (String) ctx.get("op");
|
String operation = (String) ctx.get("op");
|
||||||
|
String timestamp = (String) ctx.get("_timestamp");
|
||||||
|
Long ttl = null;
|
||||||
|
Object fetchedTTL = ctx.get("_ttl");
|
||||||
|
if (fetchedTTL != null) {
|
||||||
|
if (fetchedTTL instanceof Number) {
|
||||||
|
ttl = ((Number) fetchedTTL).longValue();
|
||||||
|
} else {
|
||||||
|
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
|
||||||
|
}
|
||||||
|
}
|
||||||
source = (Map<String, Object>) ctx.get("_source");
|
source = (Map<String, Object>) ctx.get("_source");
|
||||||
|
|
||||||
// apply script to update the source
|
// apply script to update the source
|
||||||
String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null;
|
String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null;
|
||||||
String parent = getResult.fields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).value().toString() : null;
|
String parent = getResult.fields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).value().toString() : null;
|
||||||
// TODO ttl/timestamp
|
// No TTL has been given in the update script so we keep previous TTL value if there is one
|
||||||
|
if (ttl == null) {
|
||||||
|
ttl = getResult.fields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).value() : null;
|
||||||
|
if (ttl != null) {
|
||||||
|
ttl = ttl - (System.currentTimeMillis() - getDate); // It is an approximation of exact TTL value, could be improved
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO percolate?
|
// TODO percolate?
|
||||||
|
|
||||||
|
@ -197,7 +215,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
||||||
if (operation == null || "index".equals(operation)) {
|
if (operation == null || "index".equals(operation)) {
|
||||||
IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
|
IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
|
||||||
.source(source, sourceAndContent.v1())
|
.source(source, sourceAndContent.v1())
|
||||||
.version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
|
.version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
|
||||||
|
.timestamp(timestamp).ttl(ttl);
|
||||||
indexRequest.operationThreaded(false);
|
indexRequest.operationThreaded(false);
|
||||||
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
|
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue