more hardening of cleaning out buffers in case of failure

This commit is contained in:
kimchy 2010-05-13 16:52:59 +03:00
parent d2bc6ace83
commit 6a20ca562c
1 changed files with 19 additions and 13 deletions

View File

@ -59,6 +59,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
int size = buffer.getInt(buffer.readerIndex() - 4);
int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size;
StreamInput streamIn = new ChannelBufferStreamInput(buffer); StreamInput streamIn = new ChannelBufferStreamInput(buffer);
streamIn = HandlesStreamInput.Cached.cached(streamIn); streamIn = HandlesStreamInput.Cached.cached(streamIn);
@ -66,30 +70,34 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
byte status = buffer.readByte(); byte status = buffer.readByte();
boolean isRequest = isRequest(status); boolean isRequest = isRequest(status);
TransportResponseHandler handler = null;
if (isRequest) { if (isRequest) {
handleRequest(buffer, event, streamIn, requestId); handleRequest(event, streamIn, requestId);
} else { } else {
final TransportResponseHandler handler = transportServiceAdapter.remove(requestId); handler = transportServiceAdapter.remove(requestId);
// ignore if its null, the adapter logs it // ignore if its null, the adapter logs it
if (handler != null) { if (handler != null) {
if (isError(status)) { if (isError(status)) {
handlerResponseError(buffer, streamIn, handler); handlerResponseError(streamIn, handler);
} else { } else {
handleResponse(buffer, streamIn, handler); handleResponse(streamIn, handler);
} }
} else { } else {
// if its null, skip those bytes (remove 8 for the request id, and 1 for the status) // if its null, skip those bytes
buffer.skipBytes(buffer.readableBytes()); buffer.readerIndex(markedReaderIndex + size);
} }
} }
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read for [{}] and handler {}, resetting", requestId, handler);
buffer.readerIndex(expectedIndexReader);
}
} }
private void handleResponse(ChannelBuffer channelBuffer, StreamInput buffer, final TransportResponseHandler handler) { private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
final Streamable streamable = handler.newInstance(); final Streamable streamable = handler.newInstance();
try { try {
streamable.readFrom(buffer); streamable.readFrom(buffer);
} catch (Exception e) { } catch (Exception e) {
channelBuffer.skipBytes(channelBuffer.readableBytes());
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e)); handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e));
return; return;
} }
@ -113,13 +121,12 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
} }
} }
private void handlerResponseError(ChannelBuffer channelBuffer, StreamInput buffer, final TransportResponseHandler handler) { private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) {
Throwable error; Throwable error;
try { try {
ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer); ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer);
error = (Throwable) ois.readObject(); error = (Throwable) ois.readObject();
} catch (Exception e) { } catch (Exception e) {
channelBuffer.skipBytes(channelBuffer.readableBytes());
error = new TransportSerializationException("Failed to deserialize exception response from stream", e); error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
} }
handleException(handler, error); handleException(handler, error);
@ -145,14 +152,14 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
} }
} }
private void handleRequest(ChannelBuffer channelBuffer, MessageEvent event, StreamInput buffer, long requestId) throws IOException { private void handleRequest(MessageEvent event, StreamInput buffer, long requestId) throws IOException {
final String action = buffer.readUTF(); final String action = buffer.readUTF();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, event.getChannel(), requestId); final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, event.getChannel(), requestId);
try { try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action); final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) { if (handler == null) {
throw new ActionNotFoundTransportException("Action [" + action + "] not found"); logger.warn("No handler found for action [{}]", action);
} }
final Streamable streamable = handler.newInstance(); final Streamable streamable = handler.newInstance();
streamable.readFrom(buffer); streamable.readFrom(buffer);
@ -176,7 +183,6 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
handler.messageReceived(streamable, transportChannel); handler.messageReceived(streamable, transportChannel);
} }
} catch (Exception e) { } catch (Exception e) {
channelBuffer.skipBytes(channelBuffer.readableBytes());
try { try {
transportChannel.sendResponse(e); transportChannel.sendResponse(e);
} catch (IOException e1) { } catch (IOException e1) {