Failure exception while executing a valid query after an invalid query, closes #1617.

This commit is contained in:
Shay Banon 2012-01-18 15:08:37 +02:00
parent 534f487de3
commit 64358948ef
9 changed files with 91 additions and 64 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.search.type; package org.elasticsearch.action.search.type;
import jsr166y.LinkedTransferQueue; import jsr166y.LinkedTransferQueue;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.dfs.DfsSearchResult;
@ -37,8 +36,6 @@ import java.util.Queue;
*/ */
public class TransportSearchCache { public class TransportSearchCache {
private final Queue<Collection<ShardSearchFailure>> cacheShardFailures = new LinkedTransferQueue<Collection<ShardSearchFailure>>();
private final Queue<Collection<DfsSearchResult>> cacheDfsResults = new LinkedTransferQueue<Collection<DfsSearchResult>>(); private final Queue<Collection<DfsSearchResult>> cacheDfsResults = new LinkedTransferQueue<Collection<DfsSearchResult>>();
private final Queue<Map<SearchShardTarget, QuerySearchResultProvider>> cacheQueryResults = new LinkedTransferQueue<Map<SearchShardTarget, QuerySearchResultProvider>>(); private final Queue<Map<SearchShardTarget, QuerySearchResultProvider>> cacheQueryResults = new LinkedTransferQueue<Map<SearchShardTarget, QuerySearchResultProvider>>();
@ -48,19 +45,6 @@ public class TransportSearchCache {
private final Queue<Map<SearchShardTarget, QueryFetchSearchResult>> cacheQueryFetchResults = new LinkedTransferQueue<Map<SearchShardTarget, QueryFetchSearchResult>>(); private final Queue<Map<SearchShardTarget, QueryFetchSearchResult>> cacheQueryFetchResults = new LinkedTransferQueue<Map<SearchShardTarget, QueryFetchSearchResult>>();
public Collection<ShardSearchFailure> obtainShardFailures() {
Collection<ShardSearchFailure> shardFailures;
while ((shardFailures = cacheShardFailures.poll()) == null) {
cacheShardFailures.offer(new LinkedTransferQueue<ShardSearchFailure>());
}
return shardFailures;
}
public void releaseShardFailures(Collection<ShardSearchFailure> shardFailures) {
shardFailures.clear();
cacheShardFailures.offer(shardFailures);
}
public Collection<DfsSearchResult> obtainDfsResults() { public Collection<DfsSearchResult> obtainDfsResults() {
Collection<DfsSearchResult> dfsSearchResults; Collection<DfsSearchResult> dfsSearchResults;
while ((dfsSearchResults = cacheDfsResults.poll()) == null) { while ((dfsSearchResults = cacheDfsResults.poll()) == null) {

View File

@ -153,7 +153,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
} }
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
successulOps.decrementAndGet(); successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();

View File

@ -160,7 +160,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
} }
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
successulOps.decrementAndGet(); successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
executeFetchPhase(); executeFetchPhase();
@ -251,7 +251,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
} }
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
successulOps.decrementAndGet(); successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();

View File

@ -26,7 +26,6 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Base64; import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
@ -47,20 +46,6 @@ import java.util.Map;
public abstract class TransportSearchHelper { public abstract class TransportSearchHelper {
/**
* Builds the shard failures, and releases the cache (meaning this should only be called once!).
*/
public static ShardSearchFailure[] buildShardFailures(Collection<ShardSearchFailure> shardFailures, TransportSearchCache searchCache) {
ShardSearchFailure[] ret;
if (shardFailures.isEmpty()) {
ret = ShardSearchFailure.EMPTY_ARRAY;
} else {
ret = shardFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}
searchCache.releaseShardFailures(shardFailures);
return ret;
}
public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) { public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) {
InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, numberOfShards, request.searchType()); InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, numberOfShards, request.searchType());
internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength()); internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength());

View File

@ -161,7 +161,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
} }
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
successulOps.decrementAndGet(); successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search.type; package org.elasticsearch.action.search.type;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -37,11 +38,9 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildShardFailures;
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
/** /**
@ -85,7 +84,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
private final DiscoveryNodes nodes; private final DiscoveryNodes nodes;
protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures(); private volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults(); private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
@ -104,6 +103,23 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
this.counter = new AtomicInteger(scrollId.context().length); this.counter = new AtomicInteger(scrollId.context().length);
} }
protected final ShardSearchFailure[] buildShardFailures() {
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
}
shardFailures.add(failure);
}
public void start() { public void start() {
if (scrollId.context().length == 0) { if (scrollId.context().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
@ -193,7 +209,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId); logger.debug("[{}] Failed to execute query phase", t, searchId);
} }
shardFailures.add(new ShardSearchFailure(t)); addShardFailure(new ShardSearchFailure(t));
successfulOps.decrementAndGet(); successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();
@ -206,7 +222,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
try { try {
innerFinishHim(); innerFinishHim();
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache))); listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
} }
} }
@ -219,7 +235,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
} }
searchCache.releaseQueryFetchResults(queryFetchResults); searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); System.currentTimeMillis() - startTime, buildShardFailures()));
} }
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search.type; package org.elasticsearch.action.search.type;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -41,11 +42,9 @@ import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildShardFailures;
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
/** /**
@ -89,7 +88,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
private final DiscoveryNodes nodes; private final DiscoveryNodes nodes;
protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures(); protected volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults(); private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();
@ -109,6 +108,23 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
this.successfulOps = new AtomicInteger(scrollId.context().length); this.successfulOps = new AtomicInteger(scrollId.context().length);
} }
protected final ShardSearchFailure[] buildShardFailures() {
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
}
shardFailures.add(failure);
}
public void start() { public void start() {
if (scrollId.context().length == 0) { if (scrollId.context().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
@ -185,7 +201,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId); logger.debug("[{}] Failed to execute query phase", t, searchId);
} }
shardFailures.add(new ShardSearchFailure(t)); addShardFailure(new ShardSearchFailure(t));
successfulOps.decrementAndGet(); successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
executeFetchPhase(); executeFetchPhase();
@ -237,7 +253,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
try { try {
innerFinishHim(); innerFinishHim();
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache))); listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
} }
} }
@ -248,7 +264,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
scrollId = request.scrollId(); scrollId = request.scrollId();
} }
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); System.currentTimeMillis() - startTime, buildShardFailures()));
searchCache.releaseQueryResults(queryResults); searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults); searchCache.releaseFetchResults(fetchResults);
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search.type; package org.elasticsearch.action.search.type;
import jsr166y.LinkedTransferQueue;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.*;
@ -41,11 +42,9 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildShardFailures;
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
/** /**
@ -89,7 +88,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
private final DiscoveryNodes nodes; private final DiscoveryNodes nodes;
protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures(); protected volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults(); private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
@ -108,11 +107,28 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
this.counter = new AtomicInteger(scrollId.context().length); this.counter = new AtomicInteger(scrollId.context().length);
} }
protected final ShardSearchFailure[] buildShardFailures() {
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
}
shardFailures.add(failure);
}
public void start() { public void start() {
if (scrollId.context().length == 0) { if (scrollId.context().length == 0) {
final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.attributes().get("total_hits")), 0.0f), null, false); final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.attributes().get("total_hits")), 0.0f), null, false);
searchCache.releaseQueryFetchResults(queryFetchResults); searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache))); listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, buildShardFailures()));
return; return;
} }
@ -199,7 +215,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId); logger.debug("[{}] Failed to execute query phase", t, searchId);
} }
shardFailures.add(new ShardSearchFailure(t)); addShardFailure(new ShardSearchFailure(t));
successfulOps.decrementAndGet(); successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();
@ -212,7 +228,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
try { try {
innerFinishHim(); innerFinishHim();
} catch (Exception e) { } catch (Exception e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)); ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure); logger.debug("failed to reduce search", failure);
} }
@ -252,7 +268,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), this.scrollId.attributes()); // continue moving the total_hits scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), this.scrollId.attributes()); // continue moving the total_hits
} }
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); System.currentTimeMillis() - startTime, buildShardFailures()));
} }
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search.type; package org.elasticsearch.action.search.type;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
@ -44,7 +45,6 @@ import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -92,7 +92,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
private final AtomicInteger totalOps = new AtomicInteger(); private final AtomicInteger totalOps = new AtomicInteger();
protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures(); private volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;
protected volatile ShardDoc[] sortedShardList; protected volatile ShardDoc[] sortedShardList;
@ -248,9 +248,9 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
// no more shards, add a failure // no more shards, add a failure
if (t == null) { if (t == null) {
// no active shards // no active shards
shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()))); addShardFailure(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id())));
} else { } else {
shardFailures.add(new ShardSearchFailure(t)); addShardFailure(new ShardSearchFailure(t));
} }
if (successulOps.get() == 0) { if (successulOps.get() == 0) {
// no successful ops, raise an exception // no successful ops, raise an exception
@ -290,9 +290,9 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
} }
if (t == null) { if (t == null) {
// no active shards // no active shards
shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()))); addShardFailure(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id())));
} else { } else {
shardFailures.add(new ShardSearchFailure(t)); addShardFailure(new ShardSearchFailure(t));
} }
} }
} }
@ -305,11 +305,21 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
return System.currentTimeMillis() - startTime; return System.currentTimeMillis() - startTime;
} }
/**
* Builds the shard failures, and releases the cache (meaning this should only be called once!).
*/
protected final ShardSearchFailure[] buildShardFailures() { protected final ShardSearchFailure[] buildShardFailures() {
return TransportSearchHelper.buildShardFailures(shardFailures, searchCache); LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
}
shardFailures.add(failure);
} }
/** /**