allow MessageInputStream to read multiple bytes at a time

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-10-07 19:48:13 +11:00
parent aa1299912d
commit 09947681fe
2 changed files with 33 additions and 8 deletions

View File

@ -176,6 +176,22 @@ public class MessageInputStream extends InputStream implements MessageAppender
@Override @Override
public int read() throws IOException public int read() throws IOException
{
byte[] bytes = new byte[1];
while (true)
{
int read = read(bytes, 0, 1);
if (read < 0)
return -1;
if (read == 0)
continue;
return bytes[0] & 0xFF;
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{ {
try try
{ {
@ -215,7 +231,12 @@ public class MessageInputStream extends InputStream implements MessageAppender
} }
} }
int result = activeBuffer.get() & 0xFF; ByteBuffer buffer = BufferUtil.toBuffer(b, off, len);
BufferUtil.clearToFill(buffer);
int written = BufferUtil.put(activeBuffer, buffer);
BufferUtil.flipToFlush(buffer, 0);
// If we have no more content we may need to resume to get more data.
if (!activeBuffer.hasRemaining()) if (!activeBuffer.hasRemaining())
{ {
SuspendToken resume = null; SuspendToken resume = null;
@ -247,7 +268,7 @@ public class MessageInputStream extends InputStream implements MessageAppender
resume.resume(); resume.resume();
} }
return result; return written;
} }
catch (InterruptedException x) catch (InterruptedException x)
{ {

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.common.message; package org.eclipse.jetty.websocket.common.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -32,6 +33,7 @@ import org.eclipse.jetty.toolchain.test.jupiter.WorkDir;
import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension; import org.eclipse.jetty.toolchain.test.jupiter.WorkDirExtension;
import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.SuspendToken;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -111,9 +113,10 @@ public class MessageInputStreamTest
startLatch.await(); startLatch.await();
// Read it from the stream. // Read it from the stream.
byte[] buf = new byte[32]; ByteArrayOutputStream out = new ByteArrayOutputStream();
int len = stream.read(buf); IO.copy(stream, out);
String message = new String(buf, 0, len, StandardCharsets.UTF_8); byte[] bytes = out.toByteArray();
String message = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8);
// Test it // Test it
assertThat("Error when appending", hadError.get(), is(false)); assertThat("Error when appending", hadError.get(), is(false));
@ -206,9 +209,10 @@ public class MessageInputStreamTest
session.provideContent(); session.provideContent();
// Read entire message it from the stream. // Read entire message it from the stream.
byte[] buf = new byte[32]; ByteArrayOutputStream out = new ByteArrayOutputStream();
int len = stream.read(buf); IO.copy(stream, out);
String message = new String(buf, 0, len, StandardCharsets.UTF_8); byte[] bytes = out.toByteArray();
String message = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8);
// Test it // Test it
assertThat("Message", message, is("Hello World!")); assertThat("Message", message, is("Hello World!"));