From 67cd1e9c5f8957c28c954a407fd0f9a3401c869e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 30 Nov 2017 20:44:05 +0100 Subject: [PATCH] Reset LiveVersionMap on sync commit (#27534) Today we carry on the size of the live version map to ensure that we minimze rehashing. Yet, once we are idle or we can issue a sync-commit we can resize it to defaults to free up memory. Relates to #27516 --- .../index/engine/DeleteVersionValue.java | 18 ++ .../index/engine/InternalEngine.java | 10 +- .../index/engine/LiveVersionMap.java | 29 +++- .../index/engine/VersionValue.java | 21 +++ .../index/engine/LiveVersionMapTests.java | 160 ++++++++++++++++++ 5 files changed, 227 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java index 899c06eb196..d2b2e24c616 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java @@ -44,6 +44,24 @@ class DeleteVersionValue extends VersionValue { return BASE_RAM_BYTES_USED; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + DeleteVersionValue that = (DeleteVersionValue) o; + + return time == that.time; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (int) (time ^ (time >>> 32)); + return result; + } + @Override public String toString() { return "DeleteVersionValue{" + diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6449c979de4..82461ca4a99 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -560,7 +560,7 @@ public class InternalEngine extends Engine { ensureOpen(); SearcherScope scope; if (get.realtime()) { - VersionValue versionValue = versionMap.getUnderLock(get.uid()); + VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes()); if (versionValue != null) { if (versionValue.isDelete()) { return GetResult.NOT_EXISTS; @@ -598,7 +598,7 @@ public class InternalEngine extends Engine { private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; - final VersionValue versionValue = versionMap.getUnderLock(op.uid()); + final VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { if (op.seqNo() > versionValue.seqNo || @@ -635,7 +635,7 @@ public class InternalEngine extends Engine { /** resolves the current version of the document, returning null if not found */ private VersionValue resolveDocVersion(final Operation op) throws IOException { assert incrementVersionLookup(); // used for asserting in tests - VersionValue versionValue = versionMap.getUnderLock(op.uid()); + VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes()); if (versionValue == null) { assert incrementIndexVersionLookup(); // used for asserting in tests final long currentVersion = loadCurrentVersionFromIndex(op.uid()); @@ -1048,7 +1048,7 @@ public class InternalEngine extends Engine { * Asserts that the doc in the index operation really doesn't exist */ private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException { - final VersionValue versionValue = versionMap.getUnderLock(index.uid()); + final VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); if (versionValue != null) { if (versionValue.isDelete() == false || allowDeleted == false) { throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")"); @@ -1376,6 +1376,8 @@ public class InternalEngine extends Engine { commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + // we are guaranteed to have no operations in the version map here! + versionMap.adjustMapSizeUnderLock(); return SyncedFlushResult.SUCCESS; } catch (IOException ex) { maybeFailEngine("sync commit", ex); diff --git a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index aef41d9d162..48d57ee7eec 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.engine; -import org.apache.lucene.index.Term; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.BytesRef; @@ -35,6 +34,18 @@ import java.util.concurrent.atomic.AtomicLong; /** Maps _uid value to its version information. */ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { + /** + * Resets the internal map and adjusts it's capacity as if there were no indexing operations. + * This must be called under write lock in the engine + */ + void adjustMapSizeUnderLock() { + if (maps.current.isEmpty() == false || maps.old.isEmpty() == false) { + assert false : "map must be empty"; // fail hard if not empty and fail with assertion in tests to ensure we never swallow it + throw new IllegalStateException("map must be empty"); + } + maps = new Maps(); + } + private static class Maps { // All writes (adds and deletes) go into here: @@ -50,7 +61,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { Maps() { this(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(), - ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()); + Collections.emptyMap()); } } @@ -121,21 +132,21 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { } /** Returns the live version (add or delete) for this uid. */ - VersionValue getUnderLock(final Term uid) { + VersionValue getUnderLock(final BytesRef uid) { Maps currentMaps = maps; // First try to get the "live" value: - VersionValue value = currentMaps.current.get(uid.bytes()); + VersionValue value = currentMaps.current.get(uid); if (value != null) { return value; } - value = currentMaps.old.get(uid.bytes()); + value = currentMaps.old.get(uid); if (value != null) { return value; } - return tombstones.get(uid.bytes()); + return tombstones.get(uid); } /** Adds this uid/version to the pending adds map. */ @@ -250,4 +261,8 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { // TODO: useful to break down RAM usage here? return Collections.emptyList(); } -} + + /** Returns the current internal versions as a point in time snapshot*/ + Map getAllCurrent() { + return maps.current; + }} diff --git a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java index f3d9618838f..e2a2614d6c1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -57,10 +57,31 @@ class VersionValue implements Accountable { return Collections.emptyList(); } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + VersionValue that = (VersionValue) o; + + if (version != that.version) return false; + if (seqNo != that.seqNo) return false; + return term == that.term; + } + + @Override + public int hashCode() { + int result = (int) (version ^ (version >>> 32)); + result = 31 * result + (int) (seqNo ^ (seqNo >>> 32)); + result = 31 * result + (int) (term ^ (term >>> 32)); + return result; + } + @Override public String toString() { return "VersionValue{" + "version=" + version + + ", seqNo=" + seqNo + ", term=" + term + '}'; diff --git a/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java b/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java index 97799f8c46a..77e5b55ac57 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java @@ -19,12 +19,25 @@ package org.elasticsearch.index.engine; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.RamUsageTester; import org.apache.lucene.util.TestUtil; +import org.elasticsearch.Assertions; import org.elasticsearch.bootstrap.JavaVersion; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + public class LiveVersionMapTests extends ESTestCase { public void testRamBytesUsed() throws Exception { @@ -57,4 +70,151 @@ public class LiveVersionMapTests extends ESTestCase { assertEquals(actualRamBytesUsed, estimatedRamBytesUsed, actualRamBytesUsed / 4); } + private BytesRef uid(String string) { + BytesRefBuilder builder = new BytesRefBuilder(); + builder.copyChars(string); + // length of the array must be the same as the len of the ref... there is an assertion in LiveVersionMap#putUnderLock + return BytesRef.deepCopyOf(builder.get()); + } + + public void testBasics() throws IOException { + LiveVersionMap map = new LiveVersionMap(); + map.putUnderLock(uid("test"), new VersionValue(1,1,1)); + assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + map.beforeRefresh(); + assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + map.afterRefresh(randomBoolean()); + assertNull(map.getUnderLock(uid("test"))); + + + map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1, Long.MAX_VALUE)); + assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); + map.beforeRefresh(); + assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); + map.afterRefresh(randomBoolean()); + assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); + map.removeTombstoneUnderLock(uid("test")); + assertNull(map.getUnderLock(uid("test"))); + } + + + public void testAdjustMapSizeUnderLock() throws IOException { + LiveVersionMap map = new LiveVersionMap(); + map.putUnderLock(uid("test"), new VersionValue(1,1,1)); + boolean withinRefresh = randomBoolean(); + if (withinRefresh) { + map.beforeRefresh(); + } + assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + final String msg; + if (Assertions.ENABLED) { + msg = expectThrows(AssertionError.class, map::adjustMapSizeUnderLock).getMessage(); + } else { + msg = expectThrows(IllegalStateException.class, map::adjustMapSizeUnderLock).getMessage(); + } + assertEquals("map must be empty", msg); + assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + if (withinRefresh == false) { + map.beforeRefresh(); + } + map.afterRefresh(randomBoolean()); + Map allCurrent = map.getAllCurrent(); + map.adjustMapSizeUnderLock(); + assertNotSame(allCurrent, map.getAllCurrent()); + } + + public void testConcurrently() throws IOException, InterruptedException { + HashSet keySet = new HashSet<>(); + int numKeys = randomIntBetween(50, 200); + for (int i = 0; i < numKeys; i++) { + keySet.add(uid(TestUtil.randomSimpleString(random(), 10, 20))); + } + List keyList = new ArrayList<>(keySet); + ConcurrentHashMap values = new ConcurrentHashMap<>(); + KeyedLock keyedLock = new KeyedLock<>(); + LiveVersionMap map = new LiveVersionMap(); + int numThreads = randomIntBetween(2, 5); + + Thread[] threads = new Thread[numThreads]; + CountDownLatch startGun = new CountDownLatch(numThreads); + CountDownLatch done = new CountDownLatch(numThreads); + int randomValuesPerThread = randomIntBetween(5000, 20000); + for (int j = 0; j < threads.length; j++) { + threads[j] = new Thread(() -> { + startGun.countDown(); + try { + startGun.await(); + } catch (InterruptedException e) { + done.countDown(); + throw new AssertionError(e); + } + try { + for (int i = 0; i < randomValuesPerThread; ++i) { + BytesRef bytesRef = randomFrom(random(), keyList); + try (Releasable r = keyedLock.acquire(bytesRef)) { + VersionValue versionValue = values.computeIfAbsent(bytesRef, + v -> new VersionValue(randomLong(), randomLong(), randomLong())); + boolean isDelete = versionValue instanceof DeleteVersionValue; + if (isDelete) { + map.removeTombstoneUnderLock(bytesRef); + } + if (isDelete == false && rarely()) { + versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1, + versionValue.term, Long.MAX_VALUE); + } else { + versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term); + } + values.put(bytesRef, versionValue); + map.putUnderLock(bytesRef, versionValue); + } + } + } finally { + done.countDown(); + } + }); + threads[j].start(); + + + } + do { + Map valueMap = new HashMap<>(map.getAllCurrent()); + map.beforeRefresh(); + valueMap.forEach((k, v) -> { + VersionValue actualValue = map.getUnderLock(k); + assertNotNull(actualValue); + assertTrue(v.version <= actualValue.version); + }); + map.afterRefresh(randomBoolean()); + valueMap.forEach((k, v) -> { + VersionValue actualValue = map.getUnderLock(k); + if (actualValue != null) { + if (actualValue instanceof DeleteVersionValue) { + assertTrue(v.version <= actualValue.version); // deletes can be the same version + } else { + assertTrue(v.version < actualValue.version); + } + + } + }); + if (randomBoolean()) { + Thread.yield(); + } + } while (done.getCount() != 0); + + for (int j = 0; j < threads.length; j++) { + threads[j].join(); + } + map.getAllCurrent().forEach((k, v) -> { + VersionValue versionValue = values.get(k); + assertNotNull(versionValue); + assertEquals(v, versionValue); + }); + + map.getAllTombstones().forEach(e -> { + VersionValue versionValue = values.get(e.getKey()); + assertNotNull(versionValue); + assertEquals(e.getValue(), versionValue); + assertTrue(versionValue instanceof DeleteVersionValue); + }); + } }