add per doc ttl support

This commit is contained in:
Benjamin Devèze 2011-08-30 17:48:06 +02:00 committed by Shay Banon
parent 5052282ab6
commit 65aad2da1e
24 changed files with 920 additions and 73 deletions

View File

@ -114,6 +114,7 @@ public class BulkRequest implements ActionRequest {
String routing = null;
String parent = null;
String timestamp = null;
long ttl = -1;
String opType = null;
long version = 0;
VersionType versionType = VersionType.INTERNAL;
@ -136,6 +137,8 @@ public class BulkRequest implements ActionRequest {
parent = parser.text();
} else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) {
timestamp = parser.text();
} else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) {
ttl = parser.longValue();
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text();
} else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) {
@ -160,17 +163,17 @@ public class BulkRequest implements ActionRequest {
// of index request. All index requests are still unsafe if applicable.
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).version(version).versionType(versionType)
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate));
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).version(version).versionType(versionType)
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create("create".equals(opType))
.source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate));
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).version(version).versionType(versionType)
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create(true)
.source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate));

View File

@ -123,7 +123,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
SourceToParse sourceToParse = SourceToParse.source(indexRequest.underlyingSource(), indexRequest.underlyingSourceOffset(), indexRequest.underlyingSourceLength()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp());
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
long version;
Engine.IndexingOperation op;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
@ -232,7 +233,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
IndexRequest indexRequest = (IndexRequest) item.request();
try {
SourceToParse sourceToParse = SourceToParse.source(indexRequest.underlyingSource(), indexRequest.underlyingSourceOffset(), indexRequest.underlyingSourceLength()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp());
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
indexShard.index(index);

View File

@ -119,6 +119,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
@Nullable private String routing;
@Nullable private String parent;
@Nullable private String timestamp;
private long ttl = -1;
private byte[] source;
private int sourceOffset;
@ -287,6 +288,19 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return this.timestamp;
}
// Sets the relative ttl value. It musts be > 0 as it makes little sense otherwise.
public IndexRequest ttl(long ttl) throws ElasticSearchGenerationException {
if (ttl <= 0) {
throw new ElasticSearchIllegalArgumentException("TTL value must be > 0. Illegal value provided [" + ttl + "]");
}
this.ttl = ttl;
return this;
}
public long ttl() {
return this.ttl;
}
/**
* The source of the document to index, recopied to a new array if it has an offset or unsafe.
*/
@ -644,7 +658,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
if (in.readBoolean()) {
timestamp = in.readUTF();
}
ttl = in.readLong();
sourceUnsafe = false;
sourceOffset = 0;
sourceLength = in.readVInt();
@ -687,6 +701,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
out.writeBoolean(true);
out.writeUTF(timestamp);
}
out.writeLong(ttl);
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
out.writeByte(opType.id());

View File

@ -170,7 +170,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexShard indexShard = indexShard(shardRequest);
SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp());
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
Engine.IndexingOperation op;
if (request.opType() == IndexRequest.OpType.INDEX) {
@ -225,7 +225,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexShard indexShard = indexShard(shardRequest);
IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp());
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version())

View File

