use more aggressive concurrency levels for CHM

- long running ones with high update rates
- also expose a *system* property of es.useConcurrentHashMapV8 to use the new non blocking Java8 CHM impl
This commit is contained in:
Shay Banon 2013-04-17 14:28:38 -07:00
parent 271305d5eb
commit 0eb298fe64
5 changed files with 35 additions and 17 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.util.concurrent;
import jsr166e.ConcurrentHashMapV8;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.collect.MapBackedSet;
@ -34,28 +35,45 @@ import java.util.concurrent.ConcurrentMap;
*/
public abstract class ConcurrentCollections {
private final static boolean useNonBlockingMap = Boolean.parseBoolean(System.getProperty("es.useNonBlockingMap", "false"));
private final static boolean useConcurrentHashMapV8 = Boolean.parseBoolean(System.getProperty("es.useConcurrentHashMapV8", "false"));
private final static boolean useLinkedTransferQueue = Boolean.parseBoolean(System.getProperty("es.useLinkedTransferQueue", "false"));
static final int aggressiveConcurrencyLevel;
static {
aggressiveConcurrencyLevel = Math.max(Runtime.getRuntime().availableProcessors() * 2, 16);
}
/**
* Creates a new CHM with an aggressive concurrency level, aimed at high concurrent update rate long living maps.
*/
public static <K, V> ConcurrentMap<K, V> newConcurrentMapWithAggressiveConcurrency() {
if (useConcurrentHashMapV8) {
return new ConcurrentHashMapV8<K, V>(16, 0.75f, aggressiveConcurrencyLevel);
}
return new ConcurrentHashMap<K, V>(16, 0.75f, aggressiveConcurrencyLevel);
}
public static <K, V> ConcurrentMap<K, V> newConcurrentMap() {
// if (useNonBlockingMap) {
// return new NonBlockingHashMap<K, V>();
// }
if (useConcurrentHashMapV8) {
return new ConcurrentHashMapV8<K, V>();
}
return new ConcurrentHashMap<K, V>();
}
/**
* Creates a new CHM with an aggressive concurrency level, aimed at highly updateable long living maps.
*/
public static <V> ConcurrentMapLong<V> newConcurrentMapLongWithAggressiveConcurrency() {
return new ConcurrentHashMapLong<V>(ConcurrentCollections.<Long, V>newConcurrentMapWithAggressiveConcurrency());
}
public static <V> ConcurrentMapLong<V> newConcurrentMapLong() {
// if (useNonBlockingMap) {
// return new NonBlockingHashMapLong<V>();
// }
return new ConcurrentHashMapLong<V>();
return new ConcurrentHashMapLong<V>(ConcurrentCollections.<Long, V>newConcurrentMap());
}
public static <V> Set<V> newConcurrentSet() {
// if (useNonBlockingMap) {
// return new NonBlockingHashSet<V>();
// }
return new MapBackedSet<V>(new ConcurrentHashMap<V, Boolean>());
return new MapBackedSet<V>(ConcurrentCollections.<V, Boolean>newConcurrentMap());
}
public static <T> Queue<T> newQueue() {

View File

@ -31,8 +31,8 @@ public class ConcurrentHashMapLong<T> implements ConcurrentMapLong<T> {
private final ConcurrentMap<Long, T> map;
public ConcurrentHashMapLong() {
this.map = ConcurrentCollections.newConcurrentMap();
public ConcurrentHashMapLong(ConcurrentMap<Long, T> map) {
this.map = map;
}
@Override

View File

@ -179,7 +179,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
this.codecService = codecService;
this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
this.versionMap = ConcurrentCollections.newConcurrentMap();
this.versionMap = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();

View File

@ -96,7 +96,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final CleanContextOnIndicesLifecycleListener indicesLifecycleListener = new CleanContextOnIndicesLifecycleListener();
private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLong();
private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
private final ImmutableMap<String, SearchParseElement> elementParsers;

View File

@ -54,7 +54,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
volatile ImmutableMap<String, TransportRequestHandler> serverHandlers = ImmutableMap.of();
final Object serverHandlersMutex = new Object();
final ConcurrentMapLong<RequestHolder> clientHandlers = ConcurrentCollections.newConcurrentMapLong();
final ConcurrentMapLong<RequestHolder> clientHandlers = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
final AtomicLong requestIds = new AtomicLong();