From 9ebf890d5112ebe74ee6f01f234abf525fa52c53 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 30 Jul 2013 15:25:41 -0700 Subject: [PATCH] WebSocket - supporting WriteCallback in jetty RemoteEndpoint + new RemoteEndpoint.sendBytes(ByteBuffer, WriteCallback) + new RemoteEndpoint.sendString(String, WriteCallback) --- .../jetty/websocket/api/RemoteEndpoint.java | 36 ++++++++++++----- .../websocket/client/WebSocketClientTest.java | 39 +++++++++++++++++++ .../common/WebSocketRemoteEndpoint.java | 29 +++++++++++++- .../websocket/server/IdleTimeoutTest.java | 1 - .../server/WebSocketOverSSLTest.java | 2 - .../server/helper/CaptureSocket.java | 1 - .../websocket/server/helper/SafariD00.java | 5 +-- 7 files changed, 95 insertions(+), 18 deletions(-) diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java index dfdefecd7c1..c1d000c37b9 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java @@ -35,18 +35,26 @@ public interface RemoteEndpoint void sendBytes(ByteBuffer data) throws IOException; /** - * Initiates the asynchronous transmission of a binary message. This method returns before the message is transmitted. Developers may provide a callback to - * be notified when the message has been transmitted, or may use the returned Future object to track progress of the transmission. Errors in transmission - * are given to the developer in the WriteResult object in either case. + * Initiates the asynchronous transmission of a binary message. This method returns before the message is transmitted. Developers may use the returned + * Future object to track progress of the transmission. * * @param data * the data being sent - * @param completion - * handler that will be notified of progress * @return the Future object representing the send operation. */ Future sendBytesByFuture(ByteBuffer data); + /** + * Initiates the asynchronous transmission of a binary message. This method returns before the message is transmitted. Developers may provide a callback to + * be notified when the message has been transmitted or resulted in an error. + * + * @param data + * the data being sent + * @param callback + * callback to notify of success or failure of the write operation + */ + void sendBytes(ByteBuffer data, WriteCallback callback); + /** * Send a binary message in pieces, blocking until all of the message has been transmitted. The runtime reads the message in order. Non-final pieces are * sent with isLast set to false. The final piece must be sent with isLast set to true. @@ -94,15 +102,23 @@ public interface RemoteEndpoint void sendString(String text) throws IOException; /** - * Initiates the asynchronous transmission of a text message. This method returns before the message is transmitted. Developers may provide a callback to be - * notified when the message has been transmitted, or may use the returned Future object to track progress of the transmission. Errors in transmission are - * given to the developer in the WriteResult object in either case. + * Initiates the asynchronous transmission of a text message. This method may return before the message is transmitted. Developers may use the returned + * Future object to track progress of the transmission. * * @param text * the text being sent - * @param completion - * the handler which will be notified of progress * @return the Future object representing the send operation. */ Future sendStringByFuture(String text); + + /** + * Initiates the asynchronous transmission of a text message. This method may return before the message is transmitted. Developers may provide a callback to + * be notified when the message has been transmitted or resulted in an error. + * + * @param text + * the text being sent + * @param callback + * callback to notify of success or failure of the write operation + */ + void sendString(String text, WriteCallback callback); } diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java index 52daa5ffb0c..69ef01ec208 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java @@ -34,6 +34,7 @@ import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer; import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection; import org.eclipse.jetty.websocket.common.WebSocketFrame; +import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -121,6 +122,44 @@ public class WebSocketClientTest cliSock.assertMessage("Hello World!"); } + + @Test + public void testBasicEcho_UsingCallback() throws Exception + { + JettyTrackingSocket cliSock = new JettyTrackingSocket(); + + client.getPolicy().setIdleTimeout(10000); + + URI wsUri = server.getWsUri(); + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols("echo"); + Future future = client.connect(cliSock,wsUri,request); + + final ServerConnection srvSock = server.accept(); + srvSock.upgrade(); + + Session sess = future.get(500,TimeUnit.MILLISECONDS); + Assert.assertThat("Session",sess,notNullValue()); + Assert.assertThat("Session.open",sess.isOpen(),is(true)); + Assert.assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue()); + Assert.assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue()); + + cliSock.assertWasOpened(); + cliSock.assertNotClosed(); + + Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1)); + + FutureWriteCallback callback = new FutureWriteCallback(); + + cliSock.getSession().getRemote().sendString("Hello World!",callback); + callback.get(1,TimeUnit.SECONDS); + + srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500); + // wait for response from server + cliSock.waitForMessage(500,TimeUnit.MILLISECONDS); + + cliSock.assertMessage("Hello World!"); + } @Test public void testBasicEcho_FromServer() throws Exception diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index f21404722c4..1179c1ffca6 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -141,6 +142,19 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint WebSocketFrame frame = WebSocketFrame.binary().setPayload(data); return sendAsyncFrame(frame); } + + @Override + public void sendBytes(ByteBuffer data, WriteCallback callback) + { + Objects.requireNonNull(callback,"WriteCallback cannot be null"); + msgType.set(BINARY); + if (LOG.isDebugEnabled()) + { + LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback); + } + WebSocketFrame frame = WebSocketFrame.binary().setPayload(data); + sendFrame(frame,callback); + } public void sendFrame(WebSocketFrame frame, WriteCallback callback) { @@ -317,7 +331,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint @Override public Future sendStringByFuture(String text) { - msgType.set(BINARY); + msgType.set(TEXT); WebSocketFrame frame = WebSocketFrame.text(text); if (LOG.isDebugEnabled()) { @@ -325,4 +339,17 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint } return sendAsyncFrame(frame); } + + @Override + public void sendString(String text, WriteCallback callback) + { + Objects.requireNonNull(callback,"WriteCallback cannot be null"); + msgType.set(TEXT); + WebSocketFrame frame = WebSocketFrame.text(text); + if (LOG.isDebugEnabled()) + { + LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback); + } + sendFrame(frame,callback); + } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/IdleTimeoutTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/IdleTimeoutTest.java index 97e34c6625d..3909933bc30 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/IdleTimeoutTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/IdleTimeoutTest.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.server; import static org.hamcrest.Matchers.*; -import java.io.IOException; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.websocket.api.StatusCode; diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java index 3502924b123..9a326d98cc2 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java @@ -26,8 +26,6 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.toolchain.test.EventQueue; import org.eclipse.jetty.toolchain.test.TestTracker; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.server.helper.CaptureSocket; diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/CaptureSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/CaptureSocket.java index 52f901a74b6..b983ba98feb 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/CaptureSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/CaptureSocket.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.websocket.server.helper; -import java.sql.Connection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SafariD00.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SafariD00.java index 79e42784c32..e5357e98296 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SafariD00.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SafariD00.java @@ -18,6 +18,8 @@ package org.eclipse.jetty.websocket.server.helper; +import static org.hamcrest.Matchers.*; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -34,9 +36,6 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.TypeUtil; import org.junit.Assert; -import static org.hamcrest.Matchers.*; -import static org.hamcrest.Matchers.is; - public class SafariD00 { private URI uri;