Never block on key in `LiveVersionMap#pruneTombstones` (#28736)
Pruning tombstones is best effort and should not block if a key is currently locked. This can cause a deadlock in rare situations if we switch of append only optimization while heavily updating the same key in the engine while the LiveVersionMap is locked. This is very rare since this code patch only executed every 15 seconds by default since that is the interval we try to prune the deletes in the version map. Closes #28714
This commit is contained in:
parent
8bbb3c9ffa
commit
b00870600b
|
@ -63,13 +63,11 @@ public final class KeyedLock<T> {
|
|||
while (true) {
|
||||
KeyLock perNodeLock = map.get(key);
|
||||
if (perNodeLock == null) {
|
||||
KeyLock newLock = new KeyLock(fair);
|
||||
perNodeLock = map.putIfAbsent(key, newLock);
|
||||
if (perNodeLock == null) {
|
||||
newLock.lock();
|
||||
return new ReleasableLock(key, newLock);
|
||||
}
|
||||
ReleasableLock newLock = tryCreateNewLock(key);
|
||||
if (newLock != null) {
|
||||
return newLock;
|
||||
}
|
||||
} else {
|
||||
assert perNodeLock != null;
|
||||
int i = perNodeLock.count.get();
|
||||
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
|
||||
|
@ -78,6 +76,40 @@ public final class KeyedLock<T> {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to acquire the lock for the given key and returns it. If the lock can't be acquired null is returned.
|
||||
*/
|
||||
public Releasable tryAcquire(T key) {
|
||||
final KeyLock perNodeLock = map.get(key);
|
||||
if (perNodeLock == null) {
|
||||
return tryCreateNewLock(key);
|
||||
}
|
||||
if (perNodeLock.tryLock()) { // ok we got it - make sure we increment it accordingly otherwise release it again
|
||||
int i;
|
||||
while ((i = perNodeLock.count.get()) > 0) {
|
||||
// we have to do this in a loop here since even if the count is > 0
|
||||
// there could be a concurrent blocking acquire that changes the count and then this CAS fails. Since we already got
|
||||
// the lock we should retry and see if we can still get it or if the count is 0. If that is the case and we give up.
|
||||
if (perNodeLock.count.compareAndSet(i, i + 1)) {
|
||||
return new ReleasableLock(key, perNodeLock);
|
||||
}
|
||||
}
|
||||
perNodeLock.unlock(); // make sure we unlock and don't leave the lock in a locked state
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private ReleasableLock tryCreateNewLock(T key) {
|
||||
KeyLock newLock = new KeyLock(fair);
|
||||
newLock.lock();
|
||||
KeyLock keyLock = map.putIfAbsent(key, newLock);
|
||||
if (keyLock == null) {
|
||||
return new ReleasableLock(key, newLock);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff the caller thread holds the lock for the given key
|
||||
|
@ -92,11 +124,12 @@ public final class KeyedLock<T> {
|
|||
|
||||
private void release(T key, KeyLock lock) {
|
||||
assert lock == map.get(key);
|
||||
final int decrementAndGet = lock.count.decrementAndGet();
|
||||
lock.unlock();
|
||||
int decrementAndGet = lock.count.decrementAndGet();
|
||||
if (decrementAndGet == 0) {
|
||||
map.remove(key, lock);
|
||||
}
|
||||
assert decrementAndGet >= 0 : decrementAndGet + " must be >= 0 but wasn't";
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
|
||||
/** Maps _uid value to its version information. */
|
||||
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
||||
|
@ -378,11 +377,15 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
|
|||
|
||||
void pruneTombstones(long currentTime, long pruneInterval) {
|
||||
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
|
||||
BytesRef uid = entry.getKey();
|
||||
try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get
|
||||
// the lock once per set?
|
||||
final BytesRef uid = entry.getKey();
|
||||
try (Releasable lock = keyedLock.tryAcquire(uid)) {
|
||||
// we use tryAcquire here since this is a best effort and we try to be least disruptive
|
||||
// this method is also called under lock in the engine under certain situations such that this can lead to deadlocks
|
||||
// if we do use a blocking acquire. see #28714
|
||||
if (lock != null) { // did we get the lock?
|
||||
// can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
|
||||
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
|
||||
DeleteVersionValue versionValue = tombstones.get(uid);
|
||||
final DeleteVersionValue versionValue = tombstones.get(uid);
|
||||
if (versionValue != null) {
|
||||
// check if the value is old enough to be removed
|
||||
final boolean isTooOld = currentTime - versionValue.time > pruneInterval;
|
||||
|
@ -398,6 +401,7 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when this index is closed.
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
|
@ -31,6 +30,7 @@ import java.util.Map.Entry;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -79,6 +79,34 @@ public class KeyedLockTests extends ESTestCase {
|
|||
assertFalse(lock.hasLockedKeys());
|
||||
}
|
||||
|
||||
public void testTryAcquire() throws InterruptedException {
|
||||
KeyedLock<String> lock = new KeyedLock<>();
|
||||
Releasable foo = lock.tryAcquire("foo");
|
||||
Releasable second = lock.tryAcquire("foo");
|
||||
assertTrue(lock.hasLockedKeys());
|
||||
foo.close();
|
||||
assertTrue(lock.hasLockedKeys());
|
||||
second.close();
|
||||
assertFalse(lock.hasLockedKeys());
|
||||
// lock again
|
||||
Releasable acquire = lock.tryAcquire("foo");
|
||||
assertNotNull(acquire);
|
||||
final AtomicBoolean check = new AtomicBoolean(false);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Thread thread = new Thread(() -> {
|
||||
latch.countDown();
|
||||
try (Releasable ignore = lock.acquire("foo")) {
|
||||
assertTrue(check.get());
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
latch.await();
|
||||
check.set(true);
|
||||
acquire.close();
|
||||
foo.close();
|
||||
thread.join();
|
||||
}
|
||||
|
||||
public void testLockIsReentrant() throws InterruptedException {
|
||||
KeyedLock<String> lock = new KeyedLock<>();
|
||||
Releasable foo = lock.acquire("foo");
|
||||
|
@ -137,7 +165,24 @@ public class KeyedLockTests extends ESTestCase {
|
|||
for (int i = 0; i < numRuns; i++) {
|
||||
String curName = names[randomInt(names.length - 1)];
|
||||
assert connectionLock.isHeldByCurrentThread(curName) == false;
|
||||
try (Releasable ignored = connectionLock.acquire(curName)) {
|
||||
Releasable lock;
|
||||
if (randomIntBetween(0, 10) < 4) {
|
||||
int tries = 0;
|
||||
boolean stepOut = false;
|
||||
while ((lock = connectionLock.tryAcquire(curName)) == null) {
|
||||
assertFalse(connectionLock.isHeldByCurrentThread(curName));
|
||||
if (tries++ == 10) {
|
||||
stepOut = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (stepOut) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
lock = connectionLock.acquire(curName);
|
||||
}
|
||||
try (Releasable ignore = lock) {
|
||||
assert connectionLock.isHeldByCurrentThread(curName);
|
||||
assert connectionLock.isHeldByCurrentThread(curName + "bla") == false;
|
||||
if (randomBoolean()) {
|
||||
|
|
|
@ -4517,4 +4517,60 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
|
||||
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
|
||||
}
|
||||
|
||||
|
||||
public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
|
||||
final int iters = randomIntBetween(1, 15);
|
||||
for (int i = 0; i < iters; i++) {
|
||||
// this is a reproduction of https://github.com/elastic/elasticsearch/issues/28714
|
||||
try (Store store = createStore(); InternalEngine engine = createEngine(store, createTempDir())) {
|
||||
final IndexSettings indexSettings = engine.config().getIndexSettings();
|
||||
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
|
||||
.settings(Settings.builder().put(indexSettings.getSettings())
|
||||
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(1))).build();
|
||||
engine.engineConfig.getIndexSettings().updateIndexMetaData(indexMetaData);
|
||||
engine.onSettingsChanged();
|
||||
ParsedDocument document = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Engine.Index doc = new Engine.Index(newUid(document), document, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false);
|
||||
// first index an append only document and then delete it. such that we have it in the tombstones
|
||||
engine.index(doc);
|
||||
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid()));
|
||||
|
||||
// now index more append only docs and refresh so we re-enabel the optimization for unsafe version map
|
||||
ParsedDocument document1 = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
|
||||
engine.index(new Engine.Index(newUid(document1), document1, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false));
|
||||
engine.refresh("test");
|
||||
ParsedDocument document2 = testParsedDocument(Integer.toString(2), null, testDocumentWithTextField(), SOURCE, null);
|
||||
engine.index(new Engine.Index(newUid(document2), document2, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false));
|
||||
engine.refresh("test");
|
||||
ParsedDocument document3 = testParsedDocument(Integer.toString(3), null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Engine.Index doc3 = new Engine.Index(newUid(document3), document3, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false);
|
||||
engine.index(doc3);
|
||||
engine.engineConfig.setEnableGcDeletes(true);
|
||||
// once we are here the version map is unsafe again and we need to do a refresh inside the get calls to ensure we
|
||||
// de-optimize. We also enabled GCDeletes which now causes pruning tombstones inside that refresh that is done internally
|
||||
// to ensure we de-optimize. One get call will purne and the other will try to lock the version map concurrently while
|
||||
// holding the lock that pruneTombstones needs and we have a deadlock
|
||||
CountDownLatch awaitStarted = new CountDownLatch(1);
|
||||
Thread thread = new Thread(() -> {
|
||||
awaitStarted.countDown();
|
||||
try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc3.type(), doc3.id(), doc3.uid()),
|
||||
engine::acquireSearcher)) {
|
||||
assertTrue(getResult.exists());
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
awaitStarted.await();
|
||||
try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), doc.uid()),
|
||||
engine::acquireSearcher)) {
|
||||
assertFalse(getResult.exists());
|
||||
}
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -348,4 +348,27 @@ public class LiveVersionMapTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testPruneTombstonesWhileLocked() throws InterruptedException, IOException {
|
||||
LiveVersionMap map = new LiveVersionMap();
|
||||
BytesRef uid = uid("1");
|
||||
;
|
||||
try (Releasable ignore = map.acquireLock(uid)) {
|
||||
map.putUnderLock(uid, new DeleteVersionValue(0, 0, 0, 0));
|
||||
map.beforeRefresh(); // refresh otherwise we won't prune since it's tracked by the current map
|
||||
map.afterRefresh(false);
|
||||
Thread thread = new Thread(() -> {
|
||||
map.pruneTombstones(Long.MAX_VALUE, 0);
|
||||
});
|
||||
thread.start();
|
||||
thread.join();
|
||||
assertEquals(1, map.getAllTombstones().size());
|
||||
}
|
||||
Thread thread = new Thread(() -> {
|
||||
map.pruneTombstones(Long.MAX_VALUE, 0);
|
||||
});
|
||||
thread.start();
|
||||
thread.join();
|
||||
assertEquals(0, map.getAllTombstones().size());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue