add shard specific failure reason to search operations

This commit is contained in:
kimchy 2010-02-20 17:35:26 +02:00
parent f94ff19f33
commit a828106553
29 changed files with 354 additions and 77 deletions

View File

@ -32,6 +32,10 @@ public final class ExceptionsHelper {
return result;
}
public static String detailedMessage(Throwable t) {
return detailedMessage(t, false, 0);
}
public static String detailedMessage(Throwable t, boolean newLines, int initialCounter) {
int counter = initialCounter + 1;
if (t.getCause() != null) {

View File

@ -30,6 +30,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static org.elasticsearch.action.search.ShardSearchFailure.*;
import static org.elasticsearch.search.internal.InternalSearchResponse.*;
/**
@ -45,14 +46,17 @@ public class SearchResponse implements ActionResponse, ToJson {
private int successfulShards;
private ShardSearchFailure[] shardFailures;
public SearchResponse() {
}
public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards) {
public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards, ShardSearchFailure[] shardFailures) {
this.internalResponse = internalResponse;
this.scrollId = scrollId;
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.shardFailures = shardFailures;
}
public SearchHits hits() {
@ -75,6 +79,10 @@ public class SearchResponse implements ActionResponse, ToJson {
return totalShards - successfulShards;
}
public ShardSearchFailure[] shardFailures() {
return this.shardFailures;
}
public String scrollId() {
return scrollId;
}
@ -93,6 +101,21 @@ public class SearchResponse implements ActionResponse, ToJson {
builder.field("total", totalShards());
builder.field("successful", successfulShards());
builder.field("failed", failedShards());
if (shardFailures.length > 0) {
builder.startArray("failures");
for (ShardSearchFailure shardFailure : shardFailures) {
builder.startObject();
if (shardFailure.shard() != null) {
builder.field("index", shardFailure.shard().index());
builder.field("shardId", shardFailure.shard().shardId());
}
builder.field("reason", shardFailure.reason());
builder.endObject();
}
builder.endArray();
}
builder.endObject();
internalResponse.toJson(builder, params);
}
@ -101,6 +124,15 @@ public class SearchResponse implements ActionResponse, ToJson {
internalResponse = readInternalSearchResponse(in);
totalShards = in.readInt();
successfulShards = in.readInt();
int size = in.readInt();
if (size == 0) {
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
} else {
shardFailures = new ShardSearchFailure[size];
for (int i = 0; i < shardFailures.length; i++) {
shardFailures[i] = readShardSearchFailure(in);
}
}
if (in.readBoolean()) {
scrollId = in.readUTF();
}
@ -110,6 +142,12 @@ public class SearchResponse implements ActionResponse, ToJson {
internalResponse.writeTo(out);
out.writeInt(totalShards);
out.writeInt(successfulShards);
out.writeInt(shardFailures.length);
for (ShardSearchFailure shardSearchFailure : shardFailures) {
shardSearchFailure.writeTo(out);
}
if (scrollId == null) {
out.writeBoolean(false);
} else {

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.io.Streamable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static org.elasticsearch.search.SearchShardTarget.*;
/**
* Represents a failure to search on a specific shard.
*
* @author kimchy (shay.banon)
*/
public class ShardSearchFailure implements Streamable {
public static final ShardSearchFailure[] EMPTY_ARRAY = new ShardSearchFailure[0];
private SearchShardTarget shardTarget;
private String reason;
private ShardSearchFailure() {
}
public ShardSearchFailure(Throwable t) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual != null && actual instanceof SearchException) {
this.shardTarget = ((SearchException) actual).shard();
}
this.reason = ExceptionsHelper.detailedMessage(t);
}
public ShardSearchFailure(String reason, SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
this.reason = reason;
}
@Nullable public SearchShardTarget shard() {
return this.shardTarget;
}
public String reason() {
return this.reason;
}
public static ShardSearchFailure readShardSearchFailure(DataInput in) throws IOException, ClassNotFoundException {
ShardSearchFailure shardSearchFailure = new ShardSearchFailure();
shardSearchFailure.readFrom(in);
return shardSearchFailure;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
if (in.readBoolean()) {
shardTarget = readSearchShardTarget(in);
}
reason = in.readUTF();
}
@Override public void writeTo(DataOutput out) throws IOException {
if (shardTarget == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
shardTarget.writeTo(out);
}
out.writeUTF(reason);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search.type;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
@ -36,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class TransportSearchCache {
private final Queue<Collection<ShardSearchFailure>> cacheShardFailures = new ConcurrentLinkedQueue<Collection<ShardSearchFailure>>();
private final Queue<Collection<DfsSearchResult>> cacheDfsResults = new ConcurrentLinkedQueue<Collection<DfsSearchResult>>();
private final Queue<Map<SearchShardTarget, QuerySearchResultProvider>> cacheQueryResults = new ConcurrentLinkedQueue<Map<SearchShardTarget, QuerySearchResultProvider>>();
@ -44,6 +47,21 @@ public class TransportSearchCache {
private final Queue<Map<SearchShardTarget, QueryFetchSearchResult>> cacheQueryFetchResults = new ConcurrentLinkedQueue<Map<SearchShardTarget, QueryFetchSearchResult>>();
public Collection<ShardSearchFailure> obtainShardFailures() {
Collection<ShardSearchFailure> shardFailures;
while ((shardFailures = cacheShardFailures.poll()) == null) {
cacheShardFailures.offer(new ConcurrentLinkedQueue<ShardSearchFailure>());
}
shardFailures.clear();
return shardFailures;
}
public void releaseShardFailures(Collection<ShardSearchFailure> shardFailures) {
shardFailures.clear();
cacheShardFailures.offer(shardFailures);
}
public Collection<DfsSearchResult> obtainDfsResults() {
Collection<DfsSearchResult> dfsSearchResults;
while ((dfsSearchResults = cacheDfsResults.poll()) == null) {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -63,9 +64,9 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
private class AsyncAction extends BaseAsyncAction<DfsSearchResult> {
private final Collection<DfsSearchResult> dfsResults = transportSearchCache.obtainDfsResults();
private final Collection<DfsSearchResult> dfsResults = searchCache.obtainDfsResults();
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = transportSearchCache.obtainQueryFetchResults();
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
@ -105,7 +106,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
executeSecondPhase(counter, node, querySearchRequest);
}
}
transportSearchCache.releaseDfsResults(dfsResults);
searchCache.releaseDfsResults(dfsResults);
}
});
} else {
@ -125,7 +126,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
}
}
}
transportSearchCache.releaseDfsResults(dfsResults);
searchCache.releaseDfsResults(dfsResults);
}
}
}
@ -143,6 +144,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute query phase", t);
}
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -159,15 +161,15 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
scrollIdX = buildScrollId(request.searchType(), queryFetchResults.values());
}
final String scrollId = scrollIdX;
transportSearchCache.releaseQueryFetchResults(queryFetchResults);
searchCache.releaseQueryFetchResults(queryFetchResults);
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
});
} else {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -65,11 +66,11 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
private class AsyncAction extends BaseAsyncAction<DfsSearchResult> {
private final Collection<DfsSearchResult> dfsResults = transportSearchCache.obtainDfsResults();
private final Collection<DfsSearchResult> dfsResults = searchCache.obtainDfsResults();
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = transportSearchCache.obtainQueryResults();
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = transportSearchCache.obtainFetchResults();
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
@ -111,7 +112,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
executeQuery(counter, querySearchRequest, node);
}
}
transportSearchCache.releaseDfsResults(dfsResults);
searchCache.releaseDfsResults(dfsResults);
}
});
} else {
@ -131,7 +132,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
}
transportSearchCache.releaseDfsResults(dfsResults);
searchCache.releaseDfsResults(dfsResults);
}
}
}
@ -149,6 +150,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute query phase", t);
}
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
@ -224,6 +226,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute fetch phase", t);
}
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -239,16 +242,16 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
scrollIdX = TransportSearchHelper.buildScrollId(request.searchType(), fetchResults.values());
}
final String scrollId = scrollIdX;
transportSearchCache.releaseQueryResults(queryResults);
transportSearchCache.releaseFetchResults(fetchResults);
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
});
} else {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
}
}

