abstract non blocking hashset

This commit is contained in:
kimchy 2010-05-12 04:06:44 +03:00
parent f6509930c7
commit a0b25ec4c3
15 changed files with 41 additions and 25 deletions

View File

@ -36,7 +36,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)

View File

@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.node.DiscoveryNode.*; import static org.elasticsearch.cluster.node.DiscoveryNode.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*; import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;

View File

@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.*; import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.*;
import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.collect.Lists.*; import static org.elasticsearch.util.collect.Lists.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
/** /**

View File

@ -19,16 +19,18 @@
package org.elasticsearch.http.netty; package org.elasticsearch.http.netty;
import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet; import org.elasticsearch.util.concurrent.ConcurrentCollections;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import java.util.Set;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class OpenChannelsHandler implements ChannelUpstreamHandler { public class OpenChannelsHandler implements ChannelUpstreamHandler {
private NonBlockingHashSet<Channel> openChannels = new NonBlockingHashSet<Channel>(); private Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
private final ChannelFutureListener remover = new ChannelFutureListener() { private final ChannelFutureListener remover = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {

View File

@ -35,7 +35,7 @@ import java.util.Iterator;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
import static org.elasticsearch.util.lucene.docidset.DocIdSets.*; import static org.elasticsearch.util.lucene.docidset.DocIdSets.*;
/** /**

View File

@ -59,7 +59,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.*; import static java.util.concurrent.TimeUnit.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)

View File

@ -32,7 +32,7 @@ import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (Shay Banon)

View File

@ -48,8 +48,8 @@ import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.Unicode; import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.collect.ImmutableMap; import org.elasticsearch.util.collect.ImmutableMap;
import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.concurrent.ConcurrentCollections;
import org.elasticsearch.util.concurrent.ConcurrentMapLong; import org.elasticsearch.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.util.concurrent.ConcurrentMaps;
import org.elasticsearch.util.inject.Inject; import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.timer.Timeout; import org.elasticsearch.util.timer.Timeout;
@ -90,7 +90,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final CleanContextOnIndicesLifecycleListener indicesLifecycleListener = new CleanContextOnIndicesLifecycleListener(); private final CleanContextOnIndicesLifecycleListener indicesLifecycleListener = new CleanContextOnIndicesLifecycleListener();
private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentMaps.newConcurrentMapLong(); private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLong();
private final ImmutableMap<String, SearchParseElement> elementParsers; private final ImmutableMap<String, SearchParseElement> elementParsers;

View File

@ -25,8 +25,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService; import org.elasticsearch.timer.TimerService;
import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.concurrent.ConcurrentCollections;
import org.elasticsearch.util.concurrent.ConcurrentMapLong; import org.elasticsearch.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.util.concurrent.ConcurrentMaps;
import org.elasticsearch.util.inject.Inject; import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
/** /**
@ -56,7 +56,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final ConcurrentMap<String, TransportRequestHandler> serverHandlers = newConcurrentMap(); final ConcurrentMap<String, TransportRequestHandler> serverHandlers = newConcurrentMap();
final ConcurrentMapLong<RequestHolder> clientHandlers = ConcurrentMaps.newConcurrentMapLong(); final ConcurrentMapLong<RequestHolder> clientHandlers = ConcurrentCollections.newConcurrentMapLong();
final AtomicLong requestIds = new AtomicLong(); final AtomicLong requestIds = new AtomicLong();

View File

@ -19,12 +19,12 @@
package org.elasticsearch.transport.local; package org.elasticsearch.transport.local;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.io.ThrowableObjectInputStream; import org.elasticsearch.util.io.ThrowableObjectInputStream;
import org.elasticsearch.util.io.stream.*; import org.elasticsearch.util.io.stream.*;
import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.settings.ImmutableSettings;
@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.transport.Transport.Helper.*; import static org.elasticsearch.transport.Transport.Helper.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -71,7 +71,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
} }
@Override public TransportAddress[] addressesFromString(String address) { @Override public TransportAddress[] addressesFromString(String address) {
return new TransportAddress[] {new LocalTransportAddress(address)}; return new TransportAddress[]{new LocalTransportAddress(address)};
} }
@Override public boolean addressSupported(Class<? extends TransportAddress> address) { @Override public boolean addressSupported(Class<? extends TransportAddress> address) {

View File

@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.transport.Transport.Helper.*; import static org.elasticsearch.transport.Transport.Helper.*;
import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.collect.Lists.*; import static org.elasticsearch.util.collect.Lists.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*; import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.util.transport.NetworkExceptionHelper.*; import static org.elasticsearch.util.transport.NetworkExceptionHelper.*;

View File

@ -19,16 +19,18 @@
package org.elasticsearch.transport.netty; package org.elasticsearch.transport.netty;
import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet; import org.elasticsearch.util.concurrent.ConcurrentCollections;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import java.util.Set;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (Shay Banon)
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class OpenChannelsHandler implements ChannelUpstreamHandler { public class OpenChannelsHandler implements ChannelUpstreamHandler {
private NonBlockingHashSet<Channel> openChannels = new NonBlockingHashSet<Channel>(); private Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
private final ChannelFutureListener remover = new ChannelFutureListener() { private final ChannelFutureListener remover = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {

View File

@ -19,16 +19,19 @@
package org.elasticsearch.util.concurrent; package org.elasticsearch.util.concurrent;
import org.elasticsearch.util.MapBackedSet;
import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMap; import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMap;
import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMapLong; import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMapLong;
import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public abstract class ConcurrentMaps { public abstract class ConcurrentCollections {
private final static boolean useNonBlockingMap = Boolean.parseBoolean(System.getProperty("elasticsearch.useNonBlockingMap", "true")); private final static boolean useNonBlockingMap = Boolean.parseBoolean(System.getProperty("elasticsearch.useNonBlockingMap", "true"));
@ -46,8 +49,15 @@ public abstract class ConcurrentMaps {
return new ConcurrentHashMapLong<V>(); return new ConcurrentHashMapLong<V>();
} }
public static <V> Set<V> newConcurrentSet() {
if (useNonBlockingMap) {
return new NonBlockingHashSet<V>();
}
return new MapBackedSet<V>(new ConcurrentHashMap<V, Boolean>());
}
private ConcurrentMaps() {
private ConcurrentCollections() {
} }
} }

View File

@ -19,8 +19,8 @@
package org.elasticsearch.util.lucene.versioned; package org.elasticsearch.util.lucene.versioned;
import org.elasticsearch.util.concurrent.ConcurrentCollections;
import org.elasticsearch.util.concurrent.ConcurrentMapLong; import org.elasticsearch.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.util.concurrent.ConcurrentMaps;
import org.elasticsearch.util.concurrent.ThreadSafe; import org.elasticsearch.util.concurrent.ThreadSafe;
/** /**
@ -31,7 +31,7 @@ import org.elasticsearch.util.concurrent.ThreadSafe;
@ThreadSafe @ThreadSafe
public class ConcurrentVersionedMapLong implements VersionedMap { public class ConcurrentVersionedMapLong implements VersionedMap {
private final ConcurrentMapLong<Integer> map = ConcurrentMaps.newConcurrentMapLong(); private final ConcurrentMapLong<Integer> map = ConcurrentCollections.newConcurrentMapLong();
@Override public boolean beforeVersion(int key, int versionToCheck) { @Override public boolean beforeVersion(int key, int versionToCheck) {
Integer result = map.get(key); Integer result = map.get(key);

View File

@ -19,16 +19,18 @@
package org.elasticsearch.memcached.netty; package org.elasticsearch.memcached.netty;
import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet; import org.elasticsearch.util.concurrent.ConcurrentCollections;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import java.util.Set;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class OpenChannelsHandler implements ChannelUpstreamHandler { public class OpenChannelsHandler implements ChannelUpstreamHandler {
private NonBlockingHashSet<Channel> openChannels = new NonBlockingHashSet<Channel>(); private Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
private final ChannelFutureListener remover = new ChannelFutureListener() { private final ChannelFutureListener remover = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {