diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 746c5991ecf..67b30321925 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -59,6 +59,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); + int size = buffer.getInt(buffer.readerIndex() - 4); + int markedReaderIndex = buffer.readerIndex(); + int expectedIndexReader = markedReaderIndex + size; + StreamInput streamIn = new ChannelBufferStreamInput(buffer); streamIn = HandlesStreamInput.Cached.cached(streamIn); @@ -66,30 +70,34 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { byte status = buffer.readByte(); boolean isRequest = isRequest(status); + TransportResponseHandler handler = null; if (isRequest) { - handleRequest(buffer, event, streamIn, requestId); + handleRequest(event, streamIn, requestId); } else { - final TransportResponseHandler handler = transportServiceAdapter.remove(requestId); + handler = transportServiceAdapter.remove(requestId); // ignore if its null, the adapter logs it if (handler != null) { if (isError(status)) { - handlerResponseError(buffer, streamIn, handler); + handlerResponseError(streamIn, handler); } else { - handleResponse(buffer, streamIn, handler); + handleResponse(streamIn, handler); } } else { - // if its null, skip those bytes (remove 8 for the request id, and 1 for the status) - buffer.skipBytes(buffer.readableBytes()); + // if its null, skip those bytes + buffer.readerIndex(markedReaderIndex + size); } } + if (buffer.readerIndex() < expectedIndexReader) { + logger.warn("Message not fully read for [{}] and handler {}, resetting", requestId, handler); + buffer.readerIndex(expectedIndexReader); + } } - private void handleResponse(ChannelBuffer channelBuffer, StreamInput buffer, final TransportResponseHandler handler) { + private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) { final Streamable streamable = handler.newInstance(); try { streamable.readFrom(buffer); } catch (Exception e) { - channelBuffer.skipBytes(channelBuffer.readableBytes()); handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e)); return; } @@ -113,13 +121,12 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } } - private void handlerResponseError(ChannelBuffer channelBuffer, StreamInput buffer, final TransportResponseHandler handler) { + private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) { Throwable error; try { ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer); error = (Throwable) ois.readObject(); } catch (Exception e) { - channelBuffer.skipBytes(channelBuffer.readableBytes()); error = new TransportSerializationException("Failed to deserialize exception response from stream", e); } handleException(handler, error); @@ -145,14 +152,14 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } } - private void handleRequest(ChannelBuffer channelBuffer, MessageEvent event, StreamInput buffer, long requestId) throws IOException { + private void handleRequest(MessageEvent event, StreamInput buffer, long requestId) throws IOException { final String action = buffer.readUTF(); final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, event.getChannel(), requestId); try { final TransportRequestHandler handler = transportServiceAdapter.handler(action); if (handler == null) { - throw new ActionNotFoundTransportException("Action [" + action + "] not found"); + logger.warn("No handler found for action [{}]", action); } final Streamable streamable = handler.newInstance(); streamable.readFrom(buffer); @@ -176,7 +183,6 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { handler.messageReceived(streamable, transportChannel); } } catch (Exception e) { - channelBuffer.skipBytes(channelBuffer.readableBytes()); try { transportChannel.sendResponse(e); } catch (IOException e1) {