View File

@ -23,12 +23,14 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.util.Tuple;
import java.util.Collection;
import java.util.regex.Pattern;
/**
@ -43,6 +45,20 @@ public abstract class TransportSearchHelper {
scrollIdPattern = Pattern.compile(";");
}
/**
* Builds the shard failures, and releases the cache (meaning this should only be called once!).
*/
public static ShardSearchFailure[] buildShardFailures(Collection<ShardSearchFailure> shardFailures, TransportSearchCache searchCache) {
ShardSearchFailure[] ret;
if (shardFailures.isEmpty()) {
ret = ShardSearchFailure.EMPTY_ARRAY;
} else {
ret = shardFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}
searchCache.releaseShardFailures(shardFailures);
return ret;
}
public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, SearchRequest request) {
InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, request.source());
internalRequest.from(request.from()).size(request.size());

View File

@ -57,7 +57,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
private class AsyncAction extends BaseAsyncAction<QueryFetchSearchResult> {
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = transportSearchCache.obtainQueryFetchResults();
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
@ -80,15 +80,15 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
scrollIdX = buildScrollId(request.searchType(), queryFetchResults.values());
}
final String scrollId = scrollIdX;
transportSearchCache.releaseQueryFetchResults(queryFetchResults);
searchCache.releaseQueryFetchResults(queryFetchResults);
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
});
} else {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -61,9 +62,9 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = transportSearchCache.obtainQueryResults();
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = transportSearchCache.obtainFetchResults();
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
@ -146,6 +147,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute fetch phase", t);
}
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -160,9 +162,9 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), fetchResults.values());
}
transportSearchCache.releaseQueryResults(queryResults);
transportSearchCache.releaseFetchResults(fetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get()));
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
}
}

View File

@ -23,6 +23,7 @@ import com.google.inject.Inject;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.cluster.node.Nodes;
@ -41,9 +42,12 @@ import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.trove.ExtTIntArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
/**
* @author kimchy (Shay Banon)
*/
@ -55,14 +59,14 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
private final SearchPhaseController searchPhaseController;
private final TransportSearchCache transportSearchCache;
private final TransportSearchCache searchCache;
@Inject public TransportSearchScrollQueryThenFetchAction(Settings settings, ClusterService clusterService,
TransportSearchCache transportSearchCache,
TransportSearchCache searchCache,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.clusterService = clusterService;
this.transportSearchCache = transportSearchCache;
this.searchCache = searchCache;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
@ -81,9 +85,11 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
private final Nodes nodes;
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = transportSearchCache.obtainQueryResults();
protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures();
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = transportSearchCache.obtainFetchResults();
private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
private volatile ShardDoc[] sortedShardList;
@ -122,6 +128,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute query phase", t);
}
shardFailures.add(new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
@ -174,9 +181,9 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), fetchResults.values());
}
transportSearchCache.releaseQueryResults(queryResults);
transportSearchCache.releaseFetchResults(fetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get()));
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(), buildShardFailures(shardFailures, searchCache)));
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -40,6 +41,7 @@ import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.settings.Settings;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
@ -61,14 +63,14 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
protected final SearchPhaseController searchPhaseController;
protected final TransportSearchCache transportSearchCache;
protected final TransportSearchCache searchCache;
public TransportSearchTypeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
TransportSearchCache searchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportSearchCache = transportSearchCache;
this.searchCache = searchCache;
this.indicesService = indicesService;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
@ -92,6 +94,8 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
protected final AtomicInteger totalOps = new AtomicInteger();
protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures();
protected volatile ShardDoc[] sortedShardList;
protected BaseAsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
@ -162,9 +166,6 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
private void performFirstPhase(final Iterator<ShardRouting> shardIt) {
if (!shardIt.hasNext()) {
return;
}
final ShardRouting shard = shardIt.next();
if (!shard.active()) {
// as if we have a "problem", so we iterate to the next one and maintain counts
@ -204,11 +205,25 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
}
if (totalOps.incrementAndGet() == expectedTotalOps) {
// no more shards, add a failure
shardFailures.add(new ShardSearchFailure(t));
moveToSecondPhase();
} else {
if (shardIt.hasNext()) {
performFirstPhase(shardIt);
} else {
// no more shards, add a failure
shardFailures.add(new ShardSearchFailure(t));
}
}
}
/**
* Builds the shard failures, and releases the cache (meaning this should only be called once!).
*/
protected ShardSearchFailure[] buildShardFailures() {
return TransportSearchHelper.buildShardFailures(shardFailures, searchCache);
}
protected abstract void sendExecuteFirstPhase(Node node, InternalSearchRequest request, SearchServiceListener<FirstResult> listener);

