JSR-356 cleaning up MessageInputStream and MessageReader

This commit is contained in:
Joakim Erdfelt 2013-06-05 10:34:38 -07:00
parent b01e6432a5
commit ffceb642c5
7 changed files with 263 additions and 499 deletions

View File

@ -132,6 +132,12 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect"); notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
} }
public void dispatch(Runnable runnable)
{
// TODO Auto-generated method stub
runnable.run();
}
@Override @Override
public void dump(Appendable out, String indent) throws IOException public void dump(Appendable out, String indent) throws IOException
{ {

View File

@ -65,6 +65,11 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
} }
} }
private void dispatch(Runnable runnable)
{
session.dispatch(runnable);
}
@Override @Override
public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
{ {
@ -78,7 +83,15 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
{ {
if (events.onBinary.isStreaming()) if (events.onBinary.isStreaming())
{ {
activeMessage = new MessageInputStream(this); activeMessage = new MessageInputStream(session.getConnection());
dispatch(new Runnable()
{
@Override
public void run()
{
events.onBinary.call(websocket,session,activeMessage);
}
});
} }
else else
{ {
@ -171,7 +184,15 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver
{ {
if (events.onText.isStreaming()) if (events.onText.isStreaming())
{ {
activeMessage = new MessageReader(this); activeMessage = new MessageReader(new MessageInputStream(session.getConnection()));
dispatch(new Runnable()
{
@Override
public void run()
{
events.onText.call(websocket,session,activeMessage);
}
});
} }
else else
{ {

View File

@ -21,97 +21,135 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.events.EventDriver;
/** /**
* Support class for reading binary message data as an InputStream. * Support class for reading a (single) WebSocket BINARY message via a InputStream.
* <p>
* An InputStream that can access a queue of ByteBuffer payloads, along with expected InputStream blocking behavior.
*/ */
public class MessageInputStream extends InputStream implements MessageAppender public class MessageInputStream extends InputStream implements MessageAppender
{ {
/** /**
* Threshold (of bytes) to perform compaction at * Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet.
*/ */
private static final int COMPACT_THRESHOLD = 5; @SuppressWarnings("unused")
private final EventDriver driver; private final LogicalConnection connection;
private final ByteBuffer buf; private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
private int size; private AtomicBoolean closed = new AtomicBoolean(false);
private boolean finished; // EOB / End of Buffers
private boolean needsNotification; private AtomicBoolean buffersExhausted = new AtomicBoolean(false);
private int readPosition; private ByteBuffer activeBuffer = null;
public MessageInputStream(EventDriver driver) public MessageInputStream(LogicalConnection connection)
{ {
this.driver = driver; this.connection = connection;
this.buf = ByteBuffer.allocate(driver.getPolicy().getMaxBinaryMessageBufferSize());
BufferUtil.clearToFill(this.buf);
size = 0;
readPosition = this.buf.position();
finished = false;
needsNotification = true;
} }
@Override @Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
{ {
if (finished) if (buffersExhausted.get())
{ {
throw new IOException("Cannot append to finished buffer"); // This indicates a programming mistake/error and must be bug fixed
throw new RuntimeException("Last frame already received");
} }
if (payload == null) // if closed, we should just toss incoming payloads into the bit bucket.
if (closed.get())
{ {
// empty payload is valid
return; return;
} }
driver.getPolicy().assertValidBinaryMessageSize(size + payload.remaining()); // Put the payload into the queue
size += payload.remaining(); try
synchronized (buf)
{ {
// TODO: grow buffer till max binary message size? buffers.put(payload);
// TODO: compact this buffer to fit incoming buffer? if (isLast)
// TODO: tell connection to suspend if buffer too full? {
BufferUtil.put(payload,buf); buffersExhausted.set(true);
}
} }
catch (InterruptedException e)
if (needsNotification)
{ {
needsNotification = true; throw new IOException(e);
this.driver.onInputStream(this);
} }
} }
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
finished = true; closed.set(true);
super.close(); super.close();
} }
@Override
public synchronized void mark(int readlimit)
{
/* do nothing */
}
@Override
public boolean markSupported()
{
return false;
}
@Override @Override
public void messageComplete() public void messageComplete()
{ {
finished = true; buffersExhausted.set(true);
// toss an empty ByteBuffer into queue to let it drain
try
{
buffers.put(ByteBuffer.wrap(new byte[0]));
}
catch (InterruptedException ignore)
{
/* ignore */
}
} }
@Override @Override
public int read() throws IOException public int read() throws IOException
{ {
synchronized (buf) try
{ {
byte b = buf.get(readPosition); if (closed.get())
readPosition++;
if (readPosition <= (buf.limit() - COMPACT_THRESHOLD))
{ {
int curPos = buf.position(); return -1;
buf.compact();
int offsetPos = buf.position() - curPos;
readPosition += offsetPos;
} }
return b;
if (activeBuffer == null)
{
activeBuffer = buffers.take();
}
while (activeBuffer.remaining() <= 0)
{
if (buffersExhausted.get())
{
closed.set(true);
return -1;
}
activeBuffer = buffers.take();
}
return activeBuffer.get();
}
catch (InterruptedException e)
{
throw new IOException(e);
} }
} }
@Override
public synchronized void reset() throws IOException
{
throw new IOException("reset() not supported");
}
} }

View File

@ -19,79 +19,36 @@
package org.eclipse.jetty.websocket.common.message; package org.eclipse.jetty.websocket.common.message;
import java.io.IOException; import java.io.IOException;
import java.io.Reader; import java.io.InputStreamReader;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.common.events.EventDriver;
/** /**
* Support class for reading text message data as an Reader. * Support class for reading a (single) WebSocket TEXT message via a Reader.
* <p> * <p>
* Due to the spec, this reader is forced to use the UTF8 charset. * In compliance to the WebSocket spec, this reader always uses the UTF8 {@link Charset}.
*/ */
public class MessageReader extends Reader implements MessageAppender public class MessageReader extends InputStreamReader implements MessageAppender
{ {
private final EventDriver driver; private final MessageInputStream stream;
private final Utf8StringBuilder utf;
private int size;
private boolean finished;
private boolean needsNotification;
public MessageReader(EventDriver driver) public MessageReader(MessageInputStream stream)
{ {
this.driver = driver; super(stream,StringUtil.__UTF8_CHARSET);
this.utf = new Utf8StringBuilder(); this.stream = stream;
size = 0;
finished = false;
needsNotification = true;
} }
@Override @Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
{ {
if (finished) this.stream.appendMessage(payload,isLast);
{
throw new IOException("Cannot append to finished buffer");
}
if (payload == null)
{
// empty payload is valid
return;
}
driver.getPolicy().assertValidTextMessageSize(size + payload.remaining());
size += payload.remaining();
synchronized (utf)
{
utf.append(payload);
}
if (needsNotification)
{
needsNotification = true;
this.driver.onReader(this);
}
}
@Override
public void close() throws IOException
{
finished = true;
} }
@Override @Override
public void messageComplete() public void messageComplete()
{ {
finished = true; this.stream.messageComplete();
}
@Override
public int read(char[] cbuf, int off, int len) throws IOException
{
// TODO Auto-generated method stub
return 0;
} }
} }

View File

@ -1,153 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.websocket.common.LogicalConnection;
/**
* An InputStream that can access a queue of ByteBuffer payloads, along with expected InputStream blocking behavior.
*/
public class PayloadInputStream extends InputStream implements MessageAppender
{
/**
* Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet.
*/
@SuppressWarnings("unused")
private final LogicalConnection connection;
private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
private AtomicBoolean closed = new AtomicBoolean(false);
// EOB / End of Buffers
private AtomicBoolean buffersExhausted = new AtomicBoolean(false);
private ByteBuffer activeBuffer = null;
public PayloadInputStream(LogicalConnection connection)
{
this.connection = connection;
}
@Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
{
if (buffersExhausted.get())
{
// This indicates a programming mistake/error and must be bug fixed
throw new RuntimeException("Last frame already received");
}
// if closed, we should just toss incoming payloads into the bit bucket.
if (closed.get())
{
return;
}
// Put the payload into the queue
try
{
buffers.put(payload);
if (isLast)
{
buffersExhausted.set(true);
}
}
catch (InterruptedException e)
{
throw new IOException(e);
}
}
@Override
public void close() throws IOException
{
closed.set(true);
super.close();
}
@Override
public synchronized void mark(int readlimit)
{
/* do nothing */
}
@Override
public boolean markSupported()
{
return false;
}
@Override
public void messageComplete()
{
buffersExhausted.set(true);
// toss an empty ByteBuffer into queue to let it drain
try
{
buffers.put(ByteBuffer.wrap(new byte[0]));
}
catch (InterruptedException ignore)
{
/* ignore */
}
}
@Override
public int read() throws IOException
{
try
{
if (closed.get())
{
return -1;
}
if (activeBuffer == null)
{
activeBuffer = buffers.take();
}
while (activeBuffer.remaining() <= 0)
{
if (buffersExhausted.get())
{
closed.set(true);
return -1;
}
activeBuffer = buffers.take();
}
return activeBuffer.get();
}
catch (InterruptedException e)
{
throw new IOException(e);
}
}
@Override
public synchronized void reset() throws IOException
{
throw new IOException("reset() not supported");
}
}

View File

@ -22,84 +22,166 @@ import static org.hamcrest.Matchers.*;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.io.FramePipes;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketSession;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName; import org.junit.rules.TestName;
public class MessageInputStreamTest public class MessageInputStreamTest
{ {
@Rule private static final Charset UTF8 = StringUtil.__UTF8_CHARSET;
public TestTracker testtracker = new TestTracker();
@Rule @Rule
public TestName testname = new TestName(); public TestName testname = new TestName();
private WebSocketPolicy policy; @Test
private TrackingInputStreamSocket socket; public void testBasicAppendRead() throws IOException
private LocalWebSocketSession session;
private LocalWebSocketSession remoteSession;
@After
public void closeSession()
{ {
session.close(); LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
remoteSession.close();
}
@Before try (MessageInputStream stream = new MessageInputStream(conn))
public void setupSession() {
{ // Append a message (simple, short)
policy = WebSocketPolicy.newServerPolicy(); ByteBuffer payload = BufferUtil.toBuffer("Hello World",UTF8);
policy.setInputBufferSize(1024); System.out.printf("payload = %s%n",BufferUtil.toDetailString(payload));
policy.setMaxBinaryMessageBufferSize(1024); boolean fin = true;
policy.setMaxTextMessageBufferSize(1024); stream.appendMessage(payload,fin);
// Event Driver factory // Read it from the stream.
EventDriverFactory factory = new EventDriverFactory(policy); byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,UTF8);
// Local Socket // Test it
EventDriver localDriver = factory.wrap(new DummySocket()); Assert.assertThat("Message",message,is("Hello World"));
}
// Remote socket & Session
socket = new TrackingInputStreamSocket("remote");
EventDriver remoteDriver = factory.wrap(socket);
remoteSession = new LocalWebSocketSession(testname,remoteDriver);
remoteSession.open();
OutgoingFrames socketPipe = FramePipes.to(remoteDriver);
// Local Session
session = new LocalWebSocketSession(testname,localDriver);
session.setPolicy(policy);
// talk to our remote socket
session.setOutgoingHandler(socketPipe);
// open connection
session.open();
} }
@Test @Test
@Ignore public void testBlockOnRead() throws IOException
public void testSimpleMessage() throws IOException
{ {
ByteBuffer data = BufferUtil.toBuffer("Hello World",StringUtil.__UTF8_CHARSET); LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
session.getRemote().sendBytes(data);
Assert.assertThat("Socket.messageQueue.size",socket.messageQueue.size(),is(1)); try (MessageInputStream stream = new MessageInputStream(conn))
String msg = socket.messageQueue.poll(); {
Assert.assertThat("Message",msg,is("Hello World")); final AtomicBoolean hadError = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
boolean fin = false;
TimeUnit.MILLISECONDS.sleep(200);
stream.appendMessage(BufferUtil.toBuffer("Saved",UTF8),fin);
TimeUnit.MILLISECONDS.sleep(200);
stream.appendMessage(BufferUtil.toBuffer(" by ",UTF8),fin);
fin = true;
TimeUnit.MILLISECONDS.sleep(200);
stream.appendMessage(BufferUtil.toBuffer("Zero",UTF8),fin);
}
catch (IOException | InterruptedException e)
{
hadError.set(true);
e.printStackTrace(System.err);
}
}
}).start();
// Read it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,UTF8);
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Message",message,is("Saved by Zero"));
}
}
@Test
public void testBlockOnReadInitial() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (MessageInputStream stream = new MessageInputStream(conn))
{
final AtomicBoolean hadError = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
boolean fin = true;
// wait for a little bit before populating buffers
TimeUnit.MILLISECONDS.sleep(400);
stream.appendMessage(BufferUtil.toBuffer("I will conquer",UTF8),fin);
}
catch (IOException | InterruptedException e)
{
hadError.set(true);
e.printStackTrace(System.err);
}
}
}).start();
// Read byte from stream.
int b = stream.read();
// Should be a byte, blocking till byte received.
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Initial byte",b,is((int)'I'));
}
}
@Test
public void testReadByteNoBuffersClosed() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (MessageInputStream stream = new MessageInputStream(conn))
{
final AtomicBoolean hadError = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
// wait for a little bit before sending input closed
TimeUnit.MILLISECONDS.sleep(400);
stream.messageComplete();
}
catch (InterruptedException e)
{
hadError.set(true);
e.printStackTrace(System.err);
}
}
}).start();
// Read byte from stream.
int b = stream.read();
// Should be a -1, indicating the end of the stream.
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Initial byte",b,is(-1));
}
} }
} }

View File

@ -1,187 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class PayloadInputStreamTest
{
private static final Charset UTF8 = StringUtil.__UTF8_CHARSET;
@Rule
public TestName testname = new TestName();
@Test
public void testBasicAppendRead() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (PayloadInputStream stream = new PayloadInputStream(conn))
{
// Append a message (simple, short)
ByteBuffer payload = BufferUtil.toBuffer("Hello World",UTF8);
System.out.printf("payload = %s%n",BufferUtil.toDetailString(payload));
boolean fin = true;
stream.appendMessage(payload,fin);
// Read it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,UTF8);
// Test it
Assert.assertThat("Message",message,is("Hello World"));
}
}
@Test
public void testBlockOnRead() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (PayloadInputStream stream = new PayloadInputStream(conn))
{
final AtomicBoolean hadError = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
boolean fin = false;
TimeUnit.MILLISECONDS.sleep(200);
stream.appendMessage(BufferUtil.toBuffer("Saved",UTF8),fin);
TimeUnit.MILLISECONDS.sleep(200);
stream.appendMessage(BufferUtil.toBuffer(" by ",UTF8),fin);
fin = true;
TimeUnit.MILLISECONDS.sleep(200);
stream.appendMessage(BufferUtil.toBuffer("Zero",UTF8),fin);
}
catch (IOException | InterruptedException e)
{
hadError.set(true);
e.printStackTrace(System.err);
}
}
}).start();
// Read it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,UTF8);
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Message",message,is("Saved by Zero"));
}
}
@Test
public void testBlockOnReadInitial() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (PayloadInputStream stream = new PayloadInputStream(conn))
{
final AtomicBoolean hadError = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
boolean fin = true;
// wait for a little bit before populating buffers
TimeUnit.MILLISECONDS.sleep(400);
stream.appendMessage(BufferUtil.toBuffer("I will conquer",UTF8),fin);
}
catch (IOException | InterruptedException e)
{
hadError.set(true);
e.printStackTrace(System.err);
}
}
}).start();
// Read byte from stream.
int b = stream.read();
// Should be a byte, blocking till byte received.
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Initial byte",b,is((int)'I'));
}
}
@Test
public void testReadByteNoBuffersClosed() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (PayloadInputStream stream = new PayloadInputStream(conn))
{
final AtomicBoolean hadError = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
// wait for a little bit before sending input closed
TimeUnit.MILLISECONDS.sleep(400);
stream.messageComplete();
}
catch (InterruptedException e)
{
hadError.set(true);
e.printStackTrace(System.err);
}
}
}).start();
// Read byte from stream.
int b = stream.read();
// Should be a -1, indicating the end of the stream.
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Initial byte",b,is(-1));
}
}
}