From 81447c3e2c66425e1f96d204325c4df332d5aae7 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 13 Jun 2023 13:02:28 +1000 Subject: [PATCH 1/2] Issue #9895 - ensure callback is failed after error in PerMessageDeflateExtension Signed-off-by: Lachlan Roberts --- .../internal/PerMessageDeflateExtension.java | 6 +++-- .../websocket/tests/LargeDeflateTest.java | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java index bc68f27ca56..bd600ba22ca 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java @@ -464,16 +464,18 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem chunk.setPayload(payload); chunk.setFin(frame.isFin() && complete); - boolean succeedCallback = complete; + boolean completeCallback = complete; AtomicReference payloadRef = _payloadRef; Callback payloadCallback = Callback.from(() -> { getBufferPool().release(payloadRef.getAndSet(null)); - if (succeedCallback) + if (completeCallback) callback.succeeded(); }, t -> { getBufferPool().release(payloadRef.getAndSet(null)); + if (completeCallback) + callback.failed(t); failFlusher(t); }); diff --git a/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/LargeDeflateTest.java b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/LargeDeflateTest.java index 6c3852bc1f5..56f9f0181d7 100644 --- a/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/LargeDeflateTest.java +++ b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/LargeDeflateTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -92,6 +93,27 @@ public class LargeDeflateTest assertThat(message, is(sentMessage)); } + @Test + void testDeflateLargerThanMaxMessage() throws Exception + { + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.addExtensions("permessage-deflate"); + + EventSocket clientSocket = new EventSocket(); + ByteBuffer message = largePayloads(); + Session session = _client.connect(clientSocket, URI.create("ws://localhost:" + _connector.getLocalPort() + "/ws"), upgradeRequest).get(); + + // Set the maxBinaryMessageSize on the server to be lower than the size of the message. + assertTrue(_serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + _serverSocket.session.setMaxBinaryMessageSize(message.remaining() - 1024); + + session.getRemote().sendBytes(message); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(_serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(_serverSocket.closeCode, is(StatusCode.MESSAGE_TOO_LARGE)); + assertThat(_serverSocket.closeReason, containsString("Binary message too large")); + } + private static ByteBuffer largePayloads() { var bytes = new byte[4 * 1024 * 1024]; From 212ea5e6f840a650e377d6c51749a72d7a67ad40 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 15 Jun 2023 08:27:26 +1000 Subject: [PATCH 2/2] add comment in the PerMessageDeflateExtension IncomingFlusher Signed-off-by: Lachlan Roberts --- .../websocket/core/internal/PerMessageDeflateExtension.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java index bd600ba22ca..3430c65697a 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java @@ -464,6 +464,11 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem chunk.setPayload(payload); chunk.setFin(frame.isFin() && complete); + // If we are complete we return true, then DemandingFlusher.process() will null out the Frame and Callback. + // The application may decide to hold onto the buffer and delay completing the callback, so we need to capture + // references to these in the payloadCallback and not rely on state of the flusher which may have moved on. + // This flusher could be failed while the application already has the payloadCallback, so we need protection against + // the flusher failing and the application completing the callback, that's why we use the payload AtomicReference. boolean completeCallback = complete; AtomicReference payloadRef = _payloadRef; Callback payloadCallback = Callback.from(() ->