From 08ecd9d772e845437b5406219dc9810eb9b5dbcc Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sun, 19 Aug 2012 18:15:48 +0200 Subject: [PATCH] refactor all Queue and BlockingQueue creations into a single factory method --- .../search/type/TransportSearchCache.java | 12 ++++---- ...nsportSearchScrollQueryAndFetchAction.java | 9 +++--- ...sportSearchScrollQueryThenFetchAction.java | 9 +++--- .../type/TransportSearchScrollScanAction.java | 9 +++--- .../type/TransportSearchTypeAction.java | 9 +++--- .../TransportClientNodesService.java | 9 ++---- .../action/shard/ShardStateAction.java | 4 +-- .../service/InternalClusterService.java | 4 +-- .../elasticsearch/common/CacheRecycler.java | 28 +++++++++---------- .../common/io/stream/CachedStreamOutput.java | 4 +-- .../concurrent/ConcurrentCollections.java | 10 +++++++ .../discovery/local/LocalDiscovery.java | 4 +-- .../zen/ping/unicast/UnicastZenPing.java | 3 +- .../elasticsearch/threadpool/ThreadPool.java | 8 ++---- 14 files changed, 63 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java index 2c238edfcbe..7b0c459fe18 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search.type; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -36,19 +35,20 @@ import java.util.Queue; */ public class TransportSearchCache { - private final Queue> cacheDfsResults = new LinkedTransferQueue>(); + private final Queue> cacheDfsResults = ConcurrentCollections.newQueue(); - private final Queue> cacheQueryResults = new LinkedTransferQueue>(); + private final Queue> cacheQueryResults = ConcurrentCollections.newQueue(); - private final Queue> cacheFetchResults = new LinkedTransferQueue>(); + private final Queue> cacheFetchResults = ConcurrentCollections.newQueue(); - private final Queue> cacheQueryFetchResults = new LinkedTransferQueue>(); + private final Queue> cacheQueryFetchResults = ConcurrentCollections.newQueue(); public Collection obtainDfsResults() { Collection dfsSearchResults; while ((dfsSearchResults = cacheDfsResults.poll()) == null) { - cacheDfsResults.offer(new LinkedTransferQueue()); + Queue offer = ConcurrentCollections.newQueue(); + cacheDfsResults.offer(offer); } return dfsSearchResults; } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index b5e6afcff35..23311cee4bc 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search.type; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; @@ -29,6 +28,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -39,6 +39,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; +import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; @@ -84,7 +85,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent private final DiscoveryNodes nodes; - private volatile LinkedTransferQueue shardFailures; + private volatile Queue shardFailures; private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); @@ -104,7 +105,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent } protected final ShardSearchFailure[] buildShardFailures() { - LinkedTransferQueue localFailures = shardFailures; + Queue localFailures = shardFailures; if (localFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } @@ -115,7 +116,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent // we simply try and return as much as possible protected final void addShardFailure(ShardSearchFailure failure) { if (shardFailures == null) { - shardFailures = new LinkedTransferQueue(); + shardFailures = ConcurrentCollections.newQueue(); } shardFailures.add(failure); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index 4eabc61af75..3f17d79b9ef 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search.type; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; @@ -30,6 +29,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -43,6 +43,7 @@ import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; +import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; @@ -88,7 +89,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent private final DiscoveryNodes nodes; - protected volatile LinkedTransferQueue shardFailures; + protected volatile Queue shardFailures; private final Map queryResults = searchCache.obtainQueryResults(); @@ -109,7 +110,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent } protected final ShardSearchFailure[] buildShardFailures() { - LinkedTransferQueue localFailures = shardFailures; + Queue localFailures = shardFailures; if (localFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } @@ -120,7 +121,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent // we simply try and return as much as possible protected final void addShardFailure(ShardSearchFailure failure) { if (shardFailures == null) { - shardFailures = new LinkedTransferQueue(); + shardFailures = ConcurrentCollections.newQueue(); } shardFailures.add(failure); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index b417573ced9..f8569e17bc0 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search.type; -import jsr166y.LinkedTransferQueue; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; @@ -30,6 +29,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -43,6 +43,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Map; +import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; @@ -88,7 +89,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { private final DiscoveryNodes nodes; - protected volatile LinkedTransferQueue shardFailures; + protected volatile Queue shardFailures; private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); @@ -108,7 +109,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { } protected final ShardSearchFailure[] buildShardFailures() { - LinkedTransferQueue localFailures = shardFailures; + Queue localFailures = shardFailures; if (localFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } @@ -119,7 +120,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { // we simply try and return as much as possible protected final void addShardFailure(ShardSearchFailure failure) { if (shardFailures == null) { - shardFailures = new LinkedTransferQueue(); + shardFailures = ConcurrentCollections.newQueue(); } shardFailures.add(failure); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index bf08802f370..e8ef8cb443b 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search.type; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.TransportAction; @@ -34,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; @@ -46,6 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -92,7 +93,7 @@ public abstract class TransportSearchTypeAction extends TransportAction shardFailures; + private volatile Queue shardFailures; protected volatile ShardDoc[] sortedShardList; @@ -308,7 +309,7 @@ public abstract class TransportSearchTypeAction extends TransportAction localFailures = shardFailures; + Queue localFailures = shardFailures; if (localFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } @@ -319,7 +320,7 @@ public abstract class TransportSearchTypeAction extends TransportAction(); + shardFailures = ConcurrentCollections.newQueue(); } shardFailures.add(failure); } diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 8f5081191a5..9cbd9f535f8 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -22,7 +22,6 @@ package org.elasticsearch.client.transport; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -38,13 +37,11 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -351,7 +348,7 @@ public class TransportClientNodesService extends AbstractComponent { } final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); - final LinkedTransferQueue clusterStateResponses = new LinkedTransferQueue(); + final Queue clusterStateResponses = ConcurrentCollections.newQueue(); for (final DiscoveryNode listedNode : nodesToPing) { threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { @Override diff --git a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index bcd0d233b9c..f5b4e70f784 100644 --- a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.action.shard; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -38,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -62,7 +62,7 @@ public class ShardStateAction extends AbstractComponent { private final ThreadPool threadPool; - private final BlockingQueue startedShardsQueue = new LinkedTransferQueue(); + private final BlockingQueue startedShardsQueue = ConcurrentCollections.newBlockingQueue(); @Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 47965c9f58b..66be0e62aed 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.service; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.*; @@ -34,6 +33,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.node.settings.NodeSettingsService; @@ -76,7 +76,7 @@ public class InternalClusterService extends AbstractLifecycleComponent clusterStateListeners = new CopyOnWriteArrayList(); private final List lastClusterStateListeners = new CopyOnWriteArrayList(); - private final Queue onGoingTimeouts = new LinkedTransferQueue(); + private final Queue onGoingTimeouts = ConcurrentCollections.newQueue(); private volatile ClusterState clusterState = newClusterStateBuilder().build(); diff --git a/src/main/java/org/elasticsearch/common/CacheRecycler.java b/src/main/java/org/elasticsearch/common/CacheRecycler.java index b7afa95365b..752bfc32b6d 100644 --- a/src/main/java/org/elasticsearch/common/CacheRecycler.java +++ b/src/main/java/org/elasticsearch/common/CacheRecycler.java @@ -20,10 +20,10 @@ package org.elasticsearch.common; import gnu.trove.map.hash.*; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.common.trove.ExtTDoubleObjectHashMap; import org.elasticsearch.common.trove.ExtTHashMap; import org.elasticsearch.common.trove.ExtTLongObjectHashMap; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import java.lang.ref.SoftReference; import java.util.Arrays; @@ -84,7 +84,7 @@ public class CacheRecycler { public static void pushHashMap(ExtTHashMap map) { Queue ref = hashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); hashMap.set(ref); } map.clear(); @@ -110,7 +110,7 @@ public class CacheRecycler { public static void pushDoubleObjectMap(ExtTDoubleObjectHashMap map) { Queue ref = doubleObjectHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); doubleObjectHashMap.set(ref); } map.clear(); @@ -136,7 +136,7 @@ public class CacheRecycler { public static void pushLongObjectMap(ExtTLongObjectHashMap map) { Queue ref = longObjectHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); longObjectHashMap.set(ref); } map.clear(); @@ -162,7 +162,7 @@ public class CacheRecycler { public static void pushLongLongMap(TLongLongHashMap map) { Queue ref = longLongHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); longLongHashMap.set(ref); } map.clear(); @@ -189,7 +189,7 @@ public class CacheRecycler { public static void pushIntIntMap(TIntIntHashMap map) { Queue ref = intIntHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); intIntHashMap.set(ref); } map.clear(); @@ -217,7 +217,7 @@ public class CacheRecycler { public static void pushFloatIntMap(TFloatIntHashMap map) { Queue ref = floatIntHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); floatIntHashMap.set(ref); } map.clear(); @@ -245,7 +245,7 @@ public class CacheRecycler { public static void pushDoubleIntMap(TDoubleIntHashMap map) { Queue ref = doubleIntHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); doubleIntHashMap.set(ref); } map.clear(); @@ -273,7 +273,7 @@ public class CacheRecycler { public static void pushByteIntMap(TByteIntHashMap map) { Queue ref = byteIntHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); byteIntHashMap.set(ref); } map.clear(); @@ -300,7 +300,7 @@ public class CacheRecycler { public static void pushShortIntMap(TShortIntHashMap map) { Queue ref = shortIntHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); shortIntHashMap.set(ref); } map.clear(); @@ -328,7 +328,7 @@ public class CacheRecycler { public static void pushLongIntMap(TLongIntHashMap map) { Queue ref = longIntHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); longIntHashMap.set(ref); } map.clear(); @@ -356,7 +356,7 @@ public class CacheRecycler { public static void pushObjectIntMap(TObjectIntHashMap map) { Queue ref = objectIntHashMap.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); objectIntHashMap.set(ref); } map.clear(); @@ -386,7 +386,7 @@ public class CacheRecycler { public static void pushObjectArray(Object[] objects) { Queue ref = objectArray.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); objectArray.set(ref); } Arrays.fill(objects, null); @@ -435,7 +435,7 @@ public class CacheRecycler { public static void pushIntArray(int[] ints, int sentinal) { Queue ref = intArray.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); intArray.set(ref); } Arrays.fill(ints, sentinal); diff --git a/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java index 9322e6b9abd..c35d8b63a1c 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java @@ -19,10 +19,10 @@ package org.elasticsearch.common.io.stream; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.io.UTF8StreamWriter; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.monitor.jvm.JvmInfo; import java.io.IOException; @@ -146,7 +146,7 @@ public class CachedStreamOutput { } Queue ref = cache.get(); if (ref == null) { - ref = new LinkedTransferQueue(); + ref = ConcurrentCollections.newQueue(); counter.set(0); cache.set(ref); } diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java index 31021421376..14bd21a9ea7 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java @@ -19,9 +19,12 @@ package org.elasticsearch.common.util.concurrent; +import jsr166y.LinkedTransferQueue; import org.elasticsearch.common.collect.MapBackedSet; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -53,6 +56,13 @@ public abstract class ConcurrentCollections { return new MapBackedSet(new ConcurrentHashMap()); } + public static Queue newQueue() { + return new LinkedTransferQueue(); + } + + public static BlockingQueue newBlockingQueue() { + return new LinkedTransferQueue(); + } private ConcurrentCollections() { diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index b0863cf6809..6d6cd1ad29a 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery.local; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.*; @@ -39,7 +38,6 @@ import org.elasticsearch.transport.TransportService; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -300,7 +298,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private class ClusterGroup { - private Queue members = new LinkedTransferQueue(); + private Queue members = ConcurrentCollections.newQueue(); Queue members() { return members; diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 89c8454d625..9bb4f1ae5e8 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -20,7 +20,6 @@ package org.elasticsearch.discovery.zen.ping.unicast; import com.google.common.collect.Lists; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalStateException; @@ -78,7 +77,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private final Map> receivedResponses = newConcurrentMap(); // a list of temporal responses a node will return for a request (holds requests from other nodes) - private final Queue temporalResponses = new LinkedTransferQueue(); + private final Queue temporalResponses = ConcurrentCollections.newQueue(); private final CopyOnWriteArrayList hostsProviders = new CopyOnWriteArrayList(); diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 52424c3f917..a872b34b9e9 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -22,7 +22,6 @@ package org.elasticsearch.threadpool; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; -import jsr166y.LinkedTransferQueue; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; @@ -35,10 +34,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsAbortPolicy; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; -import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; +import org.elasticsearch.common.util.concurrent.*; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; @@ -249,7 +245,7 @@ public class ThreadPool extends AbstractComponent { String queueType = settings.get("queue_type", "linked"); BlockingQueue workQueue; if (capacity == null) { - workQueue = new LinkedTransferQueue(); + workQueue = ConcurrentCollections.newBlockingQueue(); } else if ((int) capacity.singles() > 0) { if ("linked".equals(queueType)) { workQueue = new LinkedBlockingQueue((int) capacity.singles());