diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/EventMethod.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/EventMethod.java index 84177f07197..b07698a6a6c 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/EventMethod.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/EventMethod.java @@ -1,6 +1,7 @@ package org.eclipse.jetty.websocket.api; -import java.lang.annotation.Annotation; +import java.io.InputStream; +import java.io.Reader; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -9,7 +10,6 @@ import org.eclipse.jetty.util.log.Logger; public class EventMethod { - public static final EventMethod NOOP = new EventMethod(); private static final Logger LOG = Log.getLogger(EventMethod.class); private static Object[] dropFirstArg(Object[] args) @@ -23,37 +23,18 @@ public class EventMethod return ret; } - public static EventMethod findAnnotatedMethod(Object pojo, Class annoClass, Class... paramTypes) - { - Class[] possibleParams = new Class[paramTypes.length]; - System.arraycopy(paramTypes,0,possibleParams,0,possibleParams.length); - - for (Method method : pojo.getClass().getDeclaredMethods()) - { - if (method.getAnnotation(annoClass) == null) - { - // skip, not interested - continue; - } - - } - return NOOP; - } - protected Class pojo; protected Method method; + private boolean hasConnection = false; + private boolean isStreaming = false; private Class[] paramTypes; - private EventMethod() - { - this.method = null; - } - public EventMethod(Class pojo, Method method) { this.pojo = pojo; this.paramTypes = method.getParameterTypes(); this.method = method; + identifyPresentParamTypes(); } public EventMethod(Class pojo, String methodName, Class... paramTypes) @@ -63,6 +44,7 @@ public class EventMethod this.pojo = pojo; this.paramTypes = paramTypes; this.method = pojo.getMethod(methodName,paramTypes); + identifyPresentParamTypes(); } catch (NoSuchMethodException | SecurityException e) { @@ -114,4 +96,35 @@ public class EventMethod { return this.paramTypes; } + + private void identifyPresentParamTypes() + { + this.hasConnection = false; + this.isStreaming = false; + + if (paramTypes == null) + { + return; + } + for(Class paramType: paramTypes) + { + if(WebSocketConnection.class.isAssignableFrom(paramType)) { + this.hasConnection = true; + } + if(Reader.class.isAssignableFrom(paramType)|| + InputStream.class.isAssignableFrom(paramType)) { + this.isStreaming = true; + } + } + } + + public boolean isHasConnection() + { + return hasConnection; + } + + public boolean isStreaming() + { + return isStreaming; + } } \ No newline at end of file diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketEventDriver.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketEventDriver.java index 476838900a7..5bb7c57ed9c 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketEventDriver.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketEventDriver.java @@ -1,7 +1,9 @@ package org.eclipse.jetty.websocket.api; import java.io.IOException; +import java.nio.ByteBuffer; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; @@ -10,7 +12,10 @@ import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.frames.BaseFrame; import org.eclipse.jetty.websocket.frames.BinaryFrame; import org.eclipse.jetty.websocket.frames.CloseFrame; -import org.eclipse.jetty.websocket.frames.TextFrame; +import org.eclipse.jetty.websocket.frames.DataFrame; +import org.eclipse.jetty.websocket.io.MessageInputStream; +import org.eclipse.jetty.websocket.io.MessageReader; +import org.eclipse.jetty.websocket.io.StreamAppender; import org.eclipse.jetty.websocket.parser.Parser; /** @@ -24,21 +29,25 @@ import org.eclipse.jetty.websocket.parser.Parser; public class WebSocketEventDriver implements Parser.Listener { private static final Logger LOG = Log.getLogger(WebSocketEventDriver.class); - private Object websocket; - private WebSocketPolicy policy; + private final Object websocket; + private final WebSocketPolicy policy; + private final EventMethods events; + private final ByteBufferPool bufferPool; private WebSocketConnection connection; - private EventMethods events; + private ByteBuffer activeMessage; + private StreamAppender activeStream; /** * Establish the driver for the Websocket POJO * * @param websocket */ - public WebSocketEventDriver(EventMethodsCache methodsCache, WebSocketPolicy policy, Object websocket) + public WebSocketEventDriver(Object websocket, EventMethodsCache methodsCache, WebSocketPolicy policy, ByteBufferPool bufferPool) { this.policy = policy; this.websocket = websocket; this.events = methodsCache.getMethods(websocket.getClass()); + this.bufferPool = bufferPool; if (events.isAnnotated()) { @@ -51,6 +60,15 @@ public class WebSocketEventDriver implements Parser.Listener } } + private void appendBuffer(ByteBuffer msgBuf, ByteBuffer payloadBuf) + { + if (msgBuf.remaining() < payloadBuf.remaining()) + { + throw new CloseException(StatusCode.MESSAGE_TOO_LARGE,"Message exceeded maximum buffer of " + msgBuf.limit()); + } + BufferUtil.put(payloadBuf,msgBuf); + } + public WebSocketPolicy getPolicy() { return policy; @@ -84,7 +102,6 @@ public class WebSocketEventDriver implements Parser.Listener * @param frame * the frame that appeared */ - @SuppressWarnings("unchecked") @Override public void onFrame(BaseFrame frame) { @@ -110,42 +127,167 @@ public class WebSocketEventDriver implements Parser.Listener try { - // Specified Text Case - if (frame instanceof TextFrame) + // Work a Data Frame + if (frame instanceof DataFrame) { - if (events.onText != null) + DataFrame data = (DataFrame)frame; + + if ((events.onText == null) && (events.onBinary == null)) { - TextFrame text = (TextFrame)frame; - events.onText.call(websocket,connection,text.getPayloadUTF8()); + // skip return; } - // TODO - // if (events.onTextStream != null) - // { - // } + switch (data.getOpCode()) + { + case BINARY: + { + if (events.onBinary.isStreaming()) + { + boolean needsNotification = false; - return; + // Streaming Approach + if (activeStream == null) + { + // Allocate directly, not via ByteBufferPool, as this buffer + // is ultimately controlled by the end user, and we can't know + // when they are done using the stream in order to release any + // buffer allocated from the ByteBufferPool. + ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize()); + this.activeStream = new MessageInputStream(buf); + needsNotification = true; + } + + activeStream.appendBuffer(data.getPayload()); + + if (needsNotification) + { + events.onBinary.call(websocket,connection,activeStream); + } + + if (data.isFin()) + { + // close the stream. + activeStream.bufferComplete(); + activeStream = null; // work with a new one + } + } + else + { + if (activeMessage == null) + { + // Acquire from ByteBufferPool is safe here, as the return + // from the notification is a good place to release the + // buffer. + activeMessage = bufferPool.acquire(policy.getBufferSize(),false); + BufferUtil.clearToFill(activeMessage); + } + + appendBuffer(activeMessage,data.getPayload()); + + // normal case + if (frame.isFin()) + { + // Notify using simple message approach. + try + { + BufferUtil.flipToFlush(activeMessage,0); + byte buf[] = BufferUtil.toArray(activeMessage); + events.onBinary.call(websocket,connection,buf,0,buf.length); + } + finally + { + bufferPool.release(activeMessage); + activeMessage = null; + } + } + + } + return; + } + case TEXT: + { + if (events.onText.isStreaming()) + { + boolean needsNotification = false; + + // Streaming Approach + if (activeStream == null) + { + // Allocate directly, not via ByteBufferPool, as this buffer + // is ultimately controlled by the end user, and we can't know + // when they are done using the stream in order to release any + // buffer allocated from the ByteBufferPool. + ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize()); + this.activeStream = new MessageReader(buf); + needsNotification = true; + } + + activeStream.appendBuffer(data.getPayload()); + + if (needsNotification) + { + events.onText.call(websocket,connection,activeStream); + } + + if (data.isFin()) + { + // close the stream. + activeStream.bufferComplete(); + activeStream = null; // work with a new one + } + } + else + { + if (activeMessage == null) + { + // Acquire from ByteBufferPool is safe here, as the return + // from the notification is a good place to release the + // buffer. + activeMessage = bufferPool.acquire(policy.getBufferSize(),false); + BufferUtil.clearToFill(activeMessage); + } + + appendBuffer(activeMessage,data.getPayload()); + + // normal case + if (frame.isFin()) + { + // Notify using simple message approach. + try + { + BufferUtil.flipToFlush(activeMessage,0); + events.onText.call(websocket,connection,BufferUtil.toUTF8String(activeMessage)); + } + finally + { + bufferPool.release(activeMessage); + activeMessage = null; + } + } + } + return; + } + } } // Specified Binary Case - if (frame instanceof BinaryFrame) + if ((frame instanceof BinaryFrame) && (events.onBinary != null)) { - if (events.onBinary != null) + BinaryFrame bin = (BinaryFrame)frame; + if (events.onBinary.isStreaming()) + { + // Streaming Approach + } + else { - BinaryFrame bin = (BinaryFrame)frame; // Byte array approach byte buf[] = BufferUtil.toArray(bin.getPayload()); events.onBinary.call(websocket,connection,buf,0,buf.length); } - - // TODO - // if (events.onBinaryStream != null) - // { - // } - return; } + } catch (Throwable t) { diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageInputStream.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageInputStream.java new file mode 100644 index 00000000000..7666cbb5353 --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageInputStream.java @@ -0,0 +1,41 @@ +package org.eclipse.jetty.websocket.io; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class MessageInputStream extends InputStream implements StreamAppender +{ + private final ByteBuffer buffer; + + public MessageInputStream(ByteBuffer buf) + { + this.buffer = buf; + } + + @Override + public void appendBuffer(ByteBuffer buf) + { + // TODO Auto-generated method stub + } + + @Override + public void bufferComplete() throws IOException + { + // TODO Auto-generated method stub + + } + + @Override + public ByteBuffer getBuffer() + { + return buffer; + } + + @Override + public int read() throws IOException + { + // TODO Auto-generated method stub + return 0; + } +} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageReader.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageReader.java new file mode 100644 index 00000000000..5327418be60 --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/MessageReader.java @@ -0,0 +1,48 @@ +package org.eclipse.jetty.websocket.io; + +import java.io.IOException; +import java.io.Reader; +import java.nio.ByteBuffer; + +public class MessageReader extends Reader implements StreamAppender +{ + private ByteBuffer buffer; + + public MessageReader(ByteBuffer buf) + { + this.buffer = buf; + } + + @Override + public void appendBuffer(ByteBuffer buf) + { + // TODO Auto-generated method stub + + } + + @Override + public void bufferComplete() throws IOException + { + // TODO Auto-generated method stub + + } + + @Override + public void close() throws IOException + { + // TODO Auto-generated method stub + } + + @Override + public ByteBuffer getBuffer() + { + return buffer; + } + + @Override + public int read(char[] cbuf, int off, int len) throws IOException + { + // TODO Auto-generated method stub + return 0; + } +} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/StreamAppender.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/StreamAppender.java new file mode 100644 index 00000000000..096050486ba --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/StreamAppender.java @@ -0,0 +1,13 @@ +package org.eclipse.jetty.websocket.io; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface StreamAppender +{ + void appendBuffer(ByteBuffer buf); + + void bufferComplete() throws IOException; + + ByteBuffer getBuffer(); +} diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/EventCapture.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/EventCapture.java index 08cbe365f6c..95c705cd7fb 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/EventCapture.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/EventCapture.java @@ -3,6 +3,7 @@ package org.eclipse.jetty.websocket.api; import static org.hamcrest.Matchers.*; import java.util.ArrayList; +import java.util.regex.Pattern; import org.junit.Assert; @@ -29,6 +30,12 @@ public class EventCapture extends ArrayList Assert.assertThat("Event Count",size(),is(expectedCount)); } + public void assertEventRegex(int eventNum, String regex) + { + String event = get(eventNum); + Assert.assertTrue("Event[" + eventNum + "]: regex:[" + regex + "] in [" + event + "]",Pattern.matches(regex,event)); + } + public void assertEventStartsWith(int eventNum, String expected) { Assert.assertThat("Event[" + eventNum + "]",get(eventNum),startsWith(expected)); diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/EventMethodsCacheTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/EventMethodsCacheTest.java index ed285717f5b..91c55a1f3f4 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/EventMethodsCacheTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/EventMethodsCacheTest.java @@ -162,6 +162,9 @@ public class EventMethodsCacheTest assertNoEventMethod(classId + ".onException",methods.onException); assertNoEventMethod(classId + ".onText",methods.onText); assertNoEventMethod(classId + ".onFrame",methods.onFrame); + + Assert.assertFalse(classId + ".onBinary.hasConnection",methods.onBinary.isHasConnection()); + Assert.assertFalse(classId + ".onBinary.isStreaming",methods.onBinary.isStreaming()); } /** @@ -183,6 +186,9 @@ public class EventMethodsCacheTest assertNoEventMethod(classId + ".onException",methods.onException); assertNoEventMethod(classId + ".onText",methods.onText); assertNoEventMethod(classId + ".onFrame",methods.onFrame); + + Assert.assertFalse(classId + ".onBinary.hasConnection",methods.onBinary.isHasConnection()); + Assert.assertTrue(classId + ".onBinary.isStreaming",methods.onBinary.isStreaming()); } /** @@ -228,7 +234,7 @@ public class EventMethodsCacheTest } /** - * Test Case for no exceptions and 1 method + * Test Case for annotated for text messages w/connection param */ @Test public void testAnnotatedMyStatelessEchoSocket() @@ -246,6 +252,9 @@ public class EventMethodsCacheTest assertNoEventMethod(classId + ".onException",methods.onException); assertHasEventMethod(classId + ".onText",methods.onText); assertNoEventMethod(classId + ".onFrame",methods.onFrame); + + Assert.assertTrue(classId + ".onText.hasConnection",methods.onText.isHasConnection()); + Assert.assertFalse(classId + ".onText.isStreaming",methods.onText.isStreaming()); } /** @@ -309,6 +318,9 @@ public class EventMethodsCacheTest assertNoEventMethod(classId + ".onException",methods.onException); assertHasEventMethod(classId + ".onText",methods.onText); assertNoEventMethod(classId + ".onFrame",methods.onFrame); + + Assert.assertFalse(classId + ".onText.hasConnection",methods.onText.isHasConnection()); + Assert.assertFalse(classId + ".onText.isStreaming",methods.onText.isStreaming()); } /** @@ -330,6 +342,9 @@ public class EventMethodsCacheTest assertNoEventMethod(classId + ".onException",methods.onException); assertHasEventMethod(classId + ".onText",methods.onText); assertNoEventMethod(classId + ".onFrame",methods.onFrame); + + Assert.assertFalse(classId + ".onText.hasConnection",methods.onText.isHasConnection()); + Assert.assertTrue(classId + ".onText.isStreaming",methods.onText.isStreaming()); } /** diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/WebSocketEventDriverTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/WebSocketEventDriverTest.java index 1992c31bceb..217e04ad38a 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/WebSocketEventDriverTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/WebSocketEventDriverTest.java @@ -1,5 +1,7 @@ package org.eclipse.jetty.websocket.api; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.websocket.api.samples.AdapterConnectCloseSocket; import org.eclipse.jetty.websocket.api.samples.AnnotatedBinaryArraySocket; @@ -19,12 +21,20 @@ public class WebSocketEventDriverTest @Rule public TestName testname = new TestName(); + private BinaryFrame makeBinaryFrame(String content, boolean fin) + { + BinaryFrame bin = new BinaryFrame(content.getBytes(StringUtil.__UTF8_CHARSET)); + bin.setFin(fin); + return bin; + } + private WebSocketEventDriver newDriver(Object websocket) { EventMethodsCache methodsCache = new EventMethodsCache(); methodsCache.register(websocket.getClass()); WebSocketPolicy policy = WebSocketPolicy.newServerPolicy(); - return new WebSocketEventDriver(methodsCache,policy,websocket); + ByteBufferPool bufferPool = new StandardByteBufferPool(); + return new WebSocketEventDriver(websocket,methodsCache,policy,bufferPool); } @Test @@ -52,7 +62,7 @@ public class WebSocketEventDriverTest LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); driver.setConnection(conn); driver.onConnect(); - driver.onFrame(new BinaryFrame("Hello World".getBytes(StringUtil.__UTF8_CHARSET))); + driver.onFrame(makeBinaryFrame("Hello World",true)); driver.onFrame(new CloseFrame(StatusCode.NORMAL)); socket.capture.assertEventCount(3); @@ -61,24 +71,6 @@ public class WebSocketEventDriverTest socket.capture.assertEventStartsWith(2,"onClose(1000,"); } - @Test - public void testAnnotated_ByteBuffer() - { - AnnotatedBinaryStreamSocket socket = new AnnotatedBinaryStreamSocket(); - WebSocketEventDriver driver = newDriver(socket); - - LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); - driver.setConnection(conn); - driver.onConnect(); - driver.onFrame(new BinaryFrame("Hello World".getBytes(StringUtil.__UTF8_CHARSET))); - driver.onFrame(new CloseFrame(StatusCode.NORMAL)); - - socket.capture.assertEventCount(3); - socket.capture.assertEventStartsWith(0,"onConnect"); - socket.capture.assertEvent(1,"onBinary(java.nio.HeapByteBuffer[pos=0 lim=11 cap=11])"); - socket.capture.assertEventStartsWith(2,"onClose(1000,"); - } - @Test public void testAnnotated_Frames() { @@ -95,12 +87,30 @@ public class WebSocketEventDriverTest socket.capture.assertEventCount(5); socket.capture.assertEventStartsWith(0,"onConnect("); - socket.capture.assertEventStartsWith(1,"onPingFrame("); - socket.capture.assertEventStartsWith(2,"onTextFrame("); - socket.capture.assertEventStartsWith(3,"onBaseFrame(BinaryFrame"); + socket.capture.assertEventStartsWith(1,"onFrame(Ping"); + socket.capture.assertEventStartsWith(2,"onFrame(Text"); + socket.capture.assertEventStartsWith(3,"onFrame(Binary"); socket.capture.assertEventStartsWith(4,"onClose(1001,"); } + @Test + public void testAnnotated_InputStream() + { + AnnotatedBinaryStreamSocket socket = new AnnotatedBinaryStreamSocket(); + WebSocketEventDriver driver = newDriver(socket); + + LocalWebSocketConnection conn = new LocalWebSocketConnection(testname); + driver.setConnection(conn); + driver.onConnect(); + driver.onFrame(makeBinaryFrame("Hello World",true)); + driver.onFrame(new CloseFrame(StatusCode.NORMAL)); + + socket.capture.assertEventCount(3); + socket.capture.assertEventStartsWith(0,"onConnect"); + socket.capture.assertEventRegex(1,"^onBinary\\(.*InputStream.*"); + socket.capture.assertEventStartsWith(2,"onClose(1000,"); + } + @Test public void testListener_Text() { diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/samples/AnnotatedFramesSocket.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/samples/AnnotatedFramesSocket.java index d4fbe08b41f..75854fbba58 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/samples/AnnotatedFramesSocket.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/api/samples/AnnotatedFramesSocket.java @@ -5,23 +5,14 @@ import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.EventCapture; +import org.eclipse.jetty.websocket.api.Frame; import org.eclipse.jetty.websocket.api.WebSocketConnection; -import org.eclipse.jetty.websocket.frames.BaseFrame; -import org.eclipse.jetty.websocket.frames.ControlFrame; -import org.eclipse.jetty.websocket.frames.PingFrame; -import org.eclipse.jetty.websocket.frames.TextFrame; @WebSocket public class AnnotatedFramesSocket { public EventCapture capture = new EventCapture(); - @OnWebSocketFrame - public void onBaseFrame(BaseFrame frame) - { - capture.add("onBaseFrame(%s)",frame); - } - @OnWebSocketClose public void onClose(int statusCode, String reason) { @@ -35,20 +26,8 @@ public class AnnotatedFramesSocket } @OnWebSocketFrame - public void onControlFrame(ControlFrame ping) + public void onFrame(Frame frame) { - capture.add("onControlFrame(%s)",ping); - } - - @OnWebSocketFrame - public void onPing(PingFrame ping) - { - capture.add("onPingFrame(%s)",ping); - } - - @OnWebSocketFrame - public void onTextFrame(TextFrame text) - { - capture.add("onTextFrame(%s)",text); + capture.add("onFrame(%s)",frame); } }