Core: trigger refresh when the live version map is using too much RAM

When refresh_interval is long or disabled, and indexing rate is high,
it's possible for live version map to use non-trivial amounts of RAM.
With this change we now trigger a refresh in such cases to clear the
version map so we don't use unbounded RAM.

Closes #6443
This commit is contained in:
mikemccand 2014-07-08 12:07:48 -04:00
parent 5ebc238a25
commit a8417a7de3
8 changed files with 472 additions and 80 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.engine.internal; package org.elasticsearch.index.engine.internal;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
/** Holds a deleted version, which just adds a timestmap to {@link VersionValue} so we know when we can expire the deletion. */ /** Holds a deleted version, which just adds a timestmap to {@link VersionValue} so we know when we can expire the deletion. */
@ -40,4 +41,9 @@ class DeleteVersionValue extends VersionValue {
public boolean delete() { public boolean delete() {
return true; return true;
} }
@Override
public long ramBytesUsed() {
return super.ramBytesUsed() + RamUsageEstimator.NUM_BYTES_LONG;
}
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.engine.internal; package org.elasticsearch.index.engine.internal;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
@ -97,6 +98,10 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private volatile boolean compoundOnFlush = true; private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis; private long gcDeletesInMillis;
/** When we last pruned expired tombstones from versionMap.deletes: */
private volatile long lastDeleteVersionPruneTimeMSec;
private volatile boolean enableGcDeletes = true; private volatile boolean enableGcDeletes = true;
private volatile String codecName; private volatile String codecName;
private final boolean optimizeAutoGenerateId; private final boolean optimizeAutoGenerateId;
@ -159,6 +164,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<>();
private final AtomicLong translogIdGenerator = new AtomicLong(); private final AtomicLong translogIdGenerator = new AtomicLong();
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
private SegmentInfos lastCommittedSegmentInfos; private SegmentInfos lastCommittedSegmentInfos;
@ -180,6 +186,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
this.codecName = indexSettings.get(INDEX_CODEC, "default"); this.codecName = indexSettings.get(INDEX_CODEC, "default");
this.threadPool = threadPool; this.threadPool = threadPool;
this.lastDeleteVersionPruneTimeMSec = threadPool.estimatedTimeInMillis();
this.indexSettingsService = indexSettingsService; this.indexSettingsService = indexSettingsService;
this.indexingService = indexingService; this.indexingService = indexingService;
this.warmer = (InternalIndicesWarmer) warmer; this.warmer = (InternalIndicesWarmer) warmer;
@ -393,6 +400,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
maybeFailEngine(t); maybeFailEngine(t);
throw new CreateFailedEngineException(shardId, create, t); throw new CreateFailedEngineException(shardId, create, t);
} }
checkVersionMapRefresh();
} }
private void maybeFailEngine(Throwable t) { private void maybeFailEngine(Throwable t) {
@ -487,6 +495,20 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
maybeFailEngine(t); maybeFailEngine(t);
throw new IndexFailedEngineException(shardId, index, t); throw new IndexFailedEngineException(shardId, index, t);
} }
checkVersionMapRefresh();
}
/** Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer). */
private void checkVersionMapRefresh() {
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
if (versionMap.ramBytesUsedForRefresh()/1024/1024. > 0.25*this.indexWriter.getConfig().getRAMBufferSizeMB() && versionMapRefreshPending.getAndSet(true) == false) {
// Now refresh to clear versionMap:
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
public void run() {
refresh(new Refresh("version_table_full"));
}
});
}
} }
private void innerIndex(Index index, IndexWriter writer) throws IOException { private void innerIndex(Index index, IndexWriter writer) throws IOException {
@ -514,7 +536,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
} }
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.updateVersion(updatedVersion); index.updateVersion(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) { if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create // document does not exists, we can optimize for create
@ -557,6 +578,16 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
maybeFailEngine(t); maybeFailEngine(t);
throw new DeleteFailedEngineException(shardId, delete, t); throw new DeleteFailedEngineException(shardId, delete, t);
} }
maybePruneDeletedTombstones();
}
private void maybePruneDeletedTombstones() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (enableGcDeletes && threadPool.estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > gcDeletesInMillis*0.25) {
pruneDeletedTombstones();
}
} }
private void innerDelete(Delete delete, IndexWriter writer) throws IOException { private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
@ -583,7 +614,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
} }
} }
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
final boolean found; final boolean found;
if (currentVersion == Versions.NOT_FOUND) { if (currentVersion == Versions.NOT_FOUND) {
// doc does not exist and no prior deletes // doc does not exist and no prior deletes
@ -599,7 +629,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
delete.updateVersion(updatedVersion, found); delete.updateVersion(updatedVersion, found);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, threadPool.estimatedTimeInMillis(), translogLocation)); versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, threadPool.estimatedTimeInMillis(), translogLocation));
indexingService.postDeleteUnderLock(delete); indexingService.postDeleteUnderLock(delete);
} }
@ -633,8 +663,10 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
maybeFailEngine(t); maybeFailEngine(t);
throw new DeleteByQueryFailedEngineException(shardId, delete, t); throw new DeleteByQueryFailedEngineException(shardId, delete, t);
} }
//TODO: This is heavy, since we refresh, but we really have to...
pruneDeletedVersions(System.currentTimeMillis()); // TODO: This is heavy, since we refresh, but we must do this because we don't know which documents were in fact deleted (i.e., our
// versionMap isn't updated), so we must force a cutover to a new reader to "see" the deletions:
refresh(new Refresh("delete_by_query").force(true));
} }
@Override @Override
@ -726,6 +758,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
failEngine("refresh failed", t); failEngine("refresh failed", t);
throw new RefreshFailedEngineException(shardId, t); throw new RefreshFailedEngineException(shardId, t);
} }
// TODO: maybe we should just put a scheduled job in threadPool?
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
// for a long time:
maybePruneDeletedTombstones();
versionMapRefreshPending.set(false);
} }
@Override @Override
@ -780,7 +818,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Failed to close current SearcherManager", t); logger.warn("Failed to close current SearcherManager", t);
} }
pruneDeletedVersions(threadPool.estimatedTimeInMillis());
maybePruneDeletedTombstones();
} catch (Throwable t) { } catch (Throwable t) {
throw new FlushFailedEngineException(shardId, t); throw new FlushFailedEngineException(shardId, t);
} }
@ -799,17 +839,26 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
translog.newTransientTranslog(translogId); translog.newTransientTranslog(translogId);
indexWriter.setCommitData(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map()); indexWriter.setCommitData(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
indexWriter.commit(); indexWriter.commit();
pruneDeletedVersions(threadPool.estimatedTimeInMillis()); // we need to refresh in order to clear older version values
refresh(new Refresh("version_table_flush").force(true));
// we need to move transient to current only after we refresh // we need to move transient to current only after we refresh
// so items added to current will still be around for realtime get // so items added to current will still be around for realtime get
// when tans overrides it // when tans overrides it
translog.makeTransientCurrent(); translog.makeTransientCurrent();
} catch (Throwable e) { } catch (Throwable e) {
translog.revertTransient(); translog.revertTransient();
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} }
} }
} }
// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (enableGcDeletes) {
pruneDeletedTombstones();
}
} else if (flush.type() == Flush.Type.COMMIT) { } else if (flush.type() == Flush.Type.COMMIT) {
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a // note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
// translog on an index that was opened on a committed point in time that is "in the future" // translog on an index that was opened on a committed point in time that is "in the future"
@ -827,6 +876,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} }
} }
// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (enableGcDeletes) {
pruneDeletedTombstones();
}
} else { } else {
throw new ElasticsearchIllegalStateException("flush type [" + flush.type() + "] not supported"); throw new ElasticsearchIllegalStateException("flush type [" + flush.type() + "] not supported");
} }
@ -869,34 +925,29 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
return writer; return writer;
} }
private void pruneDeletedVersions(long time) { private void pruneDeletedTombstones() {
// we need to refresh in order to clear older version values long timeMSec = threadPool.estimatedTimeInMillis();
refresh(new Refresh("version_table").force(true));
// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock... // TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...
// we only need to prune deletes; the adds/updates are cleared whenever reader is refreshed: // we only need to prune the deletes map; the current/old version maps are cleared on refresh:
for (Map.Entry<BytesRef, VersionValue> entry : versionMap.getAllDeletes()) { for (Map.Entry<BytesRef, VersionValue> entry : versionMap.getAllTombstones()) {
BytesRef uid = entry.getKey(); BytesRef uid = entry.getKey();
synchronized (dirtyLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? synchronized (dirtyLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator: // Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
VersionValue versionValue = versionMap.getDeleteUnderLock(uid); VersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (versionValue == null) { if (versionValue != null) {
// another thread has re-added this uid since we started refreshing: if (timeMSec - versionValue.time() > gcDeletesInMillis) {
continue; versionMap.removeTombstoneUnderLock(uid);
}
if (time - versionValue.time() <= 0) {
continue; // its a newer value, from after/during we refreshed, don't clear it
}
assert versionValue.delete();
if (enableGcDeletes && (time - versionValue.time()) > gcDeletesInMillis) {
versionMap.removeDeleteUnderLock(uid);
} }
} }
} }
} }
lastDeleteVersionPruneTimeMSec = timeMSec;
}
@Override @Override
public void maybeMerge() throws EngineException { public void maybeMerge() throws EngineException {
if (!possibleMergeNeeded()) { if (!possibleMergeNeeded()) {

View File

@ -21,117 +21,226 @@ package org.elasticsearch.index.engine.internal;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.Version;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
// TODO: use Lucene's LiveFieldValues, but we need to somehow extend it to handle SearcherManager changing, and to handle long-lasting (GC'd
// by time) tombstones
/** Maps _uid value to its version information. */ /** Maps _uid value to its version information. */
class LiveVersionMap implements ReferenceManager.RefreshListener { class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
// All writes go into here: private static class Maps {
private volatile Map<BytesRef,VersionValue> addsCurrent = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
// Only used while refresh is running: // All writes (adds and deletes) go into here:
private volatile Map<BytesRef,VersionValue> addsOld = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); final Map<BytesRef,VersionValue> current;
// Holds tombstones for deleted docs, expiring by their own schedule; not private so InternalEngine can prune: // Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
private final Map<BytesRef,VersionValue> deletes = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); final Map<BytesRef,VersionValue> old;
public Maps(Map<BytesRef,VersionValue> current, Map<BytesRef,VersionValue> old) {
this.current = current;
this.old = old;
}
public Maps() {
this(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(),
ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency());
}
}
// All deletes also go here, and delete "tombstones" are retained after refresh:
private final Map<BytesRef,VersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private volatile Maps maps = new Maps();
private ReferenceManager mgr; private ReferenceManager mgr;
public void setManager(ReferenceManager newMgr) { /** Bytes consumed for each BytesRef UID:
*
* NUM_BYTES_OBJECT_HEADER + 2*NUM_BYTES_INT + NUM_BYTES_OBJECT_REF + NUM_BYTES_ARRAY_HEADER [ + bytes.length] */
private static final int BASE_BYTES_PER_BYTESREF = RamUsageEstimator.NUM_BYTES_OBJECT_HEADER +
2*RamUsageEstimator.NUM_BYTES_INT +
RamUsageEstimator.NUM_BYTES_OBJECT_REF +
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
/** Bytes used by having CHM point to a key/value:
*
* CHM.Entry:
* + NUM_BYTES_OBJECT_HEADER + 3*NUM_BYTES_OBJECT_REF + NUM_BYTES_INT
*
* CHM's pointer to CHM.Entry, double for approx load factor:
* + 2*NUM_BYTES_OBJECT_REF */
private static final int BASE_BYTES_PER_CHM_ENTRY = RamUsageEstimator.NUM_BYTES_OBJECT_HEADER +
RamUsageEstimator.NUM_BYTES_INT +
5*RamUsageEstimator.NUM_BYTES_OBJECT_REF;
/** Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account
* for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM. */
final AtomicLong ramBytesUsedCurrent = new AtomicLong();
/** Tracks bytes used by tombstones (deletes) */
final AtomicLong ramBytesUsedTombstones = new AtomicLong();
/** Sync'd because we replace old mgr. */
synchronized void setManager(ReferenceManager newMgr) {
if (mgr != null) { if (mgr != null) {
mgr.removeListener(this); mgr.removeListener(this);
} }
mgr = newMgr; mgr = newMgr;
// In case InternalEngine closes & opens a new IndexWriter/SearcherManager, all deletes are made visible, so we clear old and
// current here. This is safe because caller holds writeLock here (so no concurrent adds/deletes can be happeninge):
maps = new Maps();
// So we are notified when reopen starts and finishes // So we are notified when reopen starts and finishes
mgr.addListener(this); mgr.addListener(this);
} }
@Override @Override
public void beforeRefresh() throws IOException { public void beforeRefresh() throws IOException {
addsOld = addsCurrent;
// Start sending all updates after this point to the new // Start sending all updates after this point to the new
// map. While reopen is running, any lookup will first // map. While reopen is running, any lookup will first
// try this new map, then fallback to old, then to the // try this new map, then fallback to old, then to the
// current searcher: // current searcher:
addsCurrent = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); maps = new Maps(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(), maps.current);
// This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous
// line and this one, but that should be minor, and the error won't accumulate over time:
ramBytesUsedCurrent.set(0);
} }
@Override @Override
public void afterRefresh(boolean didRefresh) throws IOException { public void afterRefresh(boolean didRefresh) throws IOException {
// Now drop all the old values because they are now // We can now drop old because these operations are now visible via the newly opened searcher. Even if didRefresh is false, which
// visible via the searcher that was just opened; if // means Lucene did not actually open a new reader because it detected no changes, it's possible old has some entries in it, which
// didRefresh is false, it's possible old has some // is fine: it means they were actually already included in the previously opened reader, so we can still safely drop them in that
// entries in it, which is fine: it means they were // case. This is because we assign new maps (in beforeRefresh) slightly before Lucene actually flushes any segments for the
// actually already included in the previously opened // reopen, and so any concurrent indexing requests can still sneak in a few additions to that current map that are in fact reflected
// reader. So we can safely clear old here: // in the previous reader. We don't touch tombstones here: they expire on their own index.gc_deletes timeframe:
addsOld = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); maps = new Maps(maps.current, ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency());
} }
/** Caller has a lock, so that this uid will not be concurrently added/deleted by another thread. */ /** Returns the live version (add or delete) for this uid. */
public VersionValue getUnderLock(BytesRef uid) { VersionValue getUnderLock(BytesRef uid) {
Maps currentMaps = maps;
// First try to get the "live" value: // First try to get the "live" value:
VersionValue value = addsCurrent.get(uid); VersionValue value = currentMaps.current.get(uid);
if (value != null) { if (value != null) {
return value; return value;
} }
value = addsOld.get(uid); value = currentMaps.old.get(uid);
if (value != null) { if (value != null) {
return value; return value;
} }
value = deletes.get(uid); return tombstones.get(uid);
if (value != null) {
return value;
}
return null;
} }
/** Adds this uid/version to the pending adds map. */ /** Adds this uid/version to the pending adds map. */
public void putUnderLock(BytesRef uid, VersionValue version) { void putUnderLock(BytesRef uid, VersionValue version) {
deletes.remove(uid);
addsCurrent.put(uid, version); long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
final VersionValue prev = maps.current.put(uid, version);
if (prev != null) {
// Deduct RAM for the version we just replaced:
long prevBytes = BASE_BYTES_PER_CHM_ENTRY;
if (prev.delete() == false) {
prevBytes += prev.ramBytesUsed() + uidRAMBytesUsed;
}
ramBytesUsedCurrent.addAndGet(-prevBytes);
} }
/** Adds this uid/version to the pending deletes map. */ // Add RAM for the new version:
public void putDeleteUnderLock(BytesRef uid, VersionValue version) { long newBytes = BASE_BYTES_PER_CHM_ENTRY;
addsCurrent.remove(uid); if (version.delete() == false) {
addsOld.remove(uid); newBytes += version.ramBytesUsed() + uidRAMBytesUsed;
deletes.put(uid, version); }
ramBytesUsedCurrent.addAndGet(newBytes);
final VersionValue prevTombstone;
if (version.delete()) {
// Also enroll the delete into tombstones, and account for its RAM too:
prevTombstone = tombstones.put(uid, version);
// We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up
// on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift
// the accounting to current:
ramBytesUsedTombstones.addAndGet(BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);
if (prevTombstone == null && prev != null && prev.delete()) {
// If prev was a delete that had already been removed from tombstones, then current was already accounting for the
// BytesRef/VersionValue RAM, so we now deduct that as well:
ramBytesUsedCurrent.addAndGet(-(prev.ramBytesUsed() + uidRAMBytesUsed));
}
} else {
// UID came back to life so we remove the tombstone:
prevTombstone = tombstones.remove(uid);
} }
/** Returns the current deleted version for this uid. */ // Deduct tombstones bytes used for the version we just removed or replaced:
public VersionValue getDeleteUnderLock(BytesRef uid) { if (prevTombstone != null) {
return deletes.get(uid); long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed));
assert v >= 0;
}
} }
/** Removes this uid from the pending deletes map. */ /** Removes this uid from the pending deletes map. */
public void removeDeleteUnderLock(BytesRef uid) { void removeTombstoneUnderLock(BytesRef uid) {
deletes.remove(uid);
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
final VersionValue prev = tombstones.remove(uid);
if (prev != null) {
assert prev.delete();
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed));
assert v >= 0;
}
final VersionValue curVersion = maps.current.get(uid);
if (curVersion != null && curVersion.delete()) {
// We now shift accounting of the BytesRef from tombstones to current, because a refresh would clear this RAM. This should be
// uncommon, because with the default refresh=1s and gc_deletes=60s, deletes should be cleared from current long before we drop
// them from tombstones:
ramBytesUsedCurrent.addAndGet(curVersion.ramBytesUsed() + uidRAMBytesUsed);
}
} }
/** Iterates over all pending deletions. */ /** Caller has a lock, so that this uid will not be concurrently added/deleted by another thread. */
public Iterable<Map.Entry<BytesRef,VersionValue>> getAllDeletes() { VersionValue getTombstoneUnderLock(BytesRef uid) {
return deletes.entrySet(); return tombstones.get(uid);
}
/** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */
Iterable<Map.Entry<BytesRef,VersionValue>> getAllTombstones() {
return tombstones.entrySet();
} }
/** Called when this index is closed. */ /** Called when this index is closed. */
public void clear() { synchronized void clear() {
addsCurrent.clear(); maps = new Maps();
addsOld.clear(); tombstones.clear();
deletes.clear(); ramBytesUsedCurrent.set(0);
ramBytesUsedTombstones.set(0);
if (mgr != null) { if (mgr != null) {
mgr.removeListener(this); mgr.removeListener(this);
mgr = null; mgr = null;
} }
} }
@Override
public long ramBytesUsed() {
return ramBytesUsedCurrent.get() + ramBytesUsedTombstones.get();
}
/** Returns how much RAM would be freed up by refreshing. This is {@link ramBytesUsed} except does not include tombstones because they
* don't clear on refresh. */
long ramBytesUsedForRefresh() {
return ramBytesUsedCurrent.get();
}
} }

View File

@ -19,9 +19,13 @@
package org.elasticsearch.index.engine.internal; package org.elasticsearch.index.engine.internal;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.Version;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
class VersionValue { class VersionValue implements Accountable {
private final long version; private final long version;
private final Translog.Location translogLocation; private final Translog.Location translogLocation;
@ -45,4 +49,9 @@ class VersionValue {
public Translog.Location translogLocation() { public Translog.Location translogLocation() {
return this.translogLocation; return this.translogLocation;
} }
@Override
public long ramBytesUsed() {
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_LONG + RamUsageEstimator.NUM_BYTES_OBJECT_REF + translogLocation.ramBytesUsed();
}
} }

View File

@ -21,8 +21,10 @@ package org.elasticsearch.index.translog;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
@ -39,6 +41,8 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
/** /**
* *
@ -133,7 +137,8 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
*/ */
TranslogStats stats(); TranslogStats stats();
static class Location { static class Location implements Accountable {
public final long translogId; public final long translogId;
public final long translogLocation; public final long translogLocation;
public final int size; public final int size;
@ -143,6 +148,11 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent,
this.translogLocation = translogLocation; this.translogLocation = translogLocation;
this.size = size; this.size = size;
} }
@Override
public long ramBytesUsed() {
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_LONG + RamUsageEstimator.NUM_BYTES_INT;
}
} }
/** /**

View File

@ -78,7 +78,6 @@ public class TranslogService extends AbstractIndexShardComponent {
this.indexSettingsService = indexSettingsService; this.indexSettingsService = indexSettingsService;
this.indexShard = indexShard; this.indexShard = indexShard;
this.translog = translog; this.translog = translog;
this.flushThresholdOperations = componentSettings.getAsInt(FLUSH_THRESHOLD_OPS_KEY, componentSettings.getAsInt("flush_threshold", 5000)); this.flushThresholdOperations = componentSettings.getAsInt(FLUSH_THRESHOLD_OPS_KEY, componentSettings.getAsInt("flush_threshold", 5000));
this.flushThresholdSize = componentSettings.getAsBytesSize(FLUSH_THRESHOLD_SIZE_KEY, new ByteSizeValue(200, ByteSizeUnit.MB)); this.flushThresholdSize = componentSettings.getAsBytesSize(FLUSH_THRESHOLD_SIZE_KEY, new ByteSizeValue(200, ByteSizeUnit.MB));
this.flushThresholdPeriod = componentSettings.getAsTime(FLUSH_THRESHOLD_PERIOD_KEY, TimeValue.timeValueMinutes(30)); this.flushThresholdPeriod = componentSettings.getAsTime(FLUSH_THRESHOLD_PERIOD_KEY, TimeValue.timeValueMinutes(30));

View File

@ -30,6 +30,7 @@ import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -1182,6 +1183,77 @@ public class InternalEngineTests extends ElasticsearchTestCase {
} }
} }
@Slow
@Test
public void testEnableGcDeletes() throws Exception {
// Make sure enableGCDeletes == false works:
Settings settings = ImmutableSettings.builder()
.put(InternalEngine.INDEX_GC_DELETES, "0ms")
.build();
Engine engine = new InternalEngine(shardId, settings, threadPool,
engineSettingsService,
new ShardIndexingService(shardId, settings,
new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, engineSettingsService)),
null, store, createSnapshotDeletionPolicy(), createTranslog(), createMergePolicy(), createMergeScheduler(engineSettingsService),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
engine.start();
engine.enableGcDeletes(false);
// Add document
Document document = testDocument();
document.add(new TextField("value", "test1", Field.Store.YES));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, Lucene.STANDARD_ANALYZER, B_2, false);
engine.index(new Engine.Index(null, newUid("1"), doc, 1, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
// Delete document we just added:
engine.delete(new Engine.Delete("test", "1", newUid("1"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
// Get should not find the document
Engine.GetResult getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(false));
// Give the gc pruning logic a chance to kick in
Thread.sleep(1000);
if (randomBoolean()) {
engine.refresh(new Engine.Refresh("test"));
}
// Delete non-existent document
engine.delete(new Engine.Delete("test", "2", newUid("2"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
// Get should not find the document (we never indexed uid=2):
getResult = engine.get(new Engine.Get(true, newUid("2")));
assertThat(getResult.exists(), equalTo(false));
// Try to index uid=1 with a too-old version, should fail:
try {
engine.index(new Engine.Index(null, newUid("1"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
fail("did not hit expected exception");
} catch (VersionConflictEngineException vcee) {
// expected
}
// Get should still not find the document
getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(false));
// Try to index uid=2 with a too-old version, should fail:
try {
engine.index(new Engine.Index(null, newUid("2"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
fail("did not hit expected exception");
} catch (VersionConflictEngineException vcee) {
// expected
}
// Get should not find the document
getResult = engine.get(new Engine.Get(true, newUid("2")));
assertThat(getResult.exists(), equalTo(false));
}
protected Term newUid(String id) { protected Term newUid(String id) {
return new Term("_uid", id); return new Term("_uid", id);
} }

View File

@ -35,6 +35,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
@ -454,6 +455,11 @@ public class SimpleVersioningTests extends ElasticsearchIntegrationTest {
public String id; public String id;
public long version; public long version;
public boolean delete; public boolean delete;
@Override
public String toString() {
return "id=" + id + " version=" + version + " delete?=" + delete;
}
} }
@ -651,4 +657,134 @@ public class SimpleVersioningTests extends ElasticsearchIntegrationTest {
} }
} }
@Test
@Slow
public void testDeleteNotLost() throws Exception {
// We require only one shard for this test, so that the 2nd delete provokes pruning the deletes map:
client()
.admin()
.indices()
.prepareCreate("test")
.setSettings(ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1))
.execute().
actionGet();
ensureGreen();
HashMap<String,Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "10ms");
newSettings.put("index.refresh_interval", "-1");
client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(newSettings)
.execute()
.actionGet();
// Index a doc:
client()
.prepareIndex("test", "type", "id")
.setSource("foo", "bar")
.setOpType(IndexRequest.OpType.INDEX)
.setVersion(10)
.setVersionType(VersionType.EXTERNAL)
.execute()
.actionGet();
if (randomBoolean()) {
// Force refresh so the add is sometimes visible in the searcher:
refresh();
}
// Delete it
client()
.prepareDelete("test", "type", "id")
.setVersion(11)
.setVersionType(VersionType.EXTERNAL)
.execute()
.actionGet();
// Real-time get should reflect delete:
assertThat("doc should have been deleted",
client()
.prepareGet("test", "type", "id")
.execute()
.actionGet()
.getVersion(),
equalTo(-1L));
// ThreadPool.estimatedTimeInMillis has default granularity of 200 msec, so we must sleep at least that long; sleep much longer in
// case system is busy:
Thread.sleep(1000);
// Delete an unrelated doc (provokes pruning deletes from versionMap)
client()
.prepareDelete("test", "type", "id2")
.setVersion(11)
.setVersionType(VersionType.EXTERNAL)
.execute()
.actionGet();
// Real-time get should still reflect delete:
assertThat("doc should have been deleted",
client()
.prepareGet("test", "type", "id")
.execute()
.actionGet()
.getVersion(),
equalTo(-1L));
}
@Test
public void testGCDeletesZero() throws Exception {
createIndex("test");
ensureGreen();
// We test deletes, but can't rely on wall-clock delete GC:
HashMap<String,Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "0ms");
client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(newSettings)
.execute()
.actionGet();
// Index a doc:
client()
.prepareIndex("test", "type", "id")
.setSource("foo", "bar")
.setOpType(IndexRequest.OpType.INDEX)
.setVersion(10)
.setVersionType(VersionType.EXTERNAL)
.execute()
.actionGet();
if (randomBoolean()) {
// Force refresh so the add is sometimes visible in the searcher:
refresh();
}
// Delete it
client()
.prepareDelete("test", "type", "id")
.setVersion(11)
.setVersionType(VersionType.EXTERNAL)
.execute()
.actionGet();
// Real-time get should reflect delete even though index.gc_deletes is 0:
assertThat("doc should have been deleted",
client()
.prepareGet("test", "type", "id")
.execute()
.actionGet()
.getVersion(),
equalTo(-1L));
}
} }