Better exception handling in actions when forking to a thread pool

An execution on a thread pool might be rejected due to its settings, have better handling in those cases across the actions we have.
closes #3524
This commit is contained in:
Shay Banon 2013-08-16 21:56:24 +02:00
parent b11f81d744
commit ad0eeef859
14 changed files with 383 additions and 291 deletions

View File

@ -115,15 +115,19 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) { if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
if (localAsync) { try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { if (localAsync) {
@Override threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
public void run() { @Override
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); public void run() {
} executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}); }
} else { });
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); } else {
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
} catch (Throwable t) {
onSecondPhaseFailure(t, querySearchRequest, entry.index, dfsResult, counter);
} }
} }
} }
@ -144,18 +148,22 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) { onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
} }
}); });
} }
void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
void finishHim() { void finishHim() {
try { try {
innerFinishHim(); innerFinishHim();

View File

@ -124,15 +124,19 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) { if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
if (localAsync) { try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { if (localAsync) {
@Override threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
public void run() { @Override
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); public void run() {
} executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}); }
} else { });
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); } else {
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
} catch (Throwable t) {
onQueryFailure(t, querySearchRequest, entry.index, dfsResult, counter);
} }
} }
} }
@ -153,18 +157,22 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) { onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
} }
}); });
} }
void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}
void executeFetchPhase() { void executeFetchPhase() {
try { try {
innerExecuteFetchPhase(); innerExecuteFetchPhase();
@ -217,15 +225,19 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) { if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
if (localAsync) { try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { if (localAsync) {
@Override threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
public void run() { @Override
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); public void run() {
} executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}); }
} else { });
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } else {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
} catch (Throwable t) {
onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter);
} }
} }
} }
@ -246,18 +258,22 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) { onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
} }
}); });
} }
void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
void finishHim() { void finishHim() {
try { try {
innerFinishHim(); innerFinishHim();

View File

@ -126,15 +126,19 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) { if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value); final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
if (localAsync) { try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { if (localAsync) {
@Override threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
public void run() { @Override
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); public void run() {
} executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}); }
} else { });
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } else {
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
} catch (Throwable t) {
onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter);
} }
} }
} }
@ -155,18 +159,22 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) { onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
} }
}); });
} }
void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
void finishHim() { void finishHim() {
try { try {
innerFinishHim(); innerFinishHim();

View File

@ -170,15 +170,19 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
final int shardIndex = i; final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1()); final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) { if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) { try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { if (localAsync) {
@Override threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
public void run() { @Override
executePhase(shardIndex, node, target.v2()); public void run() {
} executePhase(shardIndex, node, target.v2());
}); }
} else { });
executePhase(shardIndex, node, target.v2()); } else {
executePhase(shardIndex, node, target.v2());
}
} catch (Throwable t) {
onPhaseFailure(t, target.v2(), shardIndex);
} }
} }
} }
@ -200,7 +204,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
} }
} }
private void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() { searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
@Override @Override
public void onResult(QueryFetchSearchResult result) { public void onResult(QueryFetchSearchResult result) {
@ -212,18 +216,22 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) { onPhaseFailure(t, searchId, shardIndex);
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
} }
}); });
} }
private void onPhaseFailure(Throwable t, long searchId, int shardIndex) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() { private void finishHim() {
try { try {
innerFinishHim(); innerFinishHim();

View File

@ -176,15 +176,19 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
final int shardIndex = i; final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1()); final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) { if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) { try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { if (localAsync) {
@Override threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
public void run() { @Override
executeQueryPhase(shardIndex, counter, node, target.v2()); public void run() {
} executeQueryPhase(shardIndex, counter, node, target.v2());
}); }
} else { });
executeQueryPhase(shardIndex, counter, node, target.v2()); } else {
executeQueryPhase(shardIndex, counter, node, target.v2());
}
} catch (Throwable t) {
onQueryPhaseFailure(shardIndex, counter, target.v2(), t);
} }
} }
} }
@ -204,18 +208,22 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) { onQueryPhaseFailure(shardIndex, counter, searchId, t);
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
} }
}); });
} }
void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}
private void executeFetchPhase() { private void executeFetchPhase() {
sortedShardList = searchPhaseController.sortDocs(queryResults); sortedShardList = searchPhaseController.sortDocs(queryResults);
AtomicArray<ExtTIntArrayList> docIdsToLoad = new AtomicArray<ExtTIntArrayList>(queryResults.length()); AtomicArray<ExtTIntArrayList> docIdsToLoad = new AtomicArray<ExtTIntArrayList>(queryResults.length());

View File

@ -172,15 +172,19 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
final int shardIndex = i; final int shardIndex = i;
final DiscoveryNode node = nodes.get(target.v1()); final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) { if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) { try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { if (localAsync) {
@Override threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
public void run() { @Override
executePhase(shardIndex, node, target.v2()); public void run() {
} executePhase(shardIndex, node, target.v2());
}); }
} else { });
executePhase(shardIndex, node, target.v2()); } else {
executePhase(shardIndex, node, target.v2());
}
} catch (Throwable t) {
onPhaseFailure(t, target.v2(), shardIndex);
} }
} }
} }
@ -202,7 +206,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
} }
} }
private void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() { searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
@Override @Override
public void onResult(QueryFetchSearchResult result) { public void onResult(QueryFetchSearchResult result) {
@ -214,18 +218,22 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) { onPhaseFailure(t, searchId, shardIndex);
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
} }
}); });
} }
void onPhaseFailure(Throwable t, long searchId, int shardIndex) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() { private void finishHim() {
try { try {
innerFinishHim(); innerFinishHim();

View File

@ -179,12 +179,16 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
if (shard != null) { if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) { if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (localAsync) { if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { try {
@Override threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
public void run() { @Override
performFirstPhase(fShardIndex, shardIt); public void run() {
} performFirstPhase(fShardIndex, shardIt);
}); }
});
} catch (Throwable t) {
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t);
}
} else { } else {
performFirstPhase(fShardIndex, shardIt); performFirstPhase(fShardIndex, shardIt);
} }

