Versioning: Better handling of deletes - time based eviction, closes #862.

This commit is contained in:
kimchy 2011-04-16 17:26:31 +03:00
parent 4eddaec8ba
commit 763f986a30
3 changed files with 53 additions and 15 deletions

View File

@ -307,7 +307,7 @@ public class SimpleEngineBenchmark {
ThreadPool threadPool = new ThreadPool();
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
Engine engine = new RobinEngine(shardId, settings, new IndexSettingsService(shardId.index(), settings), store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)),
Engine engine = new RobinEngine(shardId, settings, new ThreadPool(), new IndexSettingsService(shardId.index(), settings), store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)),
new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index()));
engine.start();

View File

@ -53,8 +53,10 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -82,6 +84,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private final AtomicBoolean optimizeMutex = new AtomicBoolean();
private final long gcDeletesInMillis;
private final ThreadPool threadPool;
private final IndexSettingsService indexSettingsService;
private final Store store;
@ -133,7 +139,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private final Object failedEngineMutex = new Object();
private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<FailedEngineListener>();
@Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
@Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
IndexSettingsService indexSettingsService,
Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
AnalysisService analysisService, SimilarityService similarityService,
@ -143,11 +150,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(translog, "Translog must be provided to the engine");
this.gcDeletesInMillis = indexSettings.getAsTime("index.gc_deletes", TimeValue.timeValueSeconds(60)).millis();
this.indexingBufferSize = componentSettings.getAsBytesSize("index_buffer_size", new ByteSizeValue(64, ByteSizeUnit.MB)); // not really important, as it is set by the IndexingMemory manager
this.termIndexInterval = indexSettings.getAsInt("index.term_index_interval", IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL);
this.termIndexDivisor = indexSettings.getAsInt("index.term_index_divisor", 1); // IndexReader#DEFAULT_TERMS_INDEX_DIVISOR
this.asyncLoadBloomFilter = componentSettings.getAsBoolean("async_load_bloom", true); // Here for testing, should always be true
this.threadPool = threadPool;
this.indexSettingsService = indexSettingsService;
this.store = store;
this.deletionPolicy = deletionPolicy;
@ -270,7 +279,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (create.origin() == Operation.Origin.RECOVERY) {
// on recovery, we get the actual version we want to use
if (create.version() != 0) {
versionMap.put(create.uid().text(), new VersionValue(create.version(), false));
versionMap.put(create.uid().text(), new VersionValue(create.version(), false, threadPool.estimatedTimeInMillis()));
}
uidField.version(create.version());
writer.addDocument(create.doc(), create.analyzer());
@ -281,7 +290,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(create.uid());
} else {
currentVersion = versionValue.version();
if (versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
currentVersion = -1; // deleted, and GC
} else {
currentVersion = versionValue.version();
}
}
// same logic as index
@ -337,7 +350,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id());
}
versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false));
versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis()));
uidField.version(updatedVersion);
create.version(updatedVersion);
@ -374,7 +387,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (index.origin() == Operation.Origin.RECOVERY) {
// on recovery, we get the actual version we want to use
if (index.version() != 0) {
versionMap.put(index.uid().text(), new VersionValue(index.version(), false));
versionMap.put(index.uid().text(), new VersionValue(index.version(), false, threadPool.estimatedTimeInMillis()));
}
uidField.version(index.version());
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
@ -385,7 +398,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
} else {
currentVersion = versionValue.version();
if (versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
currentVersion = -1; // deleted, and GC
} else {
currentVersion = versionValue.version();
}
}
long updatedVersion;
@ -430,7 +447,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
updatedVersion = index.version();
}
versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false));
versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis()));
uidField.version(updatedVersion);
index.version(updatedVersion);
@ -470,7 +487,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (delete.origin() == Operation.Origin.RECOVERY) {
// update the version with the exact version from recovery, assuming we have it
if (delete.version() != 0) {
versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true));
versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true, threadPool.estimatedTimeInMillis()));
}
writer.deleteDocuments(delete.uid());
@ -481,7 +498,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(delete.uid());
} else {
currentVersion = versionValue.version();
if (versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
currentVersion = -1; // deleted, and GC
} else {
currentVersion = versionValue.version();
}
}
long updatedVersion;
@ -528,7 +549,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// if its a delete on delete and we have the current delete version, return it
delete.version(versionValue.version()).notFound(true);
} else {
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true));
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis()));
delete.version(updatedVersion);
writer.deleteDocuments(delete.uid());
translog.add(new Translog.Delete(delete));
@ -681,7 +702,17 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
throw new FlushFailedEngineException(shardId, e);
}
}
versionMap.clear();
// remove all version except for deletes, which we expire based on GC value
long time = threadPool.estimatedTimeInMillis();
for (Map.Entry<String, VersionValue> entry : versionMap.entrySet()) {
if (entry.getValue().delete()) {
if ((time - entry.getValue().time()) > gcDeletesInMillis) {
versionMap.remove(entry.getKey());
}
} else {
versionMap.remove(entry.getKey());
}
}
dirty = true; // force a refresh
// we need to do a refresh here so we sync versioning support
refresh(new Refresh(true).force(true));
@ -1044,12 +1075,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
static class VersionValue {
private long version;
private final long version;
private final boolean delete;
private final long time;
VersionValue(long version, boolean delete) {
VersionValue(long version, boolean delete, long time) {
this.version = version;
this.delete = delete;
this.time = time;
}
public long time() {
return this.time;
}
public long version() {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
@ -36,7 +37,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {
protected Engine createEngine(Store store, Translog translog) {
return new RobinEngine(shardId, EMPTY_SETTINGS, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index()));
}
}