diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java index e05ff158563..d35cda2e5c9 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java @@ -29,6 +29,7 @@ import javax.websocket.SendHandler; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint; import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; import org.eclipse.jetty.websocket.common.message.MessageOutputStream; @@ -85,13 +86,15 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint @Override public boolean getBatchingAllowed() { - return jettyRemote.isBatching(); + return jettyRemote.getBatchMode() == BatchMode.ON; } @Override public void setBatchingAllowed(boolean allowed) throws IOException { - jettyRemote.setBatching(allowed); + if (jettyRemote.getBatchMode() == BatchMode.ON && !allowed) + jettyRemote.flush(); + jettyRemote.setBatchMode(allowed ? BatchMode.ON : BatchMode.OFF); } @SuppressWarnings( diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java index 23c53b5648c..b9a24f23316 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java @@ -40,6 +40,7 @@ import javax.websocket.WebSocketContainer; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.common.LogicalConnection; import org.eclipse.jetty.websocket.common.SessionListener; @@ -374,8 +375,9 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess } @Override - public boolean getBatchingDefault() + public BatchMode getBatchMode() { - return false; + // JSR 356 specification mandates default batch mode to be off. + return BatchMode.OFF; } } diff --git a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java index 4dc25e46b50..bc6484e37a6 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java +++ b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java @@ -24,6 +24,7 @@ import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -41,7 +42,7 @@ public class JettyEchoSocket extends WebSocketAdapter { RemoteEndpoint remote = getRemote(); remote.sendBytes(BufferUtil.toBuffer(payload, offset, len), null); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } catch (IOException x) @@ -63,7 +64,7 @@ public class JettyEchoSocket extends WebSocketAdapter { RemoteEndpoint remote = getRemote(); remote.sendString(message, null); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } catch (IOException x) diff --git a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/samples/DummyConnection.java b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/samples/DummyConnection.java index ee1e6440181..8adc1b26ea1 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/samples/DummyConnection.java +++ b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/samples/DummyConnection.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.Executor; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; @@ -123,7 +124,7 @@ public class DummyConnection implements LogicalConnection } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { } diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/DummyConnection.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/DummyConnection.java index 8941c006ddd..7111ed7ef20 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/DummyConnection.java +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/DummyConnection.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executor; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; @@ -126,7 +127,7 @@ public class DummyConnection implements LogicalConnection } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { callback.writeSuccess(); } diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java index df0790d1600..c7bb1e03a37 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException; import org.eclipse.jetty.toolchain.test.EventQueue; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; @@ -92,7 +93,7 @@ public class JettyEchoSocket public void sendMessage(String msg) throws IOException { remote.sendStringByFuture(msg); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } } diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JsrBatchModeTest.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JsrBatchModeTest.java new file mode 100644 index 00000000000..69586dd8a9e --- /dev/null +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JsrBatchModeTest.java @@ -0,0 +1,181 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.jsr356.server; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javax.websocket.ClientEndpointConfig; +import javax.websocket.ContainerProvider; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.MessageHandler; +import javax.websocket.RemoteEndpoint; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; +import javax.websocket.server.ServerEndpointConfig; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer; +import org.eclipse.jetty.websocket.jsr356.server.samples.echo.BasicEchoEndpoint; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class JsrBatchModeTest +{ + private Server server; + private ServerConnector connector; + private WebSocketContainer client; + + @Before + public void prepare() throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(server, "/", true, false); + ServerContainer container = WebSocketServerContainerInitializer.configureContext(context); + ServerEndpointConfig config = ServerEndpointConfig.Builder.create(BasicEchoEndpoint.class, "/").build(); + container.addEndpoint(config); + + server.start(); + + client = ContainerProvider.getWebSocketContainer(); + server.addBean(client, true); + } + + @After + public void dispose() throws Exception + { + server.stop(); + } + + @Test + public void testBatchModeOn() throws Exception + { + ClientEndpointConfig config = ClientEndpointConfig.Builder.create().build(); + + URI uri = URI.create("ws://localhost:" + connector.getLocalPort()); + + final CountDownLatch latch = new CountDownLatch(1); + EndpointAdapter endpoint = new EndpointAdapter() + { + @Override + public void onMessage(String message) + { + latch.countDown(); + } + }; + + try (Session session = client.connectToServer(endpoint, config, uri)) + { + RemoteEndpoint.Async remote = session.getAsyncRemote(); + remote.setBatchingAllowed(true); + + Future future = remote.sendText("batch_mode_on"); + // The write is aggregated and therefore completes immediately. + future.get(1, TimeUnit.MICROSECONDS); + + // Did not flush explicitly, so the message should not be back yet. + Assert.assertFalse(latch.await(1, TimeUnit.SECONDS)); + + // Explicitly flush. + remote.flushBatch(); + + // Wait for the echo. + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testBatchModeOff() throws Exception + { + ClientEndpointConfig config = ClientEndpointConfig.Builder.create().build(); + + URI uri = URI.create("ws://localhost:" + connector.getLocalPort()); + + final CountDownLatch latch = new CountDownLatch(1); + EndpointAdapter endpoint = new EndpointAdapter() + { + @Override + public void onMessage(String message) + { + latch.countDown(); + } + }; + + try (Session session = client.connectToServer(endpoint, config, uri)) + { + RemoteEndpoint.Async remote = session.getAsyncRemote(); + remote.setBatchingAllowed(false); + + Future future = remote.sendText("batch_mode_off"); + // The write is immediate. + future.get(1, TimeUnit.SECONDS); + + // Wait for the echo. + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testBatchModeAuto() throws Exception + { + ClientEndpointConfig config = ClientEndpointConfig.Builder.create().build(); + + URI uri = URI.create("ws://localhost:" + connector.getLocalPort()); + + final CountDownLatch latch = new CountDownLatch(1); + EndpointAdapter endpoint = new EndpointAdapter() + { + @Override + public void onMessage(String message) + { + latch.countDown(); + } + }; + + try (Session session = client.connectToServer(endpoint, config, uri)) + { + RemoteEndpoint.Async remote = session.getAsyncRemote(); + + Future future = remote.sendText("batch_mode_auto"); + // The write is immediate, as per the specification. + future.get(1, TimeUnit.SECONDS); + + // Wait for the echo. + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + } + + private static abstract class EndpointAdapter extends Endpoint implements MessageHandler.Whole + { + @Override + public void onOpen(Session session, EndpointConfig config) + { + session.addMessageHandler(this); + } + } +} diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/BatchMode.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/BatchMode.java new file mode 100644 index 00000000000..a3f7aab753b --- /dev/null +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/BatchMode.java @@ -0,0 +1,50 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.api; + +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; + +/** + * The possible batch modes when invoking {@link OutgoingFrames#outgoingFrame(Frame, WriteCallback, BatchMode)}. + */ +public enum BatchMode +{ + /** + * Implementers are free to decide whether to send or not frames + * to the network layer. + */ + AUTO, + + /** + * Implementers must batch frames. + */ + ON, + + /** + * Implementers must send frames to the network layer. + */ + OFF; + + public static BatchMode max(BatchMode one, BatchMode two) + { + // Return the BatchMode that has the higher priority, where AUTO < ON < OFF. + return one.ordinal() < two.ordinal() ? two : one; + } +} diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java index ad08f59e87d..c1a880a1533 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java @@ -123,16 +123,15 @@ public interface RemoteEndpoint void sendString(String text, WriteCallback callback); /** - * @return whether the implementation is allowed to batch messages. + * @return the batch mode with which messages are sent. * @see #flush() */ - boolean isBatching(); - + BatchMode getBatchMode(); /** * Flushes messages that may have been batched by the implementation. * @throws IOException if the flush fails - * @see #isBatching() + * @see #getBatchMode() */ void flush() throws IOException; } diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java index 1634c4cc3db..9a3aed27da3 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.websocket.api.extensions; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; /** @@ -36,26 +37,8 @@ public interface OutgoingFrames * * @param frame the frame to eventually write to the network layer. * @param callback the callback to notify when the frame is written. - * @param flushMode the flush mode required by the sender. + * @param batchMode the batch mode requested by the sender. */ - void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode); + void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode); - /** - * The possible flush modes when invoking {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)}. - */ - public enum FlushMode - { - /** - * Implementers of {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)} - * are free to decide whether to flush or not the given frame - * to the network layer. - */ - AUTO, - - /** - * Implementers of {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)} - * must send the given frame to the network layer. - */ - SEND - } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java index 83be9192180..a362248a7f9 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; @@ -98,13 +99,13 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection * Override to set the masker. */ @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { if (frame instanceof WebSocketFrame) { masker.setMask((WebSocketFrame)frame); } - super.outgoingFrame(frame,callback, flushMode); + super.outgoingFrame(frame,callback, batchMode); } @Override diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientWriteThread.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientWriteThread.java index 855a52845e2..3e484ec406a 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientWriteThread.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientWriteThread.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; @@ -77,7 +78,7 @@ public class ClientWriteThread extends Thread TimeUnit.MILLISECONDS.sleep(slowness); } } - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); // block on write of last message if (lastMessage != null) diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SessionTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SessionTest.java index c6fab3dc59a..d88236fe590 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SessionTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SessionTest.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.common.WebSocketSession; @@ -85,7 +86,7 @@ public class SessionTest RemoteEndpoint remote = cliSock.getSession().getRemote(); remote.sendStringByFuture("Hello World!"); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500); // wait for response from server diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java index 69091bd4210..62d1c7df0e4 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; @@ -121,7 +122,7 @@ public class WebSocketClientTest RemoteEndpoint remote = cliSock.getSession().getRemote(); remote.sendStringByFuture("Hello World!"); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500); // wait for response from server diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index a9dde66bb76..b03ee02522f 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; @@ -79,14 +80,14 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint private final OutgoingFrames outgoing; private final AtomicInteger msgState = new AtomicInteger(); private final BlockingWriteCallback blocker = new BlockingWriteCallback(); - private volatile boolean batching; + private volatile BatchMode batchMode; public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing) { - this(connection, outgoing, true); + this(connection, outgoing, BatchMode.AUTO); } - public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing, boolean batching) + public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing, BatchMode batchMode) { if (connection == null) { @@ -94,7 +95,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint } this.connection = connection; this.outgoing = outgoing; - this.batching = batching; + this.batchMode = batchMode; } private void blockingWrite(WebSocketFrame frame) throws IOException @@ -292,11 +293,11 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint { try { - OutgoingFrames.FlushMode flushMode = OutgoingFrames.FlushMode.SEND; - if (frame.isDataFrame() && isBatching()) - flushMode = OutgoingFrames.FlushMode.AUTO; + BatchMode batchMode = BatchMode.OFF; + if (frame.isDataFrame()) + batchMode = getBatchMode(); connection.getIOState().assertOutputOpen(); - outgoing.outgoingFrame(frame, callback, flushMode); + outgoing.outgoingFrame(frame, callback, batchMode); } catch (IOException e) { @@ -426,17 +427,17 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint } @Override - public boolean isBatching() + public BatchMode getBatchMode() { - return batching; + return batchMode; } // Only the JSR needs to have this method exposed. // In the Jetty implementation the batching is set // at the moment of opening the session. - public void setBatching(boolean batching) + public void setBatchMode(BatchMode batchMode) { - this.batching = batching; + this.batchMode = batchMode; } public void flush() throws IOException @@ -456,6 +457,6 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint @Override public String toString() { - return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), isBatching()); + return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), getBatchMode()); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index da2a6b2f334..79fbf4994dc 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.CloseStatus; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; @@ -399,7 +400,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc connection.getIOState().onConnected(); // Connect remote - remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchingDefault()); + remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchMode()); // Open WebSocket websocket.openSession(this); @@ -473,9 +474,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc /** * @return the default (initial) value for the batching mode. */ - public boolean getBatchingDefault() + public BatchMode getBatchMode() { - return true; + return BatchMode.AUTO; } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java index d7ce31bcdb9..bed96921fe1 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Extension; @@ -162,10 +163,10 @@ public abstract class AbstractExtension extends ContainerLifeCycle implements Ex this.nextIncoming.incomingFrame(frame); } - protected void nextOutgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + protected void nextOutgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { log.debug("nextOutgoingFrame({})",frame); - this.nextOutgoing.outgoingFrame(frame,callback,flushMode); + this.nextOutgoing.outgoingFrame(frame,callback, batchMode); } public void setBufferPool(ByteBufferPool bufferPool) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java index 6e778729c3e..a4ea6a4a1c4 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Extension; @@ -273,9 +274,9 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { - FrameEntry entry = new FrameEntry(frame, callback, flushMode); + FrameEntry entry = new FrameEntry(frame, callback, batchMode); LOG.debug("Queuing {}", entry); entries.offer(entry); flusher.iterate(); @@ -344,13 +345,13 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames { private final Frame frame; private final WriteCallback callback; - private final FlushMode flushMode; + private final BatchMode batchMode; - private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode) + private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode) { this.frame = frame; this.callback = callback; - this.flushMode = flushMode; + this.batchMode = batchMode; } @Override @@ -371,7 +372,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames LOG.debug("Processing {}", current); if (current == null) return Action.IDLE; - nextOutgoing.outgoingFrame(current.frame, this, current.flushMode); + nextOutgoing.outgoingFrame(current.frame, this, current.batchMode); return Action.SCHEDULED; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 41f29d7107d..d4030532b33 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.BadPayloadException; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.common.OpCode; @@ -138,7 +139,7 @@ public abstract class CompressExtension extends AbstractExtension } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { // We use a queue and an IteratingCallback to handle concurrency. // We must compress and write atomically, otherwise the compression @@ -150,7 +151,7 @@ public abstract class CompressExtension extends AbstractExtension return; } - FrameEntry entry = new FrameEntry(frame, callback, flushMode); + FrameEntry entry = new FrameEntry(frame, callback, batchMode); LOG.debug("Queuing {}", entry); entries.offer(entry); flusher.iterate(); @@ -192,13 +193,13 @@ public abstract class CompressExtension extends AbstractExtension { private final Frame frame; private final WriteCallback callback; - private final FlushMode flushMode; + private final BatchMode batchMode; - private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode) + private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode) { this.frame = frame; this.callback = callback; - this.flushMode = flushMode; + this.batchMode = batchMode; } @Override @@ -235,10 +236,10 @@ public abstract class CompressExtension extends AbstractExtension private void deflate(FrameEntry entry) { Frame frame = entry.frame; - FlushMode flushMode = entry.flushMode; + BatchMode batchMode = entry.batchMode; if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload()) { - nextOutgoingFrame(frame, this, flushMode); + nextOutgoingFrame(frame, this, batchMode); return; } @@ -311,7 +312,7 @@ public abstract class CompressExtension extends AbstractExtension boolean fin = frame.isFin() && finished; chunk.setFin(fin); - nextOutgoingFrame(chunk, this, entry.flushMode); + nextOutgoingFrame(chunk, this, entry.batchMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java index 3c66249aa2b..0b0d9e537df 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; @@ -92,14 +93,14 @@ public class PerMessageDeflateExtension extends CompressExtension } @Override - protected void nextOutgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + protected void nextOutgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { if (frame.isFin() && !outgoingContextTakeover) { LOG.debug("Outgoing Context Reset"); getDeflater().reset(); } - super.nextOutgoingFrame(frame, callback, flushMode); + super.nextOutgoingFrame(frame, callback, batchMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java index 8f6c6c7dc13..2684690e409 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; @@ -57,17 +58,17 @@ public class FragmentExtension extends AbstractExtension } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { ByteBuffer payload = frame.getPayload(); int length = payload != null ? payload.remaining() : 0; if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength) { - nextOutgoingFrame(frame, callback, flushMode); + nextOutgoingFrame(frame, callback, batchMode); return; } - FrameEntry entry = new FrameEntry(frame, callback, flushMode); + FrameEntry entry = new FrameEntry(frame, callback, batchMode); LOG.debug("Queuing {}", entry); entries.offer(entry); flusher.iterate(); @@ -84,13 +85,13 @@ public class FragmentExtension extends AbstractExtension { private final Frame frame; private final WriteCallback callback; - private final FlushMode flushMode; + private final BatchMode batchMode; - private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode) + private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode) { this.frame = frame; this.callback = callback; - this.flushMode = flushMode; + this.batchMode = batchMode; } @Override @@ -145,7 +146,7 @@ public class FragmentExtension extends AbstractExtension LOG.debug("Fragmented {}->{}", frame, fragment); payload.position(newLimit); - nextOutgoingFrame(fragment, this, entry.flushMode); + nextOutgoingFrame(fragment, this, entry.batchMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/identity/IdentityExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/identity/IdentityExtension.java index fb5c97175dd..4ceb3e79826 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/identity/IdentityExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/identity/IdentityExtension.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common.extensions.identity; import org.eclipse.jetty.util.QuotedStringTokenizer; import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; @@ -56,10 +57,10 @@ public class IdentityExtension extends AbstractExtension } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { // pass through - nextOutgoingFrame(frame,callback, flushMode); + nextOutgoingFrame(frame,callback, batchMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index aadd5eda72c..f1265762d47 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -39,6 +39,7 @@ import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.CloseException; import org.eclipse.jetty.websocket.api.CloseStatus; import org.eclipse.jetty.websocket.api.StatusCode; @@ -376,7 +377,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { // Fire out a close frame, indicating abnormal shutdown, then disconnect CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason()); - outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(),FlushMode.SEND); + outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(), BatchMode.OFF); } else { @@ -387,7 +388,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp case CLOSING: CloseInfo close = ioState.getCloseInfo(); // append close frame - outgoingFrame(close.asFrame(),new OnDisconnectCallback(),FlushMode.SEND); + outgoingFrame(close.asFrame(),new OnDisconnectCallback(), BatchMode.OFF); default: break; } @@ -460,14 +461,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp * Frame from API, User, or Internal implementation destined for network. */ @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { if (LOG.isDebugEnabled()) { LOG.debug("outgoingFrame({}, {})",frame,callback); } - flusher.enqueue(frame,callback,flushMode); + flusher.enqueue(frame,callback, batchMode); } private int read(ByteBuffer buffer) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index ca9b0fe8546..5ae41e9eca9 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -33,9 +33,9 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; -import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.Generator; import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.frames.BinaryFrame; @@ -68,7 +68,7 @@ public class FrameFlusher this.maxGather = maxGather; } - public void enqueue(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode) + public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode) { if (closed.get()) { @@ -81,7 +81,7 @@ public class FrameFlusher return; } - FrameEntry entry = new FrameEntry(frame, callback, flushMode); + FrameEntry entry = new FrameEntry(frame, callback, batchMode); synchronized (lock) { @@ -185,34 +185,35 @@ public class FrameFlusher private final List buffers = new ArrayList<>(maxGather * 2 + 1); private ByteBuffer aggregate; private boolean releaseAggregate; + private BatchMode batchMode; @Override protected Action process() throws Exception { int space = aggregate == null ? bufferSize : aggregate.remaining(); - boolean batch = true; + BatchMode currentBatchMode = BatchMode.AUTO; synchronized (lock) { while (entries.size() <= maxGather && !queue.isEmpty()) { FrameEntry entry = queue.remove(0); - batch &= entry.flushMode == OutgoingFrames.FlushMode.AUTO; + currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode); // Force flush if we need to. if (entry.frame == FLUSH_FRAME) - batch = false; + currentBatchMode = BatchMode.OFF; int payloadLength = BufferUtil.length(entry.frame.getPayload()); int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength; // If it is a "big" frame, avoid copying into the aggregate buffer. if (approxFrameLength > (bufferSize >> 2)) - batch = false; + currentBatchMode = BatchMode.OFF; // If the aggregate buffer overflows, do not batch. space -= approxFrameLength; if (space <= 0) - batch = false; + currentBatchMode = BatchMode.OFF; entries.add(entry); } @@ -223,6 +224,8 @@ public class FrameFlusher if (entries.isEmpty()) { + // Nothing more to do, release the aggregate buffer if we need to. + // Releasing it here rather than in succeeded() allows for its reuse. if (releaseAggregate) { bufferPool.release(aggregate); @@ -230,19 +233,21 @@ public class FrameFlusher LOG.debug("{} released aggregate buffer {}", FrameFlusher.this, aggregate); aggregate = null; } - return Action.IDLE; + + if (batchMode != BatchMode.AUTO) + return Action.IDLE; + + LOG.debug("{} auto flushing", FrameFlusher.this); + return flush(); } - if (batch) - batch(); - else - flush(); + batchMode = currentBatchMode; - return Action.SCHEDULED; + return currentBatchMode == BatchMode.OFF ? flush() : batch(); } @SuppressWarnings("ForLoopReplaceableByForEach") - private void flush() + private Action flush() { if (!BufferUtil.isEmpty(aggregate)) { @@ -268,12 +273,17 @@ public class FrameFlusher if (LOG.isDebugEnabled()) LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries); + + if (buffers.isEmpty()) + return Action.IDLE; + endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()])); buffers.clear(); + return Action.SCHEDULED; } @SuppressWarnings("ForLoopReplaceableByForEach") - private void batch() + private Action batch() { if (aggregate == null) { @@ -299,6 +309,7 @@ public class FrameFlusher if (LOG.isDebugEnabled()) LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries); succeeded(); + return Action.SCHEDULED; } @SuppressWarnings("ForLoopReplaceableByForEach") @@ -346,14 +357,14 @@ public class FrameFlusher { private final Frame frame; private final WriteCallback callback; - private final OutgoingFrames.FlushMode flushMode; + private final BatchMode batchMode; private ByteBuffer headerBuffer; - private FrameEntry(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode) + private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode) { this.frame = Objects.requireNonNull(frame); this.callback = callback; - this.flushMode = flushMode; + this.batchMode = batchMode; } private ByteBuffer getHeaderBytes() @@ -372,7 +383,7 @@ public class FrameFlusher public String toString() { - return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, flushMode, failure); + return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, failure); } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java index 053857c3969..8189d070a35 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.websocket.common.io; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; @@ -43,7 +44,7 @@ public class FramePipes @Override public void incomingFrame(Frame frame) { - this.outgoing.outgoingFrame(frame,null,OutgoingFrames.FlushMode.SEND); + this.outgoing.outgoingFrame(frame,null, BatchMode.OFF); } } @@ -57,7 +58,7 @@ public class FramePipes } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { try { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/payload/DeMaskProcessor.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/payload/DeMaskProcessor.java index 55f67559c77..ea287f5e63e 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/payload/DeMaskProcessor.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/payload/DeMaskProcessor.java @@ -59,12 +59,15 @@ public class DeMaskProcessor implements PayloadProcessor maskOffset = offset; } - public void reset(byte mask[]) + public void reset(byte[] mask) { this.maskBytes = mask; int maskInt = 0; - for (byte maskByte : maskBytes) - maskInt = (maskInt << 8) + (maskByte & 0xFF); + if (mask != null) + { + for (byte maskByte : mask) + maskInt = (maskInt << 8) + (maskByte & 0xFF); + } this.maskInt = maskInt; this.maskOffset = 0; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index baa99de0946..a34301fb34b 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.BlockingWriteCallback; @@ -137,7 +138,7 @@ public class MessageOutputStream extends OutputStream try { - outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.SEND); + outgoing.outgoingFrame(frame,blocker, BatchMode.OFF); // block on write blocker.block(); // block success diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index 3b97c636219..8fbbbc39347 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.BlockingWriteCallback; @@ -118,7 +119,7 @@ public class MessageWriter extends Writer try { - outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.SEND); + outgoing.outgoingFrame(frame,blocker, BatchMode.OFF); // block on write blocker.block(); // write success diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/DummyOutgoingFrames.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/DummyOutgoingFrames.java index f24a5aec487..48534e46214 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/DummyOutgoingFrames.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/DummyOutgoingFrames.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common.extensions; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; @@ -44,7 +45,7 @@ public class DummyOutgoingFrames implements OutgoingFrames } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { LOG.debug("outgoingFrame({},{})",frame,callback); if (callback != null) diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java index 6ffc894e39e..d7d6643e9ac 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java @@ -27,10 +27,10 @@ import java.util.List; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; -import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension; @@ -165,7 +165,7 @@ public class FragmentExtensionTest for (String section : quote) { Frame frame = new TextFrame().setPayload(section); - ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(frame, null, BatchMode.OFF); } // Expected Frames @@ -237,7 +237,7 @@ public class FragmentExtensionTest for (String section : quote) { Frame frame = new TextFrame().setPayload(section); - ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(frame, null, BatchMode.OFF); } // Expected Frames @@ -294,7 +294,7 @@ public class FragmentExtensionTest String payload = "Are you there?"; Frame ping = new PingFrame().setPayload(payload); - ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(ping, null, BatchMode.OFF); capture.assertFrameCount(1); capture.assertHasFrame(OpCode.PING, 1); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java index 182d846574b..da4dce34011 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java @@ -23,9 +23,9 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.extensions.Extension; import org.eclipse.jetty.websocket.api.extensions.Frame; -import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.extensions.identity.IdentityExtension; @@ -81,7 +81,7 @@ public class IdentityExtensionTest ext.setNextOutgoingFrames(capture); Frame frame = new TextFrame().setPayload("hello"); - ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(frame, null, BatchMode.OFF); capture.assertFrameCount(1); capture.assertHasFrame(OpCode.TEXT, 1); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/CapturedHexPayloads.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/CapturedHexPayloads.java index a7f851d0365..779cac6601f 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/CapturedHexPayloads.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/CapturedHexPayloads.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.extensions.compress; import java.util.ArrayList; import java.util.List; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; @@ -31,7 +32,7 @@ public class CapturedHexPayloads implements OutgoingFrames private List captured = new ArrayList<>(); @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { String hexPayload = Hex.asHex(frame.getPayload()); captured.add(hexPayload); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java index 1ff6bd6db86..fbb2e3151db 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; @@ -127,7 +128,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest ext.setNextOutgoingFrames(capture); Frame frame = new TextFrame().setPayload(text); - ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(frame, null, BatchMode.OFF); capture.assertBytes(0, expectedHex); } @@ -234,9 +235,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest init(ext); ext.setNextOutgoingFrames(capture); - ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND); - ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND); - ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(new TextFrame().setPayload("time:"), null, BatchMode.OFF); + ext.outgoingFrame(new TextFrame().setPayload("time:"), null, BatchMode.OFF); + ext.outgoingFrame(new TextFrame().setPayload("time:"), null, BatchMode.OFF); List actual = capture.getCaptured(); @@ -308,8 +309,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator); ext.setNextOutgoingFrames(capture); - ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, OutgoingFrames.FlushMode.SEND); - ext.outgoingFrame(new TextFrame().setPayload("There"), null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, BatchMode.OFF); + ext.outgoingFrame(new TextFrame().setPayload("There"), null, BatchMode.OFF); capture.assertBytes(0, "c107f248cdc9c90700"); } @@ -398,7 +399,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest clientExtension.setNextOutgoingFrames(new OutgoingFrames() { @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { serverExtension.incomingFrame(frame); callback.writeSuccess(); @@ -430,7 +431,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest BinaryFrame frame = new BinaryFrame(); frame.setPayload(input); frame.setFin(true); - clientExtension.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); + clientExtension.outgoingFrame(frame, null, BatchMode.OFF); Assert.assertArrayEquals(input, result.toByteArray()); } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java index 36a45e7a6bf..e840c4c9ba3 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java @@ -26,10 +26,10 @@ import java.util.List; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; -import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.extensions.AbstractExtensionTest; @@ -318,7 +318,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest String payload = "Are you there?"; Frame ping = new PingFrame().setPayload(payload); - ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(ping, null, BatchMode.OFF); capture.assertFrameCount(1); capture.assertHasFrame(OpCode.PING, 1); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java index a33199f3384..2244c93cb5c 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java @@ -25,6 +25,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutorThreadPool; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.WebSocketPolicy; @@ -204,7 +205,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java index df0e9847c7f..bd02460c663 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java @@ -46,6 +46,7 @@ import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; @@ -467,7 +468,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { ByteBuffer headerBuf = generator.generateHeaderBytes(frame); if (LOG.isDebugEnabled()) @@ -712,7 +713,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti { frame.setMask(clientmask); } - extensionStack.outgoingFrame(frame,null,FlushMode.SEND); + extensionStack.outgoingFrame(frame,null, BatchMode.OFF); } public void writeRaw(ByteBuffer buf) throws IOException diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java index 71087c8ca2b..c3cd1ad8554 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java @@ -47,6 +47,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; @@ -231,7 +232,7 @@ public class BlockheadServer } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { ByteBuffer headerBuf = generator.generateHeaderBytes(frame); if (LOG.isDebugEnabled()) @@ -561,7 +562,7 @@ public class BlockheadServer public void write(Frame frame) throws IOException { LOG.debug("write(Frame->{}) to {}",frame,outgoing); - outgoing.outgoingFrame(frame,null,FlushMode.SEND); + outgoing.outgoingFrame(frame,null, BatchMode.OFF); } public void write(int b) throws IOException diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingFramesCapture.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingFramesCapture.java index c4cf5051c20..2510599d3cf 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingFramesCapture.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingFramesCapture.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.test; import java.util.LinkedList; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; @@ -85,7 +86,7 @@ public class OutgoingFramesCapture implements OutgoingFrames } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { frames.add(WebSocketFrame.copy(frame)); if (callback != null) diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java index efeb97ce3b7..1bbf8ac10d2 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java @@ -25,6 +25,7 @@ import java.util.Locale; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; @@ -62,7 +63,7 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames } @Override - public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { ByteBuffer buf = ByteBuffer.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength()); generator.generateWholeFrame(frame,buf); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/BatchModeTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/BatchModeTest.java new file mode 100644 index 00000000000..835d107c096 --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/BatchModeTest.java @@ -0,0 +1,103 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.server; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.server.helper.EchoSocket; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class BatchModeTest +{ + private Server server; + private ServerConnector connector; + private WebSocketClient client; + + @Before + public void prepare() throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + + WebSocketHandler handler = new WebSocketHandler() + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.register(EchoSocket.class); + } + }; + + server.setHandler(handler); + + client = new WebSocketClient(); + server.addBean(client, true); + + server.start(); + } + + @After + public void dispose() throws Exception + { + server.stop(); + } + + @Test + public void testBatchModeAuto() throws Exception + { + URI uri = URI.create("ws://localhost:" + connector.getLocalPort()); + + final CountDownLatch latch = new CountDownLatch(1); + WebSocketAdapter adapter = new WebSocketAdapter() + { + @Override + public void onWebSocketText(String message) + { + latch.countDown(); + } + }; + try (Session session = client.connect(adapter, uri).get()) + { + RemoteEndpoint remote = session.getRemote(); + + Future future = remote.sendStringByFuture("batch_mode_on"); + // The write is aggregated and therefore completes immediately. + future.get(1, TimeUnit.MICROSECONDS); + + // Wait for the echo. + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + } + + // TODO: currently not possible to configure the Jetty WebSocket Session with the batch mode. +} diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java index 878e68920e6..5dcecd300c8 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.toolchain.test.EventQueue; import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.client.WebSocketClient; @@ -87,7 +88,7 @@ public class WebSocketOverSSLTest String msg = "this is an echo ... cho ... ho ... o"; RemoteEndpoint remote = session.getRemote(); remote.sendString(msg); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); // Read frame (hopefully text frame) @@ -128,7 +129,7 @@ public class WebSocketOverSSLTest // Generate text frame RemoteEndpoint remote = session.getRemote(); remote.sendString("session.isSecure"); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); // Read frame (hopefully text frame) @@ -169,7 +170,7 @@ public class WebSocketOverSSLTest // Generate text frame RemoteEndpoint remote = session.getRemote(); remote.sendString("session.upgradeRequest.requestURI"); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); // Read frame (hopefully text frame) diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java index cb1a55a8356..4a8d3b05d33 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.server.examples; import java.io.IOException; import org.eclipse.jetty.io.RuntimeIOException; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -42,7 +43,7 @@ public class MyEchoSocket extends WebSocketAdapter // echo the data back RemoteEndpoint remote = getRemote(); remote.sendString(message); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } catch (IOException e) diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java index 9b0a609601e..431437f4433 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; @@ -46,7 +47,7 @@ public class BigEchoSocket } RemoteEndpoint remote = session.getRemote(); remote.sendBytes(ByteBuffer.wrap(buf, offset, length), null); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } @@ -60,7 +61,7 @@ public class BigEchoSocket } RemoteEndpoint remote = session.getRemote(); remote.sendString(message, null); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java index ed84403af98..a3dc6950451 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; @@ -48,7 +49,7 @@ public class EchoSocket ByteBuffer data = ByteBuffer.wrap(buf,offset,len); RemoteEndpoint remote = this.session.getRemote(); remote.sendBytes(data, null); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } @@ -66,7 +67,7 @@ public class EchoSocket // echo the message back. RemoteEndpoint remote = session.getRemote(); remote.sendString(message, null); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java index a172f7ae237..d90a8f24989 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; @@ -45,7 +46,7 @@ public class RFCSocket ByteBuffer data = ByteBuffer.wrap(buf,offset,len); RemoteEndpoint remote = session.getRemote(); remote.sendBytes(data, null); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } @@ -69,7 +70,7 @@ public class RFCSocket // echo the message back. RemoteEndpoint remote = session.getRemote(); remote.sendString(message, null); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java index c4b3b678235..467a4b527f4 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java @@ -24,6 +24,7 @@ import java.util.Map; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; @@ -118,7 +119,7 @@ public class SessionSocket { RemoteEndpoint remote = session.getRemote(); remote.sendString(text, null); - if (remote.isBatching()) + if (remote.getBatchMode() == BatchMode.ON) remote.flush(); } }