Notify Listener if transport throws an exception
This commit is contained in:
parent
5e4d5e1c64
commit
44b0edd2b8
|
@ -191,37 +191,41 @@ public class SyncedFlushService extends AbstractComponent {
|
|||
* returns the number of inflight operations on primary. -1 upon error.
|
||||
*/
|
||||
protected void getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable, final ActionListener<InFlightOpsResponse> listener) {
|
||||
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
|
||||
final DiscoveryNode primaryNode = state.nodes().get(primaryShard.currentNodeId());
|
||||
if (primaryNode == null) {
|
||||
logger.trace("{} failed to resolve node for primary shard {}, skipping sync", shardId, primaryShard);
|
||||
listener.onResponse(new InFlightOpsResponse(-1));
|
||||
return;
|
||||
try {
|
||||
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
|
||||
final DiscoveryNode primaryNode = state.nodes().get(primaryShard.currentNodeId());
|
||||
if (primaryNode == null) {
|
||||
logger.trace("{} failed to resolve node for primary shard {}, skipping sync", shardId, primaryShard);
|
||||
listener.onResponse(new InFlightOpsResponse(-1));
|
||||
return;
|
||||
}
|
||||
logger.trace("{} retrieving in flight operation count", shardId);
|
||||
transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId),
|
||||
new BaseTransportResponseHandler<InFlightOpsResponse>() {
|
||||
@Override
|
||||
public InFlightOpsResponse newInstance() {
|
||||
return new InFlightOpsResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(InFlightOpsResponse response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.debug("{} unexpected error while retrieving inflight op count", shardId);
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
listener.onFailure(t);
|
||||
}
|
||||
logger.trace("{} retrieving in flight operation count", shardId);
|
||||
transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId),
|
||||
new BaseTransportResponseHandler<InFlightOpsResponse>() {
|
||||
@Override
|
||||
public InFlightOpsResponse newInstance() {
|
||||
return new InFlightOpsResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(InFlightOpsResponse response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.debug("{} unexpected error while retrieving inflight op count", shardId);
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue