cleanup ttl support, make sure we close the service on node lifecycle, better settings names, and allow to provide the ttl as a time value

This commit is contained in:
Shay Banon 2011-09-09 01:21:05 +03:00
parent 65aad2da1e
commit 0fcb4e88d4
6 changed files with 74 additions and 42 deletions

View File

@ -49,7 +49,11 @@ import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.File; import java.io.File;
import java.util.concurrent.*; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.lucene.DocumentBuilder.*; import static org.elasticsearch.common.lucene.DocumentBuilder.*;
@ -165,7 +169,7 @@ public class SimpleEngineBenchmark {
String sId = Integer.toString(id); String sId = Integer.toString(id);
Document doc = doc().add(field("_id", sId)) Document doc = doc().add(field("_id", sId))
.add(field("content", contentItem)).build(); .add(field("content", contentItem)).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) { if (create) {
engine.create(new Engine.Create(null, new Term("_id", sId), pDoc)); engine.create(new Engine.Create(null, new Term("_id", sId), pDoc));
} else { } else {
@ -279,7 +283,7 @@ public class SimpleEngineBenchmark {
String sId = Integer.toString(id); String sId = Integer.toString(id);
Document doc = doc().add(field("_id", sId)) Document doc = doc().add(field("_id", sId))
.add(field("content", content(id))).build(); .add(field("content", content(id))).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) { if (create) {
engine.create(new Engine.Create(null, new Term("_id", sId), pDoc)); engine.create(new Engine.Create(null, new Term("_id", sId), pDoc));
} else { } else {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -138,7 +139,11 @@ public class BulkRequest implements ActionRequest {
} else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) { } else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) {
timestamp = parser.text(); timestamp = parser.text();
} else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) { } else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) {
ttl = parser.longValue(); if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
ttl = TimeValue.parseTimeValue(parser.text(), null).millis();
} else {
ttl = parser.longValue();
}
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) { } else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text(); opType = parser.text();
} else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) { } else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) {

View File

@ -20,20 +20,18 @@
package org.elasticsearch.indices.ttl; package org.elasticsearch.indices.ttl;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Collector; import org.apache.lucene.search.Collector;
import org.apache.lucene.search.NumericRangeQuery; import org.apache.lucene.search.NumericRangeQuery;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder; import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.UidField; import org.elasticsearch.common.lucene.uid.UidField;
@ -51,46 +49,48 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* A node level service that delete expired docs on node primary shards. * A node level service that delete expired docs on node primary shards.
*
*/ */
public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLService> { public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLService> {
private static final String SETTING_PURGE_INTERVAL = "purge_interval"; static {
private static final TimeValue DEFAULT_PURGE_INTERVAL = new TimeValue(60, TimeUnit.SECONDS); MetaData.addDynamicSettings(
private static final String SETTINGS_BULK_SIZE = "bulk_size"; "indices.ttl.interval"
private static final int DEFAULT_BULK_SIZE = 10000; );
}
private final IndicesService indicesService; private final IndicesService indicesService;
private final Client client; private final Client client;
private final TimeValue purgeInterval; private volatile TimeValue interval;
private final int bulkSize; private final int bulkSize;
private BulkRequestBuilder bulkRequest;
private PurgerThread purgerThread; private PurgerThread purgerThread;
@Inject public IndicesTTLService(Settings settings, IndicesService indicesService, Client client) { @Inject public IndicesTTLService(Settings settings, IndicesService indicesService, NodeSettingsService nodeSettingsService, Client client) {
super(settings); super(settings);
this.indicesService = indicesService; this.indicesService = indicesService;
this.client = client; this.client = client;
this.purgeInterval = componentSettings.getAsTime(SETTING_PURGE_INTERVAL, DEFAULT_PURGE_INTERVAL); this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(60));
this.bulkSize = componentSettings.getAsInt(SETTINGS_BULK_SIZE, DEFAULT_BULK_SIZE); this.bulkSize = componentSettings.getAsInt("bulk_size", 10000);
nodeSettingsService.addListener(new ApplySettings());
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
this.purgerThread = new PurgerThread(EsExecutors.threadName(settings, "[purger]")); this.purgerThread = new PurgerThread(EsExecutors.threadName(settings, "[ttl_expire]"));
this.purgerThread.start(); this.purgerThread.start();
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
this.purgerThread.doStop(); this.purgerThread.doStop();
this.purgerThread.interrupt();
} }
@Override protected void doClose() throws ElasticSearchException { @Override protected void doClose() throws ElasticSearchException {
@ -110,13 +110,18 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
public void run() { public void run() {
while (running) { while (running) {
List<IndexShard> shardsToPurge = getShardsToPurge();
purgeShards(shardsToPurge);
try { try {
Thread.sleep(purgeInterval.millis()); List<IndexShard> shardsToPurge = getShardsToPurge();
purgeShards(shardsToPurge);
} catch (Throwable e) {
if (running) {
logger.warn("failed to execute ttl purge", e);
}
}
try {
Thread.sleep(interval.millis());
} catch (InterruptedException e) { } catch (InterruptedException e) {
running = false; // ignore, if we are interrupted because we are shutting down, running will be false
return;
} }
} }
@ -133,17 +138,15 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
// check if ttl is enabled for at least one type of this index // check if ttl is enabled for at least one type of this index
boolean hasTTLEnabled = false; boolean hasTTLEnabled = false;
for (FieldMapper ttlFieldMapper : ttlFieldMappers) { for (FieldMapper ttlFieldMapper : ttlFieldMappers) {
if (((TTLFieldMapper)ttlFieldMapper).enabled()) { if (((TTLFieldMapper) ttlFieldMapper).enabled()) {
hasTTLEnabled = true; hasTTLEnabled = true;
break; break;
} }
} }
if (hasTTLEnabled) if (hasTTLEnabled) {
{ for (IndexShard indexShard : indexService) {
for (Integer shardId : indexService.shardIds()) { if (indexShard.routingEntry().primary() && indexShard.state() == IndexShardState.STARTED && indexShard.routingEntry().started()) {
IndexShard shard = indexService.shard(shardId); shardsToPurge.add(indexShard);
if (shard.routingEntry().primary() && shard.state() == IndexShardState.STARTED && shard.routingEntry().started()) {
shardsToPurge.add(shard);
} }
} }
} }
@ -161,12 +164,12 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
ExpiredDocsCollector expiredDocsCollector = new ExpiredDocsCollector(); ExpiredDocsCollector expiredDocsCollector = new ExpiredDocsCollector();
searcher.searcher().search(query, expiredDocsCollector); searcher.searcher().search(query, expiredDocsCollector);
List<DocToPurge> docsToPurge = expiredDocsCollector.getDocsToPurge(); List<DocToPurge> docsToPurge = expiredDocsCollector.getDocsToPurge();
bulkRequest = client.prepareBulk(); BulkRequestBuilder bulkRequest = client.prepareBulk();
for (DocToPurge docToPurge : docsToPurge) { for (DocToPurge docToPurge : docsToPurge) {
bulkRequest.add(new DeleteRequest().index(shardToPurge.routingEntry().index()).type(docToPurge.type).id(docToPurge.id).version(docToPurge.version)); bulkRequest.add(new DeleteRequest().index(shardToPurge.routingEntry().index()).type(docToPurge.type).id(docToPurge.id).version(docToPurge.version));
processBulkIfNeeded(false); bulkRequest = processBulkIfNeeded(bulkRequest, false);
} }
processBulkIfNeeded(true); processBulkIfNeeded(bulkRequest, true);
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to purge", e); logger.warn("failed to purge", e);
} finally { } finally {
@ -206,7 +209,7 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
Document document = indexReader.document(doc, UidFieldSelector.INSTANCE); Document document = indexReader.document(doc, UidFieldSelector.INSTANCE);
String uid = document.getFieldable(UidFieldMapper.NAME).stringValue(); String uid = document.getFieldable(UidFieldMapper.NAME).stringValue();
long version = UidField.loadVersion(indexReader, UidFieldMapper.TERM_FACTORY.createTerm(uid)); long version = UidField.loadVersion(indexReader, UidFieldMapper.TERM_FACTORY.createTerm(uid));
docsToPurge.add(new DocToPurge(Uid.typeFromUid(uid),Uid.idFromUid(uid), version)); docsToPurge.add(new DocToPurge(Uid.typeFromUid(uid), Uid.idFromUid(uid), version));
} catch (Exception e) { } catch (Exception e) {
} }
} }
@ -220,12 +223,12 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
} }
} }
private void processBulkIfNeeded(boolean force) { private BulkRequestBuilder processBulkIfNeeded(BulkRequestBuilder bulkRequest, boolean force) {
if ((force && bulkRequest.numberOfActions() > 0) || bulkRequest.numberOfActions() >= bulkSize) { if ((force && bulkRequest.numberOfActions() > 0) || bulkRequest.numberOfActions() >= bulkSize) {
try { try {
bulkRequest.execute(new ActionListener<BulkResponse>() { bulkRequest.execute(new ActionListener<BulkResponse>() {
@Override public void onResponse(BulkResponse bulkResponse) { @Override public void onResponse(BulkResponse bulkResponse) {
logger.debug("bulk took " + bulkResponse.getTookInMillis() + "ms"); logger.trace("bulk took " + bulkResponse.getTookInMillis() + "ms");
} }
@Override public void onFailure(Throwable e) { @Override public void onFailure(Throwable e) {
@ -237,5 +240,16 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
} }
bulkRequest = client.prepareBulk(); bulkRequest = client.prepareBulk();
} }
return bulkRequest;
}
class ApplySettings implements NodeSettingsService.Listener {
@Override public void onRefreshSettings(Settings settings) {
TimeValue interval = settings.getAsTime("indices.ttl.interval", IndicesTTLService.this.interval);
if (!interval.equals(IndicesTTLService.this.interval)) {
logger.info("updating indices.ttl.interval from [{}] to [{}]", IndicesTTLService.this.interval, interval);
IndicesTTLService.this.interval = interval;
}
}
} }
} }

View File

@ -212,6 +212,7 @@ public final class InternalNode implements Node {
injector.getInstance(IndicesClusterStateService.class).stop(); injector.getInstance(IndicesClusterStateService.class).stop();
// we close indices first, so operations won't be allowed on it // we close indices first, so operations won't be allowed on it
injector.getInstance(IndexingMemoryBufferController.class).stop(); injector.getInstance(IndexingMemoryBufferController.class).stop();
injector.getInstance(IndicesTTLService.class).stop();
injector.getInstance(IndicesService.class).stop(); injector.getInstance(IndicesService.class).stop();
// sleep a bit to let operations finish with indices service // sleep a bit to let operations finish with indices service
// try { // try {
@ -262,6 +263,7 @@ public final class InternalNode implements Node {
stopWatch.stop().start("indices"); stopWatch.stop().start("indices");
injector.getInstance(IndicesNodeFilterCache.class).close(); injector.getInstance(IndicesNodeFilterCache.class).close();
injector.getInstance(IndexingMemoryBufferController.class).close(); injector.getInstance(IndexingMemoryBufferController.class).close();
injector.getInstance(IndicesTTLService.class).close();
injector.getInstance(IndicesService.class).close(); injector.getInstance(IndicesService.class).close();
stopWatch.stop().start("routing"); stopWatch.stop().start("routing");
injector.getInstance(RoutingService.class).close(); injector.getInstance(RoutingService.class).close();

View File

@ -30,7 +30,14 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.rest.*; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.XContentRestResponse;
import org.elasticsearch.rest.XContentThrowableRestResponse;
import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestXContentBuilder; import org.elasticsearch.rest.action.support.RestXContentBuilder;
@ -66,7 +73,7 @@ public class RestIndexAction extends BaseRestHandler {
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp")); indexRequest.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) { if (request.hasParam("ttl")) {
indexRequest.ttl(request.paramAsLong("ttl", -1)); indexRequest.ttl(request.paramAsTime("ttl", null).millis());
} }
indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe()); indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));

View File

@ -28,17 +28,17 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
public class SimpleTTLTests extends AbstractNodesTests { public class SimpleTTLTests extends AbstractNodesTests {
static private final long purgeInterval = 500; static private final long purgeInterval = 200;
private Client client; private Client client;
@BeforeClass public void createNodes() throws Exception { @BeforeClass public void createNodes() throws Exception {
Settings settings = settingsBuilder().put("indices.ttl.purge_interval", purgeInterval).build(); Settings settings = settingsBuilder().put("indices.ttl.interval", purgeInterval).build();
startNode("node1", settings); startNode("node1", settings);
startNode("node2", settings); startNode("node2", settings);
client = getClient(); client = getClient();