Adding beginnings of Streaming support in WebSocketEventDriver

This commit is contained in:
Joakim Erdfelt 2012-07-05 16:11:11 -07:00
parent d163fb4414
commit 46452b9c7a
9 changed files with 365 additions and 97 deletions

View File

@ -1,6 +1,7 @@
package org.eclipse.jetty.websocket.api;
import java.lang.annotation.Annotation;
import java.io.InputStream;
import java.io.Reader;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@ -9,7 +10,6 @@ import org.eclipse.jetty.util.log.Logger;
public class EventMethod
{
public static final EventMethod NOOP = new EventMethod();
private static final Logger LOG = Log.getLogger(EventMethod.class);
private static Object[] dropFirstArg(Object[] args)
@ -23,37 +23,18 @@ public class EventMethod
return ret;
}
public static EventMethod findAnnotatedMethod(Object pojo, Class<? extends Annotation> annoClass, Class<?>... paramTypes)
{
Class<?>[] possibleParams = new Class<?>[paramTypes.length];
System.arraycopy(paramTypes,0,possibleParams,0,possibleParams.length);
for (Method method : pojo.getClass().getDeclaredMethods())
{
if (method.getAnnotation(annoClass) == null)
{
// skip, not interested
continue;
}
}
return NOOP;
}
protected Class<?> pojo;
protected Method method;
private boolean hasConnection = false;
private boolean isStreaming = false;
private Class<?>[] paramTypes;
private EventMethod()
{
this.method = null;
}
public EventMethod(Class<?> pojo, Method method)
{
this.pojo = pojo;
this.paramTypes = method.getParameterTypes();
this.method = method;
identifyPresentParamTypes();
}
public EventMethod(Class<?> pojo, String methodName, Class<?>... paramTypes)
@ -63,6 +44,7 @@ public class EventMethod
this.pojo = pojo;
this.paramTypes = paramTypes;
this.method = pojo.getMethod(methodName,paramTypes);
identifyPresentParamTypes();
}
catch (NoSuchMethodException | SecurityException e)
{
@ -114,4 +96,35 @@ public class EventMethod
{
return this.paramTypes;
}
private void identifyPresentParamTypes()
{
this.hasConnection = false;
this.isStreaming = false;
if (paramTypes == null)
{
return;
}
for(Class<?> paramType: paramTypes)
{
if(WebSocketConnection.class.isAssignableFrom(paramType)) {
this.hasConnection = true;
}
if(Reader.class.isAssignableFrom(paramType)||
InputStream.class.isAssignableFrom(paramType)) {
this.isStreaming = true;
}
}
}
public boolean isHasConnection()
{
return hasConnection;
}
public boolean isStreaming()
{
return isStreaming;
}
}

View File

@ -1,7 +1,9 @@
package org.eclipse.jetty.websocket.api;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
@ -10,7 +12,10 @@ import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.frames.BaseFrame;
import org.eclipse.jetty.websocket.frames.BinaryFrame;
import org.eclipse.jetty.websocket.frames.CloseFrame;
import org.eclipse.jetty.websocket.frames.TextFrame;
import org.eclipse.jetty.websocket.frames.DataFrame;
import org.eclipse.jetty.websocket.io.MessageInputStream;
import org.eclipse.jetty.websocket.io.MessageReader;
import org.eclipse.jetty.websocket.io.StreamAppender;
import org.eclipse.jetty.websocket.parser.Parser;
/**
@ -24,21 +29,25 @@ import org.eclipse.jetty.websocket.parser.Parser;
public class WebSocketEventDriver implements Parser.Listener
{
private static final Logger LOG = Log.getLogger(WebSocketEventDriver.class);
private Object websocket;
private WebSocketPolicy policy;
private final Object websocket;
private final WebSocketPolicy policy;
private final EventMethods events;
private final ByteBufferPool bufferPool;
private WebSocketConnection connection;
private EventMethods events;
private ByteBuffer activeMessage;
private StreamAppender activeStream;
/**
* Establish the driver for the Websocket POJO
*
* @param websocket
*/
public WebSocketEventDriver(EventMethodsCache methodsCache, WebSocketPolicy policy, Object websocket)
public WebSocketEventDriver(Object websocket, EventMethodsCache methodsCache, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
this.policy = policy;
this.websocket = websocket;
this.events = methodsCache.getMethods(websocket.getClass());
this.bufferPool = bufferPool;
if (events.isAnnotated())
{
@ -51,6 +60,15 @@ public class WebSocketEventDriver implements Parser.Listener
}
}
private void appendBuffer(ByteBuffer msgBuf, ByteBuffer payloadBuf)
{
if (msgBuf.remaining() < payloadBuf.remaining())
{
throw new CloseException(StatusCode.MESSAGE_TOO_LARGE,"Message exceeded maximum buffer of " + msgBuf.limit());
}
BufferUtil.put(payloadBuf,msgBuf);
}
public WebSocketPolicy getPolicy()
{
return policy;
@ -84,7 +102,6 @@ public class WebSocketEventDriver implements Parser.Listener
* @param frame
* the frame that appeared
*/
@SuppressWarnings("unchecked")
@Override
public void onFrame(BaseFrame frame)
{
@ -110,42 +127,167 @@ public class WebSocketEventDriver implements Parser.Listener
try
{
// Specified Text Case
if (frame instanceof TextFrame)
// Work a Data Frame
if (frame instanceof DataFrame)
{
if (events.onText != null)
DataFrame data = (DataFrame)frame;
if ((events.onText == null) && (events.onBinary == null))
{
TextFrame text = (TextFrame)frame;
events.onText.call(websocket,connection,text.getPayloadUTF8());
// skip
return;
}
// TODO
// if (events.onTextStream != null)
// {
// }
switch (data.getOpCode())
{
case BINARY:
{
if (events.onBinary.isStreaming())
{
boolean needsNotification = false;
return;
// Streaming Approach
if (activeStream == null)
{
// Allocate directly, not via ByteBufferPool, as this buffer
// is ultimately controlled by the end user, and we can't know
// when they are done using the stream in order to release any
// buffer allocated from the ByteBufferPool.
ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize());
this.activeStream = new MessageInputStream(buf);
needsNotification = true;
}
activeStream.appendBuffer(data.getPayload());
if (needsNotification)
{
events.onBinary.call(websocket,connection,activeStream);
}
if (data.isFin())
{
// close the stream.
activeStream.bufferComplete();
activeStream = null; // work with a new one
}
}
else
{
if (activeMessage == null)
{
// Acquire from ByteBufferPool is safe here, as the return
// from the notification is a good place to release the
// buffer.
activeMessage = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(activeMessage);
}
appendBuffer(activeMessage,data.getPayload());
// normal case
if (frame.isFin())
{
// Notify using simple message approach.
try
{
BufferUtil.flipToFlush(activeMessage,0);
byte buf[] = BufferUtil.toArray(activeMessage);
events.onBinary.call(websocket,connection,buf,0,buf.length);
}
finally
{
bufferPool.release(activeMessage);
activeMessage = null;
}
}
}
return;
}
case TEXT:
{
if (events.onText.isStreaming())
{
boolean needsNotification = false;
// Streaming Approach
if (activeStream == null)
{
// Allocate directly, not via ByteBufferPool, as this buffer
// is ultimately controlled by the end user, and we can't know
// when they are done using the stream in order to release any
// buffer allocated from the ByteBufferPool.
ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize());
this.activeStream = new MessageReader(buf);
needsNotification = true;
}
activeStream.appendBuffer(data.getPayload());
if (needsNotification)
{
events.onText.call(websocket,connection,activeStream);
}
if (data.isFin())
{
// close the stream.
activeStream.bufferComplete();
activeStream = null; // work with a new one
}
}
else
{
if (activeMessage == null)
{
// Acquire from ByteBufferPool is safe here, as the return
// from the notification is a good place to release the
// buffer.
activeMessage = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(activeMessage);
}
appendBuffer(activeMessage,data.getPayload());
// normal case
if (frame.isFin())
{
// Notify using simple message approach.
try
{
BufferUtil.flipToFlush(activeMessage,0);
events.onText.call(websocket,connection,BufferUtil.toUTF8String(activeMessage));
}
finally
{
bufferPool.release(activeMessage);
activeMessage = null;
}
}
}
return;
}
}
}
// Specified Binary Case
if (frame instanceof BinaryFrame)
if ((frame instanceof BinaryFrame) && (events.onBinary != null))
{
if (events.onBinary != null)
BinaryFrame bin = (BinaryFrame)frame;
if (events.onBinary.isStreaming())
{
// Streaming Approach
}
else
{
BinaryFrame bin = (BinaryFrame)frame;
// Byte array approach
byte buf[] = BufferUtil.toArray(bin.getPayload());
events.onBinary.call(websocket,connection,buf,0,buf.length);
}
// TODO
// if (events.onBinaryStream != null)
// {
// }
return;
}
}
catch (Throwable t)
{

View File

@ -0,0 +1,41 @@
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
public class MessageInputStream extends InputStream implements StreamAppender
{
private final ByteBuffer buffer;
public MessageInputStream(ByteBuffer buf)
{
this.buffer = buf;
}
@Override
public void appendBuffer(ByteBuffer buf)
{
// TODO Auto-generated method stub
}
@Override
public void bufferComplete() throws IOException
{
// TODO Auto-generated method stub
}
@Override
public ByteBuffer getBuffer()
{
return buffer;
}
@Override
public int read() throws IOException
{
// TODO Auto-generated method stub
return 0;
}
}

View File

@ -0,0 +1,48 @@
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.io.Reader;
import java.nio.ByteBuffer;
public class MessageReader extends Reader implements StreamAppender
{
private ByteBuffer buffer;
public MessageReader(ByteBuffer buf)
{
this.buffer = buf;
}
@Override
public void appendBuffer(ByteBuffer buf)
{
// TODO Auto-generated method stub
}
@Override
public void bufferComplete() throws IOException
{
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException
{
// TODO Auto-generated method stub
}
@Override
public ByteBuffer getBuffer()
{
return buffer;
}
@Override
public int read(char[] cbuf, int off, int len) throws IOException
{
// TODO Auto-generated method stub
return 0;
}
}

View File

@ -0,0 +1,13 @@
package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import java.nio.ByteBuffer;
public interface StreamAppender
{
void appendBuffer(ByteBuffer buf);
void bufferComplete() throws IOException;
ByteBuffer getBuffer();
}

View File

@ -3,6 +3,7 @@ package org.eclipse.jetty.websocket.api;
import static org.hamcrest.Matchers.*;
import java.util.ArrayList;
import java.util.regex.Pattern;
import org.junit.Assert;
@ -29,6 +30,12 @@ public class EventCapture extends ArrayList<String>
Assert.assertThat("Event Count",size(),is(expectedCount));
}
public void assertEventRegex(int eventNum, String regex)
{
String event = get(eventNum);
Assert.assertTrue("Event[" + eventNum + "]: regex:[" + regex + "] in [" + event + "]",Pattern.matches(regex,event));
}
public void assertEventStartsWith(int eventNum, String expected)
{
Assert.assertThat("Event[" + eventNum + "]",get(eventNum),startsWith(expected));

View File

@ -162,6 +162,9 @@ public class EventMethodsCacheTest
assertNoEventMethod(classId + ".onException",methods.onException);
assertNoEventMethod(classId + ".onText",methods.onText);
assertNoEventMethod(classId + ".onFrame",methods.onFrame);
Assert.assertFalse(classId + ".onBinary.hasConnection",methods.onBinary.isHasConnection());
Assert.assertFalse(classId + ".onBinary.isStreaming",methods.onBinary.isStreaming());
}
/**
@ -183,6 +186,9 @@ public class EventMethodsCacheTest
assertNoEventMethod(classId + ".onException",methods.onException);
assertNoEventMethod(classId + ".onText",methods.onText);
assertNoEventMethod(classId + ".onFrame",methods.onFrame);
Assert.assertFalse(classId + ".onBinary.hasConnection",methods.onBinary.isHasConnection());
Assert.assertTrue(classId + ".onBinary.isStreaming",methods.onBinary.isStreaming());
}
/**
@ -228,7 +234,7 @@ public class EventMethodsCacheTest
}
/**
* Test Case for no exceptions and 1 method
* Test Case for annotated for text messages w/connection param
*/
@Test
public void testAnnotatedMyStatelessEchoSocket()
@ -246,6 +252,9 @@ public class EventMethodsCacheTest
assertNoEventMethod(classId + ".onException",methods.onException);
assertHasEventMethod(classId + ".onText",methods.onText);
assertNoEventMethod(classId + ".onFrame",methods.onFrame);
Assert.assertTrue(classId + ".onText.hasConnection",methods.onText.isHasConnection());
Assert.assertFalse(classId + ".onText.isStreaming",methods.onText.isStreaming());
}
/**
@ -309,6 +318,9 @@ public class EventMethodsCacheTest
assertNoEventMethod(classId + ".onException",methods.onException);
assertHasEventMethod(classId + ".onText",methods.onText);
assertNoEventMethod(classId + ".onFrame",methods.onFrame);
Assert.assertFalse(classId + ".onText.hasConnection",methods.onText.isHasConnection());
Assert.assertFalse(classId + ".onText.isStreaming",methods.onText.isStreaming());
}
/**
@ -330,6 +342,9 @@ public class EventMethodsCacheTest
assertNoEventMethod(classId + ".onException",methods.onException);
assertHasEventMethod(classId + ".onText",methods.onText);
assertNoEventMethod(classId + ".onFrame",methods.onFrame);
Assert.assertFalse(classId + ".onText.hasConnection",methods.onText.isHasConnection());
Assert.assertTrue(classId + ".onText.isStreaming",methods.onText.isStreaming());
}
/**

View File

@ -1,5 +1,7 @@
package org.eclipse.jetty.websocket.api;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.samples.AdapterConnectCloseSocket;
import org.eclipse.jetty.websocket.api.samples.AnnotatedBinaryArraySocket;
@ -19,12 +21,20 @@ public class WebSocketEventDriverTest
@Rule
public TestName testname = new TestName();
private BinaryFrame makeBinaryFrame(String content, boolean fin)
{
BinaryFrame bin = new BinaryFrame(content.getBytes(StringUtil.__UTF8_CHARSET));
bin.setFin(fin);
return bin;
}
private WebSocketEventDriver newDriver(Object websocket)
{
EventMethodsCache methodsCache = new EventMethodsCache();
methodsCache.register(websocket.getClass());
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
return new WebSocketEventDriver(methodsCache,policy,websocket);
ByteBufferPool bufferPool = new StandardByteBufferPool();
return new WebSocketEventDriver(websocket,methodsCache,policy,bufferPool);
}
@Test
@ -52,7 +62,7 @@ public class WebSocketEventDriverTest
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
driver.setConnection(conn);
driver.onConnect();
driver.onFrame(new BinaryFrame("Hello World".getBytes(StringUtil.__UTF8_CHARSET)));
driver.onFrame(makeBinaryFrame("Hello World",true));
driver.onFrame(new CloseFrame(StatusCode.NORMAL));
socket.capture.assertEventCount(3);
@ -61,24 +71,6 @@ public class WebSocketEventDriverTest
socket.capture.assertEventStartsWith(2,"onClose(1000,");
}
@Test
public void testAnnotated_ByteBuffer()
{
AnnotatedBinaryStreamSocket socket = new AnnotatedBinaryStreamSocket();
WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
driver.setConnection(conn);
driver.onConnect();
driver.onFrame(new BinaryFrame("Hello World".getBytes(StringUtil.__UTF8_CHARSET)));
driver.onFrame(new CloseFrame(StatusCode.NORMAL));
socket.capture.assertEventCount(3);
socket.capture.assertEventStartsWith(0,"onConnect");
socket.capture.assertEvent(1,"onBinary(java.nio.HeapByteBuffer[pos=0 lim=11 cap=11])");
socket.capture.assertEventStartsWith(2,"onClose(1000,");
}
@Test
public void testAnnotated_Frames()
{
@ -95,12 +87,30 @@ public class WebSocketEventDriverTest
socket.capture.assertEventCount(5);
socket.capture.assertEventStartsWith(0,"onConnect(");
socket.capture.assertEventStartsWith(1,"onPingFrame(");
socket.capture.assertEventStartsWith(2,"onTextFrame(");
socket.capture.assertEventStartsWith(3,"onBaseFrame(BinaryFrame");
socket.capture.assertEventStartsWith(1,"onFrame(Ping");
socket.capture.assertEventStartsWith(2,"onFrame(Text");
socket.capture.assertEventStartsWith(3,"onFrame(Binary");
socket.capture.assertEventStartsWith(4,"onClose(1001,");
}
@Test
public void testAnnotated_InputStream()
{
AnnotatedBinaryStreamSocket socket = new AnnotatedBinaryStreamSocket();
WebSocketEventDriver driver = newDriver(socket);
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
driver.setConnection(conn);
driver.onConnect();
driver.onFrame(makeBinaryFrame("Hello World",true));
driver.onFrame(new CloseFrame(StatusCode.NORMAL));
socket.capture.assertEventCount(3);
socket.capture.assertEventStartsWith(0,"onConnect");
socket.capture.assertEventRegex(1,"^onBinary\\(.*InputStream.*");
socket.capture.assertEventStartsWith(2,"onClose(1000,");
}
@Test
public void testListener_Text()
{

View File

@ -5,23 +5,14 @@ import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.EventCapture;
import org.eclipse.jetty.websocket.api.Frame;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.frames.BaseFrame;
import org.eclipse.jetty.websocket.frames.ControlFrame;
import org.eclipse.jetty.websocket.frames.PingFrame;
import org.eclipse.jetty.websocket.frames.TextFrame;
@WebSocket
public class AnnotatedFramesSocket
{
public EventCapture capture = new EventCapture();
@OnWebSocketFrame
public void onBaseFrame(BaseFrame frame)
{
capture.add("onBaseFrame(%s)",frame);
}
@OnWebSocketClose
public void onClose(int statusCode, String reason)
{
@ -35,20 +26,8 @@ public class AnnotatedFramesSocket
}
@OnWebSocketFrame
public void onControlFrame(ControlFrame ping)
public void onFrame(Frame frame)
{
capture.add("onControlFrame(%s)",ping);
}
@OnWebSocketFrame
public void onPing(PingFrame ping)
{
capture.add("onPingFrame(%s)",ping);
}
@OnWebSocketFrame
public void onTextFrame(TextFrame text)
{
capture.add("onTextFrame(%s)",text);
capture.add("onFrame(%s)",frame);
}
}