From 7482e1362f11d19a9423695dba5c72c18b9ec92a Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 2 Jul 2015 09:30:28 +0200 Subject: [PATCH] Transport: Do not make the buffer skip while a stream is open. This commit changes MessageChannelHandler to not skip the underlying ChannelBuffer while a StreamInput is open on top of it. In case eg. compression is enabled, this prevents failures due to the fact that the decompressed stream input expects a certain structure that it can't verify if the position of the underlying buffer is changed. --- .../netty/MessageChannelHandler.java | 86 +++++++++---------- .../AbstractSimpleTransportTests.java | 1 - 2 files changed, 39 insertions(+), 48 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index b713398e20f..c74f0f68d01 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -85,13 +85,12 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh // buffer, or in the cumlation buffer, which is cleaned each time StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size); - - long requestId = buffer.readLong(); - byte status = buffer.readByte(); - Version version = Version.fromId(buffer.readInt()); - - StreamInput wrappedStream = null; + boolean success = false; try { + long requestId = streamIn.readLong(); + byte status = streamIn.readByte(); + Version version = Version.fromId(streamIn.readInt()); + if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) { Compressor compressor; try { @@ -106,52 +105,40 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { sb.append("]"); throw new IllegalStateException(sb.toString()); } - wrappedStream = compressor.streamInput(streamIn); - } else { - wrappedStream = streamIn; + streamIn = compressor.streamInput(streamIn); } - wrappedStream.setVersion(version); + streamIn.setVersion(version); if (TransportStatus.isRequest(status)) { - String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version); - boolean success = false; - try { - final int nextByte = wrappedStream.read(); - // calling read() is useful to make sure the message is fully read, even if there is an EOS marker - if (nextByte != -1) { - throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" - + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); - } - if (buffer.readerIndex() < expectedIndexReader) { - throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting"); - } - if (buffer.readerIndex() > expectedIndexReader) { - throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action [" - + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); - } - success = true; - } finally { - if (!success) { - buffer.readerIndex(expectedIndexReader); - } + String action = handleRequest(ctx.getChannel(), streamIn, requestId, version); + + // Chek the entire message has been read + final int nextByte = streamIn.read(); + // calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker + if (nextByte != -1) { + throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); } + if (buffer.readerIndex() < expectedIndexReader) { + throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting"); + } + if (buffer.readerIndex() > expectedIndexReader) { + throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action [" + + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); + } + } else { - TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId); + TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId); // ignore if its null, the adapter logs it if (handler != null) { if (TransportStatus.isError(status)) { - handlerResponseError(wrappedStream, handler); + handlerResponseError(streamIn, handler); } else { - handleResponse(ctx.getChannel(), wrappedStream, handler); + handleResponse(ctx.getChannel(), streamIn, handler); } - } else { - // if its null, skip those bytes - buffer.readerIndex(markedReaderIndex + size); - } - boolean success = false; - try { - final int nextByte = wrappedStream.read(); + // Chek the entire message has been read + final int nextByte = streamIn.read(); // calling read() is useful to make sure the message is fully read, even if there is an EOS marker if (nextByte != -1) { throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler [" @@ -164,15 +151,20 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler [" + handler + "], error [" + TransportStatus.isError(status) + "]; resetting"); } - success = true; - } finally { - if (!success) { - buffer.readerIndex(expectedIndexReader); - } + } } } finally { - IOUtils.close(wrappedStream); + try { + if (success) { + IOUtils.close(streamIn); + } else { + IOUtils.closeWhileHandlingException(streamIn); + } + } finally { + // Set the expected position of the buffer, no matter what happened + buffer.readerIndex(expectedIndexReader); + } } } diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java index 11123658665..cf70f6ad2f3 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java @@ -575,7 +575,6 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase @Test @TestLogging(value = "test. transport.tracer:TRACE") - @AwaitsFix(bugUrl = "@spinscale is looking into failures: http://build-us-00.elastic.co/job/es_core_master_strong/3986/") public void testTracerLog() throws InterruptedException { TransportRequestHandler handler = new TransportRequestHandler() { @Override