diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java index 631bac8014f..2c238edfcbe 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.search.type; import jsr166y.LinkedTransferQueue; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -37,8 +36,6 @@ import java.util.Queue; */ public class TransportSearchCache { - private final Queue> cacheShardFailures = new LinkedTransferQueue>(); - private final Queue> cacheDfsResults = new LinkedTransferQueue>(); private final Queue> cacheQueryResults = new LinkedTransferQueue>(); @@ -48,19 +45,6 @@ public class TransportSearchCache { private final Queue> cacheQueryFetchResults = new LinkedTransferQueue>(); - public Collection obtainShardFailures() { - Collection shardFailures; - while ((shardFailures = cacheShardFailures.poll()) == null) { - cacheShardFailures.offer(new LinkedTransferQueue()); - } - return shardFailures; - } - - public void releaseShardFailures(Collection shardFailures) { - shardFailures.clear(); - cacheShardFailures.offer(shardFailures); - } - public Collection obtainDfsResults() { Collection dfsSearchResults; while ((dfsSearchResults = cacheDfsResults.poll()) == null) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index 20bc390d890..3f2aa17e33f 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -153,7 +153,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); } - AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 4eb0ecb0a88..53a0aee72c9 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -160,7 +160,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); } - AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { executeFetchPhase(); @@ -251,7 +251,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); } - AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java index c7425f1942b..1baa5aed7ba 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java @@ -26,7 +26,6 @@ import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Base64; import org.elasticsearch.common.Nullable; @@ -47,20 +46,6 @@ import java.util.Map; public abstract class TransportSearchHelper { - /** - * Builds the shard failures, and releases the cache (meaning this should only be called once!). - */ - public static ShardSearchFailure[] buildShardFailures(Collection shardFailures, TransportSearchCache searchCache) { - ShardSearchFailure[] ret; - if (shardFailures.isEmpty()) { - ret = ShardSearchFailure.EMPTY_ARRAY; - } else { - ret = shardFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); - } - searchCache.releaseShardFailures(shardFailures); - return ret; - } - public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) { InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, numberOfShards, request.searchType()); internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength()); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 6ba16d447f1..b4ef8124a8a 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -161,7 +161,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); } - AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index 8ee85f779d3..b5e6afcff35 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search.type; +import jsr166y.LinkedTransferQueue; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; @@ -37,11 +38,9 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.action.search.type.TransportSearchHelper.buildShardFailures; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; /** @@ -85,7 +84,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent private final DiscoveryNodes nodes; - protected final Collection shardFailures = searchCache.obtainShardFailures(); + private volatile LinkedTransferQueue shardFailures; private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); @@ -104,6 +103,23 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent this.counter = new AtomicInteger(scrollId.context().length); } + protected final ShardSearchFailure[] buildShardFailures() { + LinkedTransferQueue localFailures = shardFailures; + if (localFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new LinkedTransferQueue(); + } + shardFailures.add(failure); + } + public void start() { if (scrollId.context().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); @@ -193,7 +209,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, searchId); } - shardFailures.add(new ShardSearchFailure(t)); + addShardFailure(new ShardSearchFailure(t)); successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); @@ -206,7 +222,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent try { innerFinishHim(); } catch (Exception e) { - listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache))); + listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); } } @@ -219,7 +235,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent } searchCache.releaseQueryFetchResults(queryFetchResults); listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), - System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); + System.currentTimeMillis() - startTime, buildShardFailures())); } } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index 928a737f89f..4eabc61af75 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search.type; +import jsr166y.LinkedTransferQueue; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; @@ -41,11 +42,9 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.action.search.type.TransportSearchHelper.buildShardFailures; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; /** @@ -89,7 +88,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent private final DiscoveryNodes nodes; - protected final Collection shardFailures = searchCache.obtainShardFailures(); + protected volatile LinkedTransferQueue shardFailures; private final Map queryResults = searchCache.obtainQueryResults(); @@ -109,6 +108,23 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent this.successfulOps = new AtomicInteger(scrollId.context().length); } + protected final ShardSearchFailure[] buildShardFailures() { + LinkedTransferQueue localFailures = shardFailures; + if (localFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new LinkedTransferQueue(); + } + shardFailures.add(failure); + } + public void start() { if (scrollId.context().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); @@ -185,7 +201,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, searchId); } - shardFailures.add(new ShardSearchFailure(t)); + addShardFailure(new ShardSearchFailure(t)); successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { executeFetchPhase(); @@ -237,7 +253,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent try { innerFinishHim(); } catch (Exception e) { - listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache))); + listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); } } @@ -248,7 +264,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent scrollId = request.scrollId(); } listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), - System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); + System.currentTimeMillis() - startTime, buildShardFailures())); searchCache.releaseQueryResults(queryResults); searchCache.releaseFetchResults(fetchResults); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index 4b9bfa1be0a..b417573ced9 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search.type; +import jsr166y.LinkedTransferQueue; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; @@ -41,11 +42,9 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.action.search.type.TransportSearchHelper.buildShardFailures; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; /** @@ -89,7 +88,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { private final DiscoveryNodes nodes; - protected final Collection shardFailures = searchCache.obtainShardFailures(); + protected volatile LinkedTransferQueue shardFailures; private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); @@ -108,11 +107,28 @@ public class TransportSearchScrollScanAction extends AbstractComponent { this.counter = new AtomicInteger(scrollId.context().length); } + protected final ShardSearchFailure[] buildShardFailures() { + LinkedTransferQueue localFailures = shardFailures; + if (localFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new LinkedTransferQueue(); + } + shardFailures.add(failure); + } + public void start() { if (scrollId.context().length == 0) { final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.attributes().get("total_hits")), 0.0f), null, false); searchCache.releaseQueryFetchResults(queryFetchResults); - listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache))); + listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, buildShardFailures())); return; } @@ -199,7 +215,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, searchId); } - shardFailures.add(new ShardSearchFailure(t)); + addShardFailure(new ShardSearchFailure(t)); successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); @@ -212,7 +228,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { try { innerFinishHim(); } catch (Exception e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)); + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); if (logger.isDebugEnabled()) { logger.debug("failed to reduce search", failure); } @@ -252,7 +268,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), this.scrollId.attributes()); // continue moving the total_hits } listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), - System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); + System.currentTimeMillis() - startTime, buildShardFailures())); } } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 7ab9f545640..53f77710399 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search.type; +import jsr166y.LinkedTransferQueue; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.TransportAction; @@ -44,7 +45,6 @@ import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; -import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -92,7 +92,7 @@ public abstract class TransportSearchTypeAction extends TransportAction shardFailures = searchCache.obtainShardFailures(); + private volatile LinkedTransferQueue shardFailures; protected volatile ShardDoc[] sortedShardList; @@ -248,9 +248,9 @@ public abstract class TransportSearchTypeAction extends TransportAction localFailures = shardFailures; + if (localFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new LinkedTransferQueue(); + } + shardFailures.add(failure); } /**