diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 694d23738f2..fda67909706 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -114,6 +114,7 @@ public class BulkRequest implements ActionRequest { String routing = null; String parent = null; String timestamp = null; + long ttl = -1; String opType = null; long version = 0; VersionType versionType = VersionType.INTERNAL; @@ -136,6 +137,8 @@ public class BulkRequest implements ActionRequest { parent = parser.text(); } else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) { timestamp = parser.text(); + } else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) { + ttl = parser.longValue(); } else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) { opType = parser.text(); } else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) { @@ -160,17 +163,17 @@ public class BulkRequest implements ActionRequest { // of index request. All index requests are still unsafe if applicable. if ("index".equals(action)) { if (opType == null) { - internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).version(version).versionType(versionType) + internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) .source(data, from, nextMarker - from, contentUnsafe) .percolate(percolate)); } else { - internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).version(version).versionType(versionType) + internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) .create("create".equals(opType)) .source(data, from, nextMarker - from, contentUnsafe) .percolate(percolate)); } } else if ("create".equals(action)) { - internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).version(version).versionType(versionType) + internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) .create(true) .source(data, from, nextMarker - from, contentUnsafe) .percolate(percolate)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 1fc092690f6..116a288f8d7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -123,7 +123,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } SourceToParse sourceToParse = SourceToParse.source(indexRequest.underlyingSource(), indexRequest.underlyingSourceOffset(), indexRequest.underlyingSourceLength()).type(indexRequest.type()).id(indexRequest.id()) - .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()); + .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); + long version; Engine.IndexingOperation op; if (indexRequest.opType() == IndexRequest.OpType.INDEX) { @@ -232,7 +233,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation IndexRequest indexRequest = (IndexRequest) item.request(); try { SourceToParse sourceToParse = SourceToParse.source(indexRequest.underlyingSource(), indexRequest.underlyingSourceOffset(), indexRequest.underlyingSourceLength()).type(indexRequest.type()).id(indexRequest.id()) - .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()); + .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); + if (indexRequest.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA); indexShard.index(index); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java index cb2b6ed41e6..bebebf4f6cc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -119,6 +119,7 @@ public class IndexRequest extends ShardReplicationOperationRequest { @Nullable private String routing; @Nullable private String parent; @Nullable private String timestamp; + private long ttl = -1; private byte[] source; private int sourceOffset; @@ -287,6 +288,19 @@ public class IndexRequest extends ShardReplicationOperationRequest { return this.timestamp; } + // Sets the relative ttl value. It musts be > 0 as it makes little sense otherwise. + public IndexRequest ttl(long ttl) throws ElasticSearchGenerationException { + if (ttl <= 0) { + throw new ElasticSearchIllegalArgumentException("TTL value must be > 0. Illegal value provided [" + ttl + "]"); + } + this.ttl = ttl; + return this; + } + + public long ttl() { + return this.ttl; + } + /** * The source of the document to index, recopied to a new array if it has an offset or unsafe. */ @@ -644,7 +658,7 @@ public class IndexRequest extends ShardReplicationOperationRequest { if (in.readBoolean()) { timestamp = in.readUTF(); } - + ttl = in.readLong(); sourceUnsafe = false; sourceOffset = 0; sourceLength = in.readVInt(); @@ -687,6 +701,7 @@ public class IndexRequest extends ShardReplicationOperationRequest { out.writeBoolean(true); out.writeUTF(timestamp); } + out.writeLong(ttl); out.writeVInt(sourceLength); out.writeBytes(source, sourceOffset, sourceLength); out.writeByte(opType.id()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index f1f457dc866..0fc055eee76 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -170,7 +170,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi IndexShard indexShard = indexShard(shardRequest); SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id()) - .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()); + .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); long version; Engine.IndexingOperation op; if (request.opType() == IndexRequest.OpType.INDEX) { @@ -225,7 +225,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi IndexShard indexShard = indexShard(shardRequest); IndexRequest request = shardRequest.request; SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id()) - .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()); + .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse) .version(request.version()) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java index 8b914b77d7a..4369820b069 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java @@ -311,6 +311,12 @@ public class IndexRequestBuilder extends BaseRequestBuilder 0 as it makes little sense otherwise. + public IndexRequestBuilder setTTL(long ttl) { + request.ttl(ttl); + return this; + } + /** * Should the listener be called on a separate thread if needed. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/AlreadyExpiredException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/AlreadyExpiredException.java new file mode 100644 index 00000000000..060e9c98a12 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/AlreadyExpiredException.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException; + +public class AlreadyExpiredException extends ElasticSearchException implements IgnoreOnRecoveryEngineException { + private String index; + private String type; + private String id; + private final long timestamp; + private final long ttl; + private final long now; + + public AlreadyExpiredException(String index, String type, String id, long timestamp, long ttl, long now) { + super("already expired [" + index + "]/[" + type + "]/[" + id + "] due to expire at [" + (timestamp + ttl) + "] and was processed at [" + now + "]"); + this.index = index; + this.type = type; + this.id = id; + this.timestamp = timestamp; + this.ttl = ttl; + this.now = now; + } + + public String index() { + return index; + } + + public String type() { + return type; + } + + public String id() { + return id; + } + + public long timestamp() { + return timestamp; + } + + public long ttl() { + return ttl; + } + + public long now() { + return now; + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java index bec2b8a915a..8b3c1806eda 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -366,6 +366,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this.doc.timestamp(); } + public long ttl() { + return this.doc.ttl(); + } + public long version() { return this.version; } @@ -526,6 +530,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this.doc.timestamp(); } + public long ttl() { + return this.doc.ttl(); + } + public byte[] source() { return this.doc.source(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/IgnoreOnRecoveryEngineException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/IgnoreOnRecoveryEngineException.java new file mode 100644 index 00000000000..724766b602a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/IgnoreOnRecoveryEngineException.java @@ -0,0 +1,26 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +/** + * Exceptions implementing this interface will be ignored during recovery. + */ +public interface IgnoreOnRecoveryEngineException { +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 80e11e34490..f9318b48437 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.mapper.internal.RoutingFieldMapper; +import org.elasticsearch.index.mapper.internal.TTLFieldMapper; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.selector.FieldMappersFieldSelector; @@ -254,6 +255,11 @@ public class ShardGetService extends AbstractIndexShardComponent { value = source.parent; } else if (field.equals(TimestampFieldMapper.NAME) && docMapper.timestampFieldMapper().stored()) { value = source.timestamp; + } else if (field.equals(TTLFieldMapper.NAME) && docMapper.TTLFieldMapper().stored()) { + // Call value for search with timestamp + ttl here to display the live remaining ttl value and be consistent with the search result display + if (source.ttl > 0) { + value = docMapper.TTLFieldMapper().valueForSearch(source.timestamp + source.ttl); + } } else { String script = null; if (field.contains("_source.")) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index c64721203ba..d71e0ce7a93 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -167,6 +167,7 @@ public class DocumentMapper implements ToXContent { this.rootMappers.put(BoostFieldMapper.class, new BoostFieldMapper()); this.rootMappers.put(RoutingFieldMapper.class, new RoutingFieldMapper()); this.rootMappers.put(TimestampFieldMapper.class, new TimestampFieldMapper()); + this.rootMappers.put(TTLFieldMapper.class, new TTLFieldMapper()); this.rootMappers.put(UidFieldMapper.class, new UidFieldMapper()); // don't add parent field, by default its "null" } @@ -368,6 +369,10 @@ public class DocumentMapper implements ToXContent { return rootMapper(TimestampFieldMapper.class); } + public TTLFieldMapper TTLFieldMapper() { + return rootMapper(TTLFieldMapper.class); + } + public Analyzer indexAnalyzer() { return this.indexAnalyzer; } @@ -486,7 +491,7 @@ public class DocumentMapper implements ToXContent { if (context.docs().size() > 1) { Collections.reverse(context.docs()); } - ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), source.timestamp(), context.docs(), context.analyzer(), + ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), source.timestamp(), source.ttl(), context.docs(), context.analyzer(), context.source(), context.sourceOffset(), context.sourceLength(), context.mappersAdded()).parent(source.parent()); // reset the context to free up memory context.reset(null, null, null, null); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapperParser.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapperParser.java index 469353f7492..8a271237294 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapperParser.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapperParser.java @@ -97,6 +97,7 @@ public class DocumentMapperParser extends AbstractIndexComponent { .put(ParentFieldMapper.NAME, new ParentFieldMapper.TypeParser()) .put(RoutingFieldMapper.NAME, new RoutingFieldMapper.TypeParser()) .put(TimestampFieldMapper.NAME, new TimestampFieldMapper.TypeParser()) + .put(TTLFieldMapper.NAME, new TTLFieldMapper.TypeParser()) .put(UidFieldMapper.NAME, new UidFieldMapper.TypeParser()) .put(IdFieldMapper.NAME, new IdFieldMapper.TypeParser()) .immutableMap(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java index 00efdf8c5f0..22ba2295f87 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java @@ -42,6 +42,8 @@ public class ParsedDocument { private final long timestamp; + private final long ttl; + private final List documents; private final Analyzer analyzer; @@ -54,16 +56,17 @@ public class ParsedDocument { private String parent; - public ParsedDocument(String uid, String id, String type, String routing, long timestamp, Document document, Analyzer analyzer, byte[] source, boolean mappersAdded) { - this(uid, id, type, routing, timestamp, Arrays.asList(document), analyzer, source, 0, source.length, mappersAdded); + public ParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, Document document, Analyzer analyzer, byte[] source, boolean mappersAdded) { + this(uid, id, type, routing, timestamp, ttl, Arrays.asList(document), analyzer, source, 0, source.length, mappersAdded); } - public ParsedDocument(String uid, String id, String type, String routing, long timestamp, List documents, Analyzer analyzer, byte[] source, int sourceOffset, int sourceLength, boolean mappersAdded) { + public ParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, List documents, Analyzer analyzer, byte[] source, int sourceOffset, int sourceLength, boolean mappersAdded) { this.uid = uid; this.id = id; this.type = type; this.routing = routing; this.timestamp = timestamp; + this.ttl = ttl; this.documents = documents; this.source = source; this.sourceOffset = sourceOffset; @@ -92,6 +95,10 @@ public class ParsedDocument { return this.timestamp; } + public long ttl() { + return this.ttl; + } + public Document rootDoc() { return documents.get(documents.size() - 1); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java index 5c59c0c74b4..4863fc84ac6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java @@ -56,6 +56,8 @@ public class SourceToParse { private long timestamp; + private long ttl; + public SourceToParse(XContentParser parser) { this.parser = parser; this.source = null; @@ -151,4 +153,13 @@ public class SourceToParse { this.timestamp = timestamp; return this; } + + public long ttl() { + return this.ttl; + } + + public SourceToParse ttl(long ttl) { + this.ttl = ttl; + return this; + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/Uid.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/Uid.java index 922f2abcfbb..88e398fce0d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/Uid.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/Uid.java @@ -74,6 +74,11 @@ public final class Uid { return uid.substring(delimiterIndex + 1); } + public static String typeFromUid(String uid) { + int delimiterIndex = uid.indexOf(DELIMITER); // type is not allowed to have # in it..., ids can + return uid.substring(0, delimiterIndex); + } + public static Uid createUid(String uid) { int delimiterIndex = uid.indexOf(DELIMITER); // type is not allowed to have # in it..., ids can return new Uid(uid.substring(0, delimiterIndex), uid.substring(delimiterIndex + 1)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/TTLFieldMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/TTLFieldMapper.java new file mode 100644 index 00000000000..3841dcb87bf --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/TTLFieldMapper.java @@ -0,0 +1,185 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.mapper.internal; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Fieldable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.AlreadyExpiredException; +import org.elasticsearch.index.mapper.InternalMapper; +import org.elasticsearch.index.mapper.Mapper; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.MergeContext; +import org.elasticsearch.index.mapper.MergeMappingException; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.RootMapper; +import org.elasticsearch.index.mapper.core.LongFieldMapper; +import org.elasticsearch.index.mapper.core.NumberFieldMapper; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Date; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.support.XContentMapValues.*; +import static org.elasticsearch.index.mapper.core.TypeParsers.*; + +public class TTLFieldMapper extends LongFieldMapper implements InternalMapper, RootMapper { + + public static final String NAME = "_ttl"; + public static final String CONTENT_TYPE = "_ttl"; + + public static class Defaults extends LongFieldMapper.Defaults { + public static final String NAME = TTLFieldMapper.CONTENT_TYPE; + public static final Field.Store STORE = Field.Store.YES; + public static final Field.Index INDEX = Field.Index.NOT_ANALYZED; + public static final boolean ENABLED = false; + } + + public static class Builder extends NumberFieldMapper.Builder { + + private boolean enabled = Defaults.ENABLED; + + public Builder() { + super(Defaults.NAME); + store = Defaults.STORE; + index = Defaults.INDEX; + } + + public Builder enabled(boolean enabled) { + this.enabled = enabled; + return builder; + } + + @Override public TTLFieldMapper build(BuilderContext context) { + return new TTLFieldMapper(store, index, enabled); + } + } + + public static class TypeParser implements Mapper.TypeParser { + @Override public Mapper.Builder parse(String name, Map node, ParserContext parserContext) throws MapperParsingException { + TTLFieldMapper.Builder builder = new TTLFieldMapper.Builder(); + parseField(builder, builder.name, node, parserContext); + for (Map.Entry entry : node.entrySet()) { + String fieldName = Strings.toUnderscoreCase(entry.getKey()); + Object fieldNode = entry.getValue(); + if (fieldName.equals("enabled")) { + builder.enabled(nodeBooleanValue(fieldNode)); + } + } + return builder; + } + } + + private boolean enabled; + + public TTLFieldMapper() { + this(Defaults.STORE, Defaults.INDEX, Defaults.ENABLED); + } + + protected TTLFieldMapper(Field.Store store, Field.Index index, boolean enabled) { + super(new Names(Defaults.NAME, Defaults.NAME, Defaults.NAME, Defaults.NAME), Defaults.PRECISION_STEP, + Defaults.FUZZY_FACTOR, index, store, Defaults.BOOST, Defaults.OMIT_NORMS, + Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Defaults.NULL_VALUE); + this.enabled = enabled; + } + + public boolean enabled() { + return this.enabled; + } + + // Overrides valueForSearch to display live value of remaining ttl + @Override public Object valueForSearch(Fieldable field) { + long now; + SearchContext searchContext = SearchContext.current(); + if (searchContext != null) { + now = searchContext.nowInMillis(); + } else { + now = System.currentTimeMillis(); + } + long value = value(field); + return value - now; + } + + // Other implementation for realtime get display + public Object valueForSearch(long expirationTime) { + return expirationTime - System.currentTimeMillis(); + } + + @Override public void validate(ParseContext context) throws MapperParsingException { + } + + @Override public void preParse(ParseContext context) throws IOException { + } + + @Override public void postParse(ParseContext context) throws IOException { + super.parse(context); + } + + @Override public void parse(ParseContext context) throws IOException, MapperParsingException { + if (context.sourceToParse().ttl() < 0) { // no ttl has been provided externally + long ttl = context.parser().longValue(); + if (ttl <= 0) { + throw new MapperParsingException("TTL value must be > 0. Illegal value provided [" + ttl + "]"); + } + context.sourceToParse().ttl(ttl); + } + } + + @Override public boolean includeInObject() { + return true; + } + + @Override protected Fieldable parseCreateField(ParseContext context) throws IOException, AlreadyExpiredException { + if (enabled) { + long timestamp = context.sourceToParse().timestamp(); + long ttl = context.sourceToParse().ttl(); + if (ttl > 0) { // a ttl has been provided either externally or in the _source + long expire = new Date(timestamp + ttl).getTime(); + long now = System.currentTimeMillis(); + // there is not point indexing already expired doc + if (now >= expire) { + throw new AlreadyExpiredException(context.index(), context.type(), context.id(), timestamp, ttl, now); + } + // the expiration timestamp (timestamp + ttl) is set as field + return new CustomLongNumericField(this, expire); + } + } + return null; + } + + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // if all are defaults, no sense to write it at all + if (enabled == Defaults.ENABLED) { + return builder; + } + builder.startObject(CONTENT_TYPE); + if (enabled != Defaults.ENABLED) { + builder.field("enabled", enabled); + } + builder.endObject(); + return builder; + } + + @Override public void merge(Mapper mergeWith, MergeContext mergeContext) throws MergeMappingException { + // do nothing here, no merging, but also no exception + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 0ead6d7276a..f71e9191340 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -44,6 +44,7 @@ import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException; import org.elasticsearch.index.engine.OptimizeFailedEngineException; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.flush.FlushStats; @@ -542,31 +543,50 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } - switch (operation.opType()) { - case CREATE: - Translog.Create create = (Translog.Create) operation; - engine.create(prepareCreate(source(create.source(), create.sourceOffset(), create.sourceLength()).type(create.type()).id(create.id()) - .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp())).version(create.version()) - .origin(Engine.Operation.Origin.RECOVERY)); - break; - case SAVE: - Translog.Index index = (Translog.Index) operation; - engine.index(prepareIndex(source(index.source(), index.sourceOffset(), index.sourceLength()).type(index.type()).id(index.id()) - .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp())).version(index.version()) - .origin(Engine.Operation.Origin.RECOVERY)); - break; - case DELETE: - Translog.Delete delete = (Translog.Delete) operation; - Uid uid = Uid.createUid(delete.uid().text()); - engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid()).version(delete.version()) - .origin(Engine.Operation.Origin.RECOVERY)); - break; - case DELETE_BY_QUERY: - Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation; - engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types())); - break; - default: - throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]"); + try { + switch (operation.opType()) { + case CREATE: + Translog.Create create = (Translog.Create) operation; + engine.create(prepareCreate(source(create.source(), create.sourceOffset(), create.sourceLength()).type(create.type()).id(create.id()) + .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl())).version(create.version()) + .origin(Engine.Operation.Origin.RECOVERY)); + break; + case SAVE: + Translog.Index index = (Translog.Index) operation; + engine.index(prepareIndex(source(index.source(), index.sourceOffset(), index.sourceLength()).type(index.type()).id(index.id()) + .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl())).version(index.version()) + .origin(Engine.Operation.Origin.RECOVERY)); + break; + case DELETE: + Translog.Delete delete = (Translog.Delete) operation; + Uid uid = Uid.createUid(delete.uid().text()); + engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid()).version(delete.version()) + .origin(Engine.Operation.Origin.RECOVERY)); + break; + case DELETE_BY_QUERY: + Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation; + engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types())); + break; + default: + throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]"); + } + } catch (ElasticSearchException e) { + boolean hasIgnoreOnRecoveryException = false; + ElasticSearchException current = e; + while (true) { + if (current instanceof IgnoreOnRecoveryEngineException) { + hasIgnoreOnRecoveryException = true; + break; + } + if (current.getCause() instanceof ElasticSearchException) { + current = (ElasticSearchException) current.getCause(); + } else { + break; + } + } + if (!hasIgnoreOnRecoveryException) { + throw e; + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index 1548e7e3d94..480676dffc7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -231,12 +231,14 @@ public interface Translog extends IndexShardComponent { public final String routing; public final String parent; public final long timestamp; + public final long ttl; - public Source(BytesHolder source, String routing, String parent, long timestamp) { + public Source(BytesHolder source, String routing, String parent, long timestamp, long ttl) { this.source = source; this.routing = routing; this.parent = parent; this.timestamp = timestamp; + this.ttl = ttl; } } @@ -249,6 +251,7 @@ public interface Translog extends IndexShardComponent { private String routing; private String parent; private long timestamp; + private long ttl; private long version; public Create() { @@ -263,6 +266,7 @@ public interface Translog extends IndexShardComponent { this.routing = create.routing(); this.parent = create.parent(); this.timestamp = create.timestamp(); + this.ttl = create.ttl(); this.version = create.version(); } @@ -314,6 +318,10 @@ public interface Translog extends IndexShardComponent { return this.timestamp; } + public long ttl() { + return this.ttl; + } + public long version() { return this.version; } @@ -343,7 +351,10 @@ public interface Translog extends IndexShardComponent { if (version >= 4) { this.timestamp = in.readLong(); } - return new Source(source, routing, parent, timestamp); + if (version >= 5) { + this.ttl = in.readLong(); + } + return new Source(source, routing, parent, timestamp, ttl); } @Override public void readFrom(StreamInput in) throws IOException { @@ -370,10 +381,13 @@ public interface Translog extends IndexShardComponent { if (version >= 4) { this.timestamp = in.readLong(); } + if (version >= 5) { + this.ttl = in.readLong(); + } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(4); // version + out.writeVInt(5); // version out.writeUTF(id); out.writeUTF(type); out.writeVInt(sourceLength); @@ -392,6 +406,7 @@ public interface Translog extends IndexShardComponent { } out.writeLong(version); out.writeLong(timestamp); + out.writeLong(ttl); } } @@ -405,6 +420,7 @@ public interface Translog extends IndexShardComponent { private String routing; private String parent; private long timestamp; + private long ttl; public Index() { } @@ -419,6 +435,7 @@ public interface Translog extends IndexShardComponent { this.parent = index.parent(); this.version = index.version(); this.timestamp = index.timestamp(); + this.ttl = index.ttl(); } public Index(String type, String id, byte[] source) { @@ -457,6 +474,10 @@ public interface Translog extends IndexShardComponent { return this.timestamp; } + public long ttl() { + return this.ttl; + } + public byte[] source() { return this.source; } @@ -498,7 +519,10 @@ public interface Translog extends IndexShardComponent { if (version >= 4) { this.timestamp = in.readLong(); } - return new Source(source, routing, parent, timestamp); + if (version >= 5) { + this.ttl = in.readLong(); + } + return new Source(source, routing, parent, timestamp, ttl); } @Override public void readFrom(StreamInput in) throws IOException { @@ -525,10 +549,13 @@ public interface Translog extends IndexShardComponent { if (version >= 4) { this.timestamp = in.readLong(); } + if (version >= 5) { + this.ttl = in.readLong(); + } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(4); // version + out.writeVInt(5); // version out.writeUTF(id); out.writeUTF(type); out.writeVInt(sourceLength); @@ -547,6 +574,7 @@ public interface Translog extends IndexShardComponent { } out.writeLong(version); out.writeLong(timestamp); + out.writeLong(ttl); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java index 8f55b4395dc..89f85c6050f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -33,6 +33,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; +import org.elasticsearch.indices.ttl.IndicesTTLService; /** * @author kimchy (shay.banon) @@ -62,5 +63,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules { bind(IndexingMemoryBufferController.class).asEagerSingleton(); bind(IndicesNodeFilterCache.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); + bind(IndicesTTLService.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java new file mode 100644 index 00000000000..f88e2a303ce --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java @@ -0,0 +1,241 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.ttl; + +import org.apache.lucene.document.Document; + +import org.apache.lucene.index.IndexReader; + +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.NumericRangeQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorer; +import org.elasticsearch.ElasticSearchException; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.action.bulk.BulkRequestBuilder; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.uid.UidField; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.index.mapper.FieldMappers; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.internal.TTLFieldMapper; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.mapper.selector.UidFieldSelector; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.indices.IndicesService; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + + +/** + * A node level service that delete expired docs on node primary shards. + * + */ +public class IndicesTTLService extends AbstractLifecycleComponent { + + private static final String SETTING_PURGE_INTERVAL = "purge_interval"; + private static final TimeValue DEFAULT_PURGE_INTERVAL = new TimeValue(60, TimeUnit.SECONDS); + private static final String SETTINGS_BULK_SIZE = "bulk_size"; + private static final int DEFAULT_BULK_SIZE = 10000; + + private final IndicesService indicesService; + private final Client client; + + private final TimeValue purgeInterval; + private final int bulkSize; + private BulkRequestBuilder bulkRequest; + private PurgerThread purgerThread; + + @Inject public IndicesTTLService(Settings settings, IndicesService indicesService, Client client) { + super(settings); + this.indicesService = indicesService; + this.client = client; + this.purgeInterval = componentSettings.getAsTime(SETTING_PURGE_INTERVAL, DEFAULT_PURGE_INTERVAL); + this.bulkSize = componentSettings.getAsInt(SETTINGS_BULK_SIZE, DEFAULT_BULK_SIZE); + } + + @Override protected void doStart() throws ElasticSearchException { + this.purgerThread = new PurgerThread(EsExecutors.threadName(settings, "[purger]")); + this.purgerThread.start(); + } + + @Override protected void doStop() throws ElasticSearchException { + this.purgerThread.doStop(); + } + + @Override protected void doClose() throws ElasticSearchException { + } + + private class PurgerThread extends Thread { + volatile boolean running = true; + + public PurgerThread(String name) { + super(name); + setDaemon(true); + } + + public void doStop() { + running = false; + } + + public void run() { + while (running) { + List shardsToPurge = getShardsToPurge(); + purgeShards(shardsToPurge); + try { + Thread.sleep(purgeInterval.millis()); + } catch (InterruptedException e) { + running = false; + return; + } + + } + } + + /** + * Returns the shards to purge, i.e. the local started primary shards that have ttl enabled + */ + private List getShardsToPurge() { + List shardsToPurge = new ArrayList(); + for (IndexService indexService : indicesService) { + // should be optimized with the hasTTL flag + FieldMappers ttlFieldMappers = indexService.mapperService().name(TTLFieldMapper.NAME); + // check if ttl is enabled for at least one type of this index + boolean hasTTLEnabled = false; + for (FieldMapper ttlFieldMapper : ttlFieldMappers) { + if (((TTLFieldMapper)ttlFieldMapper).enabled()) { + hasTTLEnabled = true; + break; + } + } + if (hasTTLEnabled) + { + for (Integer shardId : indexService.shardIds()) { + IndexShard shard = indexService.shard(shardId); + if (shard.routingEntry().primary() && shard.state() == IndexShardState.STARTED && shard.routingEntry().started()) { + shardsToPurge.add(shard); + } + } + } + } + return shardsToPurge; + } + } + + private void purgeShards(List shardsToPurge) { + for (IndexShard shardToPurge : shardsToPurge) { + Query query = NumericRangeQuery.newLongRange(TTLFieldMapper.NAME, null, System.currentTimeMillis(), false, true); + Engine.Searcher searcher = shardToPurge.searcher(); + try { + logger.debug("[{}][{}] purging shard", shardToPurge.routingEntry().index(), shardToPurge.routingEntry().id()); + ExpiredDocsCollector expiredDocsCollector = new ExpiredDocsCollector(); + searcher.searcher().search(query, expiredDocsCollector); + List docsToPurge = expiredDocsCollector.getDocsToPurge(); + bulkRequest = client.prepareBulk(); + for (DocToPurge docToPurge : docsToPurge) { + bulkRequest.add(new DeleteRequest().index(shardToPurge.routingEntry().index()).type(docToPurge.type).id(docToPurge.id).version(docToPurge.version)); + processBulkIfNeeded(false); + } + processBulkIfNeeded(true); + } catch (Exception e) { + logger.warn("failed to purge", e); + } finally { + searcher.release(); + } + } + } + + private static class DocToPurge { + public final String type; + public final String id; + public final long version; + + public DocToPurge(String type, String id, long version) { + this.type = type; + this.id = id; + this.version = version; + } + } + + private class ExpiredDocsCollector extends Collector { + private IndexReader indexReader; + private List docsToPurge = new ArrayList(); + + public ExpiredDocsCollector() { + } + + public void setScorer(Scorer scorer) { + } + + public boolean acceptsDocsOutOfOrder() { + return true; + } + + public void collect(int doc) { + try { + Document document = indexReader.document(doc, UidFieldSelector.INSTANCE); + String uid = document.getFieldable(UidFieldMapper.NAME).stringValue(); + long version = UidField.loadVersion(indexReader, UidFieldMapper.TERM_FACTORY.createTerm(uid)); + docsToPurge.add(new DocToPurge(Uid.typeFromUid(uid),Uid.idFromUid(uid), version)); + } catch (Exception e) { + } + } + + public void setNextReader(IndexReader reader, int docBase) { + this.indexReader = reader; + } + + public List getDocsToPurge() { + return this.docsToPurge; + } + } + + private void processBulkIfNeeded(boolean force) { + if ((force && bulkRequest.numberOfActions() > 0) || bulkRequest.numberOfActions() >= bulkSize) { + try { + bulkRequest.execute(new ActionListener() { + @Override public void onResponse(BulkResponse bulkResponse) { + logger.debug("bulk took " + bulkResponse.getTookInMillis() + "ms"); + } + + @Override public void onFailure(Throwable e) { + logger.warn("failed to execute bulk"); + } + }); + } catch (Exception e) { + logger.warn("failed to process bulk", e); + } + bulkRequest = client.prepareBulk(); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 0c1705e5cf7..53a8f079097 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -61,6 +61,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.memory.IndexingMemoryBufferController; +import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.jmx.JmxModule; import org.elasticsearch.jmx.JmxService; import org.elasticsearch.monitor.MonitorModule; @@ -174,6 +175,7 @@ public final class InternalNode implements Node { injector.getInstance(IndicesService.class).start(); injector.getInstance(IndexingMemoryBufferController.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); + injector.getInstance(IndicesTTLService.class).start(); injector.getInstance(RiversManager.class).start(); injector.getInstance(ClusterService.class).start(); injector.getInstance(RoutingService.class).start(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index 7493773162a..2dc3c842af5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -65,6 +65,9 @@ public class RestIndexAction extends BaseRestHandler { indexRequest.routing(request.param("routing")); indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing indexRequest.timestamp(request.param("timestamp")); + if (request.hasParam("ttl")) { + indexRequest.ttl(request.paramAsLong("ttl", -1)); + } indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe()); indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh())); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java index 04584ca608a..9c4db2add2f 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java @@ -138,10 +138,10 @@ public abstract class AbstractSimpleEngineTests { assertThat(segments.isEmpty(), equalTo(true)); // create a doc and refresh - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc)); - ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); + ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); engine.create(new Engine.Create(null, newUid("2"), doc2)); engine.refresh(new Engine.Refresh(true)); @@ -162,7 +162,7 @@ public abstract class AbstractSimpleEngineTests { assertThat(segments.get(0).deletedDocs(), equalTo(0)); - ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); + ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); engine.create(new Engine.Create(null, newUid("3"), doc3)); engine.refresh(new Engine.Refresh(true)); @@ -202,7 +202,7 @@ public abstract class AbstractSimpleEngineTests { searchResult.release(); // create a document - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc)); // its not there... @@ -236,7 +236,7 @@ public abstract class AbstractSimpleEngineTests { assertThat(getResult.docIdAndVersion(), notNullValue()); // now do an update - doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test1")).add(field(SourceFieldMapper.NAME, B_2, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_2, false); + doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test1")).add(field(SourceFieldMapper.NAME, B_2, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_2, false); engine.index(new Engine.Index(null, newUid("1"), doc)); // its not updated yet... @@ -285,7 +285,7 @@ public abstract class AbstractSimpleEngineTests { searchResult.release(); // add it back - doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); + doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc)); // its not there... @@ -317,7 +317,7 @@ public abstract class AbstractSimpleEngineTests { // make sure we can still work with the engine // now do an update - doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.index(new Engine.Index(null, newUid("1"), doc)); // its not updated yet... @@ -345,7 +345,7 @@ public abstract class AbstractSimpleEngineTests { searchResult.release(); // create a document - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc)); // its not there... @@ -378,7 +378,7 @@ public abstract class AbstractSimpleEngineTests { @Test public void testSimpleSnapshot() throws Exception { // create a document - ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc1)); final ExecutorService executorService = Executors.newCachedThreadPool(); @@ -394,10 +394,10 @@ public abstract class AbstractSimpleEngineTests { Future future = executorService.submit(new Callable() { @Override public Object call() throws Exception { engine.flush(new Engine.Flush()); - ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); + ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); engine.create(new Engine.Create(null, newUid("2"), doc2)); engine.flush(new Engine.Flush()); - ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); + ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); engine.create(new Engine.Create(null, newUid("3"), doc3)); return null; } @@ -432,7 +432,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testSimpleRecover() throws Exception { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc)); engine.flush(new Engine.Flush()); @@ -473,10 +473,10 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception { - ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc1)); engine.flush(new Engine.Flush()); - ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); + ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); engine.create(new Engine.Create(null, newUid("2"), doc2)); engine.recover(new Engine.RecoveryHandler() { @@ -500,10 +500,10 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception { - ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc1)); engine.flush(new Engine.Flush()); - ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); + ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); engine.create(new Engine.Create(null, newUid("2"), doc2)); engine.recover(new Engine.RecoveryHandler() { @@ -517,7 +517,7 @@ public abstract class AbstractSimpleEngineTests { assertThat(create.source(), equalTo(B_2)); // add for phase3 - ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); + ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); engine.create(new Engine.Create(null, newUid("3"), doc3)); } @@ -534,7 +534,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningNewCreate() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Create create = new Engine.Create(null, newUid("1"), doc); engine.create(create); assertThat(create.version(), equalTo(1l)); @@ -545,7 +545,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testExternalVersioningNewCreate() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Create create = new Engine.Create(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); engine.create(create); assertThat(create.version(), equalTo(12l)); @@ -556,7 +556,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningNewIndex() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); assertThat(index.version(), equalTo(1l)); @@ -567,7 +567,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testExternalVersioningNewIndex() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); engine.index(index); assertThat(index.version(), equalTo(12l)); @@ -578,7 +578,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningIndexConflict() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); assertThat(index.version(), equalTo(1l)); @@ -606,7 +606,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testExternalVersioningIndexConflict() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); engine.index(index); assertThat(index.version(), equalTo(12l)); @@ -625,7 +625,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningIndexConflictWithFlush() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); assertThat(index.version(), equalTo(1l)); @@ -655,7 +655,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testExternalVersioningIndexConflictWithFlush() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); engine.index(index); assertThat(index.version(), equalTo(12l)); @@ -676,7 +676,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningDeleteConflict() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); assertThat(index.version(), equalTo(1l)); @@ -726,7 +726,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningDeleteConflictWithFlush() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); assertThat(index.version(), equalTo(1l)); @@ -782,7 +782,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningCreateExistsException() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Create create = new Engine.Create(null, newUid("1"), doc); engine.create(create); assertThat(create.version(), equalTo(1l)); @@ -797,7 +797,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningCreateExistsExceptionWithFlush() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Create create = new Engine.Create(null, newUid("1"), doc); engine.create(create); assertThat(create.version(), equalTo(1l)); @@ -814,7 +814,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningReplicaConflict1() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); assertThat(index.version(), equalTo(1l)); @@ -848,7 +848,7 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testVersioningReplicaConflict2() { - ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); assertThat(index.version(), equalTo(1l)); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/mapper/ttl/TTLMappingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/mapper/ttl/TTLMappingTests.java new file mode 100644 index 00000000000..1f561bad437 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/mapper/ttl/TTLMappingTests.java @@ -0,0 +1,86 @@ +/* exception when already expired + + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.mapper.ttl; + +import org.apache.lucene.document.Field; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.MapperTests; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.mapper.internal.TTLFieldMapper; +import org.testng.annotations.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +public class TTLMappingTests { + @Test public void testSimpleDisabled() throws Exception { + String mapping = XContentFactory.jsonBuilder().startObject().startObject("type").endObject().string(); + DocumentMapper docMapper = MapperTests.newParser().parse(mapping); + byte[] source = XContentFactory.jsonBuilder() + .startObject() + .field("field", "value") + .endObject() + .copiedBytes(); + ParsedDocument doc = docMapper.parse(SourceToParse.source(source).type("type").id("1").ttl(Long.MAX_VALUE)); + + assertThat(doc.rootDoc().getFieldable("_ttl"), equalTo(null)); + } + + @Test public void testEnabled() throws Exception { + String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("_ttl").field("enabled", "yes").endObject() + .endObject().endObject().string(); + DocumentMapper docMapper = MapperTests.newParser().parse(mapping); + byte[] source = XContentFactory.jsonBuilder() + .startObject() + .field("field", "value") + .endObject() + .copiedBytes(); + ParsedDocument doc = docMapper.parse(SourceToParse.source(source).type("type").id("1").ttl(Long.MAX_VALUE)); + + assertThat(doc.rootDoc().getFieldable("_ttl").isStored(), equalTo(true)); + assertThat(doc.rootDoc().getFieldable("_ttl").isIndexed(), equalTo(true)); + assertThat(doc.rootDoc().getFieldable("_ttl").tokenStreamValue(), notNullValue()); + } + + @Test public void testDefaultValues() throws Exception { + String mapping = XContentFactory.jsonBuilder().startObject().startObject("type").endObject().string(); + DocumentMapper docMapper = MapperTests.newParser().parse(mapping); + assertThat(docMapper.TTLFieldMapper().enabled(), equalTo(TTLFieldMapper.Defaults.ENABLED)); + assertThat(docMapper.TTLFieldMapper().store(), equalTo(TTLFieldMapper.Defaults.STORE)); + assertThat(docMapper.TTLFieldMapper().index(), equalTo(TTLFieldMapper.Defaults.INDEX)); + } + + + @Test public void testSetValues() throws Exception { + String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("_ttl") + .field("enabled", "yes").field("store", "no").field("index", "no") + .endObject() + .endObject().endObject().string(); + DocumentMapper docMapper = MapperTests.newParser().parse(mapping); + assertThat(docMapper.TTLFieldMapper().enabled(), equalTo(true)); + assertThat(docMapper.TTLFieldMapper().store(), equalTo(Field.Store.NO)); + assertThat(docMapper.TTLFieldMapper().index(), equalTo(Field.Index.NO)); + } +} \ No newline at end of file diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/ttl/SimpleTTLTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/ttl/SimpleTTLTests.java new file mode 100644 index 00000000000..017f5bdb4b6 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/ttl/SimpleTTLTests.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.ttl; + +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +public class SimpleTTLTests extends AbstractNodesTests { + + static private final long purgeInterval = 500; + private Client client; + + @BeforeClass public void createNodes() throws Exception { + Settings settings = settingsBuilder().put("indices.ttl.purge_interval", purgeInterval).build(); + startNode("node1", settings); + startNode("node2", settings); + client = getClient(); + } + + @AfterClass public void closeNodes() { + client.close(); + closeAllNodes(); + } + + protected Client getClient() { + return client("node1"); + } + + @Test public void testSimpleTTL() throws Exception { + client.admin().indices().prepareDelete().execute().actionGet(); + + client.admin().indices().prepareCreate("test") + .addMapping("type1", XContentFactory.jsonBuilder() + .startObject() + .startObject("type1") + .startObject("_timestamp").field("enabled", true).field("store", "yes").endObject() + .startObject("_ttl").field("enabled", true).field("store", "yes").endObject() + .endObject() + .endObject()) + .execute().actionGet(); + client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + long providedTTLValue = 3000; + logger.info("--> checking ttl"); + client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setTTL(providedTTLValue).setRefresh(true).execute().actionGet(); + long now = System.currentTimeMillis(); + + // realtime get check + long now1 = System.currentTimeMillis(); + GetResponse getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(true).execute().actionGet(); + long ttl0 = ((Number) getResponse.field("_ttl").value()).longValue(); + assertThat(ttl0, greaterThan(0L)); + assertThat(ttl0, lessThan(providedTTLValue - (now1 - now))); + // verify the ttl is still decreasing when going to the replica + now1 = System.currentTimeMillis(); + getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(true).execute().actionGet(); + ttl0 = ((Number) getResponse.field("_ttl").value()).longValue(); + assertThat(ttl0, greaterThan(0L)); + assertThat(ttl0, lessThan(providedTTLValue - (now1 - now))); + // non realtime get (stored) + now1 = System.currentTimeMillis(); + getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(false).execute().actionGet(); + ttl0 = ((Number) getResponse.field("_ttl").value()).longValue(); + assertThat(ttl0, greaterThan(0L)); + assertThat(ttl0, lessThan(providedTTLValue - (now1 - now))); + // non realtime get going the replica + now1 = System.currentTimeMillis(); + getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(false).execute().actionGet(); + ttl0 = ((Number) getResponse.field("_ttl").value()).longValue(); + assertThat(ttl0, greaterThan(0L)); + assertThat(ttl0, lessThan(providedTTLValue - (now1 - now))); + + logger.info("--> checking purger"); + // make sure the purger has done its job + long shouldBeExpiredDate = now + providedTTLValue + purgeInterval + 2000; + now1 = System.currentTimeMillis(); + if (shouldBeExpiredDate - now1 > 0) { + Thread.sleep(shouldBeExpiredDate - now1); + } + // realtime get check + getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(true).execute().actionGet(); + assertThat(getResponse.exists(), equalTo(false)); + // replica realtime get check + getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(true).execute().actionGet(); + assertThat(getResponse.exists(), equalTo(false)); + // non realtime get (stored) check + getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(false).execute().actionGet(); + assertThat(getResponse.exists(), equalTo(false)); + // non realtime get going the replica check + getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(false).execute().actionGet(); + assertThat(getResponse.exists(), equalTo(false)); + } +}