@ -311,6 +311,12 @@ public class IndexRequestBuilder extends BaseRequestBuilder<IndexRequest, IndexR
return this;
}
// Sets the relative ttl value. It musts be > 0 as it makes little sense otherwise.
public IndexRequestBuilder setTTL(long ttl) {
request.ttl(ttl);
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
public class AlreadyExpiredException extends ElasticSearchException implements IgnoreOnRecoveryEngineException {
private String index;
private String type;
private String id;
private final long timestamp;
private final long ttl;
private final long now;
public AlreadyExpiredException(String index, String type, String id, long timestamp, long ttl, long now) {
super("already expired [" + index + "]/[" + type + "]/[" + id + "] due to expire at [" + (timestamp + ttl) + "] and was processed at [" + now + "]");
this.index = index;
this.type = type;
this.id = id;
this.timestamp = timestamp;
this.ttl = ttl;
this.now = now;
}
public String index() {
return index;
}
public String type() {
return type;
}
public String id() {
return id;
}
public long timestamp() {
return timestamp;
}
public long ttl() {
return ttl;
}
public long now() {
return now;
}
}

View File

@ -366,6 +366,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.timestamp();
}
public long ttl() {
return this.doc.ttl();
}
public long version() {
return this.version;
}
@ -526,6 +530,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.timestamp();
}
public long ttl() {
return this.doc.ttl();
}
public byte[] source() {
return this.doc.source();
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
/**
* Exceptions implementing this interface will be ignored during recovery.
*/
public interface IgnoreOnRecoveryEngineException {
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.selector.FieldMappersFieldSelector;
@ -254,6 +255,11 @@ public class ShardGetService extends AbstractIndexShardComponent {
value = source.parent;
} else if (field.equals(TimestampFieldMapper.NAME) && docMapper.timestampFieldMapper().stored()) {
value = source.timestamp;
} else if (field.equals(TTLFieldMapper.NAME) && docMapper.TTLFieldMapper().stored()) {
// Call value for search with timestamp + ttl here to display the live remaining ttl value and be consistent with the search result display
if (source.ttl > 0) {
value = docMapper.TTLFieldMapper().valueForSearch(source.timestamp + source.ttl);
}
} else {
String script = null;
if (field.contains("_source.")) {

View File

@ -167,6 +167,7 @@ public class DocumentMapper implements ToXContent {
this.rootMappers.put(BoostFieldMapper.class, new BoostFieldMapper());
this.rootMappers.put(RoutingFieldMapper.class, new RoutingFieldMapper());
this.rootMappers.put(TimestampFieldMapper.class, new TimestampFieldMapper());
this.rootMappers.put(TTLFieldMapper.class, new TTLFieldMapper());
this.rootMappers.put(UidFieldMapper.class, new UidFieldMapper());
// don't add parent field, by default its "null"
}
@ -368,6 +369,10 @@ public class DocumentMapper implements ToXContent {
return rootMapper(TimestampFieldMapper.class);
}
public TTLFieldMapper TTLFieldMapper() {
return rootMapper(TTLFieldMapper.class);
}
public Analyzer indexAnalyzer() {
return this.indexAnalyzer;
}
@ -486,7 +491,7 @@ public class DocumentMapper implements ToXContent {
if (context.docs().size() > 1) {
Collections.reverse(context.docs());
}
ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), source.timestamp(), context.docs(), context.analyzer(),
ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), source.timestamp(), source.ttl(), context.docs(), context.analyzer(),
context.source(), context.sourceOffset(), context.sourceLength(), context.mappersAdded()).parent(source.parent());
// reset the context to free up memory
context.reset(null, null, null, null);

View File

@ -97,6 +97,7 @@ public class DocumentMapperParser extends AbstractIndexComponent {
.put(ParentFieldMapper.NAME, new ParentFieldMapper.TypeParser())
.put(RoutingFieldMapper.NAME, new RoutingFieldMapper.TypeParser())
.put(TimestampFieldMapper.NAME, new TimestampFieldMapper.TypeParser())
.put(TTLFieldMapper.NAME, new TTLFieldMapper.TypeParser())
.put(UidFieldMapper.NAME, new UidFieldMapper.TypeParser())
.put(IdFieldMapper.NAME, new IdFieldMapper.TypeParser())
.immutableMap();

View File

@ -42,6 +42,8 @@ public class ParsedDocument {
private final long timestamp;
private final long ttl;
private final List<Document> documents;
private final Analyzer analyzer;
@ -54,16 +56,17 @@ public class ParsedDocument {
private String parent;
public ParsedDocument(String uid, String id, String type, String routing, long timestamp, Document document, Analyzer analyzer, byte[] source, boolean mappersAdded) {
this(uid, id, type, routing, timestamp, Arrays.asList(document), analyzer, source, 0, source.length, mappersAdded);
public ParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, Document document, Analyzer analyzer, byte[] source, boolean mappersAdded) {
this(uid, id, type, routing, timestamp, ttl, Arrays.asList(document), analyzer, source, 0, source.length, mappersAdded);
}
public ParsedDocument(String uid, String id, String type, String routing, long timestamp, List<Document> documents, Analyzer analyzer, byte[] source, int sourceOffset, int sourceLength, boolean mappersAdded) {
public ParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, List<Document> documents, Analyzer analyzer, byte[] source, int sourceOffset, int sourceLength, boolean mappersAdded) {
this.uid = uid;
this.id = id;
this.type = type;
this.routing = routing;
this.timestamp = timestamp;
this.ttl = ttl;
this.documents = documents;
this.source = source;
this.sourceOffset = sourceOffset;
@ -92,6 +95,10 @@ public class ParsedDocument {
return this.timestamp;
}
public long ttl() {
return this.ttl;
}
public Document rootDoc() {
return documents.get(documents.size() - 1);
}

View File

@ -56,6 +56,8 @@ public class SourceToParse {
private long timestamp;
private long ttl;
public SourceToParse(XContentParser parser) {
this.parser = parser;
this.source = null;
@ -151,4 +153,13 @@ public class SourceToParse {
this.timestamp = timestamp;
return this;
}
public long ttl() {
return this.ttl;
}
public SourceToParse ttl(long ttl) {
this.ttl = ttl;
return this;
}
}

View File

@ -74,6 +74,11 @@ public final class Uid {
return uid.substring(delimiterIndex + 1);
}
public static String typeFromUid(String uid) {
int delimiterIndex = uid.indexOf(DELIMITER); // type is not allowed to have # in it..., ids can
return uid.substring(0, delimiterIndex);
}
public static Uid createUid(String uid) {
int delimiterIndex = uid.indexOf(DELIMITER); // type is not allowed to have # in it..., ids can
return new Uid(uid.substring(0, delimiterIndex), uid.substring(delimiterIndex + 1));

View File

@ -0,0 +1,185 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper.internal;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.AlreadyExpiredException;
import org.elasticsearch.index.mapper.InternalMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MergeContext;
import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.RootMapper;
import org.elasticsearch.index.mapper.core.LongFieldMapper;
import org.elasticsearch.index.mapper.core.NumberFieldMapper;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.*;
import static org.elasticsearch.index.mapper.core.TypeParsers.*;
public class TTLFieldMapper extends LongFieldMapper implements InternalMapper, RootMapper {
public static final String NAME = "_ttl";
public static final String CONTENT_TYPE = "_ttl";
public static class Defaults extends LongFieldMapper.Defaults {
public static final String NAME = TTLFieldMapper.CONTENT_TYPE;
public static final Field.Store STORE = Field.Store.YES;
public static final Field.Index INDEX = Field.Index.NOT_ANALYZED;
public static final boolean ENABLED = false;
}
public static class Builder extends NumberFieldMapper.Builder<Builder, TTLFieldMapper> {
private boolean enabled = Defaults.ENABLED;
public Builder() {
super(Defaults.NAME);
store = Defaults.STORE;
index = Defaults.INDEX;
}
public Builder enabled(boolean enabled) {
this.enabled = enabled;
return builder;
}
@Override public TTLFieldMapper build(BuilderContext context) {
return new TTLFieldMapper(store, index, enabled);
}
}
public static class TypeParser implements Mapper.TypeParser {
@Override public Mapper.Builder parse(String name, Map<String, Object> node, ParserContext parserContext) throws MapperParsingException {
TTLFieldMapper.Builder builder = new TTLFieldMapper.Builder();
parseField(builder, builder.name, node, parserContext);
for (Map.Entry<String, Object> entry : node.entrySet()) {
String fieldName = Strings.toUnderscoreCase(entry.getKey());
Object fieldNode = entry.getValue();
if (fieldName.equals("enabled")) {
builder.enabled(nodeBooleanValue(fieldNode));
}
}
return builder;
}
}
private boolean enabled;
public TTLFieldMapper() {
this(Defaults.STORE, Defaults.INDEX, Defaults.ENABLED);
}
protected TTLFieldMapper(Field.Store store, Field.Index index, boolean enabled) {
super(new Names(Defaults.NAME, Defaults.NAME, Defaults.NAME, Defaults.NAME), Defaults.PRECISION_STEP,
Defaults.FUZZY_FACTOR, index, store, Defaults.BOOST, Defaults.OMIT_NORMS,
Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Defaults.NULL_VALUE);
this.enabled = enabled;
}
public boolean enabled() {
return this.enabled;
}
// Overrides valueForSearch to display live value of remaining ttl
@Override public Object valueForSearch(Fieldable field) {
long now;
SearchContext searchContext = SearchContext.current();
if (searchContext != null) {
now = searchContext.nowInMillis();
} else {
now = System.currentTimeMillis();
}
long value = value(field);
return value - now;
}
// Other implementation for realtime get display
public Object valueForSearch(long expirationTime) {
return expirationTime - System.currentTimeMillis();
}
@Override public void validate(ParseContext context) throws MapperParsingException {
}
@Override public void preParse(ParseContext context) throws IOException {
}
@Override public void postParse(ParseContext context) throws IOException {
super.parse(context);
}
@Override public void parse(ParseContext context) throws IOException, MapperParsingException {
if (context.sourceToParse().ttl() < 0) { // no ttl has been provided externally
long ttl = context.parser().longValue();
if (ttl <= 0) {
throw new MapperParsingException("TTL value must be > 0. Illegal value provided [" + ttl + "]");
}
context.sourceToParse().ttl(ttl);
}
}
@Override public boolean includeInObject() {
return true;
}
@Override protected Fieldable parseCreateField(ParseContext context) throws IOException, AlreadyExpiredException {
if (enabled) {
long timestamp = context.sourceToParse().timestamp();
long ttl = context.sourceToParse().ttl();
if (ttl > 0) { // a ttl has been provided either externally or in the _source
long expire = new Date(timestamp + ttl).getTime();
long now = System.currentTimeMillis();
// there is not point indexing already expired doc
if (now >= expire) {
throw new AlreadyExpiredException(context.index(), context.type(), context.id(), timestamp, ttl, now);
}
// the expiration timestamp (timestamp + ttl) is set as field
return new CustomLongNumericField(this, expire);
}
}
return null;
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// if all are defaults, no sense to write it at all
if (enabled == Defaults.ENABLED) {
return builder;
}
builder.startObject(CONTENT_TYPE);
if (enabled != Defaults.ENABLED) {
builder.field("enabled", enabled);
}
builder.endObject();
return builder;
}
@Override public void merge(Mapper mergeWith, MergeContext mergeContext) throws MergeMappingException {
// do nothing here, no merging, but also no exception
}
}

View File

@ -44,6 +44,7 @@ import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.engine.OptimizeFailedEngineException;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.flush.FlushStats;
@ -542,17 +543,18 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
try {
switch (operation.opType()) {
case CREATE:
Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(source(create.source(), create.sourceOffset(), create.sourceLength()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp())).version(create.version())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl())).version(create.version())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(source(index.source(), index.sourceOffset(), index.sourceLength()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp())).version(index.version())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl())).version(index.version())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case DELETE:
@ -568,6 +570,24 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
default:
throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]");
}
} catch (ElasticSearchException e) {
boolean hasIgnoreOnRecoveryException = false;
ElasticSearchException current = e;
while (true) {
if (current instanceof IgnoreOnRecoveryEngineException) {
hasIgnoreOnRecoveryException = true;
break;
}
if (current.getCause() instanceof ElasticSearchException) {
current = (ElasticSearchException) current.getCause();
} else {
break;
}
}
if (!hasIgnoreOnRecoveryException) {
throw e;
}
}
}
/**

View File

@ -231,12 +231,14 @@ public interface Translog extends IndexShardComponent {
public final String routing;
public final String parent;
public final long timestamp;
public final long ttl;
public Source(BytesHolder source, String routing, String parent, long timestamp) {
public Source(BytesHolder source, String routing, String parent, long timestamp, long ttl) {
this.source = source;
this.routing = routing;
this.parent = parent;
this.timestamp = timestamp;
this.ttl = ttl;
}
}
@ -249,6 +251,7 @@ public interface Translog extends IndexShardComponent {
private String routing;
private String parent;
private long timestamp;
private long ttl;
private long version;
public Create() {
@ -263,6 +266,7 @@ public interface Translog extends IndexShardComponent {
this.routing = create.routing();
this.parent = create.parent();
this.timestamp = create.timestamp();
this.ttl = create.ttl();
this.version = create.version();
}
@ -314,6 +318,10 @@ public interface Translog extends IndexShardComponent {
return this.timestamp;
}
public long ttl() {
return this.ttl;
}
public long version() {
return this.version;
}
@ -343,7 +351,10 @@ public interface Translog extends IndexShardComponent {
if (version >= 4) {
this.timestamp = in.readLong();
}
return new Source(source, routing, parent, timestamp);
if (version >= 5) {
this.ttl = in.readLong();
}
return new Source(source, routing, parent, timestamp, ttl);
}
@Override public void readFrom(StreamInput in) throws IOException {
@ -370,10 +381,13 @@ public interface Translog extends IndexShardComponent {
if (version >= 4) {
this.timestamp = in.readLong();
}
if (version >= 5) {
this.ttl = in.readLong();
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(4); // version
out.writeVInt(5); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(sourceLength);
@ -392,6 +406,7 @@ public interface Translog extends IndexShardComponent {
}
out.writeLong(version);
out.writeLong(timestamp);
out.writeLong(ttl);
}
}
@ -405,6 +420,7 @@ public interface Translog extends IndexShardComponent {
private String routing;
private String parent;
private long timestamp;
private long ttl;
public Index() {
}
@ -419,6 +435,7 @@ public interface Translog extends IndexShardComponent {
this.parent = index.parent();
this.version = index.version();
this.timestamp = index.timestamp();
this.ttl = index.ttl();
}
public Index(String type, String id, byte[] source) {
@ -457,6 +474,10 @@ public interface Translog extends IndexShardComponent {
return this.timestamp;
}
public long ttl() {
return this.ttl;
}
public byte[] source() {
return this.source;
}
@ -498,7 +519,10 @@ public interface Translog extends IndexShardComponent {
if (version >= 4) {
this.timestamp = in.readLong();
}
return new Source(source, routing, parent, timestamp);
if (version >= 5) {
this.ttl = in.readLong();
}
return new Source(source, routing, parent, timestamp, ttl);
}
@Override public void readFrom(StreamInput in) throws IOException {
@ -525,10 +549,13 @@ public interface Translog extends IndexShardComponent {
if (version >= 4) {
this.timestamp = in.readLong();
}
if (version >= 5) {
this.ttl = in.readLong();
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(4); // version
out.writeVInt(5); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(sourceLength);
@ -547,6 +574,7 @@ public interface Translog extends IndexShardComponent {
}
out.writeLong(version);
out.writeLong(timestamp);
out.writeLong(ttl);
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.ttl.IndicesTTLService;
/**
* @author kimchy (shay.banon)
@ -62,5 +63,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndexingMemoryBufferController.class).asEagerSingleton();
bind(IndicesNodeFilterCache.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.ttl;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.NumericRangeQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.UidField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMappers;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.selector.UidFieldSelector;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* A node level service that delete expired docs on node primary shards.
*
*/
public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLService> {
private static final String SETTING_PURGE_INTERVAL = "purge_interval";
private static final TimeValue DEFAULT_PURGE_INTERVAL = new TimeValue(60, TimeUnit.SECONDS);
private static final String SETTINGS_BULK_SIZE = "bulk_size";
private static final int DEFAULT_BULK_SIZE = 10000;
private final IndicesService indicesService;
private final Client client;
private final TimeValue purgeInterval;
private final int bulkSize;
private BulkRequestBuilder bulkRequest;
private PurgerThread purgerThread;
@Inject public IndicesTTLService(Settings settings, IndicesService indicesService, Client client) {
super(settings);
this.indicesService = indicesService;
this.client = client;
this.purgeInterval = componentSettings.getAsTime(SETTING_PURGE_INTERVAL, DEFAULT_PURGE_INTERVAL);
this.bulkSize = componentSettings.getAsInt(SETTINGS_BULK_SIZE, DEFAULT_BULK_SIZE);
}
@Override protected void doStart() throws ElasticSearchException {
this.purgerThread = new PurgerThread(EsExecutors.threadName(settings, "[purger]"));
this.purgerThread.start();
}
@Override protected void doStop() throws ElasticSearchException {
this.purgerThread.doStop();
}
@Override protected void doClose() throws ElasticSearchException {
}
private class PurgerThread extends Thread {
volatile boolean running = true;
public PurgerThread(String name) {
super(name);
setDaemon(true);
}
public void doStop() {
running = false;
}
public void run() {
while (running) {
List<IndexShard> shardsToPurge = getShardsToPurge();
purgeShards(shardsToPurge);
try {
Thread.sleep(purgeInterval.millis());
} catch (InterruptedException e) {
running = false;
return;
}
}
}
/**
* Returns the shards to purge, i.e. the local started primary shards that have ttl enabled
*/
private List<IndexShard> getShardsToPurge() {
List<IndexShard> shardsToPurge = new ArrayList<IndexShard>();
for (IndexService indexService : indicesService) {
// should be optimized with the hasTTL flag
FieldMappers ttlFieldMappers = indexService.mapperService().name(TTLFieldMapper.NAME);
// check if ttl is enabled for at least one type of this index
boolean hasTTLEnabled = false;
for (FieldMapper ttlFieldMapper : ttlFieldMappers) {
if (((TTLFieldMapper)ttlFieldMapper).enabled()) {
hasTTLEnabled = true;
break;
}
}
if (hasTTLEnabled)
{
for (Integer shardId : indexService.shardIds()) {
IndexShard shard = indexService.shard(shardId);
if (shard.routingEntry().primary() && shard.state() == IndexShardState.STARTED && shard.routingEntry().started()) {
shardsToPurge.add(shard);
}
}
}
}
return shardsToPurge;
}
}
private void purgeShards(List<IndexShard> shardsToPurge) {
for (IndexShard shardToPurge : shardsToPurge) {
Query query = NumericRangeQuery.newLongRange(TTLFieldMapper.NAME, null, System.currentTimeMillis(), false, true);
Engine.Searcher searcher = shardToPurge.searcher();
try {
logger.debug("[{}][{}] purging shard", shardToPurge.routingEntry().index(), shardToPurge.routingEntry().id());
ExpiredDocsCollector expiredDocsCollector = new ExpiredDocsCollector();
searcher.searcher().search(query, expiredDocsCollector);
List<DocToPurge> docsToPurge = expiredDocsCollector.getDocsToPurge();
bulkRequest = client.prepareBulk();
for (DocToPurge docToPurge : docsToPurge) {
bulkRequest.add(new DeleteRequest().index(shardToPurge.routingEntry().index()).type(docToPurge.type).id(docToPurge.id).version(docToPurge.version));
processBulkIfNeeded(false);
}
processBulkIfNeeded(true);
} catch (Exception e) {
logger.warn("failed to purge", e);
} finally {
searcher.release();
}
}
}
private static class DocToPurge {
public final String type;
public final String id;
public final long version;
public DocToPurge(String type, String id, long version) {
this.type = type;
this.id = id;
this.version = version;
}
}
private class ExpiredDocsCollector extends Collector {
private IndexReader indexReader;
private List<DocToPurge> docsToPurge = new ArrayList<DocToPurge>();
public ExpiredDocsCollector() {
}
public void setScorer(Scorer scorer) {
}
public boolean acceptsDocsOutOfOrder() {
return true;
}
public void collect(int doc) {
try {
Document document = indexReader.document(doc, UidFieldSelector.INSTANCE);
String uid = document.getFieldable(UidFieldMapper.NAME).stringValue();
long version = UidField.loadVersion(indexReader, UidFieldMapper.TERM_FACTORY.createTerm(uid));
docsToPurge.add(new DocToPurge(Uid.typeFromUid(uid),Uid.idFromUid(uid), version));
} catch (Exception e) {
}
}
public void setNextReader(IndexReader reader, int docBase) {
this.indexReader = reader;
}
public List<DocToPurge> getDocsToPurge() {
return this.docsToPurge;
}
}
private void processBulkIfNeeded(boolean force) {
if ((force && bulkRequest.numberOfActions() > 0) || bulkRequest.numberOfActions() >= bulkSize) {
try {
bulkRequest.execute(new ActionListener<BulkResponse>() {
@Override public void onResponse(BulkResponse bulkResponse) {
logger.debug("bulk took " + bulkResponse.getTookInMillis() + "ms");
}
@Override public void onFailure(Throwable e) {
logger.warn("failed to execute bulk");
}
});
} catch (Exception e) {
logger.warn("failed to process bulk", e);
}
bulkRequest = client.prepareBulk();
}
}
}

View File

@ -61,6 +61,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.jmx.JmxModule;
import org.elasticsearch.jmx.JmxService;
import org.elasticsearch.monitor.MonitorModule;
@ -174,6 +175,7 @@ public final class InternalNode implements Node {
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndexingMemoryBufferController.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(RiversManager.class).start();
injector.getInstance(ClusterService.class).start();
injector.getInstance(RoutingService.class).start();

View File

@ -65,6 +65,9 @@ public class RestIndexAction extends BaseRestHandler {
indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
indexRequest.ttl(request.paramAsLong("ttl", -1));
}
indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));

View File

@ -138,10 +138,10 @@ public abstract class AbstractSimpleEngineTests {
assertThat(segments.isEmpty(), equalTo(true));
// create a doc and refresh
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.refresh(new Engine.Refresh(true));
@ -162,7 +162,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(segments.get(0).deletedDocs(), equalTo(0));
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3));
engine.refresh(new Engine.Refresh(true));
@ -202,7 +202,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// create a document
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
// its not there...
@ -236,7 +236,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(getResult.docIdAndVersion(), notNullValue());
// now do an update
doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test1")).add(field(SourceFieldMapper.NAME, B_2, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_2, false);
doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test1")).add(field(SourceFieldMapper.NAME, B_2, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.index(new Engine.Index(null, newUid("1"), doc));
// its not updated yet...
@ -285,7 +285,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// add it back
doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
// its not there...
@ -317,7 +317,7 @@ public abstract class AbstractSimpleEngineTests {
// make sure we can still work with the engine
// now do an update
doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.index(new Engine.Index(null, newUid("1"), doc));
// its not updated yet...
@ -345,7 +345,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// create a document
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
// its not there...
@ -378,7 +378,7 @@ public abstract class AbstractSimpleEngineTests {
@Test public void testSimpleSnapshot() throws Exception {
// create a document
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1));
final ExecutorService executorService = Executors.newCachedThreadPool();
@ -394,10 +394,10 @@ public abstract class AbstractSimpleEngineTests {
Future<Object> future = executorService.submit(new Callable<Object>() {
@Override public Object call() throws Exception {
engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.flush(new Engine.Flush());
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3));
return null;
}
@ -432,7 +432,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testSimpleRecover() throws Exception {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(new Engine.Flush());
@ -473,10 +473,10 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception {
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1));
engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.recover(new Engine.RecoveryHandler() {
@ -500,10 +500,10 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception {
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1));
engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.recover(new Engine.RecoveryHandler() {
@ -517,7 +517,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(create.source(), equalTo(B_2));
// add for phase3
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3));
}
@ -534,7 +534,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningNewCreate() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc);
engine.create(create);
assertThat(create.version(), equalTo(1l));
@ -545,7 +545,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testExternalVersioningNewCreate() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
engine.create(create);
assertThat(create.version(), equalTo(12l));
@ -556,7 +556,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningNewIndex() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
@ -567,7 +567,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testExternalVersioningNewIndex() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
engine.index(index);
assertThat(index.version(), equalTo(12l));
@ -578,7 +578,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningIndexConflict() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
@ -606,7 +606,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testExternalVersioningIndexConflict() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
engine.index(index);
assertThat(index.version(), equalTo(12l));
@ -625,7 +625,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningIndexConflictWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
@ -655,7 +655,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testExternalVersioningIndexConflictWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
engine.index(index);
assertThat(index.version(), equalTo(12l));
@ -676,7 +676,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningDeleteConflict() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
@ -726,7 +726,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningDeleteConflictWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
@ -782,7 +782,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningCreateExistsException() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc);
engine.create(create);
assertThat(create.version(), equalTo(1l));
@ -797,7 +797,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningCreateExistsExceptionWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc);
engine.create(create);
assertThat(create.version(), equalTo(1l));
@ -814,7 +814,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningReplicaConflict1() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));
@ -848,7 +848,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testVersioningReplicaConflict2() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1l));

