Fixes #11081 - Dropped WebSocket messages due to race condition in WebSocket frame handling. (#11090)

Now the reset of the MessageSink internal accumulators happens before the demand.

This avoids the race, since as soon as there is demand another thread could enter the MessageSink, but the accumulator has already been reset.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-01-02 19:37:35 +01:00 committed by GitHub
parent c758ecdd2e
commit 9fdfa464d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 48 deletions

View File

@ -33,14 +33,11 @@ public class ByteArrayMessageSink extends AbstractMessageSink
public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle)
{
super(session, methodHandle);
// This uses the offset length byte array signature not supported by javax websocket.
// The javax layer instead uses decoders for whole byte array messages instead of this message sink.
MethodType onMessageType = MethodType.methodType(Void.TYPE, byte[].class, int.class, int.class);
if (methodHandle.type().changeReturnType(void.class) != onMessageType.changeReturnType(void.class))
{
throw InvalidSignatureException.build(onMessageType, methodHandle.type());
}
}
@Override
@ -56,8 +53,9 @@ public class ByteArrayMessageSink extends AbstractMessageSink
String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d", size, maxBinaryMessageSize));
}
// If we are fin and no OutputStream has been created we don't need to aggregate.
if (frame.isFin() && (out == null))
// If the frame is fin and no accumulator has been
// created or used, then we don't need to aggregate.
if (frame.isFin() && (out == null || out.getLength() == 0))
{
if (frame.hasPayload())
{
@ -65,7 +63,9 @@ public class ByteArrayMessageSink extends AbstractMessageSink
methodHandle.invoke(buf, 0, buf.length);
}
else
{
methodHandle.invoke(EMPTY_BUFFER, 0, 0);
}
callback.succeeded();
session.demand(1);
@ -79,32 +79,25 @@ public class ByteArrayMessageSink extends AbstractMessageSink
if (out == null)
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
// The callback is now stored in the accumulator, so if
// the methodHandle throws, don't fail the callback twice.
callback = Callback.NOOP;
}
// If the methodHandle throws we don't want to fail callback twice.
callback = Callback.NOOP;
if (frame.isFin())
{
byte[] buf = out.takeByteArray();
methodHandle.invoke(buf, 0, buf.length);
}
callback.succeeded();
session.demand(1);
}
catch (Throwable t)
{
if (out != null)
out.fail(t);
fail(t);
callback.failed(t);
}
finally
{
if (frame.isFin())
{
// reset
out = null;
}
}
}
@Override

View File

@ -16,7 +16,6 @@ package org.eclipse.jetty.websocket.core.internal.messages;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.io.ByteBufferPool;
@ -34,14 +33,10 @@ public class ByteBufferMessageSink extends AbstractMessageSink
public ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle)
{
super(session, methodHandle);
// Validate onMessageMethod
Objects.requireNonNull(methodHandle, "MethodHandle");
// Validate onMessage method signature.
MethodType onMessageType = MethodType.methodType(Void.TYPE, ByteBuffer.class);
if (methodHandle.type() != onMessageType)
{
throw InvalidSignatureException.build(onMessageType, methodHandle.type());
}
}
@Override
@ -57,8 +52,9 @@ public class ByteBufferMessageSink extends AbstractMessageSink
size, maxBinaryMessageSize));
}
// If we are fin and no OutputStream has been created we don't need to aggregate.
if (frame.isFin() && (out == null))
// If the frame is fin and no accumulator has been
// created or used, then we don't need to aggregate.
if (frame.isFin() && (out == null || out.getLength() == 0))
{
if (frame.hasPayload())
methodHandle.invoke(frame.getPayload());
@ -77,10 +73,11 @@ public class ByteBufferMessageSink extends AbstractMessageSink
if (out == null)
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
// The callback is now stored in the accumulator, so if
// the methodHandle throws, don't fail the callback twice.
callback = Callback.NOOP;
}
// If the methodHandle throws we don't want to fail callback twice.
callback = Callback.NOOP;
if (frame.isFin())
{
ByteBufferPool bufferPool = session.getByteBufferPool();
@ -97,21 +94,14 @@ public class ByteBufferMessageSink extends AbstractMessageSink
}
}
callback.succeeded();
session.demand(1);
}
catch (Throwable t)
{
if (out != null)
out.fail(t);
fail(t);
callback.failed(t);
}
finally
{
if (frame.isFin())
{
out = null;
}
}
}
@Override

View File

@ -47,33 +47,33 @@ public class StringMessageSink extends AbstractMessageSink
if (out == null)
out = new Utf8StringBuilder(session.getInputBufferSize());
out.append(frame.getPayload());
if (frame.isFin())
{
methodHandle.invoke(out.toString());
reset();
}
callback.succeeded();
session.demand(1);
}
catch (Throwable t)
{
reset();
callback.failed(t);
}
finally
{
if (frame.isFin())
{
// reset
size = 0;
out = null;
}
}
}
@Override
public void fail(Throwable failure)
{
if (out != null)
out.reset();
reset();
}
private void reset()
{
out = null;
size = 0;
}
}