Reset LiveVersionMap on sync commit (#27534)

Today we carry on the size of the live version map to ensure that
we minimze rehashing. Yet, once we are idle or we can issue a sync-commit
we can resize it to defaults to free up memory.

Relates to #27516
This commit is contained in:
Simon Willnauer 2017-11-30 20:44:05 +01:00 committed by GitHub
parent d30c887893
commit 67cd1e9c5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 227 additions and 11 deletions

View File

@ -44,6 +44,24 @@ class DeleteVersionValue extends VersionValue {
return BASE_RAM_BYTES_USED; return BASE_RAM_BYTES_USED;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
DeleteVersionValue that = (DeleteVersionValue) o;
return time == that.time;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (int) (time ^ (time >>> 32));
return result;
}
@Override @Override
public String toString() { public String toString() {
return "DeleteVersionValue{" + return "DeleteVersionValue{" +

View File

@ -560,7 +560,7 @@ public class InternalEngine extends Engine {
ensureOpen(); ensureOpen();
SearcherScope scope; SearcherScope scope;
if (get.realtime()) { if (get.realtime()) {
VersionValue versionValue = versionMap.getUnderLock(get.uid()); VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
if (versionValue != null) { if (versionValue != null) {
if (versionValue.isDelete()) { if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS; return GetResult.NOT_EXISTS;
@ -598,7 +598,7 @@ public class InternalEngine extends Engine {
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
final OpVsLuceneDocStatus status; final OpVsLuceneDocStatus status;
final VersionValue versionValue = versionMap.getUnderLock(op.uid()); final VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
assert incrementVersionLookup(); assert incrementVersionLookup();
if (versionValue != null) { if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo || if (op.seqNo() > versionValue.seqNo ||
@ -635,7 +635,7 @@ public class InternalEngine extends Engine {
/** resolves the current version of the document, returning null if not found */ /** resolves the current version of the document, returning null if not found */
private VersionValue resolveDocVersion(final Operation op) throws IOException { private VersionValue resolveDocVersion(final Operation op) throws IOException {
assert incrementVersionLookup(); // used for asserting in tests assert incrementVersionLookup(); // used for asserting in tests
VersionValue versionValue = versionMap.getUnderLock(op.uid()); VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
if (versionValue == null) { if (versionValue == null) {
assert incrementIndexVersionLookup(); // used for asserting in tests assert incrementIndexVersionLookup(); // used for asserting in tests
final long currentVersion = loadCurrentVersionFromIndex(op.uid()); final long currentVersion = loadCurrentVersionFromIndex(op.uid());
@ -1048,7 +1048,7 @@ public class InternalEngine extends Engine {
* Asserts that the doc in the index operation really doesn't exist * Asserts that the doc in the index operation really doesn't exist
*/ */
private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException { private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException {
final VersionValue versionValue = versionMap.getUnderLock(index.uid()); final VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
if (versionValue != null) { if (versionValue != null) {
if (versionValue.isDelete() == false || allowDeleted == false) { if (versionValue.isDelete() == false || allowDeleted == false) {
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")"); throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
@ -1376,6 +1376,8 @@ public class InternalEngine extends Engine {
commitIndexWriter(indexWriter, translog, syncId); commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId); logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
// we are guaranteed to have no operations in the version map here!
versionMap.adjustMapSizeUnderLock();
return SyncedFlushResult.SUCCESS; return SyncedFlushResult.SUCCESS;
} catch (IOException ex) { } catch (IOException ex) {
maybeFailEngine("sync commit", ex); maybeFailEngine("sync commit", ex);

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.engine; package org.elasticsearch.index.engine;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
@ -35,6 +34,18 @@ import java.util.concurrent.atomic.AtomicLong;
/** Maps _uid value to its version information. */ /** Maps _uid value to its version information. */
class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
/**
* Resets the internal map and adjusts it's capacity as if there were no indexing operations.
* This must be called under write lock in the engine
*/
void adjustMapSizeUnderLock() {
if (maps.current.isEmpty() == false || maps.old.isEmpty() == false) {
assert false : "map must be empty"; // fail hard if not empty and fail with assertion in tests to ensure we never swallow it
throw new IllegalStateException("map must be empty");
}
maps = new Maps();
}
private static class Maps { private static class Maps {
// All writes (adds and deletes) go into here: // All writes (adds and deletes) go into here:
@ -50,7 +61,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
Maps() { Maps() {
this(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(), this(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(),
ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency()); Collections.emptyMap());
} }
} }
@ -121,21 +132,21 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
} }
/** Returns the live version (add or delete) for this uid. */ /** Returns the live version (add or delete) for this uid. */
VersionValue getUnderLock(final Term uid) { VersionValue getUnderLock(final BytesRef uid) {
Maps currentMaps = maps; Maps currentMaps = maps;
// First try to get the "live" value: // First try to get the "live" value:
VersionValue value = currentMaps.current.get(uid.bytes()); VersionValue value = currentMaps.current.get(uid);
if (value != null) { if (value != null) {
return value; return value;
} }
value = currentMaps.old.get(uid.bytes()); value = currentMaps.old.get(uid);
if (value != null) { if (value != null) {
return value; return value;
} }
return tombstones.get(uid.bytes()); return tombstones.get(uid);
} }
/** Adds this uid/version to the pending adds map. */ /** Adds this uid/version to the pending adds map. */
@ -250,4 +261,8 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
// TODO: useful to break down RAM usage here? // TODO: useful to break down RAM usage here?
return Collections.emptyList(); return Collections.emptyList();
} }
}
/** Returns the current internal versions as a point in time snapshot*/
Map<BytesRef, VersionValue> getAllCurrent() {
return maps.current;
}}

View File

@ -57,10 +57,31 @@ class VersionValue implements Accountable {
return Collections.emptyList(); return Collections.emptyList();
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
VersionValue that = (VersionValue) o;
if (version != that.version) return false;
if (seqNo != that.seqNo) return false;
return term == that.term;
}
@Override
public int hashCode() {
int result = (int) (version ^ (version >>> 32));
result = 31 * result + (int) (seqNo ^ (seqNo >>> 32));
result = 31 * result + (int) (term ^ (term >>> 32));
return result;
}
@Override @Override
public String toString() { public String toString() {
return "VersionValue{" + return "VersionValue{" +
"version=" + version + "version=" + version +
", seqNo=" + seqNo + ", seqNo=" + seqNo +
", term=" + term + ", term=" + term +
'}'; '}';

View File

@ -19,12 +19,25 @@
package org.elasticsearch.index.engine; package org.elasticsearch.index.engine;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.RamUsageTester; import org.apache.lucene.util.RamUsageTester;
import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.bootstrap.JavaVersion; import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
public class LiveVersionMapTests extends ESTestCase { public class LiveVersionMapTests extends ESTestCase {
public void testRamBytesUsed() throws Exception { public void testRamBytesUsed() throws Exception {
@ -57,4 +70,151 @@ public class LiveVersionMapTests extends ESTestCase {
assertEquals(actualRamBytesUsed, estimatedRamBytesUsed, actualRamBytesUsed / 4); assertEquals(actualRamBytesUsed, estimatedRamBytesUsed, actualRamBytesUsed / 4);
} }
private BytesRef uid(String string) {
BytesRefBuilder builder = new BytesRefBuilder();
builder.copyChars(string);
// length of the array must be the same as the len of the ref... there is an assertion in LiveVersionMap#putUnderLock
return BytesRef.deepCopyOf(builder.get());
}
public void testBasics() throws IOException {
LiveVersionMap map = new LiveVersionMap();
map.putUnderLock(uid("test"), new VersionValue(1,1,1));
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
map.beforeRefresh();
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
map.afterRefresh(randomBoolean());
assertNull(map.getUnderLock(uid("test")));
map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1, Long.MAX_VALUE));
assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
map.beforeRefresh();
assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
map.afterRefresh(randomBoolean());
assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
map.removeTombstoneUnderLock(uid("test"));
assertNull(map.getUnderLock(uid("test")));
}
public void testAdjustMapSizeUnderLock() throws IOException {
LiveVersionMap map = new LiveVersionMap();
map.putUnderLock(uid("test"), new VersionValue(1,1,1));
boolean withinRefresh = randomBoolean();
if (withinRefresh) {
map.beforeRefresh();
}
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
final String msg;
if (Assertions.ENABLED) {
msg = expectThrows(AssertionError.class, map::adjustMapSizeUnderLock).getMessage();
} else {
msg = expectThrows(IllegalStateException.class, map::adjustMapSizeUnderLock).getMessage();
}
assertEquals("map must be empty", msg);
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
if (withinRefresh == false) {
map.beforeRefresh();
}
map.afterRefresh(randomBoolean());
Map<BytesRef, VersionValue> allCurrent = map.getAllCurrent();
map.adjustMapSizeUnderLock();
assertNotSame(allCurrent, map.getAllCurrent());
}
public void testConcurrently() throws IOException, InterruptedException {
HashSet<BytesRef> keySet = new HashSet<>();
int numKeys = randomIntBetween(50, 200);
for (int i = 0; i < numKeys; i++) {
keySet.add(uid(TestUtil.randomSimpleString(random(), 10, 20)));
}
List<BytesRef> keyList = new ArrayList<>(keySet);
ConcurrentHashMap<BytesRef, VersionValue> values = new ConcurrentHashMap<>();
KeyedLock<BytesRef> keyedLock = new KeyedLock<>();
LiveVersionMap map = new LiveVersionMap();
int numThreads = randomIntBetween(2, 5);
Thread[] threads = new Thread[numThreads];
CountDownLatch startGun = new CountDownLatch(numThreads);
CountDownLatch done = new CountDownLatch(numThreads);
int randomValuesPerThread = randomIntBetween(5000, 20000);
for (int j = 0; j < threads.length; j++) {
threads[j] = new Thread(() -> {
startGun.countDown();
try {
startGun.await();
} catch (InterruptedException e) {
done.countDown();
throw new AssertionError(e);
}
try {
for (int i = 0; i < randomValuesPerThread; ++i) {
BytesRef bytesRef = randomFrom(random(), keyList);
try (Releasable r = keyedLock.acquire(bytesRef)) {
VersionValue versionValue = values.computeIfAbsent(bytesRef,
v -> new VersionValue(randomLong(), randomLong(), randomLong()));
boolean isDelete = versionValue instanceof DeleteVersionValue;
if (isDelete) {
map.removeTombstoneUnderLock(bytesRef);
}
if (isDelete == false && rarely()) {
versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1,
versionValue.term, Long.MAX_VALUE);
} else {
versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term);
}
values.put(bytesRef, versionValue);
map.putUnderLock(bytesRef, versionValue);
}
}
} finally {
done.countDown();
}
});
threads[j].start();
}
do {
Map<BytesRef, VersionValue> valueMap = new HashMap<>(map.getAllCurrent());
map.beforeRefresh();
valueMap.forEach((k, v) -> {
VersionValue actualValue = map.getUnderLock(k);
assertNotNull(actualValue);
assertTrue(v.version <= actualValue.version);
});
map.afterRefresh(randomBoolean());
valueMap.forEach((k, v) -> {
VersionValue actualValue = map.getUnderLock(k);
if (actualValue != null) {
if (actualValue instanceof DeleteVersionValue) {
assertTrue(v.version <= actualValue.version); // deletes can be the same version
} else {
assertTrue(v.version < actualValue.version);
}
}
});
if (randomBoolean()) {
Thread.yield();
}
} while (done.getCount() != 0);
for (int j = 0; j < threads.length; j++) {
threads[j].join();
}
map.getAllCurrent().forEach((k, v) -> {
VersionValue versionValue = values.get(k);
assertNotNull(versionValue);
assertEquals(v, versionValue);
});
map.getAllTombstones().forEach(e -> {
VersionValue versionValue = values.get(e.getKey());
assertNotNull(versionValue);
assertEquals(e.getValue(), versionValue);
assertTrue(versionValue instanceof DeleteVersionValue);
});
}
} }