diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index 0a81cbdf8a5..710ae1fc576 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -59,7 +59,9 @@ public class MessageInputStream extends InputStream implements MessageAppender public void appendFrame(ByteBuffer framePayload, boolean fin) throws IOException { if (LOG.isDebugEnabled()) - LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(framePayload)); + { + LOG.debug("Appending {} chunk: {}",fin?"final":"non-final",BufferUtil.toDetailString(framePayload)); + } // If closed, we should just toss incoming payloads into the bit bucket. if (closed.get()) @@ -72,9 +74,20 @@ public class MessageInputStream extends InputStream implements MessageAppender // be processed after this method returns. try { + if (framePayload == null) + { + // skip if no payload + return; + } + int capacity = framePayload.remaining(); + if (capacity <= 0) + { + // skip if no payload data to copy + return; + } // TODO: the copy buffer should be pooled too, but no buffer pool available from here. - ByteBuffer copy = framePayload.isDirect() ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity); + ByteBuffer copy = framePayload.isDirect()?ByteBuffer.allocateDirect(capacity):ByteBuffer.allocate(capacity); copy.put(framePayload).flip(); buffers.put(copy); } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java index b71149e6a3f..b7850a0a329 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java @@ -18,6 +18,8 @@ package org.eclipse.jetty.websocket.common.message; +import static org.hamcrest.Matchers.*; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -27,15 +29,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection; import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPool; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import static org.hamcrest.Matchers.is; - public class MessageInputStreamTest { @Rule @@ -47,8 +46,6 @@ public class MessageInputStreamTest @Test(timeout=10000) public void testBasicAppendRead() throws IOException { - LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool); - try (MessageInputStream stream = new MessageInputStream()) { // Append a single message (simple, short) @@ -70,8 +67,6 @@ public class MessageInputStreamTest @Test(timeout=5000) public void testBlockOnRead() throws Exception { - LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool); - try (MessageInputStream stream = new MessageInputStream()) { final AtomicBoolean hadError = new AtomicBoolean(false); @@ -121,8 +116,6 @@ public class MessageInputStreamTest @Test(timeout=10000) public void testBlockOnReadInitial() throws IOException { - LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool); - try (MessageInputStream stream = new MessageInputStream()) { final AtomicBoolean hadError = new AtomicBoolean(false); @@ -160,8 +153,6 @@ public class MessageInputStreamTest @Test(timeout=10000) public void testReadByteNoBuffersClosed() throws IOException { - LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool); - try (MessageInputStream stream = new MessageInputStream()) { final AtomicBoolean hadError = new AtomicBoolean(false); @@ -194,4 +185,52 @@ public class MessageInputStreamTest Assert.assertThat("Initial byte",b,is(-1)); } } + + @Test(timeout=10000) + public void testAppendEmptyPayloadRead() throws IOException + { + try (MessageInputStream stream = new MessageInputStream()) + { + // Append parts of message + ByteBuffer msg1 = BufferUtil.toBuffer("Hello ",StandardCharsets.UTF_8); + ByteBuffer msg2 = ByteBuffer.allocate(0); // what is being tested + ByteBuffer msg3 = BufferUtil.toBuffer("World",StandardCharsets.UTF_8); + + stream.appendFrame(msg1,false); + stream.appendFrame(msg2,false); + stream.appendFrame(msg3,true); + + // Read entire message it from the stream. + byte buf[] = new byte[32]; + int len = stream.read(buf); + String message = new String(buf,0,len,StandardCharsets.UTF_8); + + // Test it + Assert.assertThat("Message",message,is("Hello World")); + } + } + + @Test(timeout=10000) + public void testAppendNullPayloadRead() throws IOException + { + try (MessageInputStream stream = new MessageInputStream()) + { + // Append parts of message + ByteBuffer msg1 = BufferUtil.toBuffer("Hello ",StandardCharsets.UTF_8); + ByteBuffer msg2 = null; // what is being tested + ByteBuffer msg3 = BufferUtil.toBuffer("World",StandardCharsets.UTF_8); + + stream.appendFrame(msg1,false); + stream.appendFrame(msg2,false); + stream.appendFrame(msg3,true); + + // Read entire message it from the stream. + byte buf[] = new byte[32]; + int len = stream.read(buf); + String message = new String(buf,0,len,StandardCharsets.UTF_8); + + // Test it + Assert.assertThat("Message",message,is("Hello World")); + } + } }