diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java index 28158eb6e9f..5221f96285a 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java @@ -307,7 +307,7 @@ public class SimpleEngineBenchmark { ThreadPool threadPool = new ThreadPool(); SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings)); - Engine engine = new RobinEngine(shardId, settings, new IndexSettingsService(shardId.index(), settings), store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)), + Engine engine = new RobinEngine(shardId, settings, new ThreadPool(), new IndexSettingsService(shardId.index(), settings), store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)), new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index())); engine.start(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 0962fbb716c..2fb92d87c66 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -53,8 +53,10 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -82,6 +84,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private final AtomicBoolean optimizeMutex = new AtomicBoolean(); + private final long gcDeletesInMillis; + + private final ThreadPool threadPool; + private final IndexSettingsService indexSettingsService; private final Store store; @@ -133,7 +139,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private final Object failedEngineMutex = new Object(); private final CopyOnWriteArrayList failedEngineListeners = new CopyOnWriteArrayList(); - @Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, + @Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, + IndexSettingsService indexSettingsService, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService, @@ -143,11 +150,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine"); Preconditions.checkNotNull(translog, "Translog must be provided to the engine"); + this.gcDeletesInMillis = indexSettings.getAsTime("index.gc_deletes", TimeValue.timeValueSeconds(60)).millis(); this.indexingBufferSize = componentSettings.getAsBytesSize("index_buffer_size", new ByteSizeValue(64, ByteSizeUnit.MB)); // not really important, as it is set by the IndexingMemory manager this.termIndexInterval = indexSettings.getAsInt("index.term_index_interval", IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL); this.termIndexDivisor = indexSettings.getAsInt("index.term_index_divisor", 1); // IndexReader#DEFAULT_TERMS_INDEX_DIVISOR this.asyncLoadBloomFilter = componentSettings.getAsBoolean("async_load_bloom", true); // Here for testing, should always be true + this.threadPool = threadPool; this.indexSettingsService = indexSettingsService; this.store = store; this.deletionPolicy = deletionPolicy; @@ -270,7 +279,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (create.origin() == Operation.Origin.RECOVERY) { // on recovery, we get the actual version we want to use if (create.version() != 0) { - versionMap.put(create.uid().text(), new VersionValue(create.version(), false)); + versionMap.put(create.uid().text(), new VersionValue(create.version(), false, threadPool.estimatedTimeInMillis())); } uidField.version(create.version()); writer.addDocument(create.doc(), create.analyzer()); @@ -281,7 +290,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (versionValue == null) { currentVersion = loadCurrentVersionFromIndex(create.uid()); } else { - currentVersion = versionValue.version(); + if (versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { + currentVersion = -1; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } // same logic as index @@ -337,7 +350,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id()); } - versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false)); + versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis())); uidField.version(updatedVersion); create.version(updatedVersion); @@ -374,7 +387,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (index.origin() == Operation.Origin.RECOVERY) { // on recovery, we get the actual version we want to use if (index.version() != 0) { - versionMap.put(index.uid().text(), new VersionValue(index.version(), false)); + versionMap.put(index.uid().text(), new VersionValue(index.version(), false, threadPool.estimatedTimeInMillis())); } uidField.version(index.version()); writer.updateDocument(index.uid(), index.doc(), index.analyzer()); @@ -385,7 +398,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (versionValue == null) { currentVersion = loadCurrentVersionFromIndex(index.uid()); } else { - currentVersion = versionValue.version(); + if (versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { + currentVersion = -1; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } long updatedVersion; @@ -430,7 +447,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { updatedVersion = index.version(); } - versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false)); + versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis())); uidField.version(updatedVersion); index.version(updatedVersion); @@ -470,7 +487,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (delete.origin() == Operation.Origin.RECOVERY) { // update the version with the exact version from recovery, assuming we have it if (delete.version() != 0) { - versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true)); + versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true, threadPool.estimatedTimeInMillis())); } writer.deleteDocuments(delete.uid()); @@ -481,7 +498,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (versionValue == null) { currentVersion = loadCurrentVersionFromIndex(delete.uid()); } else { - currentVersion = versionValue.version(); + if (versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { + currentVersion = -1; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } long updatedVersion; @@ -528,7 +549,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // if its a delete on delete and we have the current delete version, return it delete.version(versionValue.version()).notFound(true); } else { - versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true)); + versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis())); delete.version(updatedVersion); writer.deleteDocuments(delete.uid()); translog.add(new Translog.Delete(delete)); @@ -681,7 +702,17 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { throw new FlushFailedEngineException(shardId, e); } } - versionMap.clear(); + // remove all version except for deletes, which we expire based on GC value + long time = threadPool.estimatedTimeInMillis(); + for (Map.Entry entry : versionMap.entrySet()) { + if (entry.getValue().delete()) { + if ((time - entry.getValue().time()) > gcDeletesInMillis) { + versionMap.remove(entry.getKey()); + } + } else { + versionMap.remove(entry.getKey()); + } + } dirty = true; // force a refresh // we need to do a refresh here so we sync versioning support refresh(new Refresh(true).force(true)); @@ -1044,12 +1075,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } static class VersionValue { - private long version; + private final long version; private final boolean delete; + private final long time; - VersionValue(long version, boolean delete) { + VersionValue(long version, boolean delete, long time) { this.version = version; this.delete = delete; + this.time = time; + } + + public long time() { + return this.time; } public long version() { diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/robin/SimpleRobinEngineTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/robin/SimpleRobinEngineTests.java index de7745c1179..b51b944caff 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/robin/SimpleRobinEngineTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/robin/SimpleRobinEngineTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; @@ -36,7 +37,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; public class SimpleRobinEngineTests extends AbstractSimpleEngineTests { protected Engine createEngine(Store store, Translog translog) { - return new RobinEngine(shardId, EMPTY_SETTINGS, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), + return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index())); } }