View File

@ -19,13 +19,13 @@
package org.elasticsearch.rest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.util.io.FastCharArrayWriter;
import org.elasticsearch.util.json.JsonBuilder;
import java.io.IOException;
import java.io.PrintWriter;
import static org.elasticsearch.ExceptionsHelper.*;
import static org.elasticsearch.util.json.JsonBuilder.*;
/**
@ -60,7 +60,7 @@ public class JsonThrowableRestResponse extends JsonRestResponse {
holder.writer.reset();
t.printStackTrace(holder.printWriter);
JsonBuilder builder = jsonBuilder().prettyPrint()
.startObject().field("error", ExceptionsHelper.detailedMessage(t, false, 0));
.startObject().field("error", detailedMessage(t));
if (request.paramAsBoolean("errorTrace", false)) {
builder.startObject("errorTrace");
boolean first = true;

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search;
import org.elasticsearch.search.internal.SearchContext;
/**
* @author kimchy (shay.banon)
*/
public class SearchContextException extends SearchException {
public SearchContextException(SearchContext context, String msg) {
super(context.shardTarget(), buildMessage(context, msg));
}
public SearchContextException(SearchContext context, String msg, Throwable t) {
super(context.shardTarget(), buildMessage(context, msg), t);
}
private static String buildMessage(SearchContext context, String msg) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(context.shardTarget().index()).append("][").append(context.shardTarget().shardId()).append("]: ");
sb.append("query[").append(context.query()).append("],from[").append(context.from()).append("],size[").append(context.size()).append("]");
if (context.sort() != null) {
sb.append(",sort[").append(context.sort()).append("]");
}
return sb.append(": ").append(msg).toString();
}
}

View File

