From 9fdfa464d765e7f999c28efd7b89b944f6ac59d2 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 2 Jan 2024 19:37:35 +0100 Subject: [PATCH] Fixes #11081 - Dropped WebSocket messages due to race condition in WebSocket frame handling. (#11090) Now the reset of the MessageSink internal accumulators happens before the demand. This avoids the race, since as soon as there is demand another thread could enter the MessageSink, but the accumulator has already been reset. Signed-off-by: Simone Bordet --- .../messages/ByteArrayMessageSink.java | 27 +++++++----------- .../messages/ByteBufferMessageSink.java | 28 ++++++------------- .../internal/messages/StringMessageSink.java | 24 ++++++++-------- 3 files changed, 31 insertions(+), 48 deletions(-) diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java index 466e9a4106d..e368e060d41 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java @@ -33,14 +33,11 @@ public class ByteArrayMessageSink extends AbstractMessageSink public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle) { super(session, methodHandle); - // This uses the offset length byte array signature not supported by javax websocket. // The javax layer instead uses decoders for whole byte array messages instead of this message sink. MethodType onMessageType = MethodType.methodType(Void.TYPE, byte[].class, int.class, int.class); if (methodHandle.type().changeReturnType(void.class) != onMessageType.changeReturnType(void.class)) - { throw InvalidSignatureException.build(onMessageType, methodHandle.type()); - } } @Override @@ -56,8 +53,9 @@ public class ByteArrayMessageSink extends AbstractMessageSink String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d", size, maxBinaryMessageSize)); } - // If we are fin and no OutputStream has been created we don't need to aggregate. - if (frame.isFin() && (out == null)) + // If the frame is fin and no accumulator has been + // created or used, then we don't need to aggregate. + if (frame.isFin() && (out == null || out.getLength() == 0)) { if (frame.hasPayload()) { @@ -65,7 +63,9 @@ public class ByteArrayMessageSink extends AbstractMessageSink methodHandle.invoke(buf, 0, buf.length); } else + { methodHandle.invoke(EMPTY_BUFFER, 0, 0); + } callback.succeeded(); session.demand(1); @@ -79,32 +79,25 @@ public class ByteArrayMessageSink extends AbstractMessageSink if (out == null) out = new ByteBufferCallbackAccumulator(); out.addEntry(payload, callback); + // The callback is now stored in the accumulator, so if + // the methodHandle throws, don't fail the callback twice. + callback = Callback.NOOP; } - // If the methodHandle throws we don't want to fail callback twice. - callback = Callback.NOOP; if (frame.isFin()) { byte[] buf = out.takeByteArray(); methodHandle.invoke(buf, 0, buf.length); } + callback.succeeded(); session.demand(1); } catch (Throwable t) { - if (out != null) - out.fail(t); + fail(t); callback.failed(t); } - finally - { - if (frame.isFin()) - { - // reset - out = null; - } - } } @Override diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java index 92768c02925..e070b600f2a 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java @@ -16,7 +16,6 @@ package org.eclipse.jetty.websocket.core.internal.messages; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodType; import java.nio.ByteBuffer; -import java.util.Objects; import org.eclipse.jetty.io.ByteBufferCallbackAccumulator; import org.eclipse.jetty.io.ByteBufferPool; @@ -34,14 +33,10 @@ public class ByteBufferMessageSink extends AbstractMessageSink public ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle) { super(session, methodHandle); - - // Validate onMessageMethod - Objects.requireNonNull(methodHandle, "MethodHandle"); + // Validate onMessage method signature. MethodType onMessageType = MethodType.methodType(Void.TYPE, ByteBuffer.class); if (methodHandle.type() != onMessageType) - { throw InvalidSignatureException.build(onMessageType, methodHandle.type()); - } } @Override @@ -57,8 +52,9 @@ public class ByteBufferMessageSink extends AbstractMessageSink size, maxBinaryMessageSize)); } - // If we are fin and no OutputStream has been created we don't need to aggregate. - if (frame.isFin() && (out == null)) + // If the frame is fin and no accumulator has been + // created or used, then we don't need to aggregate. + if (frame.isFin() && (out == null || out.getLength() == 0)) { if (frame.hasPayload()) methodHandle.invoke(frame.getPayload()); @@ -77,10 +73,11 @@ public class ByteBufferMessageSink extends AbstractMessageSink if (out == null) out = new ByteBufferCallbackAccumulator(); out.addEntry(payload, callback); + // The callback is now stored in the accumulator, so if + // the methodHandle throws, don't fail the callback twice. + callback = Callback.NOOP; } - // If the methodHandle throws we don't want to fail callback twice. - callback = Callback.NOOP; if (frame.isFin()) { ByteBufferPool bufferPool = session.getByteBufferPool(); @@ -97,21 +94,14 @@ public class ByteBufferMessageSink extends AbstractMessageSink } } + callback.succeeded(); session.demand(1); } catch (Throwable t) { - if (out != null) - out.fail(t); + fail(t); callback.failed(t); } - finally - { - if (frame.isFin()) - { - out = null; - } - } } @Override diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java index c79b69d660d..fc2b3085629 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java @@ -47,33 +47,33 @@ public class StringMessageSink extends AbstractMessageSink if (out == null) out = new Utf8StringBuilder(session.getInputBufferSize()); - out.append(frame.getPayload()); + if (frame.isFin()) + { methodHandle.invoke(out.toString()); + reset(); + } callback.succeeded(); session.demand(1); } catch (Throwable t) { + reset(); callback.failed(t); } - finally - { - if (frame.isFin()) - { - // reset - size = 0; - out = null; - } - } } @Override public void fail(Throwable failure) { - if (out != null) - out.reset(); + reset(); + } + + private void reset() + { + out = null; + size = 0; } }