Avoid sliced locked contention in internal engine
Today we use a sliced lock strategy for acquiring locks to prevent concurrent updates to the same document. The number of sliced locks is computed as a linear function of the number of logical processors. Unfortunately, the probability of a collision against a sliced lock is prone to the birthday problem and grows faster than expected. In fact, the mathematics works out such that for a fixed target probability of collision, the number of lock slices should grow like the square of the number of logical processors. This is less-than-ideal, and we can do better anyway. This commit introduces a strategy for avoiding lock contention within the internal engine. Ideally, we would only have lock contention if there were concurrent updates to the same document. We can get close to this ideal world by associating a lock with the ID of each document. This association can be held in a concurrent hash map. Now, the JDK ConcurrentHashMap also uses a sliced lock internally, but it has several strategies for avoiding taking the locks and these locks are only held for a very short period of time. This implementation associates a reference count with the lock that is associated with a document ID and automatically removes the document ID from the concurrent hash map when the reference count reaches zero. Relates #18060
This commit is contained in:
parent
6c3beaa2eb
commit
c25f8ad912
|
@ -50,7 +50,7 @@ public class KeyedLock<T> {
|
|||
this(false);
|
||||
}
|
||||
|
||||
private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMap();
|
||||
private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
||||
|
||||
public Releasable acquire(T key) {
|
||||
assert isHeldByCurrentThread(key) == false : "lock for " + key + " is already heald by this thread";
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.lucene.util.BytesRef;
|
|||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
|
@ -51,6 +50,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
|||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
|
@ -100,7 +100,7 @@ public class InternalEngine extends Engine {
|
|||
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
|
||||
private final LiveVersionMap versionMap;
|
||||
|
||||
private final Object[] dirtyLocks;
|
||||
private final KeyedLock<BytesRef> keyedLock = new KeyedLock<>();
|
||||
|
||||
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
|
||||
|
||||
|
@ -128,10 +128,6 @@ public class InternalEngine extends Engine {
|
|||
try {
|
||||
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
|
||||
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
|
||||
this.dirtyLocks = new Object[Runtime.getRuntime().availableProcessors() * 10]; // we multiply it to have enough...
|
||||
for (int i = 0; i < dirtyLocks.length; i++) {
|
||||
dirtyLocks[i] = new Object();
|
||||
}
|
||||
throttle = new IndexThrottle();
|
||||
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
|
||||
try {
|
||||
|
@ -356,7 +352,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
private boolean innerIndex(Index index) throws IOException {
|
||||
synchronized (dirtyLock(index.uid())) {
|
||||
try (Releasable ignored = acquireLock(index.uid())) {
|
||||
lastWriteNanos = index.startTime();
|
||||
final long currentVersion;
|
||||
final boolean deleted;
|
||||
|
@ -451,7 +447,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
private void innerDelete(Delete delete) throws IOException {
|
||||
synchronized (dirtyLock(delete.uid())) {
|
||||
try (Releasable ignored = acquireLock(delete.uid())) {
|
||||
lastWriteNanos = delete.startTime();
|
||||
final long currentVersion;
|
||||
final boolean deleted;
|
||||
|
@ -708,7 +704,7 @@ public class InternalEngine extends Engine {
|
|||
// we only need to prune the deletes map; the current/old version maps are cleared on refresh:
|
||||
for (Map.Entry<BytesRef, VersionValue> entry : versionMap.getAllTombstones()) {
|
||||
BytesRef uid = entry.getKey();
|
||||
synchronized (dirtyLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
|
||||
try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
|
||||
|
||||
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
|
||||
VersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
|
||||
|
@ -908,13 +904,12 @@ public class InternalEngine extends Engine {
|
|||
return searcherManager;
|
||||
}
|
||||
|
||||
private Object dirtyLock(BytesRef uid) {
|
||||
int hash = Murmur3HashFunction.hash(uid.bytes, uid.offset, uid.length);
|
||||
return dirtyLocks[Math.floorMod(hash, dirtyLocks.length)];
|
||||
private Releasable acquireLock(BytesRef uid) {
|
||||
return keyedLock.acquire(uid);
|
||||
}
|
||||
|
||||
private Object dirtyLock(Term uid) {
|
||||
return dirtyLock(uid.bytes());
|
||||
private Releasable acquireLock(Term uid) {
|
||||
return acquireLock(uid.bytes());
|
||||
}
|
||||
|
||||
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue