Track deletes only in the tombstone map instead of maintaining as copy (#27868)

Today we maintain a copy of every delete in the live version maps. This is unnecessary
and might add quite some overhead if maps grow large. This change moves out the deletes
tracking into the tombstone map only and relies on the cleaning of tombstones when deletes
are collected.
This commit is contained in:
Simon Willnauer 2018-02-19 12:23:38 +01:00 committed by GitHub
parent 90030e008b
commit 56edb5eb3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 228 additions and 117 deletions

View File

@ -1541,4 +1541,9 @@ public abstract class Engine implements Closeable {
public boolean isRecovering() { public boolean isRecovering() {
return false; return false;
} }
/**
* Tries to prune buffered deletes from the version map.
*/
public abstract void maybePruneDeletes();
} }

View File

@ -1182,7 +1182,7 @@ public class InternalEngine extends Engine {
} }
throw e; throw e;
} }
maybePruneDeletedTombstones(); maybePruneDeletes();
return deleteResult; return deleteResult;
} }
@ -1311,7 +1311,8 @@ public class InternalEngine extends Engine {
} }
} }
private void maybePruneDeletedTombstones() { @Override
public void maybePruneDeletes() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it // It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis: // every 1/4 of gcDeletesInMillis:
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) { if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
@ -1401,7 +1402,7 @@ public class InternalEngine extends Engine {
// TODO: maybe we should just put a scheduled job in threadPool? // TODO: maybe we should just put a scheduled job in threadPool?
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
// for a long time: // for a long time:
maybePruneDeletedTombstones(); maybePruneDeletes();
mergeScheduler.refreshConfig(); mergeScheduler.refreshConfig();
} }
@ -1621,32 +1622,15 @@ public class InternalEngine extends Engine {
} }
private void pruneDeletedTombstones() { private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
versionMap.pruneTombstones(timeMSec, engineConfig.getIndexSettings().getGcDeletesInMillis());
// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...
// we only need to prune the deletes map; the current/old version maps are cleared on refresh:
for (Map.Entry<BytesRef, DeleteVersionValue> entry : versionMap.getAllTombstones()) {
BytesRef uid = entry.getKey();
try (Releasable ignored = versionMap.acquireLock(uid)) {
// can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
DeleteVersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (versionValue != null) {
if (timeMSec - versionValue.time > getGcDeletesInMillis()) {
versionMap.removeTombstoneUnderLock(uid);
}
}
}
}
lastDeleteVersionPruneTimeMSec = timeMSec; lastDeleteVersionPruneTimeMSec = timeMSec;
} }
// testing // testing
void clearDeletedTombstones() { void clearDeletedTombstones() {
versionMap.clearTombstones(); // clean with current time Long.MAX_VALUE and interval 0 since we use a greater than relationship here.
versionMap.pruneTombstones(Long.MAX_VALUE, 0);
} }
@Override @Override
@ -2181,7 +2165,7 @@ public class InternalEngine extends Engine {
public void onSettingsChanged() { public void onSettingsChanged() {
mergeScheduler.refreshConfig(); mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletedTombstones(); maybePruneDeletes();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
// this is an anti-viral settings you can only opt out for the entire index // this is an anti-viral settings you can only opt out for the entire index
// only if a shard starts up again due to relocation or if the index is closed // only if a shard starts up again due to relocation or if the index is closed

View File

@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/** Maps _uid value to its version information. */ /** Maps _uid value to its version information. */
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
@ -40,6 +41,10 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
private static final class VersionLookup { private static final class VersionLookup {
/** Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account
* for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM. */
final AtomicLong ramBytesUsed = new AtomicLong();
private static final VersionLookup EMPTY = new VersionLookup(Collections.emptyMap()); private static final VersionLookup EMPTY = new VersionLookup(Collections.emptyMap());
private final Map<BytesRef, VersionValue> map; private final Map<BytesRef, VersionValue> map;
@ -55,6 +60,10 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
// map reference itself. // map reference itself.
private boolean unsafe; private boolean unsafe;
// minimum timestamp of delete operations that were made while this map was active. this is used to make sure they are kept in
// the tombstone
private final AtomicLong minDeleteTimestamp = new AtomicLong(Long.MAX_VALUE);
private VersionLookup(Map<BytesRef, VersionValue> map) { private VersionLookup(Map<BytesRef, VersionValue> map) {
this.map = map; this.map = map;
} }
@ -71,7 +80,6 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
return map.isEmpty(); return map.isEmpty();
} }
int size() { int size() {
return map.size(); return map.size();
} }
@ -83,6 +91,16 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
void markAsUnsafe() { void markAsUnsafe() {
unsafe = true; unsafe = true;
} }
public VersionValue remove(BytesRef uid) {
return map.remove(uid);
}
public void updateMinDeletedTimestamp(DeleteVersionValue delete) {
long time = delete.time;
minDeleteTimestamp.updateAndGet(prev -> Math.min(time, prev));
}
} }
private static final class Maps { private static final class Maps {
@ -98,6 +116,7 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
boolean needsSafeAccess; boolean needsSafeAccess;
final boolean previousMapsNeededSafeAccess; final boolean previousMapsNeededSafeAccess;
Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) { Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) {
this.current = current; this.current = current;
this.old = old; this.old = old;
@ -123,8 +142,8 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
* Builds a new map for the refresh transition this should be called in beforeRefresh() * Builds a new map for the refresh transition this should be called in beforeRefresh()
*/ */
Maps buildTransitionMap() { Maps buildTransitionMap() {
return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())), return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())), current,
current, shouldInheritSafeAccess()); shouldInheritSafeAccess());
} }
/** /**
@ -133,6 +152,39 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
Maps invalidateOldMap() { Maps invalidateOldMap() {
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess); return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
} }
void put(BytesRef uid, VersionValue version) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
long ramAccounting = BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed;
VersionValue previousValue = current.put(uid, version);
ramAccounting += previousValue == null ? 0 : -(BASE_BYTES_PER_CHM_ENTRY + previousValue.ramBytesUsed() + uidRAMBytesUsed);
adjustRam(ramAccounting);
}
void adjustRam(long value) {
if (value != 0) {
long v = current.ramBytesUsed.addAndGet(value);
assert v >= 0 : "bytes=" + v;
}
}
void remove(BytesRef uid, DeleteVersionValue deleted) {
VersionValue previousValue = current.remove(uid);
current.updateMinDeletedTimestamp(deleted);
if (previousValue != null) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
adjustRam(-(BASE_BYTES_PER_CHM_ENTRY + previousValue.ramBytesUsed() + uidRAMBytesUsed));
}
if (old != VersionLookup.EMPTY) {
// we also need to remove it from the old map here to make sure we don't read this stale value while
// we are in the middle of a refresh. Most of the time the old map is an empty map so we can skip it there.
old.remove(uid);
}
}
long getMinDeleteTimestamp() {
return Math.min(current.minDeleteTimestamp.get(), old.minDeleteTimestamp.get());
}
} }
// All deletes also go here, and delete "tombstones" are retained after refresh: // All deletes also go here, and delete "tombstones" are retained after refresh:
@ -178,12 +230,6 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF; BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
} }
/**
* Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account
* for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM.
*/
final AtomicLong ramBytesUsedCurrent = new AtomicLong();
/** /**
* Tracks bytes used by tombstones (deletes) * Tracks bytes used by tombstones (deletes)
*/ */
@ -199,7 +245,6 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null; assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null;
// This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous // This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous
// line and this one, but that should be minor, and the error won't accumulate over time: // line and this one, but that should be minor, and the error won't accumulate over time:
ramBytesUsedCurrent.set(0);
} }
@Override @Override
@ -292,48 +337,28 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
assert keyedLock.isHeldByCurrentThread(uid); assert keyedLock.isHeldByCurrentThread(uid);
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
final VersionValue prev = maps.current.put(uid, version);
if (prev != null) {
// Deduct RAM for the version we just replaced:
long prevBytes = BASE_BYTES_PER_CHM_ENTRY;
if (prev.isDelete() == false) {
prevBytes += prev.ramBytesUsed() + uidRAMBytesUsed;
}
ramBytesUsedCurrent.addAndGet(-prevBytes);
}
// Add RAM for the new version:
long newBytes = BASE_BYTES_PER_CHM_ENTRY;
if (version.isDelete() == false) { if (version.isDelete() == false) {
newBytes += version.ramBytesUsed() + uidRAMBytesUsed; maps.put(uid, version);
} removeTombstoneUnderLock(uid);
ramBytesUsedCurrent.addAndGet(newBytes);
final VersionValue prevTombstone;
if (version.isDelete()) {
// Also enroll the delete into tombstones, and account for its RAM too:
prevTombstone = tombstones.put(uid, (DeleteVersionValue) version);
// We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up
// on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift
// the accounting to current:
ramBytesUsedTombstones.addAndGet(BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);
if (prevTombstone == null && prev != null && prev.isDelete()) {
// If prev was a delete that had already been removed from tombstones, then current was already accounting for the
// BytesRef/VersionValue RAM, so we now deduct that as well:
ramBytesUsedCurrent.addAndGet(-(prev.ramBytesUsed() + uidRAMBytesUsed));
}
} else { } else {
// UID came back to life so we remove the tombstone: DeleteVersionValue versionValue = (DeleteVersionValue) version;
prevTombstone = tombstones.remove(uid); putTombstone(uid, versionValue);
maps.remove(uid, versionValue);
} }
}
private void putTombstone(BytesRef uid, DeleteVersionValue version) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
// Also enroll the delete into tombstones, and account for its RAM too:
final VersionValue prevTombstone = tombstones.put(uid, version);
long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);
// Deduct tombstones bytes used for the version we just removed or replaced: // Deduct tombstones bytes used for the version we just removed or replaced:
if (prevTombstone != null) { if (prevTombstone != null) {
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed)); accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed);
assert v >= 0 : "bytes=" + v; }
if (accountRam != 0) {
long v = ramBytesUsedTombstones.addAndGet(accountRam);
assert v >= 0: "bytes=" + v;
} }
} }
@ -343,19 +368,34 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
void removeTombstoneUnderLock(BytesRef uid) { void removeTombstoneUnderLock(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid); assert keyedLock.isHeldByCurrentThread(uid);
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
final VersionValue prev = tombstones.remove(uid); final VersionValue prev = tombstones.remove(uid);
if (prev != null) { if (prev != null) {
assert prev.isDelete(); assert prev.isDelete();
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed));
assert v >= 0 : "bytes=" + v; assert v >= 0 : "bytes=" + v;
} }
final VersionValue curVersion = maps.current.get(uid); }
if (curVersion != null && curVersion.isDelete()) {
// We now shift accounting of the BytesRef from tombstones to current, because a refresh would clear this RAM. This should be void pruneTombstones(long currentTime, long pruneInterval) {
// uncommon, because with the default refresh=1s and gc_deletes=60s, deletes should be cleared from current long before we drop for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
// them from tombstones: BytesRef uid = entry.getKey();
ramBytesUsedCurrent.addAndGet(curVersion.ramBytesUsed() + uidRAMBytesUsed); try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get
// the lock once per set?
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
DeleteVersionValue versionValue = tombstones.get(uid);
if (versionValue != null) {
// check if the value is old enough to be removed
final boolean isTooOld = currentTime - versionValue.time > pruneInterval;
if (isTooOld) {
// version value can't be removed it's
// not yet flushed to lucene ie. it's part of this current maps object
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
if (isNotTrackedByCurrentMaps) {
removeTombstoneUnderLock(uid);
}
}
}
}
} }
} }
@ -367,28 +407,12 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
return tombstones.get(uid); return tombstones.get(uid);
} }
/**
* Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd).
*/
Iterable<Map.Entry<BytesRef, DeleteVersionValue>> getAllTombstones() {
return tombstones.entrySet();
}
/**
* clears all tombstones ops
*/
void clearTombstones() {
tombstones.clear();
}
/** /**
* Called when this index is closed. * Called when this index is closed.
*/ */
synchronized void clear() { synchronized void clear() {
maps = new Maps(); maps = new Maps();
tombstones.clear(); tombstones.clear();
ramBytesUsedCurrent.set(0);
// NOTE: we can't zero this here, because a refresh thread could be calling InternalEngine.pruneDeletedTombstones at the same time, // NOTE: we can't zero this here, because a refresh thread could be calling InternalEngine.pruneDeletedTombstones at the same time,
// and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index // and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index
// is being closed: // is being closed:
@ -397,7 +421,7 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
@Override @Override
public long ramBytesUsed() { public long ramBytesUsed() {
return ramBytesUsedCurrent.get() + ramBytesUsedTombstones.get(); return maps.current.ramBytesUsed.get() + ramBytesUsedTombstones.get();
} }
/** /**
@ -405,7 +429,7 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
* don't clear on refresh. * don't clear on refresh.
*/ */
long ramBytesUsedForRefresh() { long ramBytesUsedForRefresh() {
return ramBytesUsedCurrent.get(); return maps.current.ramBytesUsed.get();
} }
@Override @Override
@ -421,6 +445,11 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
return maps.current.map; return maps.current.map;
} }
/** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */
Map<BytesRef, DeleteVersionValue> getAllTombstones() {
return tombstones;
}
/** /**
* Acquires a releaseable lock for the given uId. All *UnderLock methods require * Acquires a releaseable lock for the given uId. All *UnderLock methods require
* this lock to be hold by the caller otherwise the visibility guarantees of this version * this lock to be hold by the caller otherwise the visibility guarantees of this version

View File

@ -2500,13 +2500,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// lets skip this refresh since we are search idle and // lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
// cause the next schedule to refresh. // cause the next schedule to refresh.
setRefreshPending(); final Engine engine = getEngine();
engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
setRefreshPending(engine);
return false; return false;
} else { } else {
refresh("schedule"); refresh("schedule");
return true; return true;
} }
} }
final Engine engine = getEngine();
engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
return false; return false;
} }
@ -2524,8 +2528,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return lastSearcherAccess.get(); return lastSearcherAccess.get();
} }
private void setRefreshPending() { private void setRefreshPending(Engine engine) {
Engine engine = getEngine();
Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation(); Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation();
Translog.Location location; Translog.Location location;
do { do {

View File

@ -26,7 +26,6 @@ import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Assertions; import org.elasticsearch.Assertions;
import org.elasticsearch.bootstrap.JavaVersion; import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
@ -37,6 +36,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.StreamSupport;
public class LiveVersionMapTests extends ESTestCase { public class LiveVersionMapTests extends ESTestCase {
@ -84,19 +86,21 @@ public class LiveVersionMapTests extends ESTestCase {
public void testBasics() throws IOException { public void testBasics() throws IOException {
LiveVersionMap map = new LiveVersionMap(); LiveVersionMap map = new LiveVersionMap();
try (Releasable r = map.acquireLock(uid("test"))) { try (Releasable r = map.acquireLock(uid("test"))) {
map.putUnderLock(uid("test"), new VersionValue(1, 1, 1)); map.putUnderLock(uid("test"), new VersionValue(1,1,1));
assertEquals(new VersionValue(1, 1, 1), map.getUnderLock(uid("test"))); assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
map.beforeRefresh(); map.beforeRefresh();
assertEquals(new VersionValue(1, 1, 1), map.getUnderLock(uid("test"))); assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
map.afterRefresh(randomBoolean()); map.afterRefresh(randomBoolean());
assertNull(map.getUnderLock(uid("test"))); assertNull(map.getUnderLock(uid("test")));
map.putUnderLock(uid("test"), new DeleteVersionValue(1, 1, 1, Long.MAX_VALUE));
assertEquals(new DeleteVersionValue(1, 1, 1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1));
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
map.beforeRefresh(); map.beforeRefresh();
assertEquals(new DeleteVersionValue(1, 1, 1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
map.afterRefresh(randomBoolean()); map.afterRefresh(randomBoolean());
assertEquals(new DeleteVersionValue(1, 1, 1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
map.removeTombstoneUnderLock(uid("test")); map.pruneTombstones(2, 0);
assertNull(map.getUnderLock(uid("test"))); assertNull(map.getUnderLock(uid("test")));
} }
} }
@ -109,6 +113,7 @@ public class LiveVersionMapTests extends ESTestCase {
} }
List<BytesRef> keyList = new ArrayList<>(keySet); List<BytesRef> keyList = new ArrayList<>(keySet);
ConcurrentHashMap<BytesRef, VersionValue> values = new ConcurrentHashMap<>(); ConcurrentHashMap<BytesRef, VersionValue> values = new ConcurrentHashMap<>();
ConcurrentHashMap<BytesRef, DeleteVersionValue> deletes = new ConcurrentHashMap<>();
LiveVersionMap map = new LiveVersionMap(); LiveVersionMap map = new LiveVersionMap();
int numThreads = randomIntBetween(2, 5); int numThreads = randomIntBetween(2, 5);
@ -116,6 +121,8 @@ public class LiveVersionMapTests extends ESTestCase {
CountDownLatch startGun = new CountDownLatch(numThreads); CountDownLatch startGun = new CountDownLatch(numThreads);
CountDownLatch done = new CountDownLatch(numThreads); CountDownLatch done = new CountDownLatch(numThreads);
int randomValuesPerThread = randomIntBetween(5000, 20000); int randomValuesPerThread = randomIntBetween(5000, 20000);
AtomicLong clock = new AtomicLong(0);
AtomicLong lastPrunedTimestamp = new AtomicLong(-1);
for (int j = 0; j < threads.length; j++) { for (int j = 0; j < threads.length; j++) {
threads[j] = new Thread(() -> { threads[j] = new Thread(() -> {
startGun.countDown(); startGun.countDown();
@ -128,33 +135,39 @@ public class LiveVersionMapTests extends ESTestCase {
try { try {
for (int i = 0; i < randomValuesPerThread; ++i) { for (int i = 0; i < randomValuesPerThread; ++i) {
BytesRef bytesRef = randomFrom(random(), keyList); BytesRef bytesRef = randomFrom(random(), keyList);
final long clockTick = clock.get();
try (Releasable r = map.acquireLock(bytesRef)) { try (Releasable r = map.acquireLock(bytesRef)) {
VersionValue versionValue = values.computeIfAbsent(bytesRef, VersionValue versionValue = values.computeIfAbsent(bytesRef,
v -> new VersionValue(randomLong(), randomLong(), randomLong())); v -> new VersionValue(randomLong(), randomLong(), randomLong()));
boolean isDelete = versionValue instanceof DeleteVersionValue; boolean isDelete = versionValue instanceof DeleteVersionValue;
if (isDelete) { if (isDelete) {
map.removeTombstoneUnderLock(bytesRef); map.removeTombstoneUnderLock(bytesRef);
deletes.remove(bytesRef);
} }
if (isDelete == false && rarely()) { if (isDelete == false && rarely()) {
versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1,
versionValue.term, Long.MAX_VALUE); versionValue.term, clock.getAndIncrement());
deletes.put(bytesRef, (DeleteVersionValue) versionValue);
} else { } else {
versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term); versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term);
} }
values.put(bytesRef, versionValue); values.put(bytesRef, versionValue);
map.putUnderLock(bytesRef, versionValue); map.putUnderLock(bytesRef, versionValue);
} }
if (rarely()) {
map.pruneTombstones(clockTick, 0);
// timestamp we pruned the deletes
lastPrunedTimestamp.updateAndGet(prev -> Math.max(clockTick, prev)); // make sure we track the latest
}
} }
} finally { } finally {
done.countDown(); done.countDown();
} }
}); });
threads[j].start(); threads[j].start();
} }
do { do {
Map<BytesRef, VersionValue> valueMap = new HashMap<>(map.getAllCurrent()); final Map<BytesRef, VersionValue> valueMap = new HashMap<>(map.getAllCurrent());
map.beforeRefresh(); map.beforeRefresh();
valueMap.forEach((k, v) -> { valueMap.forEach((k, v) -> {
try (Releasable r = map.acquireLock(k)) { try (Releasable r = map.acquireLock(k)) {
@ -190,13 +203,33 @@ public class LiveVersionMapTests extends ESTestCase {
assertNotNull(versionValue); assertNotNull(versionValue);
assertEquals(v, versionValue); assertEquals(v, versionValue);
}); });
Runnable assertTombstones = () ->
map.getAllTombstones().entrySet().forEach(e -> {
VersionValue versionValue = values.get(e.getKey());
assertNotNull(versionValue);
assertEquals(e.getValue(), versionValue);
assertTrue(versionValue instanceof DeleteVersionValue);
});
assertTombstones.run();
map.beforeRefresh();
assertTombstones.run();
map.afterRefresh(false);
assertTombstones.run();
map.getAllTombstones().forEach(e -> { deletes.entrySet().forEach(e -> {
VersionValue versionValue = values.get(e.getKey()); try (Releasable r = map.acquireLock(e.getKey())) {
assertNotNull(versionValue); VersionValue value = map.getUnderLock(e.getKey());
assertEquals(e.getValue(), versionValue); // here we keep track of the deletes and ensure that all deletes that are not visible anymore ie. not in the map
assertTrue(versionValue instanceof DeleteVersionValue); // have a timestamp that is smaller or equal to the maximum timestamp that we pruned on
if (value == null) {
assertTrue(e.getValue().time + " > " + lastPrunedTimestamp.get(), e.getValue().time <= lastPrunedTimestamp.get());
} else {
assertEquals(value, e.getValue());
}
}
}); });
map.pruneTombstones(clock.incrementAndGet(), 0);
assertEquals(0, StreamSupport.stream(map.getAllTombstones().entrySet().spliterator(), false).count());
} }
public void testCarryOnSafeAccess() throws IOException { public void testCarryOnSafeAccess() throws IOException {
@ -258,4 +291,61 @@ public class LiveVersionMapTests extends ESTestCase {
assertTrue(map.isSafeAccessRequired()); assertTrue(map.isSafeAccessRequired());
} }
} }
public void testAddAndDeleteRefreshConcurrently() throws IOException, InterruptedException {
LiveVersionMap map = new LiveVersionMap();
int numIters = randomIntBetween(1000, 5000);
AtomicBoolean done = new AtomicBoolean(false);
AtomicLong version = new AtomicLong();
CountDownLatch start = new CountDownLatch(2);
BytesRef uid = uid("1");
VersionValue initialVersion = new VersionValue(version.incrementAndGet(), 1, 1);
try (Releasable ignore = map.acquireLock(uid)) {
map.putUnderLock(uid, initialVersion);
}
Thread t = new Thread(() -> {
start.countDown();
try {
start.await();
VersionValue nextVersionValue = initialVersion;
for (int i = 0; i < numIters; i++) {
try (Releasable ignore = map.acquireLock(uid)) {
VersionValue underLock = map.getUnderLock(uid);
if (underLock != null) {
assertEquals(underLock, nextVersionValue);
} else {
underLock = nextVersionValue;
}
if (underLock.isDelete()) {
nextVersionValue = new VersionValue(version.incrementAndGet(), 1, 1);
} else if (randomBoolean()) {
nextVersionValue = new VersionValue(version.incrementAndGet(), 1, 1);
} else {
nextVersionValue = new DeleteVersionValue(version.incrementAndGet(), 1, 1, 0);
}
map.putUnderLock(uid, nextVersionValue);
}
}
} catch (Exception e) {
throw new AssertionError(e);
} finally {
done.set(true);
}
});
t.start();
start.countDown();
while(done.get() == false) {
map.beforeRefresh();
Thread.yield();
map.afterRefresh(false);
}
t.join();
try (Releasable ignore = map.acquireLock(uid)) {
VersionValue underLock = map.getUnderLock(uid);
if (underLock != null) {
assertEquals(version.get(), underLock.version);
}
}
}
} }