fix search hanging because of query parsing exception

This commit is contained in:
kimchy 2010-03-20 18:45:02 +02:00
parent bad8643978
commit 35bd7f0086
7 changed files with 86 additions and 17 deletions

View File

@ -21,12 +21,14 @@ package org.elasticsearch.action;
import org.elasticsearch.util.io.stream.Streamable;
import java.io.Serializable;
/**
* An exception indicating that a failure occurred performing an operation on the shard.
*
* @author kimchy (Shay Banon)
*/
public interface ShardOperationFailedException extends Streamable {
public interface ShardOperationFailedException extends Streamable, Serializable {
/**
* The index the operation failed on. Might return <tt>null</tt> if it can't be derived.

View File

@ -19,21 +19,19 @@
package org.elasticsearch.action.search;
import org.elasticsearch.ElasticSearchException;
/**
* A failure during a reduce phase (when receiving results from several shards, and reducing them
* into one or more results and possible actions).
*
* @author kimchy (shay.banon)
*/
public class ReduceSearchPhaseException extends ElasticSearchException {
public class ReduceSearchPhaseException extends SearchPhaseExecutionException {
public ReduceSearchPhaseException(String phaseName, String msg) {
super("Failed to reduce [" + phaseName + "] " + msg);
public ReduceSearchPhaseException(String phaseName, String msg, ShardSearchFailure[] shardFailures) {
super(phaseName, "[reduce] " + msg, shardFailures);
}
public ReduceSearchPhaseException(String phaseName, String msg, Throwable cause) {
super("Failed to reduce [" + phaseName + "]" + msg, cause);
public ReduceSearchPhaseException(String phaseName, String msg, Throwable cause, ShardSearchFailure[] shardFailures) {
super(phaseName, "[reduce] " + msg, cause, shardFailures);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.ElasticSearchException;
/**
* @author kimchy (shay.banon)
*/
public class SearchPhaseExecutionException extends ElasticSearchException {
private final String phaseName;
private ShardSearchFailure[] shardFailures;
public SearchPhaseExecutionException(String phaseName, String msg, ShardSearchFailure[] shardFailures) {
super(buildMessage(phaseName, msg, shardFailures));
this.phaseName = phaseName;
this.shardFailures = shardFailures;
}
public SearchPhaseExecutionException(String phaseName, String msg, Throwable cause, ShardSearchFailure[] shardFailures) {
super(buildMessage(phaseName, msg, shardFailures), cause);
this.phaseName = phaseName;
this.shardFailures = shardFailures;
}
public String phaseName() {
return phaseName;
}
public ShardSearchFailure[] shardFailures() {
return shardFailures;
}
private static final String buildMessage(String phaseName, String msg, ShardSearchFailure[] shardFailures) {
StringBuilder sb = new StringBuilder();
sb.append("Failed to execute [").append(phaseName).append("] ").append(msg);
if (shardFailures != null && shardFailures.length > 0) {
sb.append("; shardFailures ");
for (ShardSearchFailure shardFailure : shardFailures) {
sb.append("{").append(shardFailure.shard()).append(": ").append(shardFailure.reason()).append("}");
}
}
return sb.toString();
}
}

View File

@ -158,7 +158,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
try {
innerFinishHim();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("query_fetch", "", e));
listener.onFailure(new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures()));
}
}

View File

@ -164,7 +164,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
try {
innerExecuteFetchPhase();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("query", "", e));
listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
}
}
@ -248,7 +248,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
try {
innerFinishHim();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e));
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}

View File

@ -161,7 +161,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
try {
innerFinishHim();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e));
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}

View File

@ -194,7 +194,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
try {
moveToSecondPhase();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e));
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
}
}
@ -208,10 +208,15 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
if (totalOps.incrementAndGet() == expectedTotalOps) {
// no more shards, add a failure
shardFailures.add(new ShardSearchFailure(t));
try {
moveToSecondPhase();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e));
if (successulOps.get() == 0) {
// no successful ops, raise an exception
listener.onFailure(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures()));
} else {
try {
moveToSecondPhase();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
}
} else {
if (shardIt.hasNext()) {