View File

@ -0,0 +1,86 @@
/* exception when already expired
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper.ttl;
import org.apache.lucene.document.Field;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperTests;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
public class TTLMappingTests {
@Test public void testSimpleDisabled() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type").endObject().string();
DocumentMapper docMapper = MapperTests.newParser().parse(mapping);
byte[] source = XContentFactory.jsonBuilder()
.startObject()
.field("field", "value")
.endObject()
.copiedBytes();
ParsedDocument doc = docMapper.parse(SourceToParse.source(source).type("type").id("1").ttl(Long.MAX_VALUE));
assertThat(doc.rootDoc().getFieldable("_ttl"), equalTo(null));
}
@Test public void testEnabled() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("_ttl").field("enabled", "yes").endObject()
.endObject().endObject().string();
DocumentMapper docMapper = MapperTests.newParser().parse(mapping);
byte[] source = XContentFactory.jsonBuilder()
.startObject()
.field("field", "value")
.endObject()
.copiedBytes();
ParsedDocument doc = docMapper.parse(SourceToParse.source(source).type("type").id("1").ttl(Long.MAX_VALUE));
assertThat(doc.rootDoc().getFieldable("_ttl").isStored(), equalTo(true));
assertThat(doc.rootDoc().getFieldable("_ttl").isIndexed(), equalTo(true));
assertThat(doc.rootDoc().getFieldable("_ttl").tokenStreamValue(), notNullValue());
}
@Test public void testDefaultValues() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type").endObject().string();
DocumentMapper docMapper = MapperTests.newParser().parse(mapping);
assertThat(docMapper.TTLFieldMapper().enabled(), equalTo(TTLFieldMapper.Defaults.ENABLED));
assertThat(docMapper.TTLFieldMapper().store(), equalTo(TTLFieldMapper.Defaults.STORE));
assertThat(docMapper.TTLFieldMapper().index(), equalTo(TTLFieldMapper.Defaults.INDEX));
}
@Test public void testSetValues() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("_ttl")
.field("enabled", "yes").field("store", "no").field("index", "no")
.endObject()
.endObject().endObject().string();
DocumentMapper docMapper = MapperTests.newParser().parse(mapping);
assertThat(docMapper.TTLFieldMapper().enabled(), equalTo(true));
assertThat(docMapper.TTLFieldMapper().store(), equalTo(Field.Store.NO));
assertThat(docMapper.TTLFieldMapper().index(), equalTo(Field.Index.NO));
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.ttl;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
public class SimpleTTLTests extends AbstractNodesTests {
static private final long purgeInterval = 500;
private Client client;
@BeforeClass public void createNodes() throws Exception {
Settings settings = settingsBuilder().put("indices.ttl.purge_interval", purgeInterval).build();
startNode("node1", settings);
startNode("node2", settings);
client = getClient();
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
@Test public void testSimpleTTL() throws Exception {
client.admin().indices().prepareDelete().execute().actionGet();
client.admin().indices().prepareCreate("test")
.addMapping("type1", XContentFactory.jsonBuilder()
.startObject()
.startObject("type1")
.startObject("_timestamp").field("enabled", true).field("store", "yes").endObject()
.startObject("_ttl").field("enabled", true).field("store", "yes").endObject()
.endObject()
.endObject())
.execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
long providedTTLValue = 3000;
logger.info("--> checking ttl");
client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setTTL(providedTTLValue).setRefresh(true).execute().actionGet();
long now = System.currentTimeMillis();
// realtime get check
long now1 = System.currentTimeMillis();
GetResponse getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(true).execute().actionGet();
long ttl0 = ((Number) getResponse.field("_ttl").value()).longValue();
assertThat(ttl0, greaterThan(0L));
assertThat(ttl0, lessThan(providedTTLValue - (now1 - now)));
// verify the ttl is still decreasing when going to the replica
now1 = System.currentTimeMillis();
getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(true).execute().actionGet();
ttl0 = ((Number) getResponse.field("_ttl").value()).longValue();
assertThat(ttl0, greaterThan(0L));
assertThat(ttl0, lessThan(providedTTLValue - (now1 - now)));
// non realtime get (stored)
now1 = System.currentTimeMillis();
getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(false).execute().actionGet();
ttl0 = ((Number) getResponse.field("_ttl").value()).longValue();
assertThat(ttl0, greaterThan(0L));
assertThat(ttl0, lessThan(providedTTLValue - (now1 - now)));
// non realtime get going the replica
now1 = System.currentTimeMillis();
getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(false).execute().actionGet();
ttl0 = ((Number) getResponse.field("_ttl").value()).longValue();
assertThat(ttl0, greaterThan(0L));
assertThat(ttl0, lessThan(providedTTLValue - (now1 - now)));
logger.info("--> checking purger");
// make sure the purger has done its job
long shouldBeExpiredDate = now + providedTTLValue + purgeInterval + 2000;
now1 = System.currentTimeMillis();
if (shouldBeExpiredDate - now1 > 0) {
Thread.sleep(shouldBeExpiredDate - now1);
}
// realtime get check
getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(true).execute().actionGet();
assertThat(getResponse.exists(), equalTo(false));
// replica realtime get check
getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(true).execute().actionGet();
assertThat(getResponse.exists(), equalTo(false));
// non realtime get (stored) check
getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(false).execute().actionGet();
assertThat(getResponse.exists(), equalTo(false));
// non realtime get going the replica check
getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(false).execute().actionGet();
assertThat(getResponse.exists(), equalTo(false));
}
}