From 0fcb4e88d45b89bee5b7aef0c49fa29788fd64c2 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 9 Sep 2011 01:21:05 +0300 Subject: [PATCH] cleanup ttl support, make sure we close the service on node lifecycle, better settings names, and allow to provide the ttl as a time value --- .../index/engine/SimpleEngineBenchmark.java | 10 ++- .../action/bulk/BulkRequest.java | 7 +- .../indices/ttl/IndicesTTLService.java | 80 +++++++++++-------- .../node/internal/InternalNode.java | 2 + .../rest/action/index/RestIndexAction.java | 11 ++- .../test/integration/ttl/SimpleTTLTests.java | 6 +- 6 files changed, 74 insertions(+), 42 deletions(-) diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java index 94fdb65fcb1..91296cd0f73 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java @@ -49,7 +49,11 @@ import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.threadpool.ThreadPool; import java.io.File; -import java.util.concurrent.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.lucene.DocumentBuilder.*; @@ -165,7 +169,7 @@ public class SimpleEngineBenchmark { String sId = Integer.toString(id); Document doc = doc().add(field("_id", sId)) .add(field("content", contentItem)).build(); - ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); + ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); if (create) { engine.create(new Engine.Create(null, new Term("_id", sId), pDoc)); } else { @@ -279,7 +283,7 @@ public class SimpleEngineBenchmark { String sId = Integer.toString(id); Document doc = doc().add(field("_id", sId)) .add(field("content", content(id))).build(); - ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); + ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); if (create) { engine.create(new Engine.Create(null, new Term("_id", sId), pDoc)); } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index fda67909706..c9f736a19d2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; @@ -138,7 +139,11 @@ public class BulkRequest implements ActionRequest { } else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) { timestamp = parser.text(); } else if ("_ttl".equals(currentFieldName) || "ttl".equals(currentFieldName)) { - ttl = parser.longValue(); + if (parser.currentToken() == XContentParser.Token.VALUE_STRING) { + ttl = TimeValue.parseTimeValue(parser.text(), null).millis(); + } else { + ttl = parser.longValue(); + } } else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) { opType = parser.text(); } else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java index f88e2a303ce..5e54b2a63ef 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java @@ -20,20 +20,18 @@ package org.elasticsearch.indices.ttl; import org.apache.lucene.document.Document; - import org.apache.lucene.index.IndexReader; - import org.apache.lucene.search.Collector; import org.apache.lucene.search.NumericRangeQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorer; import org.elasticsearch.ElasticSearchException; - import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.action.bulk.BulkRequestBuilder; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.uid.UidField; @@ -51,46 +49,48 @@ import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.settings.NodeSettingsService; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; /** * A node level service that delete expired docs on node primary shards. - * */ public class IndicesTTLService extends AbstractLifecycleComponent { - private static final String SETTING_PURGE_INTERVAL = "purge_interval"; - private static final TimeValue DEFAULT_PURGE_INTERVAL = new TimeValue(60, TimeUnit.SECONDS); - private static final String SETTINGS_BULK_SIZE = "bulk_size"; - private static final int DEFAULT_BULK_SIZE = 10000; + static { + MetaData.addDynamicSettings( + "indices.ttl.interval" + ); + } private final IndicesService indicesService; private final Client client; - private final TimeValue purgeInterval; + private volatile TimeValue interval; private final int bulkSize; - private BulkRequestBuilder bulkRequest; private PurgerThread purgerThread; - @Inject public IndicesTTLService(Settings settings, IndicesService indicesService, Client client) { + @Inject public IndicesTTLService(Settings settings, IndicesService indicesService, NodeSettingsService nodeSettingsService, Client client) { super(settings); this.indicesService = indicesService; this.client = client; - this.purgeInterval = componentSettings.getAsTime(SETTING_PURGE_INTERVAL, DEFAULT_PURGE_INTERVAL); - this.bulkSize = componentSettings.getAsInt(SETTINGS_BULK_SIZE, DEFAULT_BULK_SIZE); + this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(60)); + this.bulkSize = componentSettings.getAsInt("bulk_size", 10000); + + nodeSettingsService.addListener(new ApplySettings()); } @Override protected void doStart() throws ElasticSearchException { - this.purgerThread = new PurgerThread(EsExecutors.threadName(settings, "[purger]")); + this.purgerThread = new PurgerThread(EsExecutors.threadName(settings, "[ttl_expire]")); this.purgerThread.start(); } @Override protected void doStop() throws ElasticSearchException { this.purgerThread.doStop(); + this.purgerThread.interrupt(); } @Override protected void doClose() throws ElasticSearchException { @@ -110,13 +110,18 @@ public class IndicesTTLService extends AbstractLifecycleComponent shardsToPurge = getShardsToPurge(); - purgeShards(shardsToPurge); try { - Thread.sleep(purgeInterval.millis()); + List shardsToPurge = getShardsToPurge(); + purgeShards(shardsToPurge); + } catch (Throwable e) { + if (running) { + logger.warn("failed to execute ttl purge", e); + } + } + try { + Thread.sleep(interval.millis()); } catch (InterruptedException e) { - running = false; - return; + // ignore, if we are interrupted because we are shutting down, running will be false } } @@ -133,17 +138,15 @@ public class IndicesTTLService extends AbstractLifecycleComponent docsToPurge = expiredDocsCollector.getDocsToPurge(); - bulkRequest = client.prepareBulk(); + BulkRequestBuilder bulkRequest = client.prepareBulk(); for (DocToPurge docToPurge : docsToPurge) { bulkRequest.add(new DeleteRequest().index(shardToPurge.routingEntry().index()).type(docToPurge.type).id(docToPurge.id).version(docToPurge.version)); - processBulkIfNeeded(false); + bulkRequest = processBulkIfNeeded(bulkRequest, false); } - processBulkIfNeeded(true); + processBulkIfNeeded(bulkRequest, true); } catch (Exception e) { logger.warn("failed to purge", e); } finally { @@ -206,7 +209,7 @@ public class IndicesTTLService extends AbstractLifecycleComponent 0) || bulkRequest.numberOfActions() >= bulkSize) { try { bulkRequest.execute(new ActionListener() { @Override public void onResponse(BulkResponse bulkResponse) { - logger.debug("bulk took " + bulkResponse.getTookInMillis() + "ms"); + logger.trace("bulk took " + bulkResponse.getTookInMillis() + "ms"); } @Override public void onFailure(Throwable e) { @@ -237,5 +240,16 @@ public class IndicesTTLService extends AbstractLifecycleComponent