From e4235ea07087bf2bfe8d5e19112f08849dbae2fa Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 23 Aug 2013 19:58:46 +1000 Subject: [PATCH] 415744 - Reduce Future usage in websocket --- .../websocket/jsr356/JettyEchoSocket.java | 4 +- .../jsr356/server/JettyEchoSocket.java | 2 +- .../websocket/client/WebSocketClientTest.java | 2 +- .../common/BlockingWriteCallback.java | 19 ++++++++++ .../common/WebSocketRemoteEndpoint.java | 37 ++++++++++--------- .../examples/echo/AnnotatedEchoSocket.java | 2 +- .../examples/echo/ListenerEchoSocket.java | 2 +- .../annotations/MyStatelessEchoSocket.java | 2 +- .../jetty/websocket/server/LoadTest.java | 2 +- .../jetty/websocket/server/ab/ABSocket.java | 4 +- .../server/browser/BrowserSocket.java | 4 +- .../server/examples/echo/BigEchoSocket.java | 4 +- .../examples/echo/EchoBroadcastSocket.java | 4 +- .../examples/echo/EchoFragmentSocket.java | 8 ++-- .../websocket/server/helper/EchoSocket.java | 4 +- .../websocket/server/helper/RFCSocket.java | 4 +- .../server/helper/SessionSocket.java | 10 ++--- .../java/examples/MyBinaryEchoSocket.java | 2 +- .../src/test/java/examples/MyEchoSocket.java | 2 +- .../java/com/acme/WebSocketChatServlet.java | 2 +- 20 files changed, 71 insertions(+), 49 deletions(-) create mode 100644 jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java diff --git a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java index b186a7869a1..2eb52476d8b 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java +++ b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java @@ -33,7 +33,7 @@ public class JettyEchoSocket extends WebSocketAdapter @Override public void onWebSocketBinary(byte[] payload, int offset, int len) { - getRemote().sendBytesByFuture(BufferUtil.toBuffer(payload,offset,len)); + getRemote().sendBytes(BufferUtil.toBuffer(payload,offset,len),null); } @Override @@ -45,6 +45,6 @@ public class JettyEchoSocket extends WebSocketAdapter @Override public void onWebSocketText(String message) { - getRemote().sendStringByFuture(message); + getRemote().sendString(message,null); } } diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java index 978bd70941a..377bdf9205f 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java @@ -70,7 +70,7 @@ public class JettyEchoSocket public void onMessage(String msg) { incomingMessages.add(msg); - remote.sendStringByFuture(msg); + remote.sendString(msg,null); } @OnWebSocketConnect diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java index bc914f60ec8..45b007f712f 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java @@ -116,7 +116,7 @@ public class WebSocketClientTest Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1)); - cliSock.getSession().getRemote().sendStringByFuture("Hello World!"); + cliSock.getSession().getRemote().sendString("Hello World!",null); srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500); // wait for response from server cliSock.waitForMessage(500,TimeUnit.MILLISECONDS); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java new file mode 100644 index 00000000000..d18c20db3ef --- /dev/null +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java @@ -0,0 +1,19 @@ +package org.eclipse.jetty.websocket.common; + +import org.eclipse.jetty.util.BlockingCallback; +import org.eclipse.jetty.websocket.api.WriteCallback; + +public class BlockingWriteCallback extends BlockingCallback implements WriteCallback +{ + @Override + public void writeFailed(Throwable x) + { + failed(x); + } + + @Override + public void writeSuccess() + { + succeeded(); + } +} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index 5291998cdb4..1ca4e4a20f0 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -52,6 +53,18 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint private static final int TEXT = 1; private static final int BINARY = 2; private static final int CONTROL = 3; + private static final WriteCallback NOOP_CALLBACK = new WriteCallback() + { + @Override + public void writeSuccess() + { + } + + @Override + public void writeFailed(Throwable x) + { + } + }; private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class); public final LogicalConnection connection; @@ -72,19 +85,11 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint private void blockingWrite(WebSocketFrame frame) throws IOException { - Future fut = sendAsyncFrame(frame); - try - { - fut.get(); // block till done - } - catch (ExecutionException e) - { - throw new IOException("Failed to write bytes",e.getCause()); - } - catch (InterruptedException e) - { - throw new IOException("Failed to write bytes",e); - } + // TODO Blocking callbacks can be recycled, but they do not handle concurrent calls, + // so if some mutual exclusion can be applied, then this callback can be reused. + BlockingWriteCallback callback = new BlockingWriteCallback(); + sendFrame(frame,callback); + callback.block(); } public InetSocketAddress getInetSocketAddress() @@ -150,13 +155,12 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint @Override public void sendBytes(ByteBuffer data, WriteCallback callback) { - Objects.requireNonNull(callback,"WriteCallback cannot be null"); msgType.set(BINARY); if (LOG.isDebugEnabled()) { LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback); } - sendFrame(new BinaryFrame().setPayload(data),callback); + sendFrame(new BinaryFrame().setPayload(data),callback==null?NOOP_CALLBACK:callback); } public void sendFrame(WebSocketFrame frame, WriteCallback callback) @@ -356,13 +360,12 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint @Override public void sendString(String text, WriteCallback callback) { - Objects.requireNonNull(callback,"WriteCallback cannot be null"); msgType.set(TEXT); TextFrame frame = new TextFrame().setPayload(text); if (LOG.isDebugEnabled()) { LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback); } - sendFrame(frame,callback); + sendFrame(frame,callback==null?NOOP_CALLBACK:callback); } } diff --git a/jetty-websocket/websocket-common/src/test/java/examples/echo/AnnotatedEchoSocket.java b/jetty-websocket/websocket-common/src/test/java/examples/echo/AnnotatedEchoSocket.java index b99945254a9..002569cb523 100644 --- a/jetty-websocket/websocket-common/src/test/java/examples/echo/AnnotatedEchoSocket.java +++ b/jetty-websocket/websocket-common/src/test/java/examples/echo/AnnotatedEchoSocket.java @@ -35,7 +35,7 @@ public class AnnotatedEchoSocket { System.out.printf("Echoing back message [%s]%n",message); // echo the message back - session.getRemote().sendStringByFuture(message); + session.getRemote().sendString(message,null); } } } diff --git a/jetty-websocket/websocket-common/src/test/java/examples/echo/ListenerEchoSocket.java b/jetty-websocket/websocket-common/src/test/java/examples/echo/ListenerEchoSocket.java index 6d76f0eb4ab..fbfa0271cb3 100644 --- a/jetty-websocket/websocket-common/src/test/java/examples/echo/ListenerEchoSocket.java +++ b/jetty-websocket/websocket-common/src/test/java/examples/echo/ListenerEchoSocket.java @@ -59,7 +59,7 @@ public class ListenerEchoSocket implements WebSocketListener { System.out.printf("Echoing back message [%s]%n",message); // echo the message back - outbound.getRemote().sendStringByFuture(message); + outbound.getRemote().sendString(message,null); } } } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/annotations/MyStatelessEchoSocket.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/annotations/MyStatelessEchoSocket.java index 1a36a940b0e..cd12edacf9c 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/annotations/MyStatelessEchoSocket.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/annotations/MyStatelessEchoSocket.java @@ -36,6 +36,6 @@ public class MyStatelessEchoSocket @OnWebSocketMessage public void onText(Session session, String text) { - session.getRemote().sendStringByFuture(text); + session.getRemote().sendString(text,null); } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/LoadTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/LoadTest.java index dcacc6c8a8c..5a7004615a3 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/LoadTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/LoadTest.java @@ -69,7 +69,7 @@ public class LoadTest @OnWebSocketMessage public void onWebSocketText(String message) { - session.getRemote().sendStringByFuture(message); + session.getRemote().sendString(message,null); long iter = count.incrementAndGet(); if ((iter % 100) == 0) { diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/ABSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/ABSocket.java index 16b0591beef..d8da5f99ebb 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/ABSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/ABSocket.java @@ -46,7 +46,7 @@ public class ABSocket // echo the message back. ByteBuffer data = ByteBuffer.wrap(buf,offset,len); - this.session.getRemote().sendBytesByFuture(data); + this.session.getRemote().sendBytes(data,null); } @OnWebSocketConnect @@ -73,7 +73,7 @@ public class ABSocket try { // echo the message back. - this.session.getRemote().sendStringByFuture(message); + this.session.getRemote().sendString(message,null); } catch (WebSocketException e) { diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/browser/BrowserSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/browser/BrowserSocket.java index 726ad52df78..a9a40115313 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/browser/BrowserSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/browser/BrowserSocket.java @@ -67,7 +67,7 @@ public class BrowserSocket randomText[i] = letters[rand.nextInt(lettersLen)]; } msg = String.format("ManyThreads [%s]",String.valueOf(randomText)); - remote.sendStringByFuture(msg); + remote.sendString(msg,null); } } } @@ -219,7 +219,7 @@ public class BrowserSocket } // Async write - remote.sendStringByFuture(message); + remote.sendString(message,null); } private void writeMessage(String format, Object... args) diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java index 3c8aec6e357..ab9f8c0d38b 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java @@ -42,7 +42,7 @@ public class BigEchoSocket LOG.warn("Session is closed"); return; } - session.getRemote().sendBytesByFuture(ByteBuffer.wrap(buf,offset,length)); + session.getRemote().sendBytes(ByteBuffer.wrap(buf,offset,length),null); } @OnWebSocketMessage @@ -53,6 +53,6 @@ public class BigEchoSocket LOG.warn("Session is closed"); return; } - session.getRemote().sendStringByFuture(message); + session.getRemote().sendString(message,null); } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastSocket.java index 6cbe5d4c89f..ab9a09cdeb1 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoBroadcastSocket.java @@ -40,7 +40,7 @@ public class EchoBroadcastSocket ByteBuffer data = ByteBuffer.wrap(buf,offset,len); for (EchoBroadcastSocket sock : BROADCAST) { - sock.session.getRemote().sendBytesByFuture(data.slice()); + sock.session.getRemote().sendBytes(data.slice(),null); } } @@ -62,7 +62,7 @@ public class EchoBroadcastSocket { for (EchoBroadcastSocket sock : BROADCAST) { - sock.session.getRemote().sendStringByFuture(text); + sock.session.getRemote().sendString(text,null); } } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java index e158b3a7603..3377d0319de 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/EchoFragmentSocket.java @@ -60,13 +60,13 @@ public class EchoFragmentSocket switch (frame.getType()) { case BINARY: - remote.sendBytesByFuture(buf1); - remote.sendBytesByFuture(buf2); + remote.sendBytes(buf1,null); + remote.sendBytes(buf2,null); break; case TEXT: // NOTE: This impl is not smart enough to split on a UTF8 boundary - remote.sendStringByFuture(BufferUtil.toUTF8String(buf1)); - remote.sendStringByFuture(BufferUtil.toUTF8String(buf2)); + remote.sendString(BufferUtil.toUTF8String(buf1),null); + remote.sendString(BufferUtil.toUTF8String(buf2),null); break; default: throw new IOException("Unexpected frame type: " + frame.getType()); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java index 89d7e0e9629..d5910e4c72d 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java @@ -44,7 +44,7 @@ public class EchoSocket // echo the message back. ByteBuffer data = ByteBuffer.wrap(buf,offset,len); - this.session.getRemote().sendBytesByFuture(data); + this.session.getRemote().sendBytes(data,null); } @OnWebSocketConnect @@ -59,6 +59,6 @@ public class EchoSocket LOG.debug("onText({})",message); // echo the message back. - this.session.getRemote().sendStringByFuture(message); + this.session.getRemote().sendString(message,null); } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java index 9d03d04c760..4adbe3f237b 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java @@ -41,7 +41,7 @@ public class RFCSocket // echo the message back. ByteBuffer data = ByteBuffer.wrap(buf,offset,len); - this.session.getRemote().sendBytesByFuture(data); + this.session.getRemote().sendBytes(data,null); } @OnWebSocketConnect @@ -62,6 +62,6 @@ public class RFCSocket } // echo the message back. - this.session.getRemote().sendStringByFuture(message); + this.session.getRemote().sendString(message,null); } } \ No newline at end of file diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java index 2e431ddc2da..8de30133687 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java @@ -61,7 +61,7 @@ public class SessionSocket if (values == null) { - session.getRemote().sendStringByFuture(""); + session.getRemote().sendString("",null); return; } @@ -78,21 +78,21 @@ public class SessionSocket delim = true; } valueStr.append(']'); - session.getRemote().sendStringByFuture(valueStr.toString()); + session.getRemote().sendString(valueStr.toString(),null); return; } if ("session.isSecure".equals(message)) { String issecure = String.format("session.isSecure=%b",session.isSecure()); - session.getRemote().sendStringByFuture(issecure); + session.getRemote().sendString(issecure,null); return; } if ("session.upgradeRequest.requestURI".equals(message)) { String response = String.format("session.upgradeRequest.requestURI=%s",session.getUpgradeRequest().getRequestURI().toASCIIString()); - session.getRemote().sendStringByFuture(response); + session.getRemote().sendString(response,null); return; } @@ -103,7 +103,7 @@ public class SessionSocket } // echo the message back. - this.session.getRemote().sendStringByFuture(message); + this.session.getRemote().sendString(message,null); } catch (Throwable t) { diff --git a/jetty-websocket/websocket-servlet/src/test/java/examples/MyBinaryEchoSocket.java b/jetty-websocket/websocket-servlet/src/test/java/examples/MyBinaryEchoSocket.java index 15aeaa95eb8..eeb300e9473 100644 --- a/jetty-websocket/websocket-servlet/src/test/java/examples/MyBinaryEchoSocket.java +++ b/jetty-websocket/websocket-servlet/src/test/java/examples/MyBinaryEchoSocket.java @@ -34,6 +34,6 @@ public class MyBinaryEchoSocket public void onWebSocketText(Session session, byte buf[], int offset, int len) { // Echo message back, asynchronously - session.getRemote().sendBytesByFuture(ByteBuffer.wrap(buf,offset,len)); + session.getRemote().sendBytes(ByteBuffer.wrap(buf,offset,len),null); } } diff --git a/jetty-websocket/websocket-servlet/src/test/java/examples/MyEchoSocket.java b/jetty-websocket/websocket-servlet/src/test/java/examples/MyEchoSocket.java index cb3e016eef9..b9d317f49f2 100644 --- a/jetty-websocket/websocket-servlet/src/test/java/examples/MyEchoSocket.java +++ b/jetty-websocket/websocket-servlet/src/test/java/examples/MyEchoSocket.java @@ -32,6 +32,6 @@ public class MyEchoSocket public void onWebSocketText(Session session, String message) { // Echo message back, asynchronously - session.getRemote().sendStringByFuture(message); + session.getRemote().sendString(message,null); } } diff --git a/tests/test-webapps/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java b/tests/test-webapps/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java index ca8f24d9aeb..1f9f3859edf 100644 --- a/tests/test-webapps/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java +++ b/tests/test-webapps/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java @@ -123,7 +123,7 @@ public class WebSocketChatServlet extends WebSocketServlet implements WebSocketC } // Async write the message back. - member.remote.sendStringByFuture(data); + member.remote.sendString(data,null); } }