From cd46dc3cee05256ac9f005a6dba841acd79c245a Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Thu, 5 Jul 2012 17:07:56 -0700 Subject: [PATCH] More cleanup of websocket-server --- .../jetty/websocket/api/WebSocketAdapter.java | 9 +++ .../websocket/api/WebSocketConnection.java | 16 ++---- .../jetty/websocket/api/io/WebSocketPing.java | 19 +++++++ .../io/WebSocketAsyncConnection.java | 33 +++++------ .../io/LocalWebSocketConnection.java | 3 +- .../server/WebSocketServerFactory.java | 14 ++++- .../server/WebSocketMessageRFC6455Test.java | 55 +++++++------------ .../server/WebSocketOverSSLTest.java | 15 ++++- .../server/WebSocketServletRFCTest.java | 3 +- .../websocket/server/ab/TestABCase7_9.java | 7 ++- .../server/examples/BasicEchoSocket.java | 4 +- .../server/examples/MyEchoSocket.java | 2 +- .../server/examples/echo/BigEchoSocket.java | 16 +++--- .../echo/EchoBroadcastPingSocket.java | 29 +++++----- .../examples/echo/EchoBroadcastSocket.java | 12 ++-- .../examples/echo/EchoFragmentSocket.java | 41 +++++++------- .../server/examples/echo/EchoFrameSocket.java | 30 ---------- .../server/helper/MessageSender.java | 2 +- 18 files changed, 150 insertions(+), 160 deletions(-) create mode 100644 jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketPing.java delete mode 100644 jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFrameSocket.java diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketAdapter.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketAdapter.java index 927da3a50e1..5a2982b28c4 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketAdapter.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketAdapter.java @@ -1,5 +1,7 @@ package org.eclipse.jetty.websocket.api; +import org.eclipse.jetty.websocket.api.io.WebSocketBlockingConnection; + /** * Default implementation of the {@link WebSocketListener}. *

@@ -8,6 +10,12 @@ package org.eclipse.jetty.websocket.api; public class WebSocketAdapter implements WebSocketListener { private WebSocketConnection connection; + private WebSocketBlockingConnection blocking; + + public WebSocketBlockingConnection getBlockingConnection() + { + return blocking; + } public WebSocketConnection getConnection() { @@ -40,6 +48,7 @@ public class WebSocketAdapter implements WebSocketListener public void onWebSocketConnect(WebSocketConnection connection) { this.connection = connection; + this.blocking = new WebSocketBlockingConnection(this.connection); } @Override diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java index e16048bde71..7713c142840 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java @@ -2,7 +2,6 @@ package org.eclipse.jetty.websocket.api; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import org.eclipse.jetty.util.Callback; @@ -67,22 +66,17 @@ public interface WebSocketConnection boolean isOpen(); /** - * Send a series of binary messages. + * Send a a binary message. *

- * Note: each buffer results in its own binary message frame. - *

- * Advanced usage, with callbacks, allows for concurrent NIO style results of the entire write operation. (Callback is only called once at the end of - * processing all of the buffers) + * NIO style with callbacks, allows for concurrent results of the write operation. */ - void write(C context, Callback callback, ByteBuffer... buffers) throws IOException; + void write(C context, Callback callback, byte buf[], int offset, int len) throws IOException; /** * Send a series of text messages. *

- * Note: each messages results in its own text message frame. - *

- * Advanced usage, with callbacks, allows for concurrent NIO style results of the entire write operation. (Callback is only called once at the end of - * processing all of the messages) + * NIO style with callbacks, allows for concurrent results of the entire write operation. (Callback is only called once at the end of processing all of the + * messages) */ void write(C context, Callback callback, String... messages) throws IOException; } \ No newline at end of file diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketPing.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketPing.java new file mode 100644 index 00000000000..b85ffaecad0 --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/io/WebSocketPing.java @@ -0,0 +1,19 @@ +package org.eclipse.jetty.websocket.api.io; + +import org.eclipse.jetty.websocket.api.WebSocketConnection; + +public class WebSocketPing +{ + private WebSocketConnection conn; + + public WebSocketPing(WebSocketConnection conn) + { + this.conn = conn; + } + + public void sendPing(byte buf[], int offset, int len) + { + // TODO: implement + // TODO: should this block and wait for a pong? (how?) + } +} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java index 060f57f6c3f..eb22f797d38 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java @@ -241,9 +241,8 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements * {@inheritDoc} */ @Override - public void write(C context, Callback callback, ByteBuffer... buffers) throws IOException + public void write(C context, Callback callback, byte buf[], int offset, int len) throws IOException { - int len = buffers.length; if (len == 0) { // nothing to write @@ -251,18 +250,15 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements } if (LOG.isDebugEnabled()) { - LOG.debug("write(context,{},ByteBuffers->{})",callback,buffers.length); + LOG.debug("write(context,{},byte[],{},{})",callback,offset,len); } - ByteBuffer raw[] = new ByteBuffer[len]; - for (int i = 0; i < len; i++) - { - raw[i] = bufferPool.acquire(buffers[i].remaining() + FrameGenerator.OVERHEAD,false); - BufferUtil.clearToFill(raw[i]); - BinaryFrame frame = new BinaryFrame(buffers[i]); - generator.generate(raw[i],frame); - BufferUtil.flipToFlush(raw[i],0); - } - getEndPoint().write(context,callback,raw); + ByteBuffer raw = bufferPool.acquire(len + FrameGenerator.OVERHEAD,false); + BufferUtil.clearToFill(raw); + BinaryFrame frame = new BinaryFrame(buf,offset,len); + frame.setFin(true); + generator.generate(raw,frame); + BufferUtil.flipToFlush(raw,0); + writeRaw(context,callback,raw); } /** @@ -281,12 +277,17 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements { LOG.debug("write(context,{},Strings->{})",callback,messages.length); } - TextFrame frames[] = new TextFrame[len]; + ByteBuffer raw[] = new ByteBuffer[messages.length]; for (int i = 0; i < len; i++) { - frames[i] = new TextFrame(messages[i]); + TextFrame frame = new TextFrame(messages[i]); + frame.setFin(true); + raw[i] = bufferPool.acquire(policy.getBufferSize(),false); + BufferUtil.clear(raw[i]); + generator.generate(raw[i],frame); + BufferUtil.flipToFlush(raw[i],0); } - // TODO write(context,callback,frames); + writeRaw(context,callback,raw); } @Override diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java index 41bc3b3a3be..e74ff0b10e4 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java @@ -2,7 +2,6 @@ package org.eclipse.jetty.websocket.io; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.api.WebSocketConnection; @@ -69,7 +68,7 @@ public class LocalWebSocketConnection implements WebSocketConnection } @Override - public void write(C context, Callback callback, ByteBuffer... buffers) throws IOException + public void write(C context, Callback callback, byte[] buf, int offset, int len) throws IOException { } diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index 4f4e8026f54..c35a7b9b951 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.log.Log; @@ -70,15 +71,22 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock } private final String supportedVersions; - private WebSocketPolicy basePolicy; + private final WebSocketPolicy basePolicy; + private final EventMethodsCache methodsCache; + private final ByteBufferPool bufferPool; private WebSocketCreator creator; - private EventMethodsCache methodsCache; private Class firstRegisteredClass; public WebSocketServerFactory(WebSocketPolicy policy) + { + this(policy,new StandardByteBufferPool()); + } + + public WebSocketServerFactory(WebSocketPolicy policy, ByteBufferPool bufferPool) { this.basePolicy = policy; this.methodsCache = new EventMethodsCache(); + this.bufferPool = bufferPool; this.creator = this; // Create supportedVersions @@ -121,7 +129,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock // Send the upgrade WebSocketPolicy objPolicy = this.basePolicy.clonePolicy(); - WebSocketEventDriver websocket = new WebSocketEventDriver(methodsCache,objPolicy,websocketPojo); + WebSocketEventDriver websocket = new WebSocketEventDriver(websocketPojo,methodsCache,objPolicy,bufferPool); return upgrade(sockreq,sockresp,websocket); } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketMessageRFC6455Test.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketMessageRFC6455Test.java index 15f7601ebf0..8792e09f33d 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketMessageRFC6455Test.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketMessageRFC6455Test.java @@ -34,21 +34,17 @@ import java.util.zip.Inflater; import org.eclipse.jetty.server.SelectChannelConnector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.Utf8StringBuilder; -import org.eclipse.jetty.websocket.annotations.OnWebSocketBinary; import org.eclipse.jetty.websocket.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect; -import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame; -import org.eclipse.jetty.websocket.annotations.OnWebSocketText; +import org.eclipse.jetty.websocket.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketConnection; -import org.eclipse.jetty.websocket.frames.BaseFrame; -import org.eclipse.jetty.websocket.frames.CloseFrame; -import org.eclipse.jetty.websocket.frames.PingFrame; -import org.eclipse.jetty.websocket.frames.PongFrame; import org.eclipse.jetty.websocket.protocol.AcceptHash; import org.eclipse.jetty.websocket.protocol.OpCode; import org.junit.AfterClass; @@ -85,33 +81,14 @@ public class WebSocketMessageRFC6455Test disconnected.countDown(); } - @OnWebSocketFrame - public void onFrame(BaseFrame frame) - { - if (_echo) - { - if (!(frame instanceof PingFrame) && !(frame instanceof PongFrame) && !(frame instanceof CloseFrame)) - { - try - { - connection.write(frame); - } - catch (IOException e) - { - e.printStackTrace(); - } - } - } - } - - @OnWebSocketBinary + @OnWebSocketMessage public void onMessage(byte[] data, int offset, int length) { if (_aggregate) { try { - connection.write(data,offset,length); + connection.write(null,fnf(),data,offset,length); } catch (IOException e) { @@ -120,7 +97,7 @@ public class WebSocketMessageRFC6455Test } } - @OnWebSocketText + @OnWebSocketMessage public void onMessage(String data) { __textCount.incrementAndGet(); @@ -140,7 +117,7 @@ public class WebSocketMessageRFC6455Test { try { - connection.write(data); + connection.write(null,fnf(),data); } catch (IOException e) { @@ -157,7 +134,7 @@ public class WebSocketMessageRFC6455Test { try { - connection.write("sent on connect"); + connection.write(null,fnf(),"sent on connect"); } catch (IOException e) { @@ -170,13 +147,19 @@ public class WebSocketMessageRFC6455Test } private static final int WSVERSION = 13; // RFC-6455 version + private static Server __server; private static SelectChannelConnector __connector; private static TestWebSocket __serverWebSocket; private static CountDownLatch __latch; - private static AtomicInteger __textCount = new AtomicInteger(0); + // Fire and Forget callback + public static Callback fnf() + { + return new FutureCallback(); + } + @BeforeClass public static void startServer() throws Exception { @@ -489,7 +472,7 @@ public class WebSocketMessageRFC6455Test String mesg = "How Now Brown Cow"; for (int i = 0; i < count; i++) { - __serverWebSocket.connection.write(mesg); + __serverWebSocket.connection.write(null,fnf(),mesg); if ((i % 100) == 0) { output.flush(); @@ -911,7 +894,7 @@ public class WebSocketMessageRFC6455Test assertTrue(__serverWebSocket.awaitDisconnected(5000)); try { - __serverWebSocket.connection.write("Don't send"); + __serverWebSocket.connection.write(null,fnf(),"Don't send"); Assert.fail("Should have thrown IOException"); } catch (IOException e) @@ -1292,7 +1275,7 @@ public class WebSocketMessageRFC6455Test message.append(text); } String data = message.toString(); - __serverWebSocket.connection.write(data); + __serverWebSocket.connection.write(null,fnf(),data); assertEquals(OpCode.TEXT.getCode(),input.read()); assertEquals(0x7e,input.read()); @@ -1364,7 +1347,7 @@ public class WebSocketMessageRFC6455Test try { - __serverWebSocket.connection.write("Don't send"); + __serverWebSocket.connection.write(null,fnf(),"Don't send"); Assert.fail("Should have thrown IOException"); } catch (IOException e) diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java index 5a748b32e5a..18f3d86f50e 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java @@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -35,9 +37,16 @@ import org.junit.Test; public class WebSocketOverSSLTest { + // Fire and Forget callback + public static Callback fnf() + { + return new FutureCallback(); + } + private Server _server; private int _port; private QueuedThreadPool _threadPool; + // private WebSocketClientFactory _wsFactory; private WebSocketConnection _connection; @@ -118,7 +127,7 @@ public class WebSocketOverSSLTest String message = new String(chars); for (int i = 0; i < count; ++i) { - _connection.write(message); + _connection.write(null,fnf(),message); } Assert.assertTrue(clientLatch.await(20,TimeUnit.SECONDS)); @@ -150,7 +159,7 @@ public class WebSocketOverSSLTest try { Assert.assertEquals(message,message); - connection.write(message); + connection.write(null,fnf(),message); serverLatch.countDown(); } catch (IOException x) @@ -169,7 +178,7 @@ public class WebSocketOverSSLTest clientLatch.countDown(); } }); - _connection.write(message); + _connection.write(null,fnf(),message); Assert.assertTrue(serverLatch.await(5,TimeUnit.SECONDS)); Assert.assertTrue(clientLatch.await(5,TimeUnit.SECONDS)); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java index 9cc5eead238..26a73c5af38 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServletRFCTest.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.util.Queue; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.StatusCode; @@ -54,7 +55,7 @@ public class WebSocketServletRFCTest // echo the message back. try { - getConnection().write(message); + getConnection().write(null,new FutureCallback(),message); } catch (IOException e) { diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase7_9.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase7_9.java index 4e867b827a4..219567aee3b 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase7_9.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase7_9.java @@ -11,6 +11,7 @@ import java.util.Queue; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -64,7 +65,7 @@ public class TestABCase7_9 // echo the message back. try { - getConnection().write(message); + getConnection().write(null,new FutureCallback(),message); } catch (IOException e) { @@ -184,7 +185,7 @@ public class TestABCase7_9 ByteBuffer frame = FrameBuilder.closeFrame().withMask(new byte[] { 0x44, 0x44, 0x44, 0x44 }).asByteBuffer(); - + ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); BufferUtil.clearToFill(buf); @@ -210,5 +211,5 @@ public class TestABCase7_9 client.close(); } } - + } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/BasicEchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/BasicEchoSocket.java index 01966516286..0edadf1c8e9 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/BasicEchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/BasicEchoSocket.java @@ -15,7 +15,7 @@ public class BasicEchoSocket extends WebSocketAdapter } try { - getConnection().write(payload,offset,len); + getBlockingConnection().write(payload,offset,len); } catch (IOException e) { @@ -32,7 +32,7 @@ public class BasicEchoSocket extends WebSocketAdapter } try { - getConnection().write(message); + getBlockingConnection().write(message); } catch (IOException e) { diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java index 11bad1e4127..b7644857693 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java @@ -17,7 +17,7 @@ public class MyEchoSocket extends WebSocketAdapter try { // echo the data back - getConnection().write(message); + getBlockingConnection().write(message); } catch (IOException e) { diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java index 01f99096aca..334f7a32c16 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java @@ -1,10 +1,9 @@ package org.eclipse.jetty.websocket.server.examples.echo; import java.io.IOException; -import java.nio.ByteBuffer; -import org.eclipse.jetty.websocket.annotations.OnWebSocketBinary; -import org.eclipse.jetty.websocket.annotations.OnWebSocketText; +import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.websocket.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.WebSocketConnection; @@ -14,8 +13,8 @@ import org.eclipse.jetty.websocket.api.WebSocketConnection; @WebSocket(maxTextSize = 64 * 1024, maxBinarySize = 64 * 1024) public class BigEchoSocket { - @OnWebSocketBinary - public void onBinary(WebSocketConnection conn, ByteBuffer buffer) + @OnWebSocketMessage + public void onBinary(WebSocketConnection conn, byte buf[], int offset, int length) { if (conn.isOpen()) { @@ -23,8 +22,7 @@ public class BigEchoSocket } try { - buffer.flip(); // flip the incoming buffer to write mode - conn.write(buffer); + conn.write(null,new FutureCallback(),buf,offset,length); } catch (IOException e) { @@ -32,7 +30,7 @@ public class BigEchoSocket } } - @OnWebSocketText + @OnWebSocketMessage public void onText(WebSocketConnection conn, String message) { if (conn.isOpen()) @@ -41,7 +39,7 @@ public class BigEchoSocket } try { - conn.write(message); + conn.write(null,new FutureCallback(),message); } catch (IOException e) { diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastPingSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastPingSocket.java index 91ad02dbdf7..231f518935f 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastPingSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastPingSocket.java @@ -1,23 +1,23 @@ package org.eclipse.jetty.websocket.server.examples.echo; -import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.WebSocketConnection; -import org.eclipse.jetty.websocket.frames.PingFrame; +import org.eclipse.jetty.websocket.api.io.WebSocketPing; @WebSocket public class EchoBroadcastPingSocket extends EchoBroadcastSocket { - private class KeepAlive extends Thread + private static class KeepAlive extends Thread { private CountDownLatch latch; + private WebSocketPing pinger; - private WebSocketConnection getConnection() + public KeepAlive(WebSocketConnection conn) { - return EchoBroadcastPingSocket.this.conn; + this.pinger = new WebSocketPing(conn); } @Override @@ -27,14 +27,10 @@ public class EchoBroadcastPingSocket extends EchoBroadcastSocket { while (!latch.await(10,TimeUnit.SECONDS)) { - System.err.println("Ping " + getConnection()); - PingFrame ping = new PingFrame(); - ByteBuffer payload = ByteBuffer.allocate(3); - payload.put((byte)1); - payload.put((byte)2); - payload.put((byte)3); - ping.setPayload(payload); - getConnection().write(ping); + System.err.println("Ping " + pinger); + byte data[] = new byte[] + { (byte)1, (byte)2, (byte)3 }; + pinger.sendPing(data,0,3); } } catch (Exception e) @@ -59,11 +55,10 @@ public class EchoBroadcastPingSocket extends EchoBroadcastSocket } } - private final KeepAlive keepAlive; // A dedicated thread is not a good way to do this + private KeepAlive keepAlive; // A dedicated thread is not a good way to do this public EchoBroadcastPingSocket() { - keepAlive = new KeepAlive(); } @Override @@ -76,6 +71,10 @@ public class EchoBroadcastPingSocket extends EchoBroadcastSocket @Override public void onOpen(WebSocketConnection conn) { + if (keepAlive == null) + { + keepAlive = new KeepAlive(conn); + } keepAlive.start(); super.onOpen(conn); } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastSocket.java index 652fa793ab6..84effa5d4a7 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastSocket.java @@ -3,10 +3,10 @@ package org.eclipse.jetty.websocket.server.examples.echo; import java.io.IOException; import java.util.concurrent.ConcurrentLinkedQueue; -import org.eclipse.jetty.websocket.annotations.OnWebSocketBinary; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.websocket.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect; -import org.eclipse.jetty.websocket.annotations.OnWebSocketText; +import org.eclipse.jetty.websocket.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.WebSocketConnection; @@ -17,14 +17,14 @@ public class EchoBroadcastSocket protected WebSocketConnection conn; - @OnWebSocketBinary + @OnWebSocketMessage public void onBinary(byte buf[], int offset, int len) { for (EchoBroadcastSocket sock : BROADCAST) { try { - sock.conn.write(buf,offset,len); + sock.conn.write(null,new FutureCallback(),buf,offset,len); } catch (IOException e) { @@ -47,14 +47,14 @@ public class EchoBroadcastSocket BROADCAST.add(this); } - @OnWebSocketText + @OnWebSocketMessage public void onText(String text) { for (EchoBroadcastSocket sock : BROADCAST) { try { - sock.conn.write(text); + sock.conn.write(null,new FutureCallback(),text); } catch (IOException e) { diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java index 8a626cadb5f..40ad4fb928f 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java @@ -1,15 +1,14 @@ package org.eclipse.jetty.websocket.server.examples.echo; import java.io.IOException; -import java.nio.ByteBuffer; -import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.WebSocketConnection; -import org.eclipse.jetty.websocket.frames.DataFrame; +import org.eclipse.jetty.websocket.protocol.Frame; /** * Echo back the incoming text or binary as 2 frames of (roughly) equal size. @@ -18,30 +17,30 @@ import org.eclipse.jetty.websocket.frames.DataFrame; public class EchoFragmentSocket { @OnWebSocketFrame - public void onFrame(WebSocketConnection conn, DataFrame data) + public void onFrame(WebSocketConnection conn, Frame frame) { - ByteBuffer payload = data.getPayload(); - BufferUtil.flipToFlush(payload,0); - int half = payload.remaining() / 2; + if (!frame.getOpCode().isDataFrame()) + { + return; + } - ByteBuffer buf1 = payload.slice(); - ByteBuffer buf2 = payload.slice(); - - buf1.limit(half); - buf2.position(half); - - DataFrame d1 = new DataFrame(data.getOpCode()); - d1.setFin(false); - d1.setPayload(buf1); - - DataFrame d2 = new DataFrame(data.getOpCode()); - d2.setFin(true); - d2.setPayload(buf2); + byte data[] = frame.getPayloadData(); + int half = data.length / 2; Callback nop = new FutureCallback<>(); try { - conn.write(null,nop,d1,d2); + switch (frame.getOpCode()) + { + case BINARY: + conn.write(null,nop,data,0,half); + conn.write(null,nop,data,half,data.length - half); + break; + case TEXT: + conn.write(null,nop,new String(data,0,half,StringUtil.__UTF8_CHARSET)); + conn.write(null,nop,new String(data,half,data.length - half,StringUtil.__UTF8_CHARSET)); + break; + } } catch (IOException e) { diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFrameSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFrameSocket.java deleted file mode 100644 index af52981e8e1..00000000000 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFrameSocket.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.eclipse.jetty.websocket.server.examples.echo; - -import java.io.IOException; - -import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame; -import org.eclipse.jetty.websocket.annotations.WebSocket; -import org.eclipse.jetty.websocket.api.WebSocketConnection; -import org.eclipse.jetty.websocket.frames.BaseFrame; - -@WebSocket -public class EchoFrameSocket -{ - @OnWebSocketFrame - public void onFrame(WebSocketConnection conn, BaseFrame frame) - { - if (!conn.isOpen()) - { - return; - } - - try - { - conn.write(frame); - } - catch (IOException e) - { - e.printStackTrace(System.err); - } - } -} diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/MessageSender.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/MessageSender.java index f3ea152ae98..1c9f5ccbcbd 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/MessageSender.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/MessageSender.java @@ -72,6 +72,6 @@ public class MessageSender extends WebSocketAdapter public void sendMessage(String format, Object... args) throws IOException { - getConnection().write(String.format(format,args)); + getBlockingConnection().write(String.format(format,args)); } } \ No newline at end of file