View File

@ -214,54 +214,54 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// no more active shards... (we should not really get here, just safety) // no more active shards... (we should not really get here, just safety)
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else { } else {
final ShardRequest shardRequest = newShardRequest(shard, request); try {
if (shard.currentNodeId().equals(nodes.localNodeId())) { final ShardRequest shardRequest = newShardRequest(shard, request);
if (localAsync) { if (shard.currentNodeId().equals(nodes.localNodeId())) {
threadPool.executor(executor).execute(new Runnable() { if (localAsync) {
@Override threadPool.executor(executor).execute(new Runnable() {
public void run() { @Override
try { public void run() {
onOperation(shard, shardIndex, shardOperation(shardRequest)); try {
} catch (Exception e) { onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
});
} else {
onOperation(shard, shardIndex, shardOperation(shardRequest));
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
// no node connected, act as failure
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
@Override
public ShardResponse newInstance() {
return newShardResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ShardResponse response) {
onOperation(shard, shardIndex, response);
}
@Override
public void handleException(TransportException e) {
onOperation(shard, shardIt, shardIndex, e); onOperation(shard, shardIt, shardIndex, e);
} }
} });
});
} else {
try {
onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
} }
} }
} else { } catch (Throwable e) {
DiscoveryNode node = nodes.get(shard.currentNodeId()); onOperation(shard, shardIt, shardIndex, e);
if (node == null) {
// no node connected, act as failure
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
@Override
public ShardResponse newInstance() {
return newShardResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ShardResponse response) {
onOperation(shard, shardIndex, response);
}
@Override
public void handleException(TransportException e) {
onOperation(shard, shardIt, shardIndex, e);
}
});
}
} }
} }
} }

View File

@ -137,16 +137,20 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
} }
}); });
} else { } else {
threadPool.executor(executor).execute(new Runnable() { try {
@Override threadPool.executor(executor).execute(new Runnable() {
public void run() { @Override
try { public void run() {
masterOperation(request, clusterService.state(), listener); try {
} catch (Throwable e) { masterOperation(request, clusterService.state(), listener);
listener.onFailure(e); } catch (Throwable e) {
listener.onFailure(e);
}
} }
} });
}); } catch (Throwable t) {
listener.onFailure(t);
}
} }
} else { } else {
if (nodes.masterNode() == null) { if (nodes.masterNode() == null) {

View File

@ -142,55 +142,59 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
transportRequestOptions.withCompress(transportCompress()); transportRequestOptions.withCompress(transportCompress());
for (final String nodeId : nodesIds) { for (final String nodeId : nodesIds) {
final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId); final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) { try {
threadPool.executor(executor()).execute(new Runnable() { if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {
@Override threadPool.executor(executor()).execute(new Runnable() {
public void run() {
try {
onOperation(nodeOperation(newNodeRequest(clusterState.nodes().localNodeId(), request)));
} catch (Throwable e) {
onFailure(clusterState.nodes().localNodeId(), e);
}
}
});
} else if (nodeId.equals("_master")) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
try {
onOperation(nodeOperation(newNodeRequest(clusterState.nodes().masterNodeId(), request)));
} catch (Throwable e) {
onFailure(clusterState.nodes().masterNodeId(), e);
}
}
});
} else {
if (node == null) {
onFailure(nodeId, new NoSuchNodeException(nodeId));
} else {
NodeRequest nodeRequest = newNodeRequest(nodeId, request);
transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
@Override @Override
public NodeResponse newInstance() { public void run() {
return newNodeResponse(); try {
} onOperation(nodeOperation(newNodeRequest(clusterState.nodes().localNodeId(), request)));
} catch (Throwable e) {
@Override onFailure(clusterState.nodes().localNodeId(), e);
public void handleResponse(NodeResponse response) { }
onOperation(response);
}
@Override
public void handleException(TransportException exp) {
onFailure(node.id(), exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
} }
}); });
} else if (nodeId.equals("_master")) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
try {
onOperation(nodeOperation(newNodeRequest(clusterState.nodes().masterNodeId(), request)));
} catch (Throwable e) {
onFailure(clusterState.nodes().masterNodeId(), e);
}
}
});
} else {
if (node == null) {
onFailure(nodeId, new NoSuchNodeException(nodeId));
} else {
NodeRequest nodeRequest = newNodeRequest(nodeId, request);
transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse newInstance() {
return newNodeResponse();
}
@Override
public void handleResponse(NodeResponse response) {
onOperation(response);
}
@Override
public void handleException(TransportException exp) {
onFailure(node.id(), exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
} }
} catch (Throwable t) {
onFailure(nodeId, t);
} }
} }
} }

View File

@ -411,16 +411,20 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
foundPrimary = true; foundPrimary = true;
if (shard.currentNodeId().equals(clusterState.nodes().localNodeId())) { if (shard.currentNodeId().equals(clusterState.nodes().localNodeId())) {
if (request.operationThreaded()) { try {
request.beforeLocalFork(); if (request.operationThreaded()) {
threadPool.executor(executor).execute(new Runnable() { request.beforeLocalFork();
@Override threadPool.executor(executor).execute(new Runnable() {
public void run() { @Override
performOnPrimary(shard.id(), fromClusterEvent, shard, clusterState); public void run() {
} performOnPrimary(shard.id(), fromClusterEvent, shard, clusterState);
}); }
} else { });
performOnPrimary(shard.id(), fromClusterEvent, shard, clusterState); } else {
performOnPrimary(shard.id(), fromClusterEvent, shard, clusterState);
}
} catch (Throwable t) {
listener.onFailure(t);
} }
} else { } else {
DiscoveryNode node = clusterState.nodes().get(shard.currentNodeId()); DiscoveryNode node = clusterState.nodes().get(shard.currentNodeId());
@ -686,22 +690,34 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} else { } else {
if (request.operationThreaded()) { if (request.operationThreaded()) {
request.beforeLocalFork(); request.beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() { try {
@Override threadPool.executor(executor).execute(new Runnable() {
public void run() { @Override
try { public void run() {
shardOperationOnReplica(shardRequest); try {
} catch (Throwable e) { shardOperationOnReplica(shardRequest);
if (!ignoreReplicaException(e)) { } catch (Throwable e) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e); if (!ignoreReplicaException(e)) {
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]"); logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
} }
} }
if (counter.decrementAndGet() == 0) { });
listener.onResponse(response.response()); } catch (Throwable e) {
} if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
} }
}); // we want to decrement the counter here, in teh failure handling, cause we got rejected
// from executing on the thread pool
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
}
}
} else { } else {
try { try {
shardOperationOnReplica(shardRequest); shardOperationOnReplica(shardRequest);

View File

@ -219,26 +219,26 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
if (shard.currentNodeId().equals(nodes.localNodeId())) { if (shard.currentNodeId().equals(nodes.localNodeId())) {
// we don't prefer local shard, so try and do it here // we don't prefer local shard, so try and do it here
if (!request.preferLocalShard()) { if (!request.preferLocalShard()) {
if (request.operationThreaded()) { try {
request.beforeLocalFork(); if (request.operationThreaded()) {
threadPool.executor(executor).execute(new Runnable() { request.beforeLocalFork();
@Override threadPool.executor(executor).execute(new Runnable() {
public void run() { @Override
try { public void run() {
Response response = shardOperation(request, shard.id()); try {
listener.onResponse(response); Response response = shardOperation(request, shard.id());
} catch (Throwable e) { listener.onResponse(response);
onFailure(shard, e); } catch (Throwable e) {
onFailure(shard, e);
}
} }
} });
}); } else {
} else {
try {
final Response response = shardOperation(request, shard.id()); final Response response = shardOperation(request, shard.id());
listener.onResponse(response); listener.onResponse(response);
} catch (Throwable e) {
onFailure(shard, e);
} }
} catch (Throwable e) {
onFailure(shard, e);
} }
} else { } else {
perform(lastException); perform(lastException);

View File

@ -184,20 +184,28 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
request.shardId = shardIt.shardId().id(); request.shardId = shardIt.shardId().id();
if (shard.currentNodeId().equals(nodes.localNodeId())) { if (shard.currentNodeId().equals(nodes.localNodeId())) {
request.beforeLocalFork(); request.beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() { try {
@Override threadPool.executor(executor).execute(new Runnable() {
public void run() { @Override
try { public void run() {
shardOperation(request, listener); try {
} catch (Exception e) { shardOperation(request, listener);
if (retryOnFailure(e)) { } catch (Throwable e) {
retry(fromClusterEvent, null); if (retryOnFailure(e)) {
} else { retry(fromClusterEvent, null);
listener.onFailure(e); } else {
listener.onFailure(e);
}
} }
} }
});
} catch (Throwable e) {
if (retryOnFailure(e)) {
retry(fromClusterEvent, null);
} else {
listener.onFailure(e);
} }
}); }
} else { } else {
DiscoveryNode node = nodes.get(shard.currentNodeId()); DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportAction, request, transportOptions(), new BaseTransportResponseHandler<Response>() { transportService.sendRequest(node, transportAction, request, transportOptions(), new BaseTransportResponseHandler<Response>() {

View File

@ -151,26 +151,26 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
} }
if (shardRouting.currentNodeId().equals(nodes.localNodeId())) { if (shardRouting.currentNodeId().equals(nodes.localNodeId())) {
if (request.operationThreaded()) { try {
request.beforeLocalFork(); if (request.operationThreaded()) {
threadPool.executor(executor).execute(new Runnable() { request.beforeLocalFork();
@Override threadPool.executor(executor).execute(new Runnable() {
public void run() { @Override
try { public void run() {
Response response = shardOperation(request, shardRouting.id()); try {
listener.onResponse(response); Response response = shardOperation(request, shardRouting.id());
} catch (Throwable e) { listener.onResponse(response);
onFailure(shardRouting, e); } catch (Throwable e) {
onFailure(shardRouting, e);
}
} }
} });
}); } else {
} else {
try {
final Response response = shardOperation(request, shardRouting.id()); final Response response = shardOperation(request, shardRouting.id());
listener.onResponse(response); listener.onResponse(response);
} catch (Throwable e) {
onFailure(shardRouting, e);
} }
} catch (Throwable e) {
onFailure(shardRouting, e);
} }
} else { } else {
DiscoveryNode node = nodes.get(shardRouting.currentNodeId()); DiscoveryNode node = nodes.get(shardRouting.currentNodeId());