diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java
index a5d5ca4f905..0d15f5bbcf9 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java
@@ -132,6 +132,12 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
}
+ public void dispatch(Runnable runnable)
+ {
+ // TODO Auto-generated method stub
+ runnable.run();
+ }
+
@Override
public void dump(Appendable out, String indent) throws IOException
{
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java
index 210e7c340c3..539e4e8bb14 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java
@@ -65,6 +65,11 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
}
}
+ private void dispatch(Runnable runnable)
+ {
+ session.dispatch(runnable);
+ }
+
@Override
public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
{
@@ -78,7 +83,15 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
{
if (events.onBinary.isStreaming())
{
- activeMessage = new MessageInputStream(this);
+ activeMessage = new MessageInputStream(session.getConnection());
+ dispatch(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ events.onBinary.call(websocket,session,activeMessage);
+ }
+ });
}
else
{
@@ -171,7 +184,15 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
{
if (events.onText.isStreaming())
{
- activeMessage = new MessageReader(this);
+ activeMessage = new MessageReader(new MessageInputStream(session.getConnection()));
+ dispatch(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ events.onText.call(websocket,session,activeMessage);
+ }
+ });
}
else
{
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java
index 2629770991b..d6cd8ecb2e2 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageInputStream.java
@@ -21,97 +21,135 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
-import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.websocket.common.events.EventDriver;
+import org.eclipse.jetty.websocket.common.LogicalConnection;
/**
- * Support class for reading binary message data as an InputStream.
+ * Support class for reading a (single) WebSocket BINARY message via a InputStream.
+ *
+ * An InputStream that can access a queue of ByteBuffer payloads, along with expected InputStream blocking behavior.
*/
public class MessageInputStream extends InputStream implements MessageAppender
{
/**
- * Threshold (of bytes) to perform compaction at
+ * Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet.
*/
- private static final int COMPACT_THRESHOLD = 5;
- private final EventDriver driver;
- private final ByteBuffer buf;
- private int size;
- private boolean finished;
- private boolean needsNotification;
- private int readPosition;
+ @SuppressWarnings("unused")
+ private final LogicalConnection connection;
+ private final BlockingDeque buffers = new LinkedBlockingDeque<>();
+ private AtomicBoolean closed = new AtomicBoolean(false);
+ // EOB / End of Buffers
+ private AtomicBoolean buffersExhausted = new AtomicBoolean(false);
+ private ByteBuffer activeBuffer = null;
- public MessageInputStream(EventDriver driver)
+ public MessageInputStream(LogicalConnection connection)
{
- this.driver = driver;
- this.buf = ByteBuffer.allocate(driver.getPolicy().getMaxBinaryMessageBufferSize());
- BufferUtil.clearToFill(this.buf);
- size = 0;
- readPosition = this.buf.position();
- finished = false;
- needsNotification = true;
+ this.connection = connection;
}
@Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
{
- if (finished)
+ if (buffersExhausted.get())
{
- throw new IOException("Cannot append to finished buffer");
+ // This indicates a programming mistake/error and must be bug fixed
+ throw new RuntimeException("Last frame already received");
}
- if (payload == null)
+ // if closed, we should just toss incoming payloads into the bit bucket.
+ if (closed.get())
{
- // empty payload is valid
return;
}
- driver.getPolicy().assertValidBinaryMessageSize(size + payload.remaining());
- size += payload.remaining();
-
- synchronized (buf)
+ // Put the payload into the queue
+ try
{
- // TODO: grow buffer till max binary message size?
- // TODO: compact this buffer to fit incoming buffer?
- // TODO: tell connection to suspend if buffer too full?
- BufferUtil.put(payload,buf);
+ buffers.put(payload);
+ if (isLast)
+ {
+ buffersExhausted.set(true);
+ }
}
-
- if (needsNotification)
+ catch (InterruptedException e)
{
- needsNotification = true;
- this.driver.onInputStream(this);
+ throw new IOException(e);
}
}
@Override
public void close() throws IOException
{
- finished = true;
+ closed.set(true);
super.close();
}
+ @Override
+ public synchronized void mark(int readlimit)
+ {
+ /* do nothing */
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return false;
+ }
+
@Override
public void messageComplete()
{
- finished = true;
+ buffersExhausted.set(true);
+ // toss an empty ByteBuffer into queue to let it drain
+ try
+ {
+ buffers.put(ByteBuffer.wrap(new byte[0]));
+ }
+ catch (InterruptedException ignore)
+ {
+ /* ignore */
+ }
}
@Override
public int read() throws IOException
{
- synchronized (buf)
+ try
{
- byte b = buf.get(readPosition);
- readPosition++;
- if (readPosition <= (buf.limit() - COMPACT_THRESHOLD))
+ if (closed.get())
{
- int curPos = buf.position();
- buf.compact();
- int offsetPos = buf.position() - curPos;
- readPosition += offsetPos;
+ return -1;
}
- return b;
+
+ if (activeBuffer == null)
+ {
+ activeBuffer = buffers.take();
+ }
+
+ while (activeBuffer.remaining() <= 0)
+ {
+ if (buffersExhausted.get())
+ {
+ closed.set(true);
+ return -1;
+ }
+ activeBuffer = buffers.take();
+ }
+
+ return activeBuffer.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
}
}
+
+ @Override
+ public synchronized void reset() throws IOException
+ {
+ throw new IOException("reset() not supported");
+ }
}
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java
index adbdc22160b..37c065afb86 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageReader.java
@@ -19,79 +19,36 @@
package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
-import java.io.Reader;
+import java.io.InputStreamReader;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
-import org.eclipse.jetty.util.Utf8StringBuilder;
-import org.eclipse.jetty.websocket.common.events.EventDriver;
+import org.eclipse.jetty.util.StringUtil;
/**
- * Support class for reading text message data as an Reader.
+ * Support class for reading a (single) WebSocket TEXT message via a Reader.
*
- * Due to the spec, this reader is forced to use the UTF8 charset.
+ * In compliance to the WebSocket spec, this reader always uses the UTF8 {@link Charset}.
*/
-public class MessageReader extends Reader implements MessageAppender
+public class MessageReader extends InputStreamReader implements MessageAppender
{
- private final EventDriver driver;
- private final Utf8StringBuilder utf;
- private int size;
- private boolean finished;
- private boolean needsNotification;
+ private final MessageInputStream stream;
- public MessageReader(EventDriver driver)
+ public MessageReader(MessageInputStream stream)
{
- this.driver = driver;
- this.utf = new Utf8StringBuilder();
- size = 0;
- finished = false;
- needsNotification = true;
+ super(stream,StringUtil.__UTF8_CHARSET);
+ this.stream = stream;
}
@Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
{
- if (finished)
- {
- throw new IOException("Cannot append to finished buffer");
- }
-
- if (payload == null)
- {
- // empty payload is valid
- return;
- }
-
- driver.getPolicy().assertValidTextMessageSize(size + payload.remaining());
- size += payload.remaining();
-
- synchronized (utf)
- {
- utf.append(payload);
- }
-
- if (needsNotification)
- {
- needsNotification = true;
- this.driver.onReader(this);
- }
- }
-
- @Override
- public void close() throws IOException
- {
- finished = true;
+ this.stream.appendMessage(payload,isLast);
}
@Override
public void messageComplete()
{
- finished = true;
- }
-
- @Override
- public int read(char[] cbuf, int off, int len) throws IOException
- {
- // TODO Auto-generated method stub
- return 0;
+ this.stream.messageComplete();
}
}
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/PayloadInputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/PayloadInputStream.java
deleted file mode 100644
index 2c98e7c111d..00000000000
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/PayloadInputStream.java
+++ /dev/null
@@ -1,153 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
-// ------------------------------------------------------------------------
-// All rights reserved. This program and the accompanying materials
-// are made available under the terms of the Eclipse Public License v1.0
-// and Apache License v2.0 which accompanies this distribution.
-//
-// The Eclipse Public License is available at
-// http://www.eclipse.org/legal/epl-v10.html
-//
-// The Apache License v2.0 is available at
-// http://www.opensource.org/licenses/apache2.0.php
-//
-// You may elect to redistribute this code under either of these licenses.
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.common.message;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.eclipse.jetty.websocket.common.LogicalConnection;
-
-/**
- * An InputStream that can access a queue of ByteBuffer payloads, along with expected InputStream blocking behavior.
- */
-public class PayloadInputStream extends InputStream implements MessageAppender
-{
- /**
- * Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet.
- */
- @SuppressWarnings("unused")
- private final LogicalConnection connection;
- private final BlockingDeque buffers = new LinkedBlockingDeque<>();
- private AtomicBoolean closed = new AtomicBoolean(false);
- // EOB / End of Buffers
- private AtomicBoolean buffersExhausted = new AtomicBoolean(false);
- private ByteBuffer activeBuffer = null;
-
- public PayloadInputStream(LogicalConnection connection)
- {
- this.connection = connection;
- }
-
- @Override
- public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
- {
- if (buffersExhausted.get())
- {
- // This indicates a programming mistake/error and must be bug fixed
- throw new RuntimeException("Last frame already received");
- }
-
- // if closed, we should just toss incoming payloads into the bit bucket.
- if (closed.get())
- {
- return;
- }
-
- // Put the payload into the queue
- try
- {
- buffers.put(payload);
- if (isLast)
- {
- buffersExhausted.set(true);
- }
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
-
- @Override
- public void close() throws IOException
- {
- closed.set(true);
- super.close();
- }
-
- @Override
- public synchronized void mark(int readlimit)
- {
- /* do nothing */
- }
-
- @Override
- public boolean markSupported()
- {
- return false;
- }
-
- @Override
- public void messageComplete()
- {
- buffersExhausted.set(true);
- // toss an empty ByteBuffer into queue to let it drain
- try
- {
- buffers.put(ByteBuffer.wrap(new byte[0]));
- }
- catch (InterruptedException ignore)
- {
- /* ignore */
- }
- }
-
- @Override
- public int read() throws IOException
- {
- try
- {
- if (closed.get())
- {
- return -1;
- }
-
- if (activeBuffer == null)
- {
- activeBuffer = buffers.take();
- }
-
- while (activeBuffer.remaining() <= 0)
- {
- if (buffersExhausted.get())
- {
- closed.set(true);
- return -1;
- }
- activeBuffer = buffers.take();
- }
-
- return activeBuffer.get();
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
-
- @Override
- public synchronized void reset() throws IOException
- {
- throw new IOException("reset() not supported");
- }
-}
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java
index a8ab20aeda2..ab6de563248 100644
--- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/MessageInputStreamTest.java
@@ -22,84 +22,166 @@ import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
-import org.eclipse.jetty.websocket.api.WebSocketPolicy;
-import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
-import org.eclipse.jetty.websocket.common.events.EventDriver;
-import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
-import org.eclipse.jetty.websocket.common.io.FramePipes;
-import org.eclipse.jetty.websocket.common.io.LocalWebSocketSession;
-import org.junit.After;
+import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class MessageInputStreamTest
{
- @Rule
- public TestTracker testtracker = new TestTracker();
+ private static final Charset UTF8 = StringUtil.__UTF8_CHARSET;
@Rule
public TestName testname = new TestName();
- private WebSocketPolicy policy;
- private TrackingInputStreamSocket socket;
- private LocalWebSocketSession session;
- private LocalWebSocketSession remoteSession;
-
- @After
- public void closeSession()
+ @Test
+ public void testBasicAppendRead() throws IOException
{
- session.close();
- remoteSession.close();
- }
+ LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
- @Before
- public void setupSession()
- {
- policy = WebSocketPolicy.newServerPolicy();
- policy.setInputBufferSize(1024);
- policy.setMaxBinaryMessageBufferSize(1024);
- policy.setMaxTextMessageBufferSize(1024);
+ try (MessageInputStream stream = new MessageInputStream(conn))
+ {
+ // Append a message (simple, short)
+ ByteBuffer payload = BufferUtil.toBuffer("Hello World",UTF8);
+ System.out.printf("payload = %s%n",BufferUtil.toDetailString(payload));
+ boolean fin = true;
+ stream.appendMessage(payload,fin);
- // Event Driver factory
- EventDriverFactory factory = new EventDriverFactory(policy);
+ // Read it from the stream.
+ byte buf[] = new byte[32];
+ int len = stream.read(buf);
+ String message = new String(buf,0,len,UTF8);
- // Local Socket
- EventDriver localDriver = factory.wrap(new DummySocket());
-
- // Remote socket & Session
- socket = new TrackingInputStreamSocket("remote");
- EventDriver remoteDriver = factory.wrap(socket);
- remoteSession = new LocalWebSocketSession(testname,remoteDriver);
- remoteSession.open();
- OutgoingFrames socketPipe = FramePipes.to(remoteDriver);
-
- // Local Session
- session = new LocalWebSocketSession(testname,localDriver);
-
- session.setPolicy(policy);
- // talk to our remote socket
- session.setOutgoingHandler(socketPipe);
- // open connection
- session.open();
+ // Test it
+ Assert.assertThat("Message",message,is("Hello World"));
+ }
}
@Test
- @Ignore
- public void testSimpleMessage() throws IOException
+ public void testBlockOnRead() throws IOException
{
- ByteBuffer data = BufferUtil.toBuffer("Hello World",StringUtil.__UTF8_CHARSET);
- session.getRemote().sendBytes(data);
+ LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
- Assert.assertThat("Socket.messageQueue.size",socket.messageQueue.size(),is(1));
- String msg = socket.messageQueue.poll();
- Assert.assertThat("Message",msg,is("Hello World"));
+ try (MessageInputStream stream = new MessageInputStream(conn))
+ {
+ final AtomicBoolean hadError = new AtomicBoolean(false);
+
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ boolean fin = false;
+ TimeUnit.MILLISECONDS.sleep(200);
+ stream.appendMessage(BufferUtil.toBuffer("Saved",UTF8),fin);
+ TimeUnit.MILLISECONDS.sleep(200);
+ stream.appendMessage(BufferUtil.toBuffer(" by ",UTF8),fin);
+ fin = true;
+ TimeUnit.MILLISECONDS.sleep(200);
+ stream.appendMessage(BufferUtil.toBuffer("Zero",UTF8),fin);
+ }
+ catch (IOException | InterruptedException e)
+ {
+ hadError.set(true);
+ e.printStackTrace(System.err);
+ }
+ }
+ }).start();
+
+ // Read it from the stream.
+ byte buf[] = new byte[32];
+ int len = stream.read(buf);
+ String message = new String(buf,0,len,UTF8);
+
+ // Test it
+ Assert.assertThat("Error when appending",hadError.get(),is(false));
+ Assert.assertThat("Message",message,is("Saved by Zero"));
+ }
+ }
+
+ @Test
+ public void testBlockOnReadInitial() throws IOException
+ {
+ LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
+
+ try (MessageInputStream stream = new MessageInputStream(conn))
+ {
+ final AtomicBoolean hadError = new AtomicBoolean(false);
+
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ boolean fin = true;
+ // wait for a little bit before populating buffers
+ TimeUnit.MILLISECONDS.sleep(400);
+ stream.appendMessage(BufferUtil.toBuffer("I will conquer",UTF8),fin);
+ }
+ catch (IOException | InterruptedException e)
+ {
+ hadError.set(true);
+ e.printStackTrace(System.err);
+ }
+ }
+ }).start();
+
+ // Read byte from stream.
+ int b = stream.read();
+ // Should be a byte, blocking till byte received.
+
+ // Test it
+ Assert.assertThat("Error when appending",hadError.get(),is(false));
+ Assert.assertThat("Initial byte",b,is((int)'I'));
+ }
+ }
+
+ @Test
+ public void testReadByteNoBuffersClosed() throws IOException
+ {
+ LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
+
+ try (MessageInputStream stream = new MessageInputStream(conn))
+ {
+ final AtomicBoolean hadError = new AtomicBoolean(false);
+
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ // wait for a little bit before sending input closed
+ TimeUnit.MILLISECONDS.sleep(400);
+ stream.messageComplete();
+ }
+ catch (InterruptedException e)
+ {
+ hadError.set(true);
+ e.printStackTrace(System.err);
+ }
+ }
+ }).start();
+
+ // Read byte from stream.
+ int b = stream.read();
+ // Should be a -1, indicating the end of the stream.
+
+ // Test it
+ Assert.assertThat("Error when appending",hadError.get(),is(false));
+ Assert.assertThat("Initial byte",b,is(-1));
+ }
}
}
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/PayloadInputStreamTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/PayloadInputStreamTest.java
deleted file mode 100644
index d9483a96715..00000000000
--- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/message/PayloadInputStreamTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
-// ------------------------------------------------------------------------
-// All rights reserved. This program and the accompanying materials
-// are made available under the terms of the Eclipse Public License v1.0
-// and Apache License v2.0 which accompanies this distribution.
-//
-// The Eclipse Public License is available at
-// http://www.eclipse.org/legal/epl-v10.html
-//
-// The Apache License v2.0 is available at
-// http://www.opensource.org/licenses/apache2.0.php
-//
-// You may elect to redistribute this code under either of these licenses.
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.common.message;
-
-import static org.hamcrest.Matchers.*;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.util.StringUtil;
-import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-public class PayloadInputStreamTest
-{
- private static final Charset UTF8 = StringUtil.__UTF8_CHARSET;
-
- @Rule
- public TestName testname = new TestName();
-
- @Test
- public void testBasicAppendRead() throws IOException
- {
- LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
-
- try (PayloadInputStream stream = new PayloadInputStream(conn))
- {
- // Append a message (simple, short)
- ByteBuffer payload = BufferUtil.toBuffer("Hello World",UTF8);
- System.out.printf("payload = %s%n",BufferUtil.toDetailString(payload));
- boolean fin = true;
- stream.appendMessage(payload,fin);
-
- // Read it from the stream.
- byte buf[] = new byte[32];
- int len = stream.read(buf);
- String message = new String(buf,0,len,UTF8);
-
- // Test it
- Assert.assertThat("Message",message,is("Hello World"));
- }
- }
-
- @Test
- public void testBlockOnRead() throws IOException
- {
- LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
-
- try (PayloadInputStream stream = new PayloadInputStream(conn))
- {
- final AtomicBoolean hadError = new AtomicBoolean(false);
-
- new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- boolean fin = false;
- TimeUnit.MILLISECONDS.sleep(200);
- stream.appendMessage(BufferUtil.toBuffer("Saved",UTF8),fin);
- TimeUnit.MILLISECONDS.sleep(200);
- stream.appendMessage(BufferUtil.toBuffer(" by ",UTF8),fin);
- fin = true;
- TimeUnit.MILLISECONDS.sleep(200);
- stream.appendMessage(BufferUtil.toBuffer("Zero",UTF8),fin);
- }
- catch (IOException | InterruptedException e)
- {
- hadError.set(true);
- e.printStackTrace(System.err);
- }
- }
- }).start();
-
- // Read it from the stream.
- byte buf[] = new byte[32];
- int len = stream.read(buf);
- String message = new String(buf,0,len,UTF8);
-
- // Test it
- Assert.assertThat("Error when appending",hadError.get(),is(false));
- Assert.assertThat("Message",message,is("Saved by Zero"));
- }
- }
-
- @Test
- public void testBlockOnReadInitial() throws IOException
- {
- LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
-
- try (PayloadInputStream stream = new PayloadInputStream(conn))
- {
- final AtomicBoolean hadError = new AtomicBoolean(false);
-
- new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- boolean fin = true;
- // wait for a little bit before populating buffers
- TimeUnit.MILLISECONDS.sleep(400);
- stream.appendMessage(BufferUtil.toBuffer("I will conquer",UTF8),fin);
- }
- catch (IOException | InterruptedException e)
- {
- hadError.set(true);
- e.printStackTrace(System.err);
- }
- }
- }).start();
-
- // Read byte from stream.
- int b = stream.read();
- // Should be a byte, blocking till byte received.
-
- // Test it
- Assert.assertThat("Error when appending",hadError.get(),is(false));
- Assert.assertThat("Initial byte",b,is((int)'I'));
- }
- }
-
- @Test
- public void testReadByteNoBuffersClosed() throws IOException
- {
- LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
-
- try (PayloadInputStream stream = new PayloadInputStream(conn))
- {
- final AtomicBoolean hadError = new AtomicBoolean(false);
-
- new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- // wait for a little bit before sending input closed
- TimeUnit.MILLISECONDS.sleep(400);
- stream.messageComplete();
- }
- catch (InterruptedException e)
- {
- hadError.set(true);
- e.printStackTrace(System.err);
- }
- }
- }).start();
-
- // Read byte from stream.
- int b = stream.read();
- // Should be a -1, indicating the end of the stream.
-
- // Test it
- Assert.assertThat("Error when appending",hadError.get(),is(false));
- Assert.assertThat("Initial byte",b,is(-1));
- }
- }
-}