From a0b25ec4c3fa1249275e77ae315843c7724583ed Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 12 May 2010 04:06:44 +0300 Subject: [PATCH] abstract non blocking hashset --- .../discovery/zen/fd/NodesFaultDetection.java | 2 +- .../zen/ping/multicast/MulticastZenPing.java | 2 +- .../discovery/zen/ping/unicast/UnicastZenPing.java | 2 +- .../http/netty/OpenChannelsHandler.java | 6 ++++-- .../support/AbstractConcurrentMapFilterCache.java | 2 +- .../index/shard/recovery/RecoveryAction.java | 2 +- .../index/store/memory/HeapDirectory.java | 2 +- .../org/elasticsearch/search/SearchService.java | 4 ++-- .../elasticsearch/transport/TransportService.java | 6 +++--- .../transport/local/LocalTransport.java | 6 +++--- .../transport/netty/NettyTransport.java | 2 +- .../transport/netty/OpenChannelsHandler.java | 6 ++++-- ...currentMaps.java => ConcurrentCollections.java} | 14 ++++++++++++-- .../versioned/ConcurrentVersionedMapLong.java | 4 ++-- .../memcached/netty/OpenChannelsHandler.java | 6 ++++-- 15 files changed, 41 insertions(+), 25 deletions(-) rename modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/{ConcurrentMaps.java => ConcurrentCollections.java} (79%) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 42ea842c0b7..aa47740e400 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -36,7 +36,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.util.TimeValue.*; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.concurrent.ConcurrentCollections.*; /** * @author kimchy (shay.banon) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index b37c4cf1b0d..78a71b03187 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.node.DiscoveryNode.*; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.concurrent.ConcurrentCollections.*; import static org.elasticsearch.util.concurrent.DynamicExecutors.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index e0d11813dd8..6a36b937c3b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.*; import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.collect.Lists.*; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.concurrent.ConcurrentCollections.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/OpenChannelsHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/OpenChannelsHandler.java index 5c61d451fd9..ac25d9618b8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/OpenChannelsHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/OpenChannelsHandler.java @@ -19,16 +19,18 @@ package org.elasticsearch.http.netty; -import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet; +import org.elasticsearch.util.concurrent.ConcurrentCollections; import org.jboss.netty.channel.*; +import java.util.Set; + /** * @author kimchy (shay.banon) */ @ChannelHandler.Sharable public class OpenChannelsHandler implements ChannelUpstreamHandler { - private NonBlockingHashSet openChannels = new NonBlockingHashSet(); + private Set openChannels = ConcurrentCollections.newConcurrentSet(); private final ChannelFutureListener remover = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java index f44e63d0639..9b97b89fb70 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java @@ -35,7 +35,7 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.concurrent.ConcurrentCollections.*; import static org.elasticsearch.util.lucene.docidset.DocIdSets.*; /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java index 6de58b151ca..b554ab679bc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java @@ -59,7 +59,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.*; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.concurrent.ConcurrentCollections.*; /** * @author kimchy (shay.banon) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/HeapDirectory.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/HeapDirectory.java index 4be0daa6b12..e5ce40e441a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/HeapDirectory.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/HeapDirectory.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.concurrent.ConcurrentCollections.*; /** * @author kimchy (Shay Banon) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java index 24f30d55843..32e5b283087 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java @@ -48,8 +48,8 @@ import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.Unicode; import org.elasticsearch.util.collect.ImmutableMap; import org.elasticsearch.util.component.AbstractLifecycleComponent; +import org.elasticsearch.util.concurrent.ConcurrentCollections; import org.elasticsearch.util.concurrent.ConcurrentMapLong; -import org.elasticsearch.util.concurrent.ConcurrentMaps; import org.elasticsearch.util.inject.Inject; import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.timer.Timeout; @@ -90,7 +90,7 @@ public class SearchService extends AbstractLifecycleComponent { private final CleanContextOnIndicesLifecycleListener indicesLifecycleListener = new CleanContextOnIndicesLifecycleListener(); - private final ConcurrentMapLong activeContexts = ConcurrentMaps.newConcurrentMapLong(); + private final ConcurrentMapLong activeContexts = ConcurrentCollections.newConcurrentMapLong(); private final ImmutableMap elementParsers; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index 43812931546..ce4bf36b6ab 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -25,8 +25,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.timer.TimerService; import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.component.AbstractLifecycleComponent; +import org.elasticsearch.util.concurrent.ConcurrentCollections; import org.elasticsearch.util.concurrent.ConcurrentMapLong; -import org.elasticsearch.util.concurrent.ConcurrentMaps; import org.elasticsearch.util.inject.Inject; import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.settings.Settings; @@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.concurrent.ConcurrentCollections.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; /** @@ -56,7 +56,7 @@ public class TransportService extends AbstractLifecycleComponent serverHandlers = newConcurrentMap(); - final ConcurrentMapLong clientHandlers = ConcurrentMaps.newConcurrentMapLong(); + final ConcurrentMapLong clientHandlers = ConcurrentCollections.newConcurrentMapLong(); final AtomicLong requestIds = new AtomicLong(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 5223d00f27b..17228057895 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -19,12 +19,12 @@ package org.elasticsearch.transport.local; -import org.elasticsearch.util.inject.Inject; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import org.elasticsearch.util.component.AbstractLifecycleComponent; +import org.elasticsearch.util.inject.Inject; import org.elasticsearch.util.io.ThrowableObjectInputStream; import org.elasticsearch.util.io.stream.*; import org.elasticsearch.util.settings.ImmutableSettings; @@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.transport.Transport.Helper.*; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.concurrent.ConcurrentCollections.*; /** * @author kimchy (shay.banon) @@ -71,7 +71,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem } @Override public TransportAddress[] addressesFromString(String address) { - return new TransportAddress[] {new LocalTransportAddress(address)}; + return new TransportAddress[]{new LocalTransportAddress(address)}; } @Override public boolean addressSupported(Class address) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 30df4e45111..d6ce15ab5a1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.transport.Transport.Helper.*; import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.collect.Lists.*; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.concurrent.ConcurrentCollections.*; import static org.elasticsearch.util.concurrent.DynamicExecutors.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.util.transport.NetworkExceptionHelper.*; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/OpenChannelsHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/OpenChannelsHandler.java index c6586236876..5bd59ccae96 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/OpenChannelsHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/OpenChannelsHandler.java @@ -19,16 +19,18 @@ package org.elasticsearch.transport.netty; -import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet; +import org.elasticsearch.util.concurrent.ConcurrentCollections; import org.jboss.netty.channel.*; +import java.util.Set; + /** * @author kimchy (Shay Banon) */ @ChannelHandler.Sharable public class OpenChannelsHandler implements ChannelUpstreamHandler { - private NonBlockingHashSet openChannels = new NonBlockingHashSet(); + private Set openChannels = ConcurrentCollections.newConcurrentSet(); private final ChannelFutureListener remover = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentMaps.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentCollections.java similarity index 79% rename from modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentMaps.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentCollections.java index 0892c814015..36337da5700 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentMaps.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentCollections.java @@ -19,16 +19,19 @@ package org.elasticsearch.util.concurrent; +import org.elasticsearch.util.MapBackedSet; import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMap; import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMapLong; +import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * @author kimchy (shay.banon) */ -public abstract class ConcurrentMaps { +public abstract class ConcurrentCollections { private final static boolean useNonBlockingMap = Boolean.parseBoolean(System.getProperty("elasticsearch.useNonBlockingMap", "true")); @@ -46,8 +49,15 @@ public abstract class ConcurrentMaps { return new ConcurrentHashMapLong(); } + public static Set newConcurrentSet() { + if (useNonBlockingMap) { + return new NonBlockingHashSet(); + } + return new MapBackedSet(new ConcurrentHashMap()); + } - private ConcurrentMaps() { + + private ConcurrentCollections() { } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/versioned/ConcurrentVersionedMapLong.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/versioned/ConcurrentVersionedMapLong.java index bb5e6d286f5..ea68152418f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/versioned/ConcurrentVersionedMapLong.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/versioned/ConcurrentVersionedMapLong.java @@ -19,8 +19,8 @@ package org.elasticsearch.util.lucene.versioned; +import org.elasticsearch.util.concurrent.ConcurrentCollections; import org.elasticsearch.util.concurrent.ConcurrentMapLong; -import org.elasticsearch.util.concurrent.ConcurrentMaps; import org.elasticsearch.util.concurrent.ThreadSafe; /** @@ -31,7 +31,7 @@ import org.elasticsearch.util.concurrent.ThreadSafe; @ThreadSafe public class ConcurrentVersionedMapLong implements VersionedMap { - private final ConcurrentMapLong map = ConcurrentMaps.newConcurrentMapLong(); + private final ConcurrentMapLong map = ConcurrentCollections.newConcurrentMapLong(); @Override public boolean beforeVersion(int key, int versionToCheck) { Integer result = map.get(key); diff --git a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java index ac1c0f6892e..9c1e04a610f 100644 --- a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java +++ b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java @@ -19,16 +19,18 @@ package org.elasticsearch.memcached.netty; -import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet; +import org.elasticsearch.util.concurrent.ConcurrentCollections; import org.jboss.netty.channel.*; +import java.util.Set; + /** * @author kimchy (shay.banon) */ @ChannelHandler.Sharable public class OpenChannelsHandler implements ChannelUpstreamHandler { - private NonBlockingHashSet openChannels = new NonBlockingHashSet(); + private Set openChannels = ConcurrentCollections.newConcurrentSet(); private final ChannelFutureListener remover = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception {