use throwable in transport layer
catch throwable when processing messages in the transport layer, to report back failures even under errors
This commit is contained in:
parent
342e9cf18e
commit
ee636c2330
|
@ -78,23 +78,14 @@ import static org.elasticsearch.ExceptionsHelper.detailedMessage;
|
|||
public class IndicesClusterStateService extends AbstractLifecycleComponent<IndicesClusterStateService> implements ClusterStateListener {
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final RecoveryTarget recoveryTarget;
|
||||
|
||||
private final ShardStateAction shardStateAction;
|
||||
|
||||
private final NodeIndexCreatedAction nodeIndexCreatedAction;
|
||||
|
||||
private final NodeIndexDeletedAction nodeIndexDeletedAction;
|
||||
|
||||
private final NodeMappingCreatedAction nodeMappingCreatedAction;
|
||||
|
||||
private final NodeMappingRefreshAction nodeMappingRefreshAction;
|
||||
|
||||
private final NodeAliasesUpdatedAction nodeAliasesUpdatedAction;
|
||||
|
||||
// a map of mappings type we have seen per index due to cluster state
|
||||
|
@ -569,24 +560,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
indexShard.engine().addFailedEngineListener(failedEngineHandler);
|
||||
} catch (IndexShardAlreadyExistsException e) {
|
||||
// ignore this, the method call can happen several times
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
logger.warn("[{}][{}] failed to create shard", e, shardRouting.index(), shardRouting.id());
|
||||
try {
|
||||
indexService.removeShard(shardId, "failed to create [" + ExceptionsHelper.detailedMessage(e) + "]");
|
||||
} catch (IndexShardMissingException e1) {
|
||||
// ignore
|
||||
} catch (Exception e1) {
|
||||
logger.warn("[{}][{}] failed to remove shard after failed creation", e1, shardRouting.index(), shardRouting.id());
|
||||
}
|
||||
shardStateAction.shardFailed(shardRouting, "Failed to create shard, message [" + detailedMessage(e) + "]");
|
||||
return;
|
||||
} catch (OutOfMemoryError e) {
|
||||
logger.warn("[{}][{}] failed to create shard", e, shardRouting.index(), shardRouting.id());
|
||||
try {
|
||||
indexService.removeShard(shardId, "failed to create [" + ExceptionsHelper.detailedMessage(e) + "]");
|
||||
} catch (IndexShardMissingException e1) {
|
||||
// ignore
|
||||
} catch (Exception e1) {
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("[{}][{}] failed to remove shard after failed creation", e1, shardRouting.index(), shardRouting.id());
|
||||
}
|
||||
shardStateAction.shardFailed(shardRouting, "Failed to create shard, message [" + detailedMessage(e) + "]");
|
||||
|
|
|
@ -48,17 +48,11 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
|
|||
public class LocalTransport extends AbstractLifecycleComponent<Transport> implements Transport {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private volatile TransportServiceAdapter transportServiceAdapter;
|
||||
|
||||
private volatile BoundTransportAddress boundAddress;
|
||||
|
||||
private volatile LocalTransportAddress localAddress;
|
||||
|
||||
private final static ConcurrentMap<TransportAddress, LocalTransport> transports = newConcurrentMap();
|
||||
|
||||
private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
|
||||
|
||||
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
|
||||
|
||||
public LocalTransport(ThreadPool threadPool) {
|
||||
|
@ -241,10 +235,10 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
final TransportRequest request = handler.newInstance();
|
||||
request.readFrom(stream);
|
||||
handler.messageReceived(request, transportChannel);
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
|
||||
logger.warn("Actual Exception", e1);
|
||||
}
|
||||
|
@ -278,7 +272,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
try {
|
||||
ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer, settings.getClassLoader());
|
||||
error = (Throwable) ois.readObject();
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
|
||||
}
|
||||
handleException(handler, error);
|
||||
|
|
|
@ -43,11 +43,8 @@ import java.io.IOException;
|
|||
public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
private final ESLogger logger;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final TransportServiceAdapter transportServiceAdapter;
|
||||
|
||||
private final NettyTransport transport;
|
||||
|
||||
public MessageChannelHandler(NettyTransport transport, ESLogger logger) {
|
||||
|
@ -157,7 +154,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
}
|
||||
|
@ -167,7 +164,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
try {
|
||||
ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer, transport.settings().getClassLoader());
|
||||
error = (Throwable) ois.readObject();
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
|
||||
}
|
||||
handleException(handler, error);
|
||||
|
@ -186,7 +183,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
public void run() {
|
||||
try {
|
||||
handler.handleException(rtx);
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
logger.error("Failed to handle exception response", e);
|
||||
}
|
||||
}
|
||||
|
@ -211,7 +208,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
|
@ -242,7 +239,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
public void run() {
|
||||
try {
|
||||
handler.handleResponse(response);
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
}
|
||||
|
@ -271,7 +268,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue