From 8e5c06b95ca000b915755362b7224168e48d0893 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 17 Feb 2014 16:57:50 +0100 Subject: [PATCH] 428232 - Rework batch mode / buffering in websocket. Refactored OutgoingFrames.outgoingFrame() to take an additional parameter, FlushMode. This is in preparation for handling this new parameter in FrameFlusher. --- .../websocket/jsr356/AbstractJsrRemote.java | 45 +++---- .../jetty/websocket/jsr356/JsrSession.java | 15 ++- .../websocket/jsr356/JsrSessionTest.java | 8 +- .../jsr356/samples/DummyConnection.java | 2 +- .../jsr356/server/DummyConnection.java | 2 +- .../jsr356/server/OnPartialTest.java | 9 +- .../eclipse/jetty/websocket/api/Session.java | 5 + .../api/extensions/OutgoingFrames.java | 44 +++++-- .../client/io/WebSocketClientConnection.java | 17 +-- .../common/WebSocketRemoteEndpoint.java | 13 +- .../websocket/common/WebSocketSession.java | 17 ++- .../common/extensions/AbstractExtension.java | 4 +- .../common/extensions/ExtensionStack.java | 10 +- .../compress/CompressExtension.java | 26 ++-- .../compress/PerMessageDeflateExtension.java | 4 +- .../fragment/FragmentExtension.java | 12 +- .../identity/IdentityExtension.java | 4 +- .../io/AbstractWebSocketConnection.java | 12 +- .../websocket/common/io/FrameFlusher.java | 10 +- .../jetty/websocket/common/io/FramePipes.java | 4 +- .../common/message/MessageOutputStream.java | 2 +- .../common/message/MessageWriter.java | 2 +- .../common/WebSocketRemoteEndpointTest.java | 4 +- .../extensions/DummyOutgoingFrames.java | 2 +- .../extensions/FragmentExtensionTest.java | 95 ++++++++------- .../extensions/IdentityExtensionTest.java | 43 +++---- .../compress/CapturedHexPayloads.java | 2 +- .../compress/DeflateFrameExtensionTest.java | 115 +++++++++--------- .../PerMessageDeflateExtensionTest.java | 99 +++++++-------- .../common/io/LocalWebSocketConnection.java | 2 +- .../common/io/LocalWebSocketSession.java | 3 +- .../common/test/BlockheadClient.java | 10 +- .../common/test/BlockheadServer.java | 9 +- .../common/test/OutgoingFramesCapture.java | 7 +- .../test/OutgoingNetworkBytesCapture.java | 7 +- 35 files changed, 358 insertions(+), 307 deletions(-) diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java index a1660158dd4..2bdb77a1981 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/AbstractJsrRemote.java @@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.jsr356; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.Future; - import javax.websocket.EncodeException; import javax.websocket.Encoder; import javax.websocket.RemoteEndpoint; @@ -30,6 +29,7 @@ import javax.websocket.SendHandler; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.common.BlockingWriteCallback; import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint; import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; import org.eclipse.jetty.websocket.common.message.MessageOutputStream; @@ -80,24 +80,31 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint @Override public void flushBatch() throws IOException { - // TODO Auto-generated method stub + BlockingWriteCallback callback = new BlockingWriteCallback(); + jettyRemote.sendBytes(BufferUtil.EMPTY_BUFFER, callback); + callback.block(); } @Override public boolean getBatchingAllowed() { - // TODO Auto-generated method stub - return false; + return session.isBatching(); + } + + @Override + public void setBatchingAllowed(boolean allowed) throws IOException + { + session.setBatching(allowed); } @SuppressWarnings( - { "rawtypes", "unchecked" }) + {"rawtypes", "unchecked"}) public Future sendObjectViaFuture(Object data) { assertMessageNotNull(data); if (LOG.isDebugEnabled()) { - LOG.debug("sendObject({})",data); + LOG.debug("sendObject({})", data); } Encoder encoder = encoders.getEncoderFor(data.getClass()); @@ -108,15 +115,15 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint if (encoder instanceof Encoder.Text) { - Encoder.Text etxt = (Encoder.Text)encoder; + Encoder.Text text = (Encoder.Text)encoder; try { - String msg = etxt.encode(data); + String msg = text.encode(data); return jettyRemote.sendStringByFuture(msg); } catch (EncodeException e) { - return new EncodeFailedFuture(data,etxt,Encoder.Text.class,e); + return new EncodeFailedFuture(data, text, Encoder.Text.class, e); } } else if (encoder instanceof Encoder.TextStream) @@ -126,12 +133,12 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint try (MessageWriter writer = new MessageWriter(session)) { writer.setCallback(callback); - etxt.encode(data,writer); + etxt.encode(data, writer); return callback; } catch (EncodeException | IOException e) { - return new EncodeFailedFuture(data,etxt,Encoder.Text.class,e); + return new EncodeFailedFuture(data, etxt, Encoder.Text.class, e); } } else if (encoder instanceof Encoder.Binary) @@ -144,7 +151,7 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint } catch (EncodeException e) { - return new EncodeFailedFuture(data,ebin,Encoder.Binary.class,e); + return new EncodeFailedFuture(data, ebin, Encoder.Binary.class, e); } } else if (encoder instanceof Encoder.BinaryStream) @@ -154,12 +161,12 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint try (MessageOutputStream out = new MessageOutputStream(session)) { out.setCallback(callback); - ebin.encode(data,out); + ebin.encode(data, out); return callback; } catch (EncodeException | IOException e) { - return new EncodeFailedFuture(data,ebin,Encoder.Binary.class,e); + return new EncodeFailedFuture(data, ebin, Encoder.Binary.class, e); } } @@ -171,7 +178,7 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint { if (LOG.isDebugEnabled()) { - LOG.debug("sendPing({})",BufferUtil.toDetailString(data)); + LOG.debug("sendPing({})", BufferUtil.toDetailString(data)); } jettyRemote.sendPing(data); } @@ -181,14 +188,8 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint { if (LOG.isDebugEnabled()) { - LOG.debug("sendPong({})",BufferUtil.toDetailString(data)); + LOG.debug("sendPong({})", BufferUtil.toDetailString(data)); } jettyRemote.sendPong(data); } - - @Override - public void setBatchingAllowed(boolean allowed) throws IOException - { - // TODO Auto-generated method stub - } } diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java index 02faf20a48c..e7776d19b44 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/JsrSession.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; - import javax.websocket.CloseReason; import javax.websocket.EndpointConfig; import javax.websocket.Extension; @@ -72,8 +71,9 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess private Map pathParameters = new HashMap<>(); private JsrAsyncRemote asyncRemote; private JsrBasicRemote basicRemote; + private volatile boolean batching; - public JsrSession(URI requestURI, EventDriver websocket, LogicalConnection connection, ClientContainer container, String id, SessionListener[] sessionListeners) + public JsrSession(URI requestURI, EventDriver websocket, LogicalConnection connection, ClientContainer container, String id, SessionListener... sessionListeners) { super(requestURI,websocket,connection,sessionListeners); if (!(websocket instanceof AbstractJsrEventDriver)) @@ -374,4 +374,15 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess messageHandlerSet.add(wrapper.getHandler()); } } + + @Override + public boolean isBatching() + { + return batching; + } + + public void setBatching(boolean batching) + { + this.batching = batching; + } } diff --git a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JsrSessionTest.java b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JsrSessionTest.java index 9d4c5d7e40d..1de304bd229 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JsrSessionTest.java +++ b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/JsrSessionTest.java @@ -18,17 +18,13 @@ package org.eclipse.jetty.websocket.jsr356; -import static org.hamcrest.Matchers.instanceOf; - import java.net.URI; import java.nio.ByteBuffer; - import javax.websocket.ClientEndpointConfig; import javax.websocket.DeploymentException; import javax.websocket.MessageHandler; import org.eclipse.jetty.websocket.api.WebSocketPolicy; -import org.eclipse.jetty.websocket.common.SessionListener; import org.eclipse.jetty.websocket.common.events.EventDriver; import org.eclipse.jetty.websocket.jsr356.client.EmptyClientEndpointConfig; import org.eclipse.jetty.websocket.jsr356.client.SimpleEndpointMetadata; @@ -44,6 +40,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.hamcrest.Matchers.instanceOf; + public class JsrSessionTest { private ClientContainer container; @@ -65,7 +63,7 @@ public class JsrSessionTest EventDriver driver = new JsrEndpointEventDriver(policy,ei); DummyConnection connection = new DummyConnection(); - session = new JsrSession(requestURI,driver,connection,container,id,new SessionListener[0]); + session = new JsrSession(requestURI,driver,connection,container,id); } @Test diff --git a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/samples/DummyConnection.java b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/samples/DummyConnection.java index 27dae9f3544..ee1e6440181 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/samples/DummyConnection.java +++ b/jetty-websocket/javax-websocket-client-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/samples/DummyConnection.java @@ -123,7 +123,7 @@ public class DummyConnection implements LogicalConnection } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { } diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/DummyConnection.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/DummyConnection.java index c2c3e6e2a60..8941c006ddd 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/DummyConnection.java +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/DummyConnection.java @@ -126,7 +126,7 @@ public class DummyConnection implements LogicalConnection } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { callback.writeSuccess(); } diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/OnPartialTest.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/OnPartialTest.java index cf997370d57..990cf71b17b 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/OnPartialTest.java +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/OnPartialTest.java @@ -18,17 +18,13 @@ package org.eclipse.jetty.websocket.jsr356.server; -import static org.hamcrest.Matchers.*; - import java.net.URI; import java.util.ArrayList; import java.util.List; - import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpointConfig; import org.eclipse.jetty.websocket.api.WebSocketPolicy; -import org.eclipse.jetty.websocket.common.SessionListener; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.events.EventDriver; import org.eclipse.jetty.websocket.common.events.EventDriverFactory; @@ -45,6 +41,9 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + public class OnPartialTest { @Rule @@ -80,7 +79,7 @@ public class OnPartialTest DummyConnection connection = new DummyConnection(); ClientContainer container = new ClientContainer(); @SuppressWarnings("resource") - JsrSession session = new JsrSession(requestURI,driver,connection,container,id,new SessionListener[0]); + JsrSession session = new JsrSession(requestURI,driver,connection,container,id); session.setPolicy(policy); session.open(); return driver; diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java index b4ccd28123f..be7e03f1485 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java @@ -175,4 +175,9 @@ public interface Session extends Closeable * @return the suspend token suitable for resuming the reading of data on the connection. */ SuspendToken suspend(); + + /** + * @return true if this session is batching network data, false if it flushes it immediately. + */ + boolean isBatching(); } diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java index d0f1fe869b7..6cf9c70150e 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/extensions/OutgoingFrames.java @@ -21,21 +21,41 @@ package org.eclipse.jetty.websocket.api.extensions; import org.eclipse.jetty.websocket.api.WriteCallback; /** - * Interface for dealing with frames outgoing to the network (eventually) + * Interface for dealing with frames outgoing to (eventually) the network layer. */ public interface OutgoingFrames { /** - * A frame, and optional callback, intended for the network. - *

- * Note: the frame can undergo many transformations in the various layers and extensions present in the implementation. - *

- * If you are implementing a mutation, you are obliged to handle the incoming WriteCallback appropriately. - * - * @param frame - * the frame to eventually write to the network. - * @param callback - * the optional callback to use for success/failure of the network write operation. Can be null. + * A frame, and optional callback, intended for the network layer. + *

+ * Note: the frame can undergo many transformations in the various + * layers and extensions present in the implementation. + *

+ * If you are implementing a mutation, you are obliged to handle + * the incoming WriteCallback appropriately. + * + * @param frame the frame to eventually write to the network layer. + * @param callback the callback to notify when the frame is written. + * @param flushMode the flush mode required by the sender. */ - void outgoingFrame(Frame frame, WriteCallback callback); + void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode); + + /** + * The possible flush modes when invoking {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)}. + */ + public enum FlushMode + { + /** + * Implementers of {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)} + * are free to decide whether to flush or not the given frame + * to the network layer. + */ + AUTO, + + /** + * Implementers of {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)} + * must flush the given frame to the network layer. + */ + FLUSH + } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java index 1acee95e41f..83be9192180 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.websocket.api.ProtocolException; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; @@ -96,26 +95,16 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection } /** - * Overrride to set masker + * Override to set the masker. */ @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { if (frame instanceof WebSocketFrame) { - if (masker == null) - { - ProtocolException ex = new ProtocolException("Must set a Masker"); - LOG.warn(ex); - if (callback != null) - { - callback.writeFailed(ex); - } - return; - } masker.setMask((WebSocketFrame)frame); } - super.outgoingFrame(frame,callback); + super.outgoingFrame(frame,callback, flushMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index 48ee92eb81b..4e0a57fc851 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -52,7 +52,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint STREAMING, PARTIAL_TEXT, PARTIAL_BINARY - }; + } private static final WriteCallback NOOP_CALLBACK = new WriteCallback() { @@ -284,17 +284,16 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint } } - /* ------------------------------------------------------------ */ - /** unchecked send - * @param frame - * @param callback - */ public void uncheckedSendFrame(WebSocketFrame frame, WriteCallback callback) { try { connection.getIOState().assertOutputOpen(); - outgoing.outgoingFrame(frame,callback); + OutgoingFrames.FlushMode flushMode = OutgoingFrames.FlushMode.FLUSH; + WebSocketSession session = connection.getSession(); + if (session != null && session.isBatching()) + flushMode = OutgoingFrames.FlushMode.AUTO; + outgoing.outgoingFrame(frame,callback,flushMode); } catch (IOException e) { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 60fc1969a3d..889376bcca3 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -57,8 +57,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc private final URI requestURI; private final EventDriver websocket; private final LogicalConnection connection; - private final Executor executor; + private final boolean batching; private final SessionListener[] sessionListeners; + private final Executor executor; private ExtensionFactory extensionFactory; private String protocolVersion; private Map parameterMap = new HashMap<>(); @@ -69,7 +70,12 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc private UpgradeRequest upgradeRequest; private UpgradeResponse upgradeResponse; - public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener[] sessionListeners) + public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener... sessionListeners) + { + this(requestURI, websocket, connection, true, sessionListeners); + } + + public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, boolean batching, SessionListener... sessionListeners) { if (requestURI == null) { @@ -79,6 +85,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc this.requestURI = requestURI; this.websocket = websocket; this.connection = connection; + this.batching = batching; this.sessionListeners = sessionListeners; this.executor = connection.getExecutor(); this.outgoingHandler = connection; @@ -471,6 +478,12 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc return connection; } + @Override + public boolean isBatching() + { + return batching; + } + @Override public String toString() { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java index 6e1f1e92838..d7ce31bcdb9 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java @@ -162,10 +162,10 @@ public abstract class AbstractExtension extends ContainerLifeCycle implements Ex this.nextIncoming.incomingFrame(frame); } - protected void nextOutgoingFrame(Frame frame, WriteCallback callback) + protected void nextOutgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { log.debug("nextOutgoingFrame({})",frame); - this.nextOutgoing.outgoingFrame(frame,callback); + this.nextOutgoing.outgoingFrame(frame,callback,flushMode); } public void setBufferPool(ByteBufferPool bufferPool) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java index 55aea29974e..6e778729c3e 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java @@ -273,9 +273,9 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { - FrameEntry entry = new FrameEntry(frame, callback); + FrameEntry entry = new FrameEntry(frame, callback, flushMode); LOG.debug("Queuing {}", entry); entries.offer(entry); flusher.iterate(); @@ -344,11 +344,13 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames { private final Frame frame; private final WriteCallback callback; + private final FlushMode flushMode; - private FrameEntry(Frame frame, WriteCallback callback) + private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode) { this.frame = frame; this.callback = callback; + this.flushMode = flushMode; } @Override @@ -369,7 +371,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames LOG.debug("Processing {}", current); if (current == null) return Action.IDLE; - nextOutgoing.outgoingFrame(current.frame, this); + nextOutgoing.outgoingFrame(current.frame, this, current.flushMode); return Action.SCHEDULED; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index dc491f7fffd..1049e71d45b 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -138,7 +138,7 @@ public abstract class CompressExtension extends AbstractExtension } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { // We use a queue and an IteratingCallback to handle concurrency. // We must compress and write atomically, otherwise the compression @@ -150,7 +150,7 @@ public abstract class CompressExtension extends AbstractExtension return; } - FrameEntry entry = new FrameEntry(frame, callback); + FrameEntry entry = new FrameEntry(frame, callback, flushMode); LOG.debug("Queuing {}", entry); entries.offer(entry); flusher.iterate(); @@ -166,11 +166,13 @@ public abstract class CompressExtension extends AbstractExtension { private final Frame frame; private final WriteCallback callback; + private final FlushMode flushMode; - private FrameEntry(Frame frame, WriteCallback callback) + private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode) { this.frame = frame; this.callback = callback; + this.flushMode = flushMode; } @Override @@ -199,7 +201,7 @@ public abstract class CompressExtension extends AbstractExtension } else { - compress(current.frame, false); + compress(current, false); } return Action.SCHEDULED; } @@ -207,31 +209,33 @@ public abstract class CompressExtension extends AbstractExtension private void deflate(FrameEntry entry) { Frame frame = entry.frame; + FlushMode flushMode = entry.flushMode; if (OpCode.isControlFrame(frame.getOpCode())) { // Skip, cannot compress control frames. - nextOutgoingFrame(frame, this); + nextOutgoingFrame(frame, this, flushMode); return; } if (!frame.hasPayload()) { // Pass through, nothing to do - nextOutgoingFrame(frame, this); + nextOutgoingFrame(frame, this, flushMode); return; } - compress(frame, true); + compress(entry, true); } - private void compress(Frame frame, boolean first) + private void compress(FrameEntry entry, boolean first) { // Get a chunk of the payload to avoid to blow // the heap if the payload is a huge mapped file. + Frame frame = entry.frame; ByteBuffer data = frame.getPayload(); int remaining = data.remaining(); int inputLength = Math.min(remaining, 32 * 1024); - LOG.debug("Compressing {}: {} bytes in {} bytes chunk", frame, remaining, inputLength); + LOG.debug("Compressing {}: {} bytes in {} bytes chunk", entry, remaining, inputLength); // Avoid to copy the bytes if the ByteBuffer // is backed by an array. @@ -280,7 +284,7 @@ public abstract class CompressExtension extends AbstractExtension // Skip the last tail bytes bytes generated by SYNC_FLUSH. payload = ByteBuffer.wrap(output, 0, outputLength - TAIL_BYTES.length); - LOG.debug("Compressed {}: {}->{} chunk bytes", frame, inputLength, outputLength); + LOG.debug("Compressed {}: {}->{} chunk bytes", entry, inputLength, outputLength); boolean continuation = frame.getType().isContinuation() || !first; DataFrame chunk = new DataFrame(frame, continuation); @@ -289,7 +293,7 @@ public abstract class CompressExtension extends AbstractExtension boolean fin = frame.isFin() && finished; chunk.setFin(fin); - nextOutgoingFrame(chunk, this); + nextOutgoingFrame(chunk, this, entry.flushMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java index a72537bfb88..3c66249aa2b 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java @@ -92,14 +92,14 @@ public class PerMessageDeflateExtension extends CompressExtension } @Override - protected void nextOutgoingFrame(Frame frame, WriteCallback callback) + protected void nextOutgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { if (frame.isFin() && !outgoingContextTakeover) { LOG.debug("Outgoing Context Reset"); getDeflater().reset(); } - super.nextOutgoingFrame(frame, callback); + super.nextOutgoingFrame(frame, callback, flushMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java index 6184c8419cb..8f6c6c7dc13 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java @@ -57,17 +57,17 @@ public class FragmentExtension extends AbstractExtension } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { ByteBuffer payload = frame.getPayload(); int length = payload != null ? payload.remaining() : 0; if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength) { - nextOutgoingFrame(frame, callback); + nextOutgoingFrame(frame, callback, flushMode); return; } - FrameEntry entry = new FrameEntry(frame, callback); + FrameEntry entry = new FrameEntry(frame, callback, flushMode); LOG.debug("Queuing {}", entry); entries.offer(entry); flusher.iterate(); @@ -84,11 +84,13 @@ public class FragmentExtension extends AbstractExtension { private final Frame frame; private final WriteCallback callback; + private final FlushMode flushMode; - private FrameEntry(Frame frame, WriteCallback callback) + private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode) { this.frame = frame; this.callback = callback; + this.flushMode = flushMode; } @Override @@ -143,7 +145,7 @@ public class FragmentExtension extends AbstractExtension LOG.debug("Fragmented {}->{}", frame, fragment); payload.position(newLimit); - nextOutgoingFrame(fragment, this); + nextOutgoingFrame(fragment, this, entry.flushMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/identity/IdentityExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/identity/IdentityExtension.java index 8b7df98e2c7..fb5c97175dd 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/identity/IdentityExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/identity/IdentityExtension.java @@ -56,10 +56,10 @@ public class IdentityExtension extends AbstractExtension } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { // pass through - nextOutgoingFrame(frame,callback); + nextOutgoingFrame(frame,callback, flushMode); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index bdb0dd3e4a8..771f2a64bb1 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; @@ -55,7 +56,8 @@ import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener; /** - * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link Connection} framework of jetty-io + * Provides the implementation of {@link LogicalConnection} within the + * framework of the new {@link Connection} framework of {@code jetty-io}. */ public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener { @@ -379,7 +381,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { // Fire out a close frame, indicating abnormal shutdown, then disconnect CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason()); - outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback()); + outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(),FlushMode.FLUSH); } else { @@ -390,7 +392,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp case CLOSING: CloseInfo close = ioState.getCloseInfo(); // append close frame - outgoingFrame(close.asFrame(),new OnDisconnectCallback()); + outgoingFrame(close.asFrame(),new OnDisconnectCallback(),FlushMode.FLUSH); default: break; } @@ -463,14 +465,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp * Frame from API, User, or Internal implementation destined for network. */ @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { if (LOG.isDebugEnabled()) { LOG.debug("outgoingFrame({}, {})",frame,callback); } - flusher.enqueue(frame,callback); + flusher.enqueue(frame,callback,flushMode); } private int read(ByteBuffer buffer) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index 73fde819220..57bbe646760 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -28,11 +28,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.ArrayQueue; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.Generator; import org.eclipse.jetty.websocket.common.OpCode; @@ -131,10 +133,10 @@ public class FrameFlusher } } - public void enqueue(Frame frame, WriteCallback callback) + public void enqueue(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode) { Objects.requireNonNull(frame); - FrameEntry entry = new FrameEntry(frame,callback); + FrameEntry entry = new FrameEntry(frame,callback,flushMode); LOG.debug("enqueue({})",entry); Throwable failure=null; synchronized (lock) @@ -292,14 +294,16 @@ public class FrameFlusher { protected final AtomicBoolean failed = new AtomicBoolean(false); protected final Frame frame; + private final OutgoingFrames.FlushMode flushMode; protected final WriteCallback callback; /** holds reference to header ByteBuffer, as it needs to be released on success/failure */ private ByteBuffer headerBuffer; - public FrameEntry(Frame frame, WriteCallback callback) + public FrameEntry(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode) { this.frame = frame; this.callback = callback; + this.flushMode = flushMode; } public ByteBuffer getHeaderBytes() diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java index 47771970eae..e4b29d5cb5c 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FramePipes.java @@ -43,7 +43,7 @@ public class FramePipes @Override public void incomingFrame(Frame frame) { - this.outgoing.outgoingFrame(frame,null); + this.outgoing.outgoingFrame(frame,null,OutgoingFrames.FlushMode.FLUSH); } } @@ -57,7 +57,7 @@ public class FramePipes } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { try { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index f01d759865d..8e2b97f5f0a 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -137,7 +137,7 @@ public class MessageOutputStream extends OutputStream try { - outgoing.outgoingFrame(frame,blocker); + outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.FLUSH); // block on write blocker.block(); // block success diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index a077dfef685..01d9748d68a 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -118,7 +118,7 @@ public class MessageWriter extends Writer try { - outgoing.outgoingFrame(frame,blocker); + outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.FLUSH); // block on write blocker.block(); // write success diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java index a78b7e68703..86e14a61f9a 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common; -import static org.hamcrest.Matchers.containsString; - import java.io.IOException; import java.nio.ByteBuffer; @@ -32,6 +30,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import static org.hamcrest.Matchers.containsString; + public class WebSocketRemoteEndpointTest { @Rule diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/DummyOutgoingFrames.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/DummyOutgoingFrames.java index a42f32ebef4..f24a5aec487 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/DummyOutgoingFrames.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/DummyOutgoingFrames.java @@ -44,7 +44,7 @@ public class DummyOutgoingFrames implements OutgoingFrames } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { LOG.debug("outgoingFrame({},{})",frame,callback); if (callback != null) diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java index b022c38e4aa..1f0e7aa942e 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common.extensions; -import static org.hamcrest.Matchers.*; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -32,6 +30,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension; @@ -46,10 +45,12 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import static org.hamcrest.Matchers.is; + public class FragmentExtensionTest { @Rule - public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test",new MappedByteBufferPool()); + public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test", new MappedByteBufferPool()); /** * Verify that incoming frames are passed thru without modification @@ -82,7 +83,7 @@ public class FragmentExtensionTest int len = quote.size(); capture.assertFrameCount(len); - capture.assertHasFrame(OpCode.TEXT,len); + capture.assertHasFrame(OpCode.TEXT, len); String prefix; int i = 0; @@ -90,15 +91,15 @@ public class FragmentExtensionTest { prefix = "Frame[" + i + "]"; - Assert.assertThat(prefix + ".opcode",actual.getOpCode(),is(OpCode.TEXT)); - Assert.assertThat(prefix + ".fin",actual.isFin(),is(true)); - Assert.assertThat(prefix + ".rsv1",actual.isRsv1(),is(false)); - Assert.assertThat(prefix + ".rsv2",actual.isRsv2(),is(false)); - Assert.assertThat(prefix + ".rsv3",actual.isRsv3(),is(false)); + Assert.assertThat(prefix + ".opcode", actual.getOpCode(), is(OpCode.TEXT)); + Assert.assertThat(prefix + ".fin", actual.isFin(), is(true)); + Assert.assertThat(prefix + ".rsv1", actual.isRsv1(), is(false)); + Assert.assertThat(prefix + ".rsv2", actual.isRsv2(), is(false)); + Assert.assertThat(prefix + ".rsv3", actual.isRsv3(), is(false)); - ByteBuffer expected = BufferUtil.toBuffer(quote.get(i),StandardCharsets.UTF_8); - Assert.assertThat(prefix + ".payloadLength",actual.getPayloadLength(),is(expected.remaining())); - ByteBufferAssert.assertEquals(prefix + ".payload",expected,actual.getPayload().slice()); + ByteBuffer expected = BufferUtil.toBuffer(quote.get(i), StandardCharsets.UTF_8); + Assert.assertThat(prefix + ".payloadLength", actual.getPayloadLength(), is(expected.remaining())); + ByteBufferAssert.assertEquals(prefix + ".payload", expected, actual.getPayload().slice()); i++; } } @@ -124,18 +125,18 @@ public class FragmentExtensionTest ext.incomingFrame(ping); capture.assertFrameCount(1); - capture.assertHasFrame(OpCode.PING,1); + capture.assertHasFrame(OpCode.PING, 1); WebSocketFrame actual = capture.getFrames().poll(); - Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.PING)); - Assert.assertThat("Frame.fin",actual.isFin(),is(true)); - Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false)); - Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false)); - Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false)); + Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.PING)); + Assert.assertThat("Frame.fin", actual.isFin(), is(true)); + Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false)); + Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false)); + Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false)); - ByteBuffer expected = BufferUtil.toBuffer(payload,StandardCharsets.UTF_8); - Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining())); - ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice()); + ByteBuffer expected = BufferUtil.toBuffer(payload, StandardCharsets.UTF_8); + Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining())); + ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice()); } /** @@ -164,7 +165,7 @@ public class FragmentExtensionTest for (String section : quote) { Frame frame = new TextFrame().setPayload(section); - ext.outgoingFrame(frame,null); + ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); } // Expected Frames @@ -195,18 +196,18 @@ public class FragmentExtensionTest // System.out.printf("expect: %s%n",expectedFrame); // Validate Frame - Assert.assertThat(prefix + ".opcode",actualFrame.getOpCode(),is(expectedFrame.getOpCode())); - Assert.assertThat(prefix + ".fin",actualFrame.isFin(),is(expectedFrame.isFin())); - Assert.assertThat(prefix + ".rsv1",actualFrame.isRsv1(),is(expectedFrame.isRsv1())); - Assert.assertThat(prefix + ".rsv2",actualFrame.isRsv2(),is(expectedFrame.isRsv2())); - Assert.assertThat(prefix + ".rsv3",actualFrame.isRsv3(),is(expectedFrame.isRsv3())); + Assert.assertThat(prefix + ".opcode", actualFrame.getOpCode(), is(expectedFrame.getOpCode())); + Assert.assertThat(prefix + ".fin", actualFrame.isFin(), is(expectedFrame.isFin())); + Assert.assertThat(prefix + ".rsv1", actualFrame.isRsv1(), is(expectedFrame.isRsv1())); + Assert.assertThat(prefix + ".rsv2", actualFrame.isRsv2(), is(expectedFrame.isRsv2())); + Assert.assertThat(prefix + ".rsv3", actualFrame.isRsv3(), is(expectedFrame.isRsv3())); // Validate Payload ByteBuffer expectedData = expectedFrame.getPayload().slice(); ByteBuffer actualData = actualFrame.getPayload().slice(); - Assert.assertThat(prefix + ".payloadLength",actualData.remaining(),is(expectedData.remaining())); - ByteBufferAssert.assertEquals(prefix + ".payload",expectedData,actualData); + Assert.assertThat(prefix + ".payloadLength", actualData.remaining(), is(expectedData.remaining())); + ByteBufferAssert.assertEquals(prefix + ".payload", expectedData, actualData); } } @@ -236,7 +237,7 @@ public class FragmentExtensionTest for (String section : quote) { Frame frame = new TextFrame().setPayload(section); - ext.outgoingFrame(frame,null); + ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); } // Expected Frames @@ -259,18 +260,18 @@ public class FragmentExtensionTest WebSocketFrame expectedFrame = expectedFrames.get(i); // Validate Frame - Assert.assertThat(prefix + ".opcode",actualFrame.getOpCode(),is(expectedFrame.getOpCode())); - Assert.assertThat(prefix + ".fin",actualFrame.isFin(),is(expectedFrame.isFin())); - Assert.assertThat(prefix + ".rsv1",actualFrame.isRsv1(),is(expectedFrame.isRsv1())); - Assert.assertThat(prefix + ".rsv2",actualFrame.isRsv2(),is(expectedFrame.isRsv2())); - Assert.assertThat(prefix + ".rsv3",actualFrame.isRsv3(),is(expectedFrame.isRsv3())); + Assert.assertThat(prefix + ".opcode", actualFrame.getOpCode(), is(expectedFrame.getOpCode())); + Assert.assertThat(prefix + ".fin", actualFrame.isFin(), is(expectedFrame.isFin())); + Assert.assertThat(prefix + ".rsv1", actualFrame.isRsv1(), is(expectedFrame.isRsv1())); + Assert.assertThat(prefix + ".rsv2", actualFrame.isRsv2(), is(expectedFrame.isRsv2())); + Assert.assertThat(prefix + ".rsv3", actualFrame.isRsv3(), is(expectedFrame.isRsv3())); // Validate Payload ByteBuffer expectedData = expectedFrame.getPayload().slice(); ByteBuffer actualData = actualFrame.getPayload().slice(); - Assert.assertThat(prefix + ".payloadLength",actualData.remaining(),is(expectedData.remaining())); - ByteBufferAssert.assertEquals(prefix + ".payload",expectedData,actualData); + Assert.assertThat(prefix + ".payloadLength", actualData.remaining(), is(expectedData.remaining())); + ByteBufferAssert.assertEquals(prefix + ".payload", expectedData, actualData); } } @@ -293,21 +294,21 @@ public class FragmentExtensionTest String payload = "Are you there?"; Frame ping = new PingFrame().setPayload(payload); - ext.outgoingFrame(ping,null); + ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.FLUSH); capture.assertFrameCount(1); - capture.assertHasFrame(OpCode.PING,1); + capture.assertHasFrame(OpCode.PING, 1); WebSocketFrame actual = capture.getFrames().getFirst(); - Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.PING)); - Assert.assertThat("Frame.fin",actual.isFin(),is(true)); - Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false)); - Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false)); - Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false)); + Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.PING)); + Assert.assertThat("Frame.fin", actual.isFin(), is(true)); + Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false)); + Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false)); + Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false)); - ByteBuffer expected = BufferUtil.toBuffer(payload,StandardCharsets.UTF_8); - Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining())); - ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice()); + ByteBuffer expected = BufferUtil.toBuffer(payload, StandardCharsets.UTF_8); + Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining())); + ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice()); } } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java index 47052ce16a7..ed7a9c60433 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/IdentityExtensionTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common.extensions; -import static org.hamcrest.Matchers.is; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -27,6 +25,7 @@ import java.nio.charset.StandardCharsets; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.extensions.Extension; import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.extensions.identity.IdentityExtension; @@ -37,6 +36,8 @@ import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.Matchers.is; + public class IdentityExtensionTest { /** @@ -54,18 +55,18 @@ public class IdentityExtensionTest ext.incomingFrame(frame); capture.assertFrameCount(1); - capture.assertHasFrame(OpCode.TEXT,1); + capture.assertHasFrame(OpCode.TEXT, 1); WebSocketFrame actual = capture.getFrames().poll(); - Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.TEXT)); - Assert.assertThat("Frame.fin",actual.isFin(),is(true)); - Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false)); - Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false)); - Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false)); + Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.TEXT)); + Assert.assertThat("Frame.fin", actual.isFin(), is(true)); + Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false)); + Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false)); + Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false)); - ByteBuffer expected = BufferUtil.toBuffer("hello",StandardCharsets.UTF_8); - Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining())); - ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice()); + ByteBuffer expected = BufferUtil.toBuffer("hello", StandardCharsets.UTF_8); + Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining())); + ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice()); } /** @@ -80,21 +81,21 @@ public class IdentityExtensionTest ext.setNextOutgoingFrames(capture); Frame frame = new TextFrame().setPayload("hello"); - ext.outgoingFrame(frame,null); + ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); capture.assertFrameCount(1); - capture.assertHasFrame(OpCode.TEXT,1); + capture.assertHasFrame(OpCode.TEXT, 1); WebSocketFrame actual = capture.getFrames().getFirst(); - Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.TEXT)); - Assert.assertThat("Frame.fin",actual.isFin(),is(true)); - Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false)); - Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false)); - Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false)); + Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.TEXT)); + Assert.assertThat("Frame.fin", actual.isFin(), is(true)); + Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false)); + Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false)); + Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false)); - ByteBuffer expected = BufferUtil.toBuffer("hello",StandardCharsets.UTF_8); - Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining())); - ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice()); + ByteBuffer expected = BufferUtil.toBuffer("hello", StandardCharsets.UTF_8); + Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining())); + ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice()); } } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/CapturedHexPayloads.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/CapturedHexPayloads.java index 04d7d41f97a..a7f851d0365 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/CapturedHexPayloads.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/CapturedHexPayloads.java @@ -31,7 +31,7 @@ public class CapturedHexPayloads implements OutgoingFrames private List captured = new ArrayList<>(); @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { String hexPayload = Hex.asHex(frame.getPayload()); captured.add(hexPayload); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java index 02f312a12f0..93b8c93f5b4 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java @@ -63,8 +63,8 @@ import static org.hamcrest.Matchers.is; public class DeflateFrameExtensionTest extends AbstractExtensionTest { @Rule - public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test",new MappedByteBufferPool()); - + public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test", new MappedByteBufferPool()); + private void assertIncoming(byte[] raw, String... expectedTextDatas) { WebSocketPolicy policy = WebSocketPolicy.newClientPolicy(); @@ -90,21 +90,21 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest int len = expectedTextDatas.length; capture.assertFrameCount(len); - capture.assertHasFrame(OpCode.TEXT,len); + capture.assertHasFrame(OpCode.TEXT, len); - int i=0; - for (WebSocketFrame actual: capture.getFrames()) + int i = 0; + for (WebSocketFrame actual : capture.getFrames()) { String prefix = "Frame[" + i + "]"; - Assert.assertThat(prefix + ".opcode",actual.getOpCode(),is(OpCode.TEXT)); - Assert.assertThat(prefix + ".fin",actual.isFin(),is(true)); - Assert.assertThat(prefix + ".rsv1",actual.isRsv1(),is(false)); // RSV1 should be unset at this point - Assert.assertThat(prefix + ".rsv2",actual.isRsv2(),is(false)); - Assert.assertThat(prefix + ".rsv3",actual.isRsv3(),is(false)); + Assert.assertThat(prefix + ".opcode", actual.getOpCode(), is(OpCode.TEXT)); + Assert.assertThat(prefix + ".fin", actual.isFin(), is(true)); + Assert.assertThat(prefix + ".rsv1", actual.isRsv1(), is(false)); // RSV1 should be unset at this point + Assert.assertThat(prefix + ".rsv2", actual.isRsv2(), is(false)); + Assert.assertThat(prefix + ".rsv3", actual.isRsv3(), is(false)); - ByteBuffer expected = BufferUtil.toBuffer(expectedTextDatas[i],StandardCharsets.UTF_8); - Assert.assertThat(prefix + ".payloadLength",actual.getPayloadLength(),is(expected.remaining())); - ByteBufferAssert.assertEquals(prefix + ".payload",expected,actual.getPayload().slice()); + ByteBuffer expected = BufferUtil.toBuffer(expectedTextDatas[i], StandardCharsets.UTF_8); + Assert.assertThat(prefix + ".payloadLength", actual.getPayloadLength(), is(expected.remaining())); + ByteBufferAssert.assertEquals(prefix + ".payload", expected, actual.getPayload().slice()); i++; } } @@ -120,15 +120,14 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest ExtensionConfig config = ExtensionConfig.parse("deflate-frame"); ext.setConfig(config); - boolean validating = true; - Generator generator = new Generator(policy,bufferPool,validating); + Generator generator = new Generator(policy, bufferPool, true); generator.configureFromExtensions(Collections.singletonList(ext)); OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator); ext.setNextOutgoingFrames(capture); Frame frame = new TextFrame().setPayload(text); - ext.outgoingFrame(frame, null); + ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); capture.assertBytes(0, expectedHex); } @@ -143,9 +142,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest tester.parseIncomingHex(// Captured from Blockhead Client - "Hello" then "There" via unit test "c18700000000f248cdc9c90700", // "Hello" "c187000000000ac9482d4a0500" // "There" - ); + ); - tester.assertHasFrames("Hello","There"); + tester.assertHasFrames("Hello", "There"); } @Test @@ -157,7 +156,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest tester.parseIncomingHex(// Captured from Chrome 20.x - "Hello" (sent from browser) "c187832b5c11716391d84a2c5c" // "Hello" - ); + ); tester.assertHasFrames("Hello"); } @@ -172,9 +171,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest tester.parseIncomingHex(// Captured from Chrome 20.x - "Hello" then "There" (sent from browser) "c1877b1971db8951bc12b21e71", // "Hello" "c18759edc8f4532480d913e8c8" // There - ); + ); - tester.assertHasFrames("Hello","There"); + tester.assertHasFrames("Hello", "There"); } @Test @@ -186,7 +185,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest tester.parseIncomingHex(// Captured from Chrome 20.x - "info:" (sent from browser) "c187ca4def7f0081a4b47d4fef" // example payload - ); + ); tester.assertHasFrames("info:"); } @@ -201,9 +200,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest tester.parseIncomingHex(// Captured from Chrome 20.x - "time:" then "time:" once more (sent from browser) "c18782467424a88fb869374474", // "time:" "c1853cfda17f16fcb07f3c" // "time:" - ); + ); - tester.assertHasFrames("time:","time:"); + tester.assertHasFrames("time:", "time:"); } @Test @@ -217,9 +216,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest "c1876b100104" + "41d9cd49de1201", // "time:" "c1852ae3ff01" + "00e2ee012a", // "time:" "c18435558caa" + "37468caa" // "time:" - ); + ); - tester.assertHasFrames("time:","time:","time:"); + tester.assertHasFrames("time:", "time:", "time:"); } @Test @@ -227,7 +226,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest { // What pywebsocket produces for "time:", "time:", "time:" String expected[] = new String[] - { "2AC9CC4DB50200", "2A01110000", "02130000" }; + {"2AC9CC4DB50200", "2A01110000", "02130000"}; // Lets see what we produce CapturedHexPayloads capture = new CapturedHexPayloads(); @@ -235,13 +234,13 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest init(ext); ext.setNextOutgoingFrames(capture); - ext.outgoingFrame(new TextFrame().setPayload("time:"),null); - ext.outgoingFrame(new TextFrame().setPayload("time:"),null); - ext.outgoingFrame(new TextFrame().setPayload("time:"),null); + ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH); List actual = capture.getCaptured(); - - Assert.assertThat("Compressed Payloads",actual,contains(expected)); + + Assert.assertThat("Compressed Payloads", actual, contains(expected)); } private void init(DeflateFrameExtension ext) @@ -254,8 +253,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest public void testDeflateBasics() throws Exception { // Setup deflater basics - boolean nowrap = true; - Deflater compressor = new Deflater(Deflater.BEST_COMPRESSION,nowrap); + Deflater compressor = new Deflater(Deflater.BEST_COMPRESSION, true); compressor.setStrategy(Deflater.DEFAULT_STRATEGY); // Text to compress @@ -264,7 +262,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest // Prime the compressor compressor.reset(); - compressor.setInput(uncompressed,0,uncompressed.length); + compressor.setInput(uncompressed, 0, uncompressed.length); compressor.finish(); // Perform compression @@ -274,26 +272,24 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest while (!compressor.finished()) { byte out[] = new byte[64]; - int len = compressor.deflate(out,0,out.length,Deflater.SYNC_FLUSH); + int len = compressor.deflate(out, 0, out.length, Deflater.SYNC_FLUSH); if (len > 0) { - outbuf.put(out,0,len); + outbuf.put(out, 0, len); } } compressor.end(); - BufferUtil.flipToFlush(outbuf,0); - byte b0 = outbuf.get(0); - if ((b0 & 1) != 0) - { - outbuf.put(0,(b0 ^= 1)); - } + BufferUtil.flipToFlush(outbuf, 0); byte compressed[] = BufferUtil.toArray(outbuf); + // Clear the BFINAL bit that has been set by the compressor.end() call. + // In the real implementation we never end() the compressor. + compressed[0] &= 0xFE; String actual = TypeUtil.toHexString(compressed); String expected = "CaCc4bCbB70200"; // what pywebsocket produces - Assert.assertThat("Compressed data",actual,is(expected)); + Assert.assertThat("Compressed data", actual, is(expected)); } @Test @@ -306,17 +302,16 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest ext.setPolicy(policy); ext.setConfig(new ExtensionConfig(ext.getName())); - boolean validating = true; - Generator generator = new Generator(policy,bufferPool,validating); + Generator generator = new Generator(policy, bufferPool, true); generator.configureFromExtensions(Collections.singletonList(ext)); OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator); ext.setNextOutgoingFrames(capture); - ext.outgoingFrame(new TextFrame().setPayload("Hello"),null); - ext.outgoingFrame(new TextFrame().setPayload("There"),null); + ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, OutgoingFrames.FlushMode.FLUSH); + ext.outgoingFrame(new TextFrame().setPayload("There"), null, OutgoingFrames.FlushMode.FLUSH); - capture.assertBytes(0,"c107f248cdc9c90700"); + capture.assertBytes(0, "c107f248cdc9c90700"); } @Test @@ -328,15 +323,15 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest Inflater inflater = new Inflater(true); inflater.reset(); - inflater.setInput(rawbuf,0,rawbuf.length); + inflater.setInput(rawbuf, 0, rawbuf.length); byte outbuf[] = new byte[64]; int len = inflater.inflate(outbuf); inflater.end(); - Assert.assertThat("Inflated length",len,greaterThan(4)); + Assert.assertThat("Inflated length", len, greaterThan(4)); - String actual = StringUtil.toUTF8String(outbuf,0,len); - Assert.assertThat("Inflated text",actual,is("info:")); + String actual = StringUtil.toUTF8String(outbuf, 0, len); + Assert.assertThat("Inflated text", actual, is("info:")); } @Test @@ -344,7 +339,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest { // Captured from PyWebSocket - "Hello" (echo from server) byte rawbuf[] = TypeUtil.fromHexString("c107f248cdc9c90700"); - assertIncoming(rawbuf,"Hello"); + assertIncoming(rawbuf, "Hello"); } @Test @@ -353,7 +348,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest // Captured from PyWebSocket - Long Text (echo from server) byte rawbuf[] = TypeUtil.fromHexString("c1421cca410a80300c44d1abccce9df7" + "f018298634d05631138ab7b7b8fdef1f" + "dc0282e2061d575a45f6f2686bab25e1" + "3fb7296fa02b5885eb3b0379c394f461" + "98cafd03"); - assertIncoming(rawbuf,"It's a big enough umbrella but it's always me that ends up getting wet."); + assertIncoming(rawbuf, "It's a big enough umbrella but it's always me that ends up getting wet."); } @Test @@ -361,7 +356,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest { // Captured from PyWebSocket - "stackoverflow" (echo from server) byte rawbuf[] = TypeUtil.fromHexString("c10f2a2e494ccece2f4b2d4acbc92f0700"); - assertIncoming(rawbuf,"stackoverflow"); + assertIncoming(rawbuf, "stackoverflow"); } /** @@ -370,7 +365,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest @Test public void testServerGeneratedHello() throws IOException { - assertOutgoing("Hello","c107f248cdc9c90700"); + assertOutgoing("Hello", "c107f248cdc9c90700"); } /** @@ -379,7 +374,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest @Test public void testServerGeneratedThere() throws IOException { - assertOutgoing("There","c1070ac9482d4a0500"); + assertOutgoing("There", "c1070ac9482d4a0500"); } @Test @@ -403,7 +398,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest clientExtension.setNextOutgoingFrames(new OutgoingFrames() { @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { serverExtension.incomingFrame(frame); callback.writeSuccess(); @@ -435,7 +430,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest BinaryFrame frame = new BinaryFrame(); frame.setPayload(input); frame.setFin(true); - clientExtension.outgoingFrame(frame, null); + clientExtension.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH); Assert.assertArrayEquals(input, result.toByteArray()); } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java index 2d8700de1bb..f993260bf29 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtensionTest.java @@ -29,6 +29,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.extensions.AbstractExtensionTest; @@ -48,17 +49,17 @@ import static org.hamcrest.Matchers.is; /** * Client side behavioral tests for permessage-deflate extension. - *

+ *

* See: http://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-15 */ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest { @Rule - public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test",new MappedByteBufferPool()); - + public LeakTrackingBufferPool bufferPool = new LeakTrackingBufferPool("Test", new MappedByteBufferPool()); + /** * Decode payload example as seen in draft-ietf-hybi-permessage-compression-15. - *

+ *

* Section 8.2.3.4: Using a DEFLATE Block with BFINAL Set to 1 */ @Test @@ -71,14 +72,14 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest tester.parseIncomingHex(// 1 message "0xc1 0x08", // header "0xf3 0x48 0xcd 0xc9 0xc9 0x07 0x00 0x00" // example payload - ); + ); tester.assertHasFrames("Hello"); } /** * Decode payload example as seen in draft-ietf-hybi-permessage-compression-15. - *

+ *

* Section 8.2.3.3: Using a DEFLATE Block with No Compression */ @Test @@ -90,14 +91,14 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest tester.parseIncomingHex(// 1 message / no compression "0xc1 0x0b 0x00 0x05 0x00 0xfa 0xff 0x48 0x65 0x6c 0x6c 0x6f 0x00" // example frame - ); + ); tester.assertHasFrames("Hello"); } /** * Decode payload example as seen in draft-ietf-hybi-permessage-compression-15. - *

+ *

* Section 8.2.3.1: A message compressed using 1 compressed DEFLATE block */ @Test @@ -109,14 +110,14 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest tester.parseIncomingHex(//basic, 1 block, compressed with 0 compression level (aka, uncompressed). "0xc1 0x07 0xf2 0x48 0xcd 0xc9 0xc9 0x07 0x00" // example frame - ); + ); tester.assertHasFrames("Hello"); } /** * Decode payload example as seen in draft-ietf-hybi-permessage-compression-15. - *

+ *

* Section 8.2.3.1: A message compressed using 1 compressed DEFLATE block (with fragmentation) */ @Test @@ -139,7 +140,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest /** * Decode payload example as seen in draft-ietf-hybi-permessage-compression-15. - *

+ *

* Section 8.2.3.2: Sharing LZ77 Sliding Window */ @Test @@ -157,12 +158,12 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest "0xc1 0x07", // (HEADER added for this test) "0xf2 0x48 0xcd 0xc9 0xc9 0x07 0x00"); - tester.assertHasFrames("Hello","Hello"); + tester.assertHasFrames("Hello", "Hello"); } /** * Decode payload example as seen in draft-ietf-hybi-permessage-compression-15. - *

+ *

* Section 8.2.3.2: Sharing LZ77 Sliding Window */ @Test @@ -179,14 +180,14 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest // message 2 "0xc1 0x05", // (HEADER added for this test) "0xf2 0x00 0x11 0x00 0x00" - ); + ); - tester.assertHasFrames("Hello","Hello"); + tester.assertHasFrames("Hello", "Hello"); } /** * Decode payload example as seen in draft-ietf-hybi-permessage-compression-15. - *

+ *

* Section 8.2.3.5: Two DEFLATE Blocks in 1 Message */ @Test @@ -199,7 +200,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest tester.parseIncomingHex(// 1 message, 1 frame, 2 deflate blocks "0xc1 0x0d", // (HEADER added for this test) "0xf2 0x48 0x05 0x00 0x00 0x00 0xff 0xff 0xca 0xc9 0xc9 0x07 0x00" - ); + ); tester.assertHasFrames("Hello"); } @@ -227,18 +228,18 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest ext.incomingFrame(ping); capture.assertFrameCount(1); - capture.assertHasFrame(OpCode.PING,1); + capture.assertHasFrame(OpCode.PING, 1); WebSocketFrame actual = capture.getFrames().poll(); - Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.PING)); - Assert.assertThat("Frame.fin",actual.isFin(),is(true)); - Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false)); - Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false)); - Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false)); + Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.PING)); + Assert.assertThat("Frame.fin", actual.isFin(), is(true)); + Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false)); + Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false)); + Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false)); - ByteBuffer expected = BufferUtil.toBuffer(payload,StandardCharsets.UTF_8); - Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining())); - ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice()); + ByteBuffer expected = BufferUtil.toBuffer(payload, StandardCharsets.UTF_8); + Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining())); + ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice()); } /** @@ -275,7 +276,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest int len = quote.size(); capture.assertFrameCount(len); - capture.assertHasFrame(OpCode.TEXT,len); + capture.assertHasFrame(OpCode.TEXT, len); String prefix; int i = 0; @@ -283,15 +284,15 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest { prefix = "Frame[" + i + "]"; - Assert.assertThat(prefix + ".opcode",actual.getOpCode(),is(OpCode.TEXT)); - Assert.assertThat(prefix + ".fin",actual.isFin(),is(true)); - Assert.assertThat(prefix + ".rsv1",actual.isRsv1(),is(false)); - Assert.assertThat(prefix + ".rsv2",actual.isRsv2(),is(false)); - Assert.assertThat(prefix + ".rsv3",actual.isRsv3(),is(false)); + Assert.assertThat(prefix + ".opcode", actual.getOpCode(), is(OpCode.TEXT)); + Assert.assertThat(prefix + ".fin", actual.isFin(), is(true)); + Assert.assertThat(prefix + ".rsv1", actual.isRsv1(), is(false)); + Assert.assertThat(prefix + ".rsv2", actual.isRsv2(), is(false)); + Assert.assertThat(prefix + ".rsv3", actual.isRsv3(), is(false)); - ByteBuffer expected = BufferUtil.toBuffer(quote.get(i),StandardCharsets.UTF_8); - Assert.assertThat(prefix + ".payloadLength",actual.getPayloadLength(),is(expected.remaining())); - ByteBufferAssert.assertEquals(prefix + ".payload",expected,actual.getPayload().slice()); + ByteBuffer expected = BufferUtil.toBuffer(quote.get(i), StandardCharsets.UTF_8); + Assert.assertThat(prefix + ".payloadLength", actual.getPayloadLength(), is(expected.remaining())); + ByteBufferAssert.assertEquals(prefix + ".payload", expected, actual.getPayload().slice()); i++; } } @@ -317,22 +318,22 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest String payload = "Are you there?"; Frame ping = new PingFrame().setPayload(payload); - ext.outgoingFrame(ping,null); + ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.FLUSH); capture.assertFrameCount(1); - capture.assertHasFrame(OpCode.PING,1); + capture.assertHasFrame(OpCode.PING, 1); WebSocketFrame actual = capture.getFrames().getFirst(); - Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.PING)); - Assert.assertThat("Frame.fin",actual.isFin(),is(true)); - Assert.assertThat("Frame.rsv1",actual.isRsv1(),is(false)); - Assert.assertThat("Frame.rsv2",actual.isRsv2(),is(false)); - Assert.assertThat("Frame.rsv3",actual.isRsv3(),is(false)); + Assert.assertThat("Frame.opcode", actual.getOpCode(), is(OpCode.PING)); + Assert.assertThat("Frame.fin", actual.isFin(), is(true)); + Assert.assertThat("Frame.rsv1", actual.isRsv1(), is(false)); + Assert.assertThat("Frame.rsv2", actual.isRsv2(), is(false)); + Assert.assertThat("Frame.rsv3", actual.isRsv3(), is(false)); - ByteBuffer expected = BufferUtil.toBuffer(payload,StandardCharsets.UTF_8); - Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(expected.remaining())); - ByteBufferAssert.assertEquals("Frame.payload",expected,actual.getPayload().slice()); + ByteBuffer expected = BufferUtil.toBuffer(payload, StandardCharsets.UTF_8); + Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining())); + ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice()); } @Test @@ -350,7 +351,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest "c1 0b 0a c8 c8 c9 2f 4a 0c 01 62 00 00" // PhloraTora ); - tester.assertHasFrames("ToraTora","AtoraFlora","PhloraTora"); + tester.assertHasFrames("ToraTora", "AtoraFlora", "PhloraTora"); } @Test @@ -368,7 +369,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest "c1 04 02 61 00 00" // tora 3 ); - tester.assertHasFrames("tora","tora","tora"); + tester.assertHasFrames("tora", "tora", "tora"); } @Test @@ -386,7 +387,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest "c1 8b e2 3e 05 53 e8 f6 cd 9a cd 74 09 52 80 3e 05" // PhloraTora ); - tester.assertHasFrames("ToraTora","AtoraFlora","PhloraTora"); + tester.assertHasFrames("ToraTora", "AtoraFlora", "PhloraTora"); } @Test @@ -404,6 +405,6 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest "c1 84 53 ad a5 34 51 cc a5 34" // tora 3 ); - tester.assertHasFrames("tora","tora","tora"); + tester.assertHasFrames("tora", "tora", "tora"); } } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java index 8d7b094ef69..a33199f3384 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java @@ -204,7 +204,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketSession.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketSession.java index 6bf9224d1f6..afb97f97381 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketSession.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketSession.java @@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.common.io; import java.net.URI; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.websocket.common.SessionListener; import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.events.EventDriver; import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture; @@ -34,7 +33,7 @@ public class LocalWebSocketSession extends WebSocketSession public LocalWebSocketSession(TestName testname, EventDriver driver, ByteBufferPool bufferPool) { - super(URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),driver,new LocalWebSocketConnection(testname,bufferPool), new SessionListener[0]); + super(URI.create("ws://localhost/LocalWebSocketSesssion/" + testname.getMethodName()),driver,new LocalWebSocketConnection(testname,bufferPool)); this.id = testname.getMethodName(); outgoingCapture = new OutgoingFramesCapture(); setOutgoingHandler(outgoingCapture); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java index 349b7b49e20..7681905bad6 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common.test; -import static org.hamcrest.Matchers.*; - import java.io.Closeable; import java.io.EOFException; import java.io.IOException; @@ -69,6 +67,10 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener; import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser; import org.junit.Assert; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + /** * A simple websocket client for performing unit tests with. *

@@ -465,7 +467,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { ByteBuffer headerBuf = generator.generateHeaderBytes(frame); if (LOG.isDebugEnabled()) @@ -710,7 +712,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti { frame.setMask(clientmask); } - extensionStack.outgoingFrame(frame,null); + extensionStack.outgoingFrame(frame,null,FlushMode.FLUSH); } public void writeRaw(ByteBuffer buf) throws IOException diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java index aa23cbde085..b16cd09c4e0 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common.test; -import static org.hamcrest.Matchers.*; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -66,6 +64,9 @@ import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory; import org.eclipse.jetty.websocket.common.frames.CloseFrame; import org.junit.Assert; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + /** * A overly simplistic websocket server used during testing. *

@@ -230,7 +231,7 @@ public class BlockheadServer } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { ByteBuffer headerBuf = generator.generateHeaderBytes(frame); if (LOG.isDebugEnabled()) @@ -560,7 +561,7 @@ public class BlockheadServer public void write(Frame frame) throws IOException { LOG.debug("write(Frame->{}) to {}",frame,outgoing); - outgoing.outgoingFrame(frame,null); + outgoing.outgoingFrame(frame,null,FlushMode.FLUSH); } public void write(int b) throws IOException diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingFramesCapture.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingFramesCapture.java index dfa2fe97c65..c4cf5051c20 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingFramesCapture.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingFramesCapture.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common.test; -import static org.hamcrest.Matchers.*; - import java.util.LinkedList; import org.eclipse.jetty.util.BufferUtil; @@ -30,6 +28,9 @@ import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.junit.Assert; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; + public class OutgoingFramesCapture implements OutgoingFrames { private LinkedList frames = new LinkedList<>(); @@ -84,7 +85,7 @@ public class OutgoingFramesCapture implements OutgoingFrames } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { frames.add(WebSocketFrame.copy(frame)); if (callback != null) diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java index fb0f17c7b56..873971f7c24 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/OutgoingNetworkBytesCapture.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.common.test; -import static org.hamcrest.Matchers.*; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -33,6 +31,9 @@ import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.Generator; import org.junit.Assert; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + /** * Capture outgoing network bytes. */ @@ -61,7 +62,7 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames } @Override - public void outgoingFrame(Frame frame, WriteCallback callback) + public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode) { ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + frame.getPayloadLength()); generator.generateWholeFrame(frame,buf);