@ -26,11 +26,19 @@ import org.elasticsearch.ElasticSearchException;
*/
public class SearchException extends ElasticSearchException {
public SearchException(String msg) {
private final SearchShardTarget shardTarget;
public SearchException(SearchShardTarget shardTarget, String msg) {
super(msg);
this.shardTarget = shardTarget;
}
public SearchException(String msg, Throwable cause) {
public SearchException(SearchShardTarget shardTarget, String msg, Throwable cause) {
super(msg, cause);
this.shardTarget = shardTarget;
}
public SearchShardTarget shard() {
return this.shardTarget;
}
}

View File

@ -19,16 +19,18 @@
package org.elasticsearch.search;
import org.elasticsearch.search.internal.SearchContext;
/**
* @author kimchy (Shay Banon)
*/
public class SearchParseException extends SearchException {
public class SearchParseException extends SearchContextException {
public SearchParseException(String msg) {
super(msg);
public SearchParseException(SearchContext context, String msg) {
super(context, "Parse Failure [" + msg + "]");
}
public SearchParseException(String msg, Throwable cause) {
super(msg, cause);
public SearchParseException(SearchContext context, String msg, Throwable cause) {
super(context, "Parse Failure [" + msg + "]", cause);
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.timer.TimerService;
@ -172,7 +173,7 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
try {
context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity()));
} catch (IOException e) {
throw new SearchException("Failed to set aggregated df", e);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
queryPhase.execute(context);
return context.queryResult();
@ -194,7 +195,7 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
try {
context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity()));
} catch (IOException e) {
throw new SearchException("Failed to set aggregated df", e);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
queryPhase.execute(context);
shortcutDocIdsToLoad(context);
@ -297,7 +298,7 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
jp.nextToken();
SearchParseElement element = elementParsers.get(fieldName);
if (element == null) {
throw new SearchParseException("No parser for element [" + fieldName + "]");
throw new SearchParseException(context, "No parser for element [" + fieldName + "]");
}
element.parse(jp, context);
} else if (token == null) {
@ -305,7 +306,7 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
}
}
} catch (Exception e) {
throw new SearchParseException("Failed to parse [" + context.source() + "]", e);
throw new SearchParseException(context, "Failed to parse [" + context.source() + "]", e);
}
}

View File

@ -24,13 +24,14 @@ import org.elasticsearch.util.io.Streamable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
/**
* The target that the search request was executed on.
*
* @author kimchy (Shay Banon)
*/
public class SearchShardTarget implements Streamable {
public class SearchShardTarget implements Streamable, Serializable {
private String nodeId;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.builder;
import org.elasticsearch.index.query.json.JsonQueryBuilder;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.json.ToJson;
@ -123,7 +124,7 @@ public class SearchSourceBuilder {
return this;
}
public String build() {
public String build() throws SearchException {
try {
JsonBuilder builder = JsonBuilder.jsonBuilder();
builder.startObject();

View File

@ -19,12 +19,12 @@
package org.elasticsearch.search.builder;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.ElasticSearchException;
/**
* @author kimchy (Shay Banon)
*/
public class SearchSourceBuilderException extends SearchException {
public class SearchSourceBuilderException extends ElasticSearchException {
public SearchSourceBuilderException(String msg) {
super(msg);

View File

@ -49,7 +49,7 @@ public class DfsPhase implements SearchPhase {
context.dfsResult().termsAndFreqs(terms, freqs);
context.dfsResult().maxDoc(context.searcher().getIndexReader().maxDoc());
} catch (Exception e) {
throw new DfsPhaseExecutionException(context);
throw new DfsPhaseExecutionException(context, "", e);
}
}
}

View File

@ -19,15 +19,15 @@
package org.elasticsearch.search.dfs;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.SearchContextException;
import org.elasticsearch.search.internal.SearchContext;
/**
* @author kimchy (Shay Banon)
*/
public class DfsPhaseExecutionException extends SearchException {
public class DfsPhaseExecutionException extends SearchContextException {
public DfsPhaseExecutionException(SearchContext context) {
super("Failed to execute dfs [" + context.query() + "]");
public DfsPhaseExecutionException(SearchContext context, String msg, Throwable t) {
super(context, "Dfs Failed [" + msg + "]", t);
}
}

View File

@ -61,7 +61,7 @@ public class FacetsParseElement implements SearchParseElement {
} else if ("idset".equals(text)) {
queryExecutionType = SearchContextFacets.QueryExecutionType.IDSET;
} else {
throw new SearchParseException("Unsupported query type [" + text + "]");
throw new SearchParseException(context, "Unsupported query type [" + text + "]");
}
} else {
@ -79,7 +79,7 @@ public class FacetsParseElement implements SearchParseElement {
}
queryFacets.add(new SearchContextFacets.QueryFacet(topLevelFieldName, facetQuery));
} else {
throw new SearchParseException("Unsupported facet type [" + facetType + "] for facet name [" + topLevelFieldName + "]");
throw new SearchParseException(context, "Unsupported facet type [" + facetType + "] for facet name [" + topLevelFieldName + "]");
}
jp.nextToken();
}

View File

@ -19,19 +19,19 @@
package org.elasticsearch.search.fetch;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.search.SearchContextException;
import org.elasticsearch.search.internal.SearchContext;
/**
* @author kimchy (Shay Banon)
*/
public class FetchPhaseExecutionException extends ElasticSearchException {
public class FetchPhaseExecutionException extends SearchContextException {
public FetchPhaseExecutionException(SearchContext context, String msg) {
this(context, msg, null);
super(context, "Fetch Failed [" + msg + "]");
}
public FetchPhaseExecutionException(SearchContext context, String msg, Throwable t) {
super("Failed to fetch query [" + context.query() + "], sort [" + context.sort() + "], from [" + context.from() + "], size [" + context.size() + "], reason [" + msg + "]", t);
super(context, "Fetch Failed [" + msg + "]", t);
}
}

View File

@ -48,6 +48,8 @@ public class SearchContext implements Releasable {
private final long id;
private final SearchShardTarget shardTarget;
private final String source;
private final Engine.Searcher engineSearcher;
@ -101,6 +103,7 @@ public class SearchContext implements Releasable {
public SearchContext(long id, SearchShardTarget shardTarget, TimeValue timeout, float queryBoost, String source,
String[] types, Engine.Searcher engineSearcher, IndexService indexService) {
this.id = id;
this.shardTarget = shardTarget;
this.timeout = timeout;
this.queryBoost = queryBoost;
this.source = source;
@ -131,6 +134,10 @@ public class SearchContext implements Releasable {
return this.id;
}
public SearchShardTarget shardTarget() {
return this.shardTarget;
}
public String source() {
return source;
}

View File

@ -85,7 +85,7 @@ public class QueryPhase implements SearchPhase {
}
searchContext.queryResult().topDocs(topDocs);
} catch (Exception e) {
throw new QueryPhaseExecutionException(searchContext, e);
throw new QueryPhaseExecutionException(searchContext, "", e);
}
facetsPhase.execute(searchContext);

View File

@ -19,15 +19,15 @@
package org.elasticsearch.search.query;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.SearchContextException;
import org.elasticsearch.search.internal.SearchContext;
/**
* @author kimchy (Shay Banon)
*/
public class QueryPhaseExecutionException extends SearchException {
public class QueryPhaseExecutionException extends SearchContextException {
public QueryPhaseExecutionException(SearchContext context, Throwable cause) {
super("Failed to execute query [" + context.query() + "], sort [" + context.sort() + "], from [" + context.from() + "], size [" + context.size() + "]", cause);
public QueryPhaseExecutionException(SearchContext context, String msg, Throwable cause) {
super(context, "Query Failed [" + msg + "]", cause);
}
}

View File

@ -99,7 +99,7 @@ public class SortParseElement implements SearchParseElement {
if ("type".equals(innerJsonName)) {
type = sortFieldTypesMapper.get(jp.getText());
if (type == -1) {
throw new SearchParseException("No sort type for [" + jp.getText() + "] with field [" + fieldName + "]");
throw new SearchParseException(context, "No sort type for [" + jp.getText() + "] with field [" + fieldName + "]");
}
}
}
@ -126,7 +126,7 @@ public class SortParseElement implements SearchParseElement {
FieldMappers fieldMappers = context.mapperService().smartNameFieldMappers(fieldName);
if (fieldMappers == null || fieldMappers.mappers().isEmpty()) {
if (type == -1) {
throw new SearchParseException("No built in mapping found for [" + fieldName + "], and no explicit type defined");
throw new SearchParseException(context, "No built in mapping found for [" + fieldName + "], and no explicit type defined");
}
} else {
fieldName = fieldMappers.mappers().get(0).indexName();

View File

@ -30,6 +30,6 @@ import java.lang.annotation.*;
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER, ElementType.FIELD})
@Target({ElementType.PARAMETER, ElementType.FIELD, ElementType.METHOD})
public @interface Nullable {
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.test.integration.search;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.search.Scroll;
@ -234,6 +235,17 @@ public class TransportTwoServersSearchTests extends AbstractServersTests {
testSimpleFacets();
}
@Test public void testFailedSearch() throws Exception {
logger.info("Start Testing failed search");
SearchResponse searchResponse = client.search(searchRequest("test").source("{ xxx }")).actionGet();
assertThat(searchResponse.successfulShards(), equalTo(0));
logger.info("Failures:");
for (ShardSearchFailure searchFailure : searchResponse.shardFailures()) {
logger.info("Reason : " + searchFailure.reason() + ", shard " + searchFailure.shard());
}
logger.info("Done Testing failed search");
}
private void index(Client client, String id, String nameValue, int age) {
client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age))).actionGet();