JSR-356 more work on supporting Streams

This commit is contained in:
Joakim Erdfelt 2013-05-30 16:25:38 -07:00
parent 4c455e4f9b
commit c872b95c04
12 changed files with 546 additions and 45 deletions

View File

@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
@ -36,9 +38,18 @@ import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
*/
public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
private static final String PRIORMSG_ERROR = "Prior message pending, cannot start new message yet.";
/** Type of Message */
private static final int NONE = 0;
private static final int TEXT = 1;
private static final int BINARY = 2;
private static final int CONTROL = 3;
private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class);
public final LogicalConnection connection;
public final OutgoingFrames outgoing;
private final ReentrantLock msgLock = new ReentrantLock();
private final AtomicInteger msgType = new AtomicInteger(NONE);
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
{
@ -100,89 +111,234 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public void sendBytes(ByteBuffer data) throws IOException
{
connection.getIOState().assertOutputOpen();
if (LOG.isDebugEnabled())
if (msgLock.tryLock())
{
LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data));
try
{
msgType.set(BINARY);
connection.getIOState().assertOutputOpen();
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data));
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(data);
blockingWrite(frame);
}
finally
{
msgType.set(NONE);
msgLock.unlock();
}
}
else
{
throw new IllegalStateException(PRIORMSG_ERROR);
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(data);
blockingWrite(frame);
}
@Override
public Future<Void> sendBytesByFuture(ByteBuffer data)
{
if (LOG.isDebugEnabled())
if (msgLock.tryLock())
{
LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data));
try
{
msgType.set(BINARY);
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data));
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(data);
return sendAsyncFrame(frame);
}
finally
{
msgType.set(NONE);
msgLock.unlock();
}
}
else
{
throw new IllegalStateException(PRIORMSG_ERROR);
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(data);
return sendAsyncFrame(frame);
}
@Override
public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
{
if (LOG.isDebugEnabled())
if (msgLock.tryLock())
{
LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast);
try
{
if (msgType.get() == TEXT)
{
throw new IllegalStateException("Prior TEXT message pending, cannot start new BINARY message yet.");
}
msgType.set(BINARY);
if (LOG.isDebugEnabled())
{
LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast);
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(fragment).setFin(isLast);
blockingWrite(frame);
}
finally
{
if (isLast)
{
msgType.set(NONE);
}
msgLock.unlock();
}
}
else
{
throw new IllegalStateException(PRIORMSG_ERROR);
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(fragment).setFin(isLast);
blockingWrite(frame);
}
@Override
public void sendPartialString(String fragment, boolean isLast) throws IOException
{
if (LOG.isDebugEnabled())
if (msgLock.tryLock())
{
LOG.debug("sendPartialString({}, {})",fragment,isLast);
try
{
if (msgType.get() == BINARY)
{
throw new IllegalStateException("Prior BINARY message pending, cannot start new TEXT message yet.");
}
msgType.set(TEXT);
if (LOG.isDebugEnabled())
{
LOG.debug("sendPartialString({}, {})",fragment,isLast);
}
WebSocketFrame frame = WebSocketFrame.text(fragment).setFin(isLast);
blockingWrite(frame);
}
finally
{
if (isLast)
{
msgType.set(NONE);
}
msgLock.unlock();
}
}
else
{
throw new IllegalStateException(PRIORMSG_ERROR);
}
WebSocketFrame frame = WebSocketFrame.text(fragment).setFin(isLast);
blockingWrite(frame);
}
@Override
public void sendPing(ByteBuffer applicationData) throws IOException
{
if (LOG.isDebugEnabled())
if (msgLock.tryLock())
{
LOG.debug("sendPing with {}",BufferUtil.toDetailString(applicationData));
try
{
msgType.set(CONTROL);
if (LOG.isDebugEnabled())
{
LOG.debug("sendPing with {}",BufferUtil.toDetailString(applicationData));
}
WebSocketFrame frame = WebSocketFrame.ping().setPayload(applicationData);
blockingWrite(frame);
}
finally
{
msgType.set(NONE);
msgLock.unlock();
}
}
else
{
throw new IllegalStateException(PRIORMSG_ERROR);
}
WebSocketFrame frame = WebSocketFrame.ping().setPayload(applicationData);
blockingWrite(frame);
}
@Override
public void sendPong(ByteBuffer applicationData) throws IOException
{
if (LOG.isDebugEnabled())
if (msgLock.tryLock())
{
LOG.debug("sendPong with {}",BufferUtil.toDetailString(applicationData));
try
{
msgType.set(CONTROL);
if (LOG.isDebugEnabled())
{
LOG.debug("sendPong with {}",BufferUtil.toDetailString(applicationData));
}
WebSocketFrame frame = WebSocketFrame.pong().setPayload(applicationData);
blockingWrite(frame);
}
finally
{
msgType.set(NONE);
msgLock.unlock();
}
}
else
{
throw new IllegalStateException(PRIORMSG_ERROR);
}
WebSocketFrame frame = WebSocketFrame.pong().setPayload(applicationData);
blockingWrite(frame);
}
@Override
public void sendString(String text) throws IOException
{
WebSocketFrame frame = WebSocketFrame.text(text);
if (LOG.isDebugEnabled())
if (msgLock.tryLock())
{
LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload()));
try
{
msgType.set(TEXT);
WebSocketFrame frame = WebSocketFrame.text(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload()));
}
blockingWrite(WebSocketFrame.text(text));
}
finally
{
msgType.set(NONE);
msgLock.unlock();
}
}
else
{
throw new IllegalStateException(PRIORMSG_ERROR);
}
blockingWrite(WebSocketFrame.text(text));
}
@Override
public Future<Void> sendStringByFuture(String text)
{
WebSocketFrame frame = WebSocketFrame.text(text);
if (LOG.isDebugEnabled())
if (msgLock.tryLock())
{
LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload()));
try
{
msgType.set(BINARY);
WebSocketFrame frame = WebSocketFrame.text(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload()));
}
return sendAsyncFrame(frame);
}
finally
{
msgType.set(NONE);
msgLock.unlock();
}
}
else
{
throw new IllegalStateException(PRIORMSG_ERROR);
}
return sendAsyncFrame(frame);
}
}

View File

@ -332,6 +332,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
write(buffer);
}
@Override
public ByteBufferPool getBufferPool()
{
return bufferPool;
@ -535,7 +536,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
EndPoint endPoint = getEndPoint();
try
{
while (true)
while (true) // TODO: should this honor the LogicalConnection.suspend() ?
{
int filled = endPoint.fill(buffer);
if (filled == 0)
@ -555,6 +556,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
parser.parse(buffer);
// TODO: has the end user application already consumed what it was given?
}
}
}

View File

@ -35,6 +35,8 @@ import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
/**
* Support for writing a single WebSocket TEXT message via a {@link Writer}
* <p>
* Note: Per WebSocket spec, all WebSocket TEXT messages must be encoded in UTF-8
*/
public class MessageWriter extends Writer
{

View File

@ -71,6 +71,10 @@ public class Utf8CharBuffer extends Utf8Appendable
public ByteBuffer getByteBuffer()
{
// remember settings
int limit = buffer.limit();
int position = buffer.position();
// flip to flush
buffer.limit(buffer.position());
buffer.position(0);
@ -78,6 +82,10 @@ public class Utf8CharBuffer extends Utf8Appendable
// get byte buffer
ByteBuffer bb = UTF8.encode(buffer);
// restor settings
buffer.limit(limit);
buffer.position(position);
return bb;
}

View File

@ -0,0 +1,86 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class WebSocketRemoteEndpointTest
{
@Rule
public TestName testname = new TestName();
@Test
public void testTextBinaryText() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
OutgoingFramesCapture outgoing = new OutgoingFramesCapture();
WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(conn,outgoing);
conn.connect();
conn.open();
// Start text message
remote.sendPartialString("Hello ",false);
try
{
// Attempt to start Binary Message
ByteBuffer bytes = ByteBuffer.wrap(new byte[]
{ 0, 1, 2 });
remote.sendPartialBytes(bytes,false);
Assert.fail("Expected " + IllegalStateException.class.getName());
}
catch (IllegalStateException e)
{
// Expected path
Assert.assertThat("Exception",e.getMessage(),containsString("message pending"));
}
// End text message
remote.sendPartialString("World!",true);
}
@Test
public void testTextPingText() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
OutgoingFramesCapture outgoing = new OutgoingFramesCapture();
WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(conn,outgoing);
conn.connect();
conn.open();
// Start text message
remote.sendPartialString("Hello ",false);
// Attempt to send Ping Message
remote.sendPing(ByteBuffer.wrap(new byte[]
{ 0 }));
// End text message
remote.sendPartialString("World!",true);
}
}

View File

@ -44,7 +44,7 @@ public class MuxerAddClientTest
// Client side physical socket
LocalWebSocketConnection physical = new LocalWebSocketConnection(testname);
physical.setPolicy(WebSocketPolicy.newClientPolicy());
physical.onOpen();
physical.open();
// Server Reader
MuxDecoder serverRead = new MuxDecoder();

View File

@ -48,7 +48,7 @@ public class MuxerAddServerTest
// Server side physical connection
LocalWebSocketConnection physical = new LocalWebSocketConnection(testname);
physical.setPolicy(WebSocketPolicy.newServerPolicy());
physical.onOpen();
physical.open();
// Client reader
MuxDecoder clientRead = new MuxDecoder();

View File

@ -77,6 +77,12 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
ioState.onCloseLocal(close);
}
public void connect()
{
LOG.debug("connect()");
ioState.onConnected();
}
@Override
public void disconnect()
{
@ -92,7 +98,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
@Override
public long getIdleTimeout()
{
// TODO Auto-generated method stub
return 0;
}
@ -116,7 +121,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
@Override
public long getMaxIdleTimeout()
{
// TODO Auto-generated method stub
return 0;
}
@ -184,8 +188,9 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
}
}
public void onOpen() {
LOG.debug("onOpen()");
public void open()
{
LOG.debug("open()");
ioState.onOpened();
}
@ -202,8 +207,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
@Override
public void setMaxIdleTimeout(long ms)
{
// TODO Auto-generated method stub
}
@Override

View File

@ -0,0 +1,29 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
/**
* Do nothing Dummy Socket, used in testing.
*/
public class DummySocket extends WebSocketAdapter
{
/* intentionally empty */
}

View File

@ -0,0 +1,105 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.io.FramePipes;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketSession;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class MessageInputStreamTest
{
@Rule
public TestTracker testtracker = new TestTracker();
@Rule
public TestName testname = new TestName();
private WebSocketPolicy policy;
private TrackingInputStreamSocket socket;
private LocalWebSocketSession session;
private LocalWebSocketSession remoteSession;
@After
public void closeSession()
{
session.close();
remoteSession.close();
}
@Before
public void setupSession()
{
policy = WebSocketPolicy.newServerPolicy();
policy.setInputBufferSize(1024);
policy.setMaxBinaryMessageBufferSize(1024);
policy.setMaxTextMessageBufferSize(1024);
// Event Driver factory
EventDriverFactory factory = new EventDriverFactory(policy);
// Local Socket
EventDriver localDriver = factory.wrap(new DummySocket());
// Remote socket & Session
socket = new TrackingInputStreamSocket("remote");
EventDriver remoteDriver = factory.wrap(socket);
remoteSession = new LocalWebSocketSession(testname,remoteDriver);
remoteSession.open();
OutgoingFrames socketPipe = FramePipes.to(remoteDriver);
// Local Session
session = new LocalWebSocketSession(testname,localDriver);
session.setPolicy(policy);
// talk to our remote socket
session.setOutgoingHandler(socketPipe);
// open connection
session.open();
}
@Test
@Ignore
public void testSimpleMessage() throws IOException
{
ByteBuffer data = BufferUtil.toBuffer("Hello World",StringUtil.__UTF8_CHARSET);
session.getRemote().sendBytes(data);
Assert.assertThat("Socket.messageQueue.size",socket.messageQueue.size(),is(1));
String msg = socket.messageQueue.poll();
Assert.assertThat("Message",msg,is("Hello World"));
}
}

View File

@ -0,0 +1,110 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.junit.Assert;
@WebSocket
public class TrackingInputStreamSocket
{
private static final Logger LOG = Log.getLogger(TrackingInputStreamSocket.class);
private final String id;
public int closeCode = -1;
public StringBuilder closeMessage = new StringBuilder();
public CountDownLatch closeLatch = new CountDownLatch(1);
public EventQueue<String> messageQueue = new EventQueue<>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public TrackingInputStreamSocket()
{
this("socket");
}
public TrackingInputStreamSocket(String id)
{
this.id = id;
}
public void assertClose(int expectedStatusCode, String expectedReason) throws InterruptedException
{
assertCloseCode(expectedStatusCode);
assertCloseReason(expectedReason);
}
public void assertCloseCode(int expectedCode) throws InterruptedException
{
Assert.assertThat("Was Closed",closeLatch.await(50,TimeUnit.MILLISECONDS),is(true));
Assert.assertThat("Close Code",closeCode,is(expectedCode));
}
private void assertCloseReason(String expectedReason)
{
Assert.assertThat("Close Reason",closeMessage.toString(),is(expectedReason));
}
@OnWebSocketClose
public void onClose(int statusCode, String reason)
{
LOG.debug("{} onClose({},{})",id,statusCode,reason);
closeCode = statusCode;
closeMessage.append(reason);
closeLatch.countDown();
}
@OnWebSocketError
public void onError(Throwable cause)
{
errorQueue.add(cause);
}
@OnWebSocketMessage
public void onInputStream(InputStream stream)
{
LOG.debug("{} onInputStream({})",id,stream);
try
{
String msg = IO.toString(stream);
messageQueue.add(msg);
}
catch (IOException e)
{
errorQueue.add(e);
}
}
public void waitForClose(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Client Socket Closed",closeLatch.await(timeoutDuration,timeoutUnit),is(true));
}
}

View File

@ -42,7 +42,7 @@ public class Utf8CharBufferTest
@Test
public void testAppendGetAppendGet()
{
ByteBuffer buf = ByteBuffer.allocate(64);
ByteBuffer buf = ByteBuffer.allocate(128);
Utf8CharBuffer utf = Utf8CharBuffer.wrap(buf);
byte hellobytes[] = asUTF("Hello ");
@ -60,8 +60,8 @@ public class Utf8CharBufferTest
@Test
public void testAppendGetClearAppendGet()
{
int bufsize = 64;
ByteBuffer buf = ByteBuffer.allocate(64);
int bufsize = 128;
ByteBuffer buf = ByteBuffer.allocate(bufsize);
Utf8CharBuffer utf = Utf8CharBuffer.wrap(buf);
int expectedSize = bufsize / 2;