Improve concurrency of ShardCoreKeyMap. (#22316)
`ShardCoreKeyMap.add` is called on each segment for all search requests, which means it might become a bottleneck under a cocurrent load of cheap search requests since this method acquires a mutex. This change proposes to use a `ConcurrentHashMap` which allows to only take the mutex in the case that the `LeafReader` has never been seen before.
This commit is contained in:
parent
acd64c6ee1
commit
fd6e1a30de
|
@ -29,9 +29,9 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.IdentityHashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A map between segment core cache keys and the shard that these segments
|
* A map between segment core cache keys and the shard that these segments
|
||||||
|
@ -50,7 +50,7 @@ public final class ShardCoreKeyMap {
|
||||||
private final Map<String, Set<Object>> indexToCoreKey;
|
private final Map<String, Set<Object>> indexToCoreKey;
|
||||||
|
|
||||||
public ShardCoreKeyMap() {
|
public ShardCoreKeyMap() {
|
||||||
coreKeyToShard = new IdentityHashMap<>();
|
coreKeyToShard = new ConcurrentHashMap<>();
|
||||||
indexToCoreKey = new HashMap<>();
|
indexToCoreKey = new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,9 +64,17 @@ public final class ShardCoreKeyMap {
|
||||||
throw new IllegalArgumentException("Could not extract shard id from " + reader);
|
throw new IllegalArgumentException("Could not extract shard id from " + reader);
|
||||||
}
|
}
|
||||||
final Object coreKey = reader.getCoreCacheKey();
|
final Object coreKey = reader.getCoreCacheKey();
|
||||||
|
|
||||||
|
if (coreKeyToShard.containsKey(coreKey)) {
|
||||||
|
// Do this check before entering the synchronized block in order to
|
||||||
|
// avoid taking the mutex if possible (which should happen most of
|
||||||
|
// the time).
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final String index = shardId.getIndexName();
|
final String index = shardId.getIndexName();
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (coreKeyToShard.put(coreKey, shardId) == null) {
|
if (coreKeyToShard.containsKey(coreKey) == false) {
|
||||||
Set<Object> objects = indexToCoreKey.get(index);
|
Set<Object> objects = indexToCoreKey.get(index);
|
||||||
if (objects == null) {
|
if (objects == null) {
|
||||||
objects = new HashSet<>();
|
objects = new HashSet<>();
|
||||||
|
@ -90,6 +98,14 @@ public final class ShardCoreKeyMap {
|
||||||
try {
|
try {
|
||||||
reader.addCoreClosedListener(listener);
|
reader.addCoreClosedListener(listener);
|
||||||
addedListener = true;
|
addedListener = true;
|
||||||
|
|
||||||
|
// Only add the core key to the map as a last operation so that
|
||||||
|
// if another thread sees that the core key is already in the
|
||||||
|
// map (like the check just before this synchronized block),
|
||||||
|
// then it means that the closed listener has already been
|
||||||
|
// registered.
|
||||||
|
ShardId previous = coreKeyToShard.put(coreKey, shardId);
|
||||||
|
assert previous == null;
|
||||||
} finally {
|
} finally {
|
||||||
if (false == addedListener) {
|
if (false == addedListener) {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue