Do not propagate errors from onResult to onFailure.

The way that SearchServiceTransportAction executes actions on a local node
today would propagate exceptions thrown in onResult to onFailure.

Close #5629
This commit is contained in:
Adrien Grand 2014-03-31 20:16:32 +02:00
parent fac4da822d
commit a34378f852
1 changed files with 81 additions and 60 deletions

View File

@ -45,6 +45,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.concurrent.Callable;
/**
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
@ -67,6 +68,26 @@ public class SearchServiceTransportAction extends AbstractComponent {
}
}
private static <T> void execute(Callable<? extends T> callable, SearchServiceListener<T> listener) {
// Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on
// these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure
T result = null;
Throwable error = null;
try {
result = callable.call();
} catch (Throwable t) {
error = t;
} finally {
if (result == null) {
assert error != null;
listener.onFailure(error);
} else {
assert error == null : error;
listener.onResult(result);
}
}
}
private final TransportService transportService;
private final ClusterService clusterService;
@ -164,12 +185,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener<DfsSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
DfsSearchResult result = searchService.executeDfsPhase(request);
listener.onResult(result);
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<DfsSearchResult>() {
@Override
public DfsSearchResult call() throws Exception {
return searchService.executeDfsPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, SearchDfsTransportHandler.ACTION, request, new BaseTransportResponseHandler<DfsSearchResult>() {
@ -198,12 +219,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener<QuerySearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
QuerySearchResult result = searchService.executeQueryPhase(request);
listener.onResult(result);
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<QuerySearchResult>() {
@Override
public QuerySearchResult call() throws Exception {
return searchService.executeQueryPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, SearchQueryTransportHandler.ACTION, request, new BaseTransportResponseHandler<QuerySearchResult>() {
@ -232,12 +253,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final SearchServiceListener<QuerySearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
QuerySearchResult result = searchService.executeQueryPhase(request);
listener.onResult(result);
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<QuerySearchResult>() {
@Override
public QuerySearchResult call() throws Exception {
return searchService.executeQueryPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, SearchQueryByIdTransportHandler.ACTION, request, new BaseTransportResponseHandler<QuerySearchResult>() {
@ -266,12 +287,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener<QuerySearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request);
listener.onResult(result.queryResult());
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<QuerySearchResult>() {
@Override
public QuerySearchResult call() throws Exception {
return searchService.executeQueryPhase(request).queryResult();
}
}, listener);
} else {
transportService.sendRequest(node, SearchQueryScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler<ScrollQuerySearchResult>() {
@ -300,12 +321,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
listener.onResult(result);
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<QueryFetchSearchResult>() {
@Override
public QueryFetchSearchResult call() throws Exception {
return searchService.executeFetchPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, SearchQueryFetchTransportHandler.ACTION, request, new BaseTransportResponseHandler<QueryFetchSearchResult>() {
@ -334,12 +355,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
listener.onResult(result);
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<QueryFetchSearchResult>() {
@Override
public QueryFetchSearchResult call() throws Exception {
return searchService.executeFetchPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, SearchQueryQueryFetchTransportHandler.ACTION, request, new BaseTransportResponseHandler<QueryFetchSearchResult>() {
@ -368,12 +389,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request);
listener.onResult(result.result());
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<QueryFetchSearchResult>() {
@Override
public QueryFetchSearchResult call() throws Exception {
return searchService.executeFetchPhase(request).result();
}
}, listener);
} else {
transportService.sendRequest(node, SearchQueryFetchScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler<ScrollQueryFetchSearchResult>() {
@ -402,12 +423,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteFetch(DiscoveryNode node, final FetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
FetchSearchResult result = searchService.executeFetchPhase(request);
listener.onResult(result);
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<FetchSearchResult>() {
@Override
public FetchSearchResult call() throws Exception {
return searchService.executeFetchPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, SearchFetchByIdTransportHandler.ACTION, request, new BaseTransportResponseHandler<FetchSearchResult>() {
@ -436,12 +457,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteScan(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener<QuerySearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
QuerySearchResult result = searchService.executeScan(request);
listener.onResult(result);
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<QuerySearchResult>() {
@Override
public QuerySearchResult call() throws Exception {
return searchService.executeScan(request);
}
}, listener);
} else {
transportService.sendRequest(node, SearchScanTransportHandler.ACTION, request, new BaseTransportResponseHandler<QuerySearchResult>() {
@ -470,12 +491,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteScan(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
ScrollQueryFetchSearchResult result = searchService.executeScan(request);
listener.onResult(result.result());
} catch (Throwable e) {
listener.onFailure(e);
}
execute(new Callable<QueryFetchSearchResult>() {
@Override
public QueryFetchSearchResult call() throws Exception {
return searchService.executeScan(request).result();
}
}, listener);
} else {
transportService.sendRequest(node, SearchScanScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler<ScrollQueryFetchSearchResult>() {