Transport: Do not make the buffer skip while a stream is open.

This commit changes MessageChannelHandler to not skip the underlying
ChannelBuffer while a StreamInput is open on top of it. In case eg. compression
is enabled, this prevents failures due to the fact that the decompressed
stream input expects a certain structure that it can't verify if the position
of the underlying buffer is changed.
This commit is contained in:
Adrien Grand 2015-07-02 09:30:28 +02:00
parent 462b5c822c
commit 7482e1362f
2 changed files with 39 additions and 48 deletions

View File

@ -85,13 +85,12 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumlation buffer, which is cleaned each time
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
long requestId = buffer.readLong();
byte status = buffer.readByte();
Version version = Version.fromId(buffer.readInt());
StreamInput wrappedStream = null;
boolean success = false;
try {
long requestId = streamIn.readLong();
byte status = streamIn.readByte();
Version version = Version.fromId(streamIn.readInt());
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
Compressor compressor;
try {
@ -106,18 +105,16 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
sb.append("]");
throw new IllegalStateException(sb.toString());
}
wrappedStream = compressor.streamInput(streamIn);
} else {
wrappedStream = streamIn;
streamIn = compressor.streamInput(streamIn);
}
wrappedStream.setVersion(version);
streamIn.setVersion(version);
if (TransportStatus.isRequest(status)) {
String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
boolean success = false;
try {
final int nextByte = wrappedStream.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
String action = handleRequest(ctx.getChannel(), streamIn, requestId, version);
// Chek the entire message has been read
final int nextByte = streamIn.read();
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
if (nextByte != -1) {
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
@ -129,29 +126,19 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action ["
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
}
success = true;
} finally {
if (!success) {
buffer.readerIndex(expectedIndexReader);
}
}
} else {
TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
handlerResponseError(wrappedStream, handler);
handlerResponseError(streamIn, handler);
} else {
handleResponse(ctx.getChannel(), wrappedStream, handler);
}
} else {
// if its null, skip those bytes
buffer.readerIndex(markedReaderIndex + size);
handleResponse(ctx.getChannel(), streamIn, handler);
}
boolean success = false;
try {
final int nextByte = wrappedStream.read();
// Chek the entire message has been read
final int nextByte = streamIn.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
@ -164,17 +151,22 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler ["
+ handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
}
success = true;
}
}
} finally {
if (!success) {
try {
if (success) {
IOUtils.close(streamIn);
} else {
IOUtils.closeWhileHandlingException(streamIn);
}
} finally {
// Set the expected position of the buffer, no matter what happened
buffer.readerIndex(expectedIndexReader);
}
}
}
} finally {
IOUtils.close(wrappedStream);
}
}
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();

View File

@ -575,7 +575,6 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
@Test
@TestLogging(value = "test. transport.tracer:TRACE")
@AwaitsFix(bugUrl = "@spinscale is looking into failures: http://build-us-00.elastic.co/job/es_core_master_strong/3986/")
public void testTracerLog() throws InterruptedException {
TransportRequestHandler handler = new TransportRequestHandler<StringMessageRequest>() {
@Override