diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java index 95c1e91aa37..acc91c507ed 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java @@ -176,6 +176,22 @@ public class MessageInputStream extends InputStream implements MessageAppender @Override public int read() throws IOException + { + byte[] bytes = new byte[1]; + while (true) + { + int read = read(bytes, 0, 1); + if (read < 0) + return -1; + if (read == 0) + continue; + + return bytes[0] & 0xFF; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { try { @@ -215,7 +231,12 @@ public class MessageInputStream extends InputStream implements MessageAppender } } - int result = activeBuffer.get() & 0xFF; + ByteBuffer buffer = BufferUtil.toBuffer(b, off, len); + BufferUtil.clearToFill(buffer); + int written = BufferUtil.put(activeBuffer, buffer); + BufferUtil.flipToFlush(buffer, 0); + + // If we have no more content we may need to resume to get more data. if (!activeBuffer.hasRemaining()) { SuspendToken resume = null; @@ -247,7 +268,7 @@ public class MessageInputStream extends InputStream implements MessageAppender resume.resume(); } - return result; + return written; } catch (InterruptedException x) { diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java index 4266ed51542..a619fba3178 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.websocket.common.message; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -32,6 +33,7 @@ import org.eclipse.jetty.toolchain.test.jupiter.WorkDir; import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension; import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.websocket.api.SuspendToken; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -111,9 +113,10 @@ public class MessageInputStreamTest startLatch.await(); // Read it from the stream. - byte[] buf = new byte[32]; - int len = stream.read(buf); - String message = new String(buf, 0, len, StandardCharsets.UTF_8); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IO.copy(stream, out); + byte[] bytes = out.toByteArray(); + String message = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8); // Test it assertThat("Error when appending", hadError.get(), is(false)); @@ -206,9 +209,10 @@ public class MessageInputStreamTest session.provideContent(); // Read entire message it from the stream. - byte[] buf = new byte[32]; - int len = stream.read(buf); - String message = new String(buf, 0, len, StandardCharsets.UTF_8); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IO.copy(stream, out); + byte[] bytes = out.toByteArray(); + String message = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8); // Test it assertThat("Message", message, is("Hello World!"));