diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java index 2bdb77a1981..e05ff158563 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java @@ -29,7 +29,6 @@ import javax.websocket.SendHandler; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.websocket.common.BlockingWriteCallback; import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint; import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; import org.eclipse.jetty.websocket.common.message.MessageOutputStream; @@ -80,21 +79,19 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint @Override public void flushBatch() throws IOException { - BlockingWriteCallback callback = new BlockingWriteCallback(); - jettyRemote.sendBytes(BufferUtil.EMPTY_BUFFER, callback); - callback.block(); + jettyRemote.flush(); } @Override public boolean getBatchingAllowed() { - return session.isBatching(); + return jettyRemote.isBatching(); } @Override public void setBatchingAllowed(boolean allowed) throws IOException { - session.setBatching(allowed); + jettyRemote.setBatching(allowed); } @SuppressWarnings( diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java index e7776d19b44..23c53b5648c 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java @@ -71,11 +71,10 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess private Map pathParameters = new HashMap<>(); private JsrAsyncRemote asyncRemote; private JsrBasicRemote basicRemote; - private volatile boolean batching; public JsrSession(URI requestURI, EventDriver websocket, LogicalConnection connection, ClientContainer container, String id, SessionListener... sessionListeners) { - super(requestURI,websocket,connection,sessionListeners); + super(requestURI, websocket, connection, sessionListeners); if (!(websocket instanceof AbstractJsrEventDriver)) { throw new IllegalArgumentException("Cannot use, not a JSR WebSocket: " + websocket); @@ -90,13 +89,12 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess this.messageHandlerFactory = new MessageHandlerFactory(); this.wrappers = new MessageHandlerWrapper[MessageType.values().length]; this.messageHandlerSet = new HashSet<>(); - } @Override public void addMessageHandler(MessageHandler handler) throws IllegalStateException { - Objects.requireNonNull(handler,"MessageHandler cannot be null"); + Objects.requireNonNull(handler, "MessageHandler cannot be null"); synchronized (wrappers) { @@ -376,13 +374,8 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess } @Override - public boolean isBatching() + public boolean getBatchingDefault() { - return batching; - } - - public void setBatching(boolean batching) - { - this.batching = batching; + return false; } } diff --git a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java index 1bb8effb566..4dc25e46b50 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java +++ b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JettyEchoSocket.java @@ -18,9 +18,13 @@ package org.eclipse.jetty.websocket.jsr356; +import java.io.IOException; + +import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.WebSocketAdapter; /** @@ -33,7 +37,17 @@ public class JettyEchoSocket extends WebSocketAdapter @Override public void onWebSocketBinary(byte[] payload, int offset, int len) { - getRemote().sendBytes(BufferUtil.toBuffer(payload,offset,len),null); + try + { + RemoteEndpoint remote = getRemote(); + remote.sendBytes(BufferUtil.toBuffer(payload, offset, len), null); + if (remote.isBatching()) + remote.flush(); + } + catch (IOException x) + { + throw new RuntimeIOException(x); + } } @Override @@ -45,6 +59,16 @@ public class JettyEchoSocket extends WebSocketAdapter @Override public void onWebSocketText(String message) { - getRemote().sendString(message,null); + try + { + RemoteEndpoint remote = getRemote(); + remote.sendString(message, null); + if (remote.isBatching()) + remote.flush(); + } + catch (IOException x) + { + throw new RuntimeIOException(x); + } } } diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java index 726ed67931b..df0790d1600 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/JettyEchoSocket.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.websocket.jsr356.server; +import java.io.IOException; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -74,10 +75,10 @@ public class JettyEchoSocket } @OnWebSocketMessage - public void onMessage(String msg) + public void onMessage(String msg) throws IOException { incomingMessages.add(msg); - remote.sendString(msg,null); + sendMessage(msg); } @OnWebSocketConnect @@ -88,8 +89,10 @@ public class JettyEchoSocket this.remote = session.getRemote(); } - public void sendMessage(String msg) + public void sendMessage(String msg) throws IOException { remote.sendStringByFuture(msg); + if (remote.isBatching()) + remote.flush(); } } diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java index 909e151721d..ad08f59e87d 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/RemoteEndpoint.java @@ -121,4 +121,18 @@ public interface RemoteEndpoint * callback to notify of success or failure of the write operation */ void sendString(String text, WriteCallback callback); + + /** + * @return whether the implementation is allowed to batch messages. + * @see #flush() + */ + boolean isBatching(); + + + /** + * Flushes messages that may have been batched by the implementation. + * @throws IOException if the flush fails + * @see #isBatching() + */ + void flush() throws IOException; } diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java index be7e03f1485..b4ccd28123f 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java @@ -175,9 +175,4 @@ public interface Session extends Closeable * @return the suspend token suitable for resuming the reading of data on the connection. */ SuspendToken suspend(); - - /** - * @return true if this session is batching network data, false if it flushes it immediately. - */ - boolean isBatching(); } diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java index 6cf9c70150e..1634c4cc3db 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java @@ -41,21 +41,21 @@ public interface OutgoingFrames void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode); /** - * The possible flush modes when invoking {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)}. + * The possible flush modes when invoking {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)}. */ public enum FlushMode { /** - * Implementers of {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)} + * Implementers of {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)} * are free to decide whether to flush or not the given frame * to the network layer. */ AUTO, /** - * Implementers of {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)} - * must flush the given frame to the network layer. + * Implementers of {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)} + * must send the given frame to the network layer. */ - FLUSH + SEND } } diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientWriteThread.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientWriteThread.java index 4b83a016695..855a52845e2 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientWriteThread.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientWriteThread.java @@ -18,10 +18,8 @@ package org.eclipse.jetty.websocket.client; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.log.Log; @@ -79,10 +77,13 @@ public class ClientWriteThread extends Thread TimeUnit.MILLISECONDS.sleep(slowness); } } + if (remote.isBatching()) + remote.flush(); // block on write of last message - lastMessage.get(2,TimeUnit.MINUTES); // block on write + if (lastMessage != null) + lastMessage.get(2,TimeUnit.MINUTES); // block on write } - catch (InterruptedException | ExecutionException | TimeoutException e) + catch (Exception e) { LOG.warn(e); } diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerReadThread.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerReadThread.java index 0c4ca8fe458..a059583ac47 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerReadThread.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerReadThread.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.client; -import static org.hamcrest.Matchers.*; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.Queue; @@ -37,6 +35,8 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection; import org.junit.Assert; +import static org.hamcrest.Matchers.is; + public class ServerReadThread extends Thread { private static final int BUFFER_SIZE = 8192; @@ -44,13 +44,13 @@ public class ServerReadThread extends Thread private final ServerConnection conn; private boolean active = true; private int slowness = -1; // disabled is default - private AtomicInteger frameCount = new AtomicInteger(); - private CountDownLatch expectedMessageCount; + private final AtomicInteger frameCount = new AtomicInteger(); + private final CountDownLatch expectedMessageCount; - public ServerReadThread(ServerConnection conn) + public ServerReadThread(ServerConnection conn, int expectedMessages) { this.conn = conn; - this.expectedMessageCount = new CountDownLatch(1); + this.expectedMessageCount = new CountDownLatch(expectedMessages); } public void cancel() @@ -75,14 +75,12 @@ public class ServerReadThread extends Thread ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false); BufferUtil.clearToFill(buf); - int len = 0; - try { while (active) { BufferUtil.clearToFill(buf); - len = conn.read(buf); + int len = conn.read(buf); if (len > 0) { @@ -108,7 +106,7 @@ public class ServerReadThread extends Thread } if (slowness > 0) { - TimeUnit.MILLISECONDS.sleep(slowness); + TimeUnit.MILLISECONDS.sleep(getSlowness()); } } } @@ -122,11 +120,6 @@ public class ServerReadThread extends Thread } } - public void setExpectedMessageCount(int expectedMessageCount) - { - this.expectedMessageCount = new CountDownLatch(expectedMessageCount); - } - public void setSlowness(int slowness) { this.slowness = slowness; diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SessionTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SessionTest.java index 5e74e1171f4..c6fab3dc59a 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SessionTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SessionTest.java @@ -18,13 +18,12 @@ package org.eclipse.jetty.websocket.client; -import static org.hamcrest.Matchers.*; - import java.net.URI; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.test.BlockheadServer; @@ -34,6 +33,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + public class SessionTest { private BlockheadServer server; @@ -81,7 +83,10 @@ public class SessionTest Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1)); - cliSock.getSession().getRemote().sendStringByFuture("Hello World!"); + RemoteEndpoint remote = cliSock.getSession().getRemote(); + remote.sendStringByFuture("Hello World!"); + if (remote.isBatching()) + remote.flush(); srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500); // wait for response from server cliSock.waitForMessage(500,TimeUnit.MILLISECONDS); diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowClientTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowClientTest.java index 4337d625dba..5589c831490 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowClientTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowClientTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.client; -import static org.hamcrest.Matchers.is; - import java.net.URI; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -36,6 +34,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import static org.hamcrest.Matchers.is; + public class SlowClientTest { @Rule @@ -79,24 +79,23 @@ public class SlowClientTest client.getPolicy().setIdleTimeout(60000); URI wsUri = server.getWsUri(); - Future future = client.connect(tsocket,wsUri); + Future future = client.connect(tsocket, wsUri); ServerConnection sconnection = server.accept(); sconnection.setSoTimeout(60000); sconnection.upgrade(); // Confirm connected - future.get(500,TimeUnit.MILLISECONDS); - tsocket.waitForConnected(500,TimeUnit.MILLISECONDS); + future.get(500, TimeUnit.MILLISECONDS); + tsocket.waitForConnected(500, TimeUnit.MILLISECONDS); + + int messageCount = 10; // Setup server read thread - ServerReadThread reader = new ServerReadThread(sconnection); - reader.setExpectedMessageCount(Integer.MAX_VALUE); // keep reading till I tell you to stop + ServerReadThread reader = new ServerReadThread(sconnection, messageCount); reader.start(); // Have client write slowly. - int messageCount = 1000; - ClientWriteThread writer = new ClientWriteThread(tsocket.getSession()); writer.setMessageCount(messageCount); writer.setMessage("Hello"); @@ -104,13 +103,15 @@ public class SlowClientTest writer.start(); writer.join(); + reader.waitForExpectedMessageCount(1, TimeUnit.MINUTES); + // Verify receive - Assert.assertThat("Frame Receive Count",reader.getFrameCount(),is(messageCount)); + Assert.assertThat("Frame Receive Count", reader.getFrameCount(), is(messageCount)); // Close - tsocket.getSession().close(StatusCode.NORMAL,"Done"); + tsocket.getSession().close(StatusCode.NORMAL, "Done"); - Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(3,TimeUnit.MINUTES)); + Assert.assertTrue("Client Socket Closed", tsocket.closeLatch.await(3, TimeUnit.MINUTES)); tsocket.assertCloseCode(StatusCode.NORMAL); reader.cancel(); // stop reading diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java index ddc26b622fe..80720f329f3 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.client; -import static org.hamcrest.Matchers.is; - import java.net.URI; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -37,6 +35,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import static org.hamcrest.Matchers.is; + public class SlowServerTest { @Rule @@ -91,11 +91,10 @@ public class SlowServerTest future.get(500,TimeUnit.MILLISECONDS); tsocket.waitForConnected(500,TimeUnit.MILLISECONDS); - int messageCount = 10; // TODO: increase to 1000 + int messageCount = 10; // Setup slow server read thread - ServerReadThread reader = new ServerReadThread(sconnection); - reader.setExpectedMessageCount(messageCount); + ServerReadThread reader = new ServerReadThread(sconnection, messageCount); reader.setSlowness(100); // slow it down reader.start(); diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java index 03856618fd0..69091bd4210 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientTest.java @@ -18,11 +18,6 @@ package org.eclipse.jetty.websocket.client; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; - import java.net.InetSocketAddress; import java.net.URI; import java.util.Arrays; @@ -33,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.common.frames.TextFrame; @@ -45,6 +41,11 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + @RunWith(AdvancedRunner.class) public class WebSocketClientTest { @@ -118,7 +119,10 @@ public class WebSocketClientTest Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1)); - cliSock.getSession().getRemote().sendStringByFuture("Hello World!"); + RemoteEndpoint remote = cliSock.getSession().getRemote(); + remote.sendStringByFuture("Hello World!"); + if (remote.isBatching()) + remote.flush(); srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500); // wait for response from server cliSock.waitForMessage(500,TimeUnit.MILLISECONDS); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java index ac62849ea05..22dec2edad5 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java @@ -58,7 +58,7 @@ public class Generator /** * The overhead (maximum) for a framing header. Assuming a maximum sized payload with masking key. */ - public static final int OVERHEAD = 28; + public static final int MAX_HEADER_LENGTH = 28; private final WebSocketBehavior behavior; private final ByteBufferPool bufferPool; @@ -196,7 +196,7 @@ public class Generator // we need a framing header assertFrameValid(frame); - ByteBuffer buffer = bufferPool.acquire(OVERHEAD,true); + ByteBuffer buffer = bufferPool.acquire(MAX_HEADER_LENGTH,true); BufferUtil.clearToFill(buffer); /* diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index 4e0a57fc851..158e3b6c48f 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -37,6 +37,7 @@ import org.eclipse.jetty.websocket.common.frames.DataFrame; import org.eclipse.jetty.websocket.common.frames.PingFrame; import org.eclipse.jetty.websocket.common.frames.PongFrame; import org.eclipse.jetty.websocket.common.frames.TextFrame; +import org.eclipse.jetty.websocket.common.io.FrameFlusher; import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; /** @@ -44,8 +45,7 @@ import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; */ public class WebSocketRemoteEndpoint implements RemoteEndpoint { - /** Message Type*/ - private enum MsgType + private enum MsgType { BLOCKING, ASYNC, @@ -53,14 +53,14 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint PARTIAL_TEXT, PARTIAL_BINARY } - + private static final WriteCallback NOOP_CALLBACK = new WriteCallback() { @Override public void writeSuccess() { } - + @Override public void writeFailed(Throwable x) { @@ -68,21 +68,25 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint }; private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class); - public final LogicalConnection connection; - public final OutgoingFrames outgoing; - /** JSR-356 blocking send behaviour message and Type sanity to support partial send properly */ - private final static int ASYNC_MASK = 0x0000FFFF; - private final static int BLOCK_MASK = 0x00010000; - private final static int STREAM_MASK = 0x00020000; - private final static int PARTIAL_TEXT_MASK= 0x00040000; - private final static int PARTIAL_BINARY_MASK= 0x00080000; - + private final static int ASYNC_MASK = 0x0000FFFF; + private final static int BLOCK_MASK = 0x00010000; + private final static int STREAM_MASK = 0x00020000; + private final static int PARTIAL_TEXT_MASK = 0x00040000; + private final static int PARTIAL_BINARY_MASK = 0x00080000; + + private final LogicalConnection connection; + private final OutgoingFrames outgoing; private final AtomicInteger msgState = new AtomicInteger(); - private final BlockingWriteCallback blocker = new BlockingWriteCallback(); + private volatile boolean batching; public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing) + { + this(connection, outgoing, true); + } + + public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing, boolean batching) { if (connection == null) { @@ -90,11 +94,12 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint } this.connection = connection; this.outgoing = outgoing; + this.batching = batching; } private void blockingWrite(WebSocketFrame frame) throws IOException { - uncheckedSendFrame(frame,blocker); + uncheckedSendFrame(frame, blocker); blocker.block(); } @@ -106,107 +111,107 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint // Blocking -> Pending!! ; Async -> STREAMING ; Partial -> Pending!! ; Stream -> STREAMING // Blocking -> Pending!! ; Async -> Pending!! ; Partial -> PARTIAL_TEXT ; Stream -> Pending!! // Blocking -> Pending!! ; Async -> Pending!! ; Partial -> PARTIAL_BIN ; Stream -> Pending!! - - while(true) + + while (true) { int state = msgState.get(); - + switch (type) { case BLOCKING: - if ((state&(PARTIAL_BINARY_MASK+PARTIAL_TEXT_MASK))!=0) - throw new IllegalStateException(String.format("Partial message pending %x for %s",state,type)); - if ((state&BLOCK_MASK)!=0) - throw new IllegalStateException(String.format("Blocking message pending %x for %s",state,type)); - if (msgState.compareAndSet(state,state|BLOCK_MASK)) - return state==0; + if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0) + throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type)); + if ((state & BLOCK_MASK) != 0) + throw new IllegalStateException(String.format("Blocking message pending %x for %s", state, type)); + if (msgState.compareAndSet(state, state | BLOCK_MASK)) + return state == 0; break; - + case ASYNC: - if ((state&(PARTIAL_BINARY_MASK+PARTIAL_TEXT_MASK))!=0) - throw new IllegalStateException(String.format("Partial message pending %x for %s",state,type)); - if ((state&ASYNC_MASK)==ASYNC_MASK) - throw new IllegalStateException(String.format("Too many async sends: %x",state)); - if (msgState.compareAndSet(state,state+1)) - return state==0; + if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0) + throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type)); + if ((state & ASYNC_MASK) == ASYNC_MASK) + throw new IllegalStateException(String.format("Too many async sends: %x", state)); + if (msgState.compareAndSet(state, state + 1)) + return state == 0; break; - + case STREAMING: - if ((state&(PARTIAL_BINARY_MASK+PARTIAL_TEXT_MASK))!=0) - throw new IllegalStateException(String.format("Partial message pending %x for %s",state,type)); - if ((state&STREAM_MASK)!=0) - throw new IllegalStateException(String.format("Already streaming %x for %s",state,type)); - if (msgState.compareAndSet(state,state|STREAM_MASK)) - return state==0; + if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0) + throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type)); + if ((state & STREAM_MASK) != 0) + throw new IllegalStateException(String.format("Already streaming %x for %s", state, type)); + if (msgState.compareAndSet(state, state | STREAM_MASK)) + return state == 0; break; - + case PARTIAL_BINARY: - if (state==PARTIAL_BINARY_MASK) + if (state == PARTIAL_BINARY_MASK) return false; - if (state==0) + if (state == 0) { - if (msgState.compareAndSet(0,state|PARTIAL_BINARY_MASK)) + if (msgState.compareAndSet(0, state | PARTIAL_BINARY_MASK)) return true; } - throw new IllegalStateException(String.format("Cannot send %s in state %x",type,state)); - + throw new IllegalStateException(String.format("Cannot send %s in state %x", type, state)); + case PARTIAL_TEXT: - if (state==PARTIAL_TEXT_MASK) + if (state == PARTIAL_TEXT_MASK) return false; - if (state==0) + if (state == 0) { - if (msgState.compareAndSet(0,state|PARTIAL_TEXT_MASK)) + if (msgState.compareAndSet(0, state | PARTIAL_TEXT_MASK)) return true; } - throw new IllegalStateException(String.format("Cannot send %s in state %x",type,state)); + throw new IllegalStateException(String.format("Cannot send %s in state %x", type, state)); } } } private void unlockMsg(MsgType type) { - while(true) + while (true) { int state = msgState.get(); - + switch (type) { case BLOCKING: - if ((state&BLOCK_MASK)==0) - throw new IllegalStateException(String.format("Not Blocking in state %x",state)); - if (msgState.compareAndSet(state,state&~BLOCK_MASK)) + if ((state & BLOCK_MASK) == 0) + throw new IllegalStateException(String.format("Not Blocking in state %x", state)); + if (msgState.compareAndSet(state, state & ~BLOCK_MASK)) return; break; - + case ASYNC: - if ((state&ASYNC_MASK)==0) - throw new IllegalStateException(String.format("Not Async in %x",state)); - if (msgState.compareAndSet(state,state-1)) + if ((state & ASYNC_MASK) == 0) + throw new IllegalStateException(String.format("Not Async in %x", state)); + if (msgState.compareAndSet(state, state - 1)) return; break; - + case STREAMING: - if ((state&STREAM_MASK)==0) - throw new IllegalStateException(String.format("Not Streaming in state %x",state)); - if (msgState.compareAndSet(state,state&~STREAM_MASK)) + if ((state & STREAM_MASK) == 0) + throw new IllegalStateException(String.format("Not Streaming in state %x", state)); + if (msgState.compareAndSet(state, state & ~STREAM_MASK)) return; break; - + case PARTIAL_BINARY: - if (msgState.compareAndSet(PARTIAL_BINARY_MASK,0)) + if (msgState.compareAndSet(PARTIAL_BINARY_MASK, 0)) return; - throw new IllegalStateException(String.format("Not Partial Binary in state %x",state)); - + throw new IllegalStateException(String.format("Not Partial Binary in state %x", state)); + case PARTIAL_TEXT: - if (msgState.compareAndSet(PARTIAL_TEXT_MASK,0)) + if (msgState.compareAndSet(PARTIAL_TEXT_MASK, 0)) return; - throw new IllegalStateException(String.format("Not Partial Text in state %x",state)); - + throw new IllegalStateException(String.format("Not Partial Text in state %x", state)); + } } } - - + + public InetSocketAddress getInetSocketAddress() { return connection.getRemoteAddress(); @@ -214,15 +219,14 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint /** * Internal - * - * @param frame - * the frame to write + * + * @param frame the frame to write * @return the future for the network write of the frame */ private Future sendAsyncFrame(WebSocketFrame frame) { FutureWriteCallback future = new FutureWriteCallback(); - uncheckedSendFrame(frame,future); + uncheckedSendFrame(frame, future); return future; } @@ -238,7 +242,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint connection.getIOState().assertOutputOpen(); if (LOG.isDebugEnabled()) { - LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data)); + LOG.debug("sendBytes with {}", BufferUtil.toDetailString(data)); } blockingWrite(new BinaryFrame().setPayload(data)); } @@ -256,7 +260,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint { if (LOG.isDebugEnabled()) { - LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data)); + LOG.debug("sendBytesByFuture with {}", BufferUtil.toDetailString(data)); } return sendAsyncFrame(new BinaryFrame().setPayload(data)); } @@ -274,9 +278,9 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint { if (LOG.isDebugEnabled()) { - LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback); + LOG.debug("sendBytes({}, {})", BufferUtil.toDetailString(data), callback); } - uncheckedSendFrame(new BinaryFrame().setPayload(data),callback==null?NOOP_CALLBACK:callback); + uncheckedSendFrame(new BinaryFrame().setPayload(data), callback == null ? NOOP_CALLBACK : callback); } finally { @@ -288,12 +292,11 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint { try { - connection.getIOState().assertOutputOpen(); - OutgoingFrames.FlushMode flushMode = OutgoingFrames.FlushMode.FLUSH; - WebSocketSession session = connection.getSession(); - if (session != null && session.isBatching()) + OutgoingFrames.FlushMode flushMode = OutgoingFrames.FlushMode.SEND; + if (frame.isDataFrame() && isBatching()) flushMode = OutgoingFrames.FlushMode.AUTO; - outgoing.outgoingFrame(frame,callback,flushMode); + connection.getIOState().assertOutputOpen(); + outgoing.outgoingFrame(frame, callback, flushMode); } catch (IOException e) { @@ -304,14 +307,14 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint @Override public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException { - boolean first=lockMsg(MsgType.PARTIAL_BINARY); + boolean first = lockMsg(MsgType.PARTIAL_BINARY); try { if (LOG.isDebugEnabled()) { - LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast); + LOG.debug("sendPartialBytes({}, {})", BufferUtil.toDetailString(fragment), isLast); } - DataFrame frame = first?new BinaryFrame():new ContinuationFrame(); + DataFrame frame = first ? new BinaryFrame() : new ContinuationFrame(); frame.setPayload(fragment); frame.setFin(isLast); blockingWrite(frame); @@ -326,15 +329,15 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint @Override public void sendPartialString(String fragment, boolean isLast) throws IOException { - boolean first=lockMsg(MsgType.PARTIAL_TEXT); + boolean first = lockMsg(MsgType.PARTIAL_TEXT); try { if (LOG.isDebugEnabled()) { - LOG.debug("sendPartialString({}, {})",fragment,isLast); + LOG.debug("sendPartialString({}, {})", fragment, isLast); } - DataFrame frame = first?new TextFrame():new ContinuationFrame(); - frame.setPayload(BufferUtil.toBuffer(fragment,StandardCharsets.UTF_8)); + DataFrame frame = first ? new TextFrame() : new ContinuationFrame(); + frame.setPayload(BufferUtil.toBuffer(fragment, StandardCharsets.UTF_8)); frame.setFin(isLast); blockingWrite(frame); } @@ -350,7 +353,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint { if (LOG.isDebugEnabled()) { - LOG.debug("sendPing with {}",BufferUtil.toDetailString(applicationData)); + LOG.debug("sendPing with {}", BufferUtil.toDetailString(applicationData)); } sendAsyncFrame(new PingFrame().setPayload(applicationData)); } @@ -360,7 +363,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint { if (LOG.isDebugEnabled()) { - LOG.debug("sendPong with {}",BufferUtil.toDetailString(applicationData)); + LOG.debug("sendPong with {}", BufferUtil.toDetailString(applicationData)); } sendAsyncFrame(new PongFrame().setPayload(applicationData)); } @@ -374,7 +377,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint WebSocketFrame frame = new TextFrame().setPayload(text); if (LOG.isDebugEnabled()) { - LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload())); + LOG.debug("sendString with {}", BufferUtil.toDetailString(frame.getPayload())); } blockingWrite(frame); } @@ -393,9 +396,9 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint TextFrame frame = new TextFrame().setPayload(text); if (LOG.isDebugEnabled()) { - LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload())); + LOG.debug("sendStringByFuture with {}", BufferUtil.toDetailString(frame.getPayload())); } - return sendAsyncFrame(frame); + return sendAsyncFrame(frame); } finally { @@ -412,9 +415,37 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint TextFrame frame = new TextFrame().setPayload(text); if (LOG.isDebugEnabled()) { - LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback); + LOG.debug("sendString({},{})", BufferUtil.toDetailString(frame.getPayload()), callback); } - uncheckedSendFrame(frame,callback==null?NOOP_CALLBACK:callback); + uncheckedSendFrame(frame, callback == null ? NOOP_CALLBACK : callback); + } + finally + { + unlockMsg(MsgType.ASYNC); + } + } + + @Override + public boolean isBatching() + { + return batching; + } + + // Only the JSR needs to have this method exposed. + // In the Jetty implementation the batching is set + // at the moment of opening the session. + public void setBatching(boolean batching) + { + this.batching = batching; + } + + public void flush() throws IOException + { + lockMsg(MsgType.ASYNC); + try + { + uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, blocker); + blocker.block(); } finally { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 889376bcca3..da2a6b2f334 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -57,7 +57,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc private final URI requestURI; private final EventDriver websocket; private final LogicalConnection connection; - private final boolean batching; private final SessionListener[] sessionListeners; private final Executor executor; private ExtensionFactory extensionFactory; @@ -71,11 +70,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc private UpgradeResponse upgradeResponse; public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener... sessionListeners) - { - this(requestURI, websocket, connection, true, sessionListeners); - } - - public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, boolean batching, SessionListener... sessionListeners) { if (requestURI == null) { @@ -85,32 +79,30 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc this.requestURI = requestURI; this.websocket = websocket; this.connection = connection; - this.batching = batching; this.sessionListeners = sessionListeners; this.executor = connection.getExecutor(); this.outgoingHandler = connection; this.incomingHandler = websocket; - this.connection.getIOState().addListener(this); } @Override public void close() { - this.close(StatusCode.NORMAL,null); + this.close(StatusCode.NORMAL, null); } @Override public void close(CloseStatus closeStatus) { - this.close(closeStatus.getCode(),closeStatus.getPhrase()); + this.close(closeStatus.getCode(), closeStatus.getPhrase()); } @Override public void close(int statusCode, String reason) { - connection.close(statusCode,reason); - notifyClose(statusCode,reason); + connection.close(statusCode, reason); + notifyClose(statusCode, reason); } /** @@ -122,7 +114,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc connection.disconnect(); // notify of harsh disconnect - notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect"); + notifyClose(StatusCode.NO_CLOSE, "Harsh disconnect"); } public void dispatch(Runnable runnable) @@ -133,11 +125,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc @Override public void dump(Appendable out, String indent) throws IOException { - super.dump(out,indent); + super.dump(out, indent); out.append(indent).append(" +- incomingHandler : "); if (incomingHandler instanceof Dumpable) { - ((Dumpable)incomingHandler).dump(out,indent + " "); + ((Dumpable)incomingHandler).dump(out, indent + " "); } else { @@ -147,7 +139,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc out.append(indent).append(" +- outgoingHandler : "); if (outgoingHandler instanceof Dumpable) { - ((Dumpable)outgoingHandler).dump(out,indent + " "); + ((Dumpable)outgoingHandler).dump(out, indent + " "); } else { @@ -280,7 +272,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc { final int prime = 31; int result = 1; - result = (prime * result) + ((connection == null)?0:connection.hashCode()); + result = (prime * result) + ((connection == null) ? 0 : connection.hashCode()); return result; } @@ -335,14 +327,14 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc public void notifyClose(int statusCode, String reason) { - websocket.onClose(new CloseInfo(statusCode,reason)); + websocket.onClose(new CloseInfo(statusCode, reason)); } public void notifyError(Throwable cause) { incomingError(cause); } - + @SuppressWarnings("incomplete-switch") @Override public void onConnectionStateChange(ConnectionState state) @@ -370,9 +362,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc if (ioState.wasAbnormalClose()) { CloseInfo close = ioState.getCloseInfo(); - LOG.debug("Detected abnormal close: {}",close); + LOG.debug("Detected abnormal close: {}", close); // notify local endpoint - notifyClose(close.getStatusCode(),close.getReason()); + notifyClose(close.getStatusCode(), close.getReason()); } break; case OPEN: @@ -407,7 +399,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc connection.getIOState().onConnected(); // Connect remote - remote = new WebSocketRemoteEndpoint(connection,outgoingHandler); + remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchingDefault()); // Open WebSocket websocket.openSession(this); @@ -417,7 +409,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc if (LOG.isDebugEnabled()) { - LOG.debug("open -> {}",dump()); + LOG.debug("open -> {}", dump()); } } @@ -457,11 +449,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc List values = entry.getValue(); if (values != null) { - this.parameterMap.put(entry.getKey(),values.toArray(new String[values.size()])); + this.parameterMap.put(entry.getKey(), values.toArray(new String[values.size()])); } else { - this.parameterMap.put(entry.getKey(),new String[0]); + this.parameterMap.put(entry.getKey(), new String[0]); } } } @@ -478,10 +470,12 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc return connection; } - @Override - public boolean isBatching() + /** + * @return the default (initial) value for the batching mode. + */ + public boolean getBatchingDefault() { - return batching; + return true; } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 1049e71d45b..41f29d7107d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -156,6 +156,32 @@ public abstract class CompressExtension extends AbstractExtension flusher.iterate(); } + protected void notifyCallbackSuccess(WriteCallback callback) + { + try + { + if (callback != null) + callback.writeSuccess(); + } + catch (Throwable x) + { + LOG.debug("Exception while notifying success of callback " + callback, x); + } + } + + protected void notifyCallbackFailure(WriteCallback callback, Throwable failure) + { + try + { + if (callback != null) + callback.writeFailed(failure); + } + catch (Throwable x) + { + LOG.debug("Exception while notifying failure of callback " + callback, x); + } + } + @Override public String toString() { @@ -210,16 +236,8 @@ public abstract class CompressExtension extends AbstractExtension { Frame frame = entry.frame; FlushMode flushMode = entry.flushMode; - if (OpCode.isControlFrame(frame.getOpCode())) + if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload()) { - // Skip, cannot compress control frames. - nextOutgoingFrame(frame, this, flushMode); - return; - } - - if (!frame.hasPayload()) - { - // Pass through, nothing to do nextOutgoingFrame(frame, this, flushMode); return; } @@ -323,30 +341,4 @@ public abstract class CompressExtension extends AbstractExtension notifyCallbackFailure(entry.callback, x); } } - - protected void notifyCallbackSuccess(WriteCallback callback) - { - try - { - if (callback != null) - callback.writeSuccess(); - } - catch (Throwable x) - { - LOG.debug("Exception while notifying success of callback " + callback, x); - } - } - - protected void notifyCallbackFailure(WriteCallback callback, Throwable failure) - { - try - { - if (callback != null) - callback.writeFailed(failure); - } - catch (Throwable x) - { - LOG.debug("Exception while notifying failure of callback " + callback, x); - } - } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 771f2a64bb1..aadd5eda72c 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -63,9 +63,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { private class Flusher extends FrameFlusher { - private Flusher(Generator generator, EndPoint endpoint) + private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint) { - super(generator,endpoint); + super(bufferPool, generator, endpoint, getPolicy().getMaxBinaryMessageBufferSize(), 8); } @Override @@ -151,7 +151,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp /** * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload) */ - private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD; + private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH; private final ByteBufferPool bufferPool; private final Scheduler scheduler; @@ -178,7 +178,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp this.suspendToken = new AtomicBoolean(false); this.ioState = new IOState(); this.ioState.addListener(this); - this.flusher = new Flusher(generator,endp); + this.flusher = new Flusher(bufferPool,generator,endp); this.setInputBufferSize(policy.getInputBufferSize()); this.setMaxIdleTimeout(policy.getIdleTimeout()); } @@ -263,11 +263,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp super.fillInterested(); } - public void flush() - { - flusher.flush(); - } - @Override public ByteBufferPool getBufferPool() { @@ -381,7 +376,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { // Fire out a close frame, indicating abnormal shutdown, then disconnect CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason()); - outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(),FlushMode.FLUSH); + outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(),FlushMode.SEND); } else { @@ -392,7 +387,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp case CLOSING: CloseInfo close = ioState.getCloseInfo(); // append close frame - outgoingFrame(close.asFrame(),new OnDisconnectCallback(),FlushMode.FLUSH); + outgoingFrame(close.asFrame(),new OnDisconnectCallback(),FlushMode.SEND); default: break; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index 57bbe646760..b174284a196 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -19,15 +19,16 @@ package org.eclipse.jetty.websocket.common.io; import java.io.EOFException; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.ArrayQueue; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; @@ -37,323 +38,321 @@ import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.Generator; import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.common.frames.BinaryFrame; /** * Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)} */ -public class FrameFlusher +public class FrameFlusher { - private static final int MAX_GATHER = Integer.getInteger("org.eclipse.jetty.websocket.common.io.FrameFlusher.MAX_GATHER",8); + public static final BinaryFrame FLUSH_FRAME = new BinaryFrame(); private static final Logger LOG = Log.getLogger(FrameFlusher.class); - /** The endpoint to flush to */ + private final ByteBufferPool bufferPool; private final EndPoint endpoint; - - /** The websocket generator */ + private final int bufferSize; private final Generator generator; - + private final int maxGather; private final Object lock = new Object(); - - /** Backlog of frames */ - private final ArrayQueue queue = new ArrayQueue<>(16,16,lock); - - private final FlusherCB flusherCB = new FlusherCB(); - - /** the buffer input size */ - private int bufferSize = 2048; + private final ArrayQueue queue = new ArrayQueue<>(16, 16, lock); + private final Flusher flusher = new Flusher(); + private final AtomicBoolean closed = new AtomicBoolean(); + private volatile Throwable failure; - /** Tracking for failure */ - private Throwable failure; - /** Is WriteBytesProvider closed to more WriteBytes being enqueued? */ - private boolean closed; - - /** - * Create a WriteBytesProvider with specified Generator and "flush" Callback. - * - * @param generator - * the generator to use for converting {@link Frame} objects to network {@link ByteBuffer}s - * @param endpoint - * the endpoint to flush to. - */ - public FrameFlusher(Generator generator, EndPoint endpoint) - { - this.endpoint=endpoint; - this.generator = Objects.requireNonNull(generator); - } - - /** - * Set the buffer size used for generating ByteBuffers from the frames. - *

- * Value usually obtained from {@link AbstractConnection#getInputBufferSize()} - * - * @param bufferSize - * the buffer size to use - */ - public void setBufferSize(int bufferSize) + public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather) { + this.bufferPool = bufferPool; + this.endpoint = endpoint; this.bufferSize = bufferSize; - } - - public int getBufferSize() - { - return bufferSize; - } - - /** - * Force closure of write bytes - */ - public void close() - { - synchronized (lock) - { - if (!closed) - { - closed=true; - - EOFException eof = new EOFException("Connection has been disconnected"); - flusherCB.failed(eof); - for (FrameEntry frame : queue) - frame.notifyFailed(eof); - queue.clear(); - } - } - - } - - /** - * Used to test for the final frame possible to be enqueued, the CLOSE frame. - * - * @return true if close frame has been enqueued already. - */ - public boolean isClosed() - { - synchronized (lock) - { - return closed; - } + this.generator = Objects.requireNonNull(generator); + this.maxGather = maxGather; } public void enqueue(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode) { - Objects.requireNonNull(frame); - FrameEntry entry = new FrameEntry(frame,callback,flushMode); - LOG.debug("enqueue({})",entry); - Throwable failure=null; + if (closed.get()) + { + notifyCallbackFailure(callback, new EOFException("Connection has been closed locally")); + return; + } + if (flusher.isFailed()) + { + notifyCallbackFailure(callback, failure); + return; + } + + FrameEntry entry = new FrameEntry(frame, callback, flushMode); + synchronized (lock) { - if (closed) - { - // Closed for more frames. - LOG.debug("Write is closed: {} {}",frame,callback); - failure=new IOException("Write is closed"); - } - else if (this.failure!=null) - { - failure=this.failure; - } - switch (frame.getOpCode()) { case OpCode.PING: - queue.add(0,entry); + { + // Prepend PINGs so they are processed first. + queue.add(0, entry); break; + } case OpCode.CLOSE: - closed=true; + { + // There may be a chance that other frames are + // added after this close frame, but we will + // fail them later to keep it simple here. + closed.set(true); queue.add(entry); break; + } default: + { queue.add(entry); + break; + } } } - if (failure != null) - { - // no changes when failed - LOG.debug("Write is in failure: {} {}",frame,callback); - entry.notifyFailed(failure); - return; - } - - flush(); + if (LOG.isDebugEnabled()) + LOG.debug("{} queued {}", this, entry); + + flusher.iterate(); } - void flush() + public void close() { - flusherCB.iterate(); + if (closed.compareAndSet(false, true)) + { + LOG.debug("{} closing {}", this); + EOFException eof = new EOFException("Connection has been closed locally"); + flusher.failed(eof); + + // Fail also queued entries. + List entries = new ArrayList<>(); + synchronized (lock) + { + entries.addAll(queue); + queue.clear(); + } + // Notify outside sync block. + for (FrameEntry entry : entries) + notifyCallbackFailure(entry.callback, eof); + } } - + protected void onFailure(Throwable x) { LOG.warn(x); } + protected void notifyCallbackSuccess(WriteCallback callback) + { + try + { + if (callback != null) + callback.writeSuccess(); + } + catch (Throwable x) + { + LOG.debug("Exception while notifying success of callback " + callback, x); + } + } + + protected void notifyCallbackFailure(WriteCallback callback, Throwable failure) + { + try + { + if (callback != null) + callback.writeFailed(failure); + } + catch (Throwable x) + { + LOG.debug("Exception while notifying failure of callback " + callback, x); + } + } + @Override public String toString() { - StringBuilder b = new StringBuilder(); - b.append("WriteBytesProvider["); - if (failure != null) - { - b.append("failure=").append(failure.getClass().getName()); - b.append(":").append(failure.getMessage()).append(','); - } - else - { - b.append("queue.size=").append(queue.size()); - } - b.append(']'); - return b.toString(); + ByteBuffer aggregate = flusher.aggregate; + return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]", + getClass().getSimpleName(), + queue.size(), + aggregate == null ? 0 : aggregate.position(), + failure); } - private class FlusherCB extends IteratingCallback + private class Flusher extends IteratingCallback { - private final ArrayQueue active = new ArrayQueue<>(lock); - private final List buffers = new ArrayList<>(MAX_GATHER*2); - private final List succeeded = new ArrayList<>(MAX_GATHER+1); - - @Override - protected void completed() - { - // will never be called as process always returns SCHEDULED or IDLE - throw new IllegalStateException(); - } + private final List entries = new ArrayList<>(maxGather); + private final List buffers = new ArrayList<>(maxGather * 2 + 1); + private ByteBuffer aggregate; + private boolean releaseAggregate; @Override protected Action process() throws Exception { + int space = aggregate == null ? bufferSize : aggregate.remaining(); + boolean batch = true; synchronized (lock) { - succeeded.clear(); - - // If we exited the loop above without hitting the gatheredBufferLimit - // then all the active frames are done, so we can add some more. - while (buffers.size() (bufferSize >> 2)) + batch = false; + + // If the aggregate buffer overflows, do not batch. + space -= approxFrameLength; + if (space <= 0) + batch = false; + + entries.add(entry); } - - if (LOG.isDebugEnabled()) - LOG.debug("process {} active={} buffers={}",FrameFlusher.this,active,buffers); } - - if (buffers.size()==0) - return Action.IDLE; - endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()])); - buffers.clear(); + if (LOG.isDebugEnabled()) + LOG.debug("{} processing {} entries: {}", FrameFlusher.this, entries.size(), entries); + + if (entries.isEmpty()) + { + if (releaseAggregate) + { + bufferPool.release(aggregate); + if (LOG.isDebugEnabled()) + LOG.debug("{} released aggregate buffer {}", FrameFlusher.this, aggregate); + aggregate = null; + } + return Action.IDLE; + } + + if (batch) + batch(); + else + flush(); + return Action.SCHEDULED; } + private void flush() + { + if (!BufferUtil.isEmpty(aggregate)) + { + BufferUtil.flipToFlush(aggregate, 0); + buffers.add(aggregate); + releaseAggregate = true; + if (LOG.isDebugEnabled()) + LOG.debug("{} flushing aggregate {}", FrameFlusher.this, aggregate); + } + + for (FrameEntry entry : entries) + { + // Skip "synthetic" frames used for flushing. + if (entry.frame == FLUSH_FRAME) + continue; + buffers.add(entry.getHeaderBytes()); + ByteBuffer payload = entry.frame.getPayload(); + if (BufferUtil.hasContent(payload)) + buffers.add(payload); + } + + if (LOG.isDebugEnabled()) + LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries); + endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()])); + buffers.clear(); + } + + private void batch() + { + if (aggregate == null) + { + aggregate = bufferPool.acquire(bufferSize, true); + BufferUtil.flipToFill(aggregate); + if (LOG.isDebugEnabled()) + LOG.debug("{} acquired aggregate buffer {}", FrameFlusher.this, aggregate); + releaseAggregate = false; + } + + for (FrameEntry entry : entries) + { + // TODO: would be better to generate the header bytes directly into the aggregate buffer. + ByteBuffer header = entry.getHeaderBytes(); + aggregate.put(header); + + ByteBuffer payload = entry.frame.getPayload(); + if (BufferUtil.hasContent(payload)) + aggregate.put(payload); + } + if (LOG.isDebugEnabled()) + LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries); + succeeded(); + } + @Override public void succeeded() - { - synchronized (lock) + { + for (FrameEntry entry : entries) { - succeeded.addAll(active); - active.clear(); + notifyCallbackSuccess(entry.callback); + entry.release(); } - - for (FrameEntry frame:succeeded) - { - frame.notifySucceeded(); - frame.freeBuffers(); - } - + entries.clear(); + + // Do not release the aggregate yet, in case there are more frames to process. + if (releaseAggregate) + BufferUtil.clearToFill(aggregate); + super.succeeded(); } - + + @Override + protected void completed() + { + // This IteratingCallback never completes. + } @Override public void failed(Throwable x) { - synchronized (lock) + for (FrameEntry entry : entries) { - succeeded.addAll(active); - active.clear(); + notifyCallbackFailure(entry.callback, x); + entry.release(); } - - for (FrameEntry frame : succeeded) - { - frame.notifyFailed(x); - frame.freeBuffers(); - } - succeeded.clear(); - + entries.clear(); super.failed(x); + failure = x; onFailure(x); } } - private class FrameEntry + private class FrameEntry { - protected final AtomicBoolean failed = new AtomicBoolean(false); - protected final Frame frame; + private final Frame frame; + private final WriteCallback callback; private final OutgoingFrames.FlushMode flushMode; - protected final WriteCallback callback; - /** holds reference to header ByteBuffer, as it needs to be released on success/failure */ private ByteBuffer headerBuffer; - public FrameEntry(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode) + private FrameEntry(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode) { - this.frame = frame; + this.frame = Objects.requireNonNull(frame); this.callback = callback; this.flushMode = flushMode; } - public ByteBuffer getHeaderBytes() + private ByteBuffer getHeaderBytes() { - ByteBuffer buf = generator.generateHeaderBytes(frame); - headerBuffer = buf; - return buf; + return headerBuffer = generator.generateHeaderBytes(frame); } - public ByteBuffer getPayload() - { - // There is no need to release this ByteBuffer, as it is just a slice of the user provided payload - return frame.getPayload(); - } - - public void notifyFailed(Throwable t) - { - freeBuffers(); - if (failed.getAndSet(true) == false) - { - try - { - if (callback!=null) - callback.writeFailed(t); - } - catch (Throwable e) - { - LOG.warn("Uncaught exception",e); - } - } - } - - public void notifySucceeded() - { - freeBuffers(); - if (callback == null) - { - return; - } - try - { - callback.writeSuccess(); - } - catch (Throwable t) - { - LOG.debug(t); - } - } - - public void freeBuffers() + private void release() { if (headerBuffer != null) { @@ -364,7 +363,7 @@ public class FrameFlusher public String toString() { - return "["+callback+","+frame+","+failure+"]"; + return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, flushMode, failure); } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java index e4b29d5cb5c..053857c3969 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java @@ -43,7 +43,7 @@ public class FramePipes @Override public void incomingFrame(Frame frame) { - this.outgoing.outgoingFrame(frame,null,OutgoingFrames.FlushMode.FLUSH); + this.outgoing.outgoingFrame(frame,null,OutgoingFrames.FlushMode.SEND); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index 8e2b97f5f0a..baa99de0946 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -137,7 +137,7 @@ public class MessageOutputStream extends OutputStream try { - outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.FLUSH); + outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.SEND); // block on write blocker.block(); // block success diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index 01d9748d68a..3b97c636219 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -118,7 +118,7 @@ public class MessageWriter extends Writer try { - outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.FLUSH); + outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.SEND); // block on write blocker.block(); // write success diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/GeneratorTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/GeneratorTest.java index 9557d4678f5..aee29187618 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/GeneratorTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/GeneratorTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common; -import static org.hamcrest.Matchers.*; - import java.nio.ByteBuffer; import java.util.Arrays; @@ -41,6 +39,8 @@ import org.eclipse.jetty.websocket.common.util.Hex; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.Matchers.is; + public class GeneratorTest { private static final Logger LOG = Log.getLogger(GeneratorTest.WindowHelper.class); @@ -64,7 +64,7 @@ public class GeneratorTest int completeBufSize = 0; for (Frame f : frames) { - completeBufSize += Generator.OVERHEAD + f.getPayloadLength(); + completeBufSize += Generator.MAX_HEADER_LENGTH + f.getPayloadLength(); } ByteBuffer completeBuf = ByteBuffer.allocate(completeBufSize); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketFrameTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketFrameTest.java index c359a6a3460..d8fc0ddca85 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketFrameTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketFrameTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common; -import static org.hamcrest.Matchers.*; - import java.nio.ByteBuffer; import org.eclipse.jetty.io.MappedByteBufferPool; @@ -37,6 +35,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import static org.hamcrest.Matchers.is; + public class WebSocketFrameTest { @Rule @@ -47,7 +47,7 @@ public class WebSocketFrameTest private ByteBuffer generateWholeFrame(Generator generator, Frame frame) { - ByteBuffer buf = ByteBuffer.allocate(frame.getPayloadLength() + Generator.OVERHEAD); + ByteBuffer buf = ByteBuffer.allocate(frame.getPayloadLength() + Generator.MAX_HEADER_LENGTH); generator.generateWholeFrame(frame,buf); BufferUtil.flipToFlush(buf,0); return buf; diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase2.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase2.java index 8dcf7ff4d5e..dc0b2ec67f1 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase2.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase2.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common.ab; -import static org.hamcrest.Matchers.is; - import java.nio.ByteBuffer; import java.util.Arrays; @@ -41,6 +39,8 @@ import org.eclipse.jetty.websocket.common.test.UnitParser; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.Matchers.is; + public class TestABCase2 { WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT); @@ -288,7 +288,7 @@ public class TestABCase2 byte[] bytes = new byte[126]; Arrays.fill(bytes,(byte)0x00); - ByteBuffer expected = ByteBuffer.allocate(bytes.length + Generator.OVERHEAD); + ByteBuffer expected = ByteBuffer.allocate(bytes.length + Generator.MAX_HEADER_LENGTH); byte b; diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java index 1f0e7aa942e..6ffc894e39e 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java @@ -165,7 +165,7 @@ public class FragmentExtensionTest for (String section : quote) { Frame frame = new TextFrame().setPayload(section); - ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); } // Expected Frames @@ -237,7 +237,7 @@ public class FragmentExtensionTest for (String section : quote) { Frame frame = new TextFrame().setPayload(section); - ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); } // Expected Frames @@ -294,7 +294,7 @@ public class FragmentExtensionTest String payload = "Are you there?"; Frame ping = new PingFrame().setPayload(payload); - ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.SEND); capture.assertFrameCount(1); capture.assertHasFrame(OpCode.PING, 1); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java index ed7a9c60433..182d846574b 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java @@ -81,7 +81,7 @@ public class IdentityExtensionTest ext.setNextOutgoingFrames(capture); Frame frame = new TextFrame().setPayload("hello"); - ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); capture.assertFrameCount(1); capture.assertHasFrame(OpCode.TEXT, 1); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java index 93b8c93f5b4..1ff6bd6db86 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java @@ -127,7 +127,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest ext.setNextOutgoingFrames(capture); Frame frame = new TextFrame().setPayload(text); - ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); capture.assertBytes(0, expectedHex); } @@ -234,9 +234,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest init(ext); ext.setNextOutgoingFrames(capture); - ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH); - ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH); - ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND); List actual = capture.getCaptured(); @@ -308,8 +308,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator); ext.setNextOutgoingFrames(capture); - ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, OutgoingFrames.FlushMode.FLUSH); - ext.outgoingFrame(new TextFrame().setPayload("There"), null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, OutgoingFrames.FlushMode.SEND); + ext.outgoingFrame(new TextFrame().setPayload("There"), null, OutgoingFrames.FlushMode.SEND); capture.assertBytes(0, "c107f248cdc9c90700"); } @@ -430,7 +430,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest BinaryFrame frame = new BinaryFrame(); frame.setPayload(input); frame.setFin(true); - clientExtension.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); + clientExtension.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND); Assert.assertArrayEquals(input, result.toByteArray()); } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java index f993260bf29..36a45e7a6bf 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java @@ -318,7 +318,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest String payload = "Are you there?"; Frame ping = new PingFrame().setPayload(payload); - ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.SEND); capture.assertFrameCount(1); capture.assertHasFrame(OpCode.PING, 1); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java index 7681905bad6..df0e9847c7f 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java @@ -712,7 +712,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti { frame.setMask(clientmask); } - extensionStack.outgoingFrame(frame,null,FlushMode.FLUSH); + extensionStack.outgoingFrame(frame,null,FlushMode.SEND); } public void writeRaw(ByteBuffer buf) throws IOException diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java index b16cd09c4e0..71087c8ca2b 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java @@ -561,7 +561,7 @@ public class BlockheadServer public void write(Frame frame) throws IOException { LOG.debug("write(Frame->{}) to {}",frame,outgoing); - outgoing.outgoingFrame(frame,null,FlushMode.FLUSH); + outgoing.outgoingFrame(frame,null,FlushMode.SEND); } public void write(int b) throws IOException diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/Fuzzer.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/Fuzzer.java index 9cd0bfec238..3d6c6414838 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/Fuzzer.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/Fuzzer.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common.test; -import static org.hamcrest.Matchers.*; - import java.io.IOException; import java.net.SocketException; import java.nio.ByteBuffer; @@ -41,6 +39,9 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.io.IOState; import org.junit.Assert; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + /** * Fuzzing utility for the AB tests. */ @@ -103,7 +104,7 @@ public class Fuzzer int buflen = 0; for (Frame f : send) { - buflen += f.getPayloadLength() + Generator.OVERHEAD; + buflen += f.getPayloadLength() + Generator.MAX_HEADER_LENGTH; } ByteBuffer buf = ByteBuffer.allocate(buflen); @@ -260,7 +261,7 @@ public class Fuzzer int buflen = 0; for (Frame f : send) { - buflen += f.getPayloadLength() + Generator.OVERHEAD; + buflen += f.getPayloadLength() + Generator.MAX_HEADER_LENGTH; } ByteBuffer buf = ByteBuffer.allocate(buflen); @@ -295,7 +296,7 @@ public class Fuzzer { f.setMask(MASK); // make sure we have mask set // Using lax generator, generate and send - ByteBuffer fullframe = ByteBuffer.allocate(f.getPayloadLength() + Generator.OVERHEAD); + ByteBuffer fullframe = ByteBuffer.allocate(f.getPayloadLength() + Generator.MAX_HEADER_LENGTH); BufferUtil.clearToFill(fullframe); generator.generateWholeFrame(f,fullframe); BufferUtil.flipToFlush(fullframe,0); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java index 873971f7c24..efeb97ce3b7 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java @@ -64,7 +64,7 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames @Override public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { - ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + frame.getPayloadLength()); + ByteBuffer buf = ByteBuffer.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength()); generator.generateWholeFrame(frame,buf); BufferUtil.flipToFlush(buf,0); captured.add(buf); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/UnitGenerator.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/UnitGenerator.java index 1dcf19d9d6b..b410f496bc6 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/UnitGenerator.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/UnitGenerator.java @@ -61,7 +61,7 @@ public class UnitGenerator extends Generator int buflen = 0; for (Frame f : frames) { - buflen += f.getPayloadLength() + Generator.OVERHEAD; + buflen += f.getPayloadLength() + Generator.MAX_HEADER_LENGTH; } ByteBuffer completeBuf = ByteBuffer.allocate(buflen); BufferUtil.clearToFill(completeBuf); @@ -96,7 +96,7 @@ public class UnitGenerator extends Generator int buflen = 0; for (Frame f : frames) { - buflen += f.getPayloadLength() + Generator.OVERHEAD; + buflen += f.getPayloadLength() + Generator.MAX_HEADER_LENGTH; } ByteBuffer completeBuf = ByteBuffer.allocate(buflen); BufferUtil.clearToFill(completeBuf); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FirefoxTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FirefoxTest.java index c79b0e5df99..46dbdd92405 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FirefoxTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FirefoxTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.server; -import static org.hamcrest.Matchers.*; - import java.util.concurrent.TimeUnit; import org.eclipse.jetty.websocket.common.WebSocketFrame; @@ -32,6 +30,8 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import static org.hamcrest.Matchers.is; + public class FirefoxTest { private static SimpleServletServer server; @@ -52,8 +52,7 @@ public class FirefoxTest @Test public void testConnectionKeepAlive() throws Exception { - BlockheadClient client = new BlockheadClient(server.getServerUri()); - try + try (BlockheadClient client = new BlockheadClient(server.getServerUri())) { // Odd Connection Header value seen in Firefox client.setConnectionValue("keep-alive, Upgrade"); @@ -66,13 +65,9 @@ public class FirefoxTest client.write(new TextFrame().setPayload(msg)); // Read frame (hopefully text frame) - IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500); + IncomingFramesCapture capture = client.readFrames(1, TimeUnit.MILLISECONDS, 500); WebSocketFrame tf = capture.getFrames().poll(); - Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg)); - } - finally - { - client.close(); + Assert.assertThat("Text Frame.status code", tf.getPayloadAsUTF8(), is(msg)); } } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java index 34b5e40ac31..878e68920e6 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketOverSSLTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.server; -import static org.hamcrest.Matchers.*; - import java.net.URI; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -27,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.toolchain.test.EventQueue; import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPool; @@ -38,6 +37,8 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import static org.hamcrest.Matchers.is; + public class WebSocketOverSSLTest { @Rule @@ -84,7 +85,10 @@ public class WebSocketOverSSLTest // Generate text frame String msg = "this is an echo ... cho ... ho ... o"; - session.getRemote().sendString(msg); + RemoteEndpoint remote = session.getRemote(); + remote.sendString(msg); + if (remote.isBatching()) + remote.flush(); // Read frame (hopefully text frame) clientSocket.messages.awaitEventCount(1,500,TimeUnit.MILLISECONDS); @@ -122,7 +126,10 @@ public class WebSocketOverSSLTest Session session = fut.get(5,TimeUnit.SECONDS); // Generate text frame - session.getRemote().sendString("session.isSecure"); + RemoteEndpoint remote = session.getRemote(); + remote.sendString("session.isSecure"); + if (remote.isBatching()) + remote.flush(); // Read frame (hopefully text frame) clientSocket.messages.awaitEventCount(1,500,TimeUnit.MILLISECONDS); @@ -160,7 +167,10 @@ public class WebSocketOverSSLTest Session session = fut.get(5,TimeUnit.SECONDS); // Generate text frame - session.getRemote().sendString("session.upgradeRequest.requestURI"); + RemoteEndpoint remote = session.getRemote(); + remote.sendString("session.upgradeRequest.requestURI"); + if (remote.isBatching()) + remote.flush(); // Read frame (hopefully text frame) clientSocket.messages.awaitEventCount(1,500,TimeUnit.MILLISECONDS); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServerSessionTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServerSessionTest.java index 28d32c08123..8b3732bfb71 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServerSessionTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketServerSessionTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.server; -import static org.hamcrest.Matchers.*; - import java.net.URI; import java.util.Queue; import java.util.concurrent.TimeUnit; @@ -36,6 +34,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; +import static org.hamcrest.Matchers.is; + /** * Testing various aspects of the server side support for WebSocket {@link Session} */ @@ -61,8 +61,7 @@ public class WebSocketServerSessionTest public void testDisconnect() throws Exception { URI uri = server.getServerUri().resolve("/test/disconnect"); - BlockheadClient client = new BlockheadClient(uri); - try + try (BlockheadClient client = new BlockheadClient(uri)) { client.connect(); client.sendStandardRequest(); @@ -70,11 +69,7 @@ public class WebSocketServerSessionTest client.write(new TextFrame().setPayload("harsh-disconnect")); - client.awaitDisconnect(1,TimeUnit.SECONDS); - } - finally - { - client.close(); + client.awaitDisconnect(1, TimeUnit.SECONDS); } } @@ -82,8 +77,7 @@ public class WebSocketServerSessionTest public void testUpgradeRequestResponse() throws Exception { URI uri = server.getServerUri().resolve("/test?snack=cashews&amount=handful&brand=off"); - BlockheadClient client = new BlockheadClient(uri); - try + try (BlockheadClient client = new BlockheadClient(uri)) { client.connect(); client.sendStandardRequest(); @@ -93,24 +87,19 @@ public class WebSocketServerSessionTest client.write(new TextFrame().setPayload("getParameterMap|snack")); client.write(new TextFrame().setPayload("getParameterMap|amount")); client.write(new TextFrame().setPayload("getParameterMap|brand")); - client.write(new TextFrame().setPayload("getParameterMap|cost")); // intentionall invalid + client.write(new TextFrame().setPayload("getParameterMap|cost")); // intentionally invalid // Read frame (hopefully text frame) - IncomingFramesCapture capture = client.readFrames(4,TimeUnit.MILLISECONDS,500); + IncomingFramesCapture capture = client.readFrames(4, TimeUnit.SECONDS, 5); Queue frames = capture.getFrames(); WebSocketFrame tf = frames.poll(); - Assert.assertThat("Parameter Map[snack]",tf.getPayloadAsUTF8(),is("[cashews]")); + Assert.assertThat("Parameter Map[snack]", tf.getPayloadAsUTF8(), is("[cashews]")); tf = frames.poll(); - Assert.assertThat("Parameter Map[amount]",tf.getPayloadAsUTF8(),is("[handful]")); + Assert.assertThat("Parameter Map[amount]", tf.getPayloadAsUTF8(), is("[handful]")); tf = frames.poll(); - Assert.assertThat("Parameter Map[brand]",tf.getPayloadAsUTF8(),is("[off]")); + Assert.assertThat("Parameter Map[brand]", tf.getPayloadAsUTF8(), is("[off]")); tf = frames.poll(); - Assert.assertThat("Parameter Map[cost]",tf.getPayloadAsUTF8(),is("")); - } - finally - { - client.close(); + Assert.assertThat("Parameter Map[cost]", tf.getPayloadAsUTF8(), is("")); } } - } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java index 5b7a4b869bc..cb1a55a8356 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/MyEchoSocket.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.websocket.server.examples; import java.io.IOException; +import org.eclipse.jetty.io.RuntimeIOException; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.WebSocketAdapter; /** @@ -38,11 +40,14 @@ public class MyEchoSocket extends WebSocketAdapter try { // echo the data back - getRemote().sendString(message); + RemoteEndpoint remote = getRemote(); + remote.sendString(message); + if (remote.isBatching()) + remote.flush(); } catch (IOException e) { - e.printStackTrace(); + throw new RuntimeIOException(e); } } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java index 742f7634dca..9b0a609601e 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/examples/echo/BigEchoSocket.java @@ -18,10 +18,12 @@ package org.eclipse.jetty.websocket.server.examples.echo; +import java.io.IOException; import java.nio.ByteBuffer; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; @@ -35,24 +37,30 @@ public class BigEchoSocket private static final Logger LOG = Log.getLogger(BigEchoSocket.class); @OnWebSocketMessage - public void onBinary(Session session, byte buf[], int offset, int length) + public void onBinary(Session session, byte buf[], int offset, int length) throws IOException { if (!session.isOpen()) { LOG.warn("Session is closed"); return; } - session.getRemote().sendBytes(ByteBuffer.wrap(buf,offset,length),null); + RemoteEndpoint remote = session.getRemote(); + remote.sendBytes(ByteBuffer.wrap(buf, offset, length), null); + if (remote.isBatching()) + remote.flush(); } @OnWebSocketMessage - public void onText(Session session, String message) + public void onText(Session session, String message) throws IOException { if (!session.isOpen()) { LOG.warn("Session is closed"); return; } - session.getRemote().sendString(message,null); + RemoteEndpoint remote = session.getRemote(); + remote.sendString(message, null); + if (remote.isBatching()) + remote.flush(); } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java index c5d1e1c6516..ed84403af98 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/EchoSocket.java @@ -18,10 +18,12 @@ package org.eclipse.jetty.websocket.server.helper; +import java.io.IOException; import java.nio.ByteBuffer; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; @@ -38,13 +40,16 @@ public class EchoSocket private Session session; @OnWebSocketMessage - public void onBinary(byte buf[], int offset, int len) + public void onBinary(byte buf[], int offset, int len) throws IOException { LOG.debug("onBinary(byte[{}],{},{})",buf.length,offset,len); // echo the message back. ByteBuffer data = ByteBuffer.wrap(buf,offset,len); - this.session.getRemote().sendBytes(data,null); + RemoteEndpoint remote = this.session.getRemote(); + remote.sendBytes(data, null); + if (remote.isBatching()) + remote.flush(); } @OnWebSocketConnect @@ -54,11 +59,14 @@ public class EchoSocket } @OnWebSocketMessage - public void onText(String message) + public void onText(String message) throws IOException { LOG.debug("onText({})",message); // echo the message back. - this.session.getRemote().sendString(message,null); + RemoteEndpoint remote = session.getRemote(); + remote.sendString(message, null); + if (remote.isBatching()) + remote.flush(); } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java index e0c13fee2f5..a172f7ae237 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/RFCSocket.java @@ -18,10 +18,12 @@ package org.eclipse.jetty.websocket.server.helper; +import java.io.IOException; import java.nio.ByteBuffer; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; @@ -35,13 +37,16 @@ public class RFCSocket private Session session; @OnWebSocketMessage - public void onBinary(byte buf[], int offset, int len) + public void onBinary(byte buf[], int offset, int len) throws IOException { LOG.debug("onBinary(byte[{}],{},{})",buf.length,offset,len); // echo the message back. ByteBuffer data = ByteBuffer.wrap(buf,offset,len); - this.session.getRemote().sendBytes(data,null); + RemoteEndpoint remote = session.getRemote(); + remote.sendBytes(data, null); + if (remote.isBatching()) + remote.flush(); } @OnWebSocketConnect @@ -51,7 +56,7 @@ public class RFCSocket } @OnWebSocketMessage - public void onText(String message) + public void onText(String message) throws IOException { LOG.debug("onText({})",message); // Test the RFC 6455 close code 1011 that should close @@ -62,6 +67,9 @@ public class RFCSocket } // echo the message back. - this.session.getRemote().sendString(message,null); + RemoteEndpoint remote = session.getRemote(); + remote.sendString(message, null); + if (remote.isBatching()) + remote.flush(); } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java index 129963b33f9..c4b3b678235 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/SessionSocket.java @@ -18,11 +18,13 @@ package org.eclipse.jetty.websocket.server.helper; +import java.io.IOException; import java.util.List; import java.util.Map; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; @@ -61,7 +63,7 @@ public class SessionSocket if (values == null) { - session.getRemote().sendString("",null); + sendString(""); return; } @@ -78,21 +80,22 @@ public class SessionSocket delim = true; } valueStr.append(']'); - session.getRemote().sendString(valueStr.toString(),null); + System.err.println("valueStr = " + valueStr); + sendString(valueStr.toString()); return; } if ("session.isSecure".equals(message)) { String issecure = String.format("session.isSecure=%b",session.isSecure()); - session.getRemote().sendString(issecure,null); + sendString(issecure); return; } if ("session.upgradeRequest.requestURI".equals(message)) { String response = String.format("session.upgradeRequest.requestURI=%s",session.getUpgradeRequest().getRequestURI().toASCIIString()); - session.getRemote().sendString(response,null); + sendString(response); return; } @@ -103,11 +106,19 @@ public class SessionSocket } // echo the message back. - this.session.getRemote().sendString(message,null); + sendString(message); } catch (Throwable t) { LOG.warn(t); } } + + protected void sendString(String text) throws IOException + { + RemoteEndpoint remote = session.getRemote(); + remote.sendString(text, null); + if (remote.isBatching()) + remote.flush(); + } }