Merged branch 'jetty-11.0.x' into 'jetty-12.0.x'.
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
commit
2a3e6f72c7
|
@ -44,7 +44,6 @@ public class ByteArrayMessageSink extends AbstractMessageSink
|
|||
public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand)
|
||||
{
|
||||
super(session, methodHandle, autoDemand);
|
||||
|
||||
// This uses the offset length byte array signature not supported by jakarta websocket.
|
||||
// The jakarta layer instead uses decoders for whole byte array messages instead of this message sink.
|
||||
MethodType onMessageType = MethodType.methodType(Void.TYPE, byte[].class, int.class, int.class);
|
||||
|
@ -65,8 +64,10 @@ public class ByteArrayMessageSink extends AbstractMessageSink
|
|||
return;
|
||||
}
|
||||
|
||||
// If the frame is fin and no accumulator has been
|
||||
// created or used, then we don't need to aggregate.
|
||||
ByteBuffer payload = frame.getPayload();
|
||||
if (frame.isFin() && accumulator == null)
|
||||
if (frame.isFin() && (accumulator == null || accumulator.getLength() == 0))
|
||||
{
|
||||
byte[] buf = BufferUtil.toArray(payload);
|
||||
getMethodHandle().invoke(buf, 0, buf.length);
|
||||
|
@ -98,19 +99,12 @@ public class ByteArrayMessageSink extends AbstractMessageSink
|
|||
{
|
||||
getCoreSession().demand();
|
||||
}
|
||||
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
if (accumulator != null)
|
||||
accumulator.fail(t);
|
||||
fail(t);
|
||||
callback.failed(t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (frame.isFin())
|
||||
accumulator = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -72,7 +72,9 @@ public class ByteBufferMessageSink extends AbstractMessageSink
|
|||
return;
|
||||
}
|
||||
|
||||
if (frame.isFin() && accumulator == null)
|
||||
// If the frame is fin and no accumulator has been
|
||||
// created or used, then we don't need to aggregate.
|
||||
if (frame.isFin() && (accumulator == null || accumulator.getLength() == 0))
|
||||
{
|
||||
invoke(getMethodHandle(), frame.getPayload(), callback);
|
||||
autoDemand();
|
||||
|
@ -108,15 +110,9 @@ public class ByteBufferMessageSink extends AbstractMessageSink
|
|||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
if (accumulator != null)
|
||||
accumulator.fail(t);
|
||||
fail(t);
|
||||
callback.failed(t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (frame.isFin())
|
||||
accumulator = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -60,12 +60,12 @@ public class StringMessageSink extends AbstractMessageSink
|
|||
|
||||
if (out == null)
|
||||
out = new Utf8StringBuilder(getCoreSession().getInputBufferSize());
|
||||
|
||||
out.append(frame.getPayload());
|
||||
|
||||
if (frame.isFin())
|
||||
{
|
||||
getMethodHandle().invoke(out.takeCompleteString(BadPayloadException.InvalidUtf8::new));
|
||||
reset();
|
||||
callback.succeeded();
|
||||
autoDemand();
|
||||
}
|
||||
|
@ -77,22 +77,20 @@ public class StringMessageSink extends AbstractMessageSink
|
|||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
reset();
|
||||
callback.failed(t);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (frame.isFin())
|
||||
{
|
||||
size = 0;
|
||||
out = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fail(Throwable failure)
|
||||
{
|
||||
if (out != null)
|
||||
out.reset();
|
||||
reset();
|
||||
}
|
||||
|
||||
private void reset()
|
||||
{
|
||||
out = null;
|
||||
size = 0;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue