Issue #4571 - simplify partial MessageSinks reduce copying
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
36cccd2c88
commit
71b11f0887
|
@ -136,20 +136,7 @@ public abstract class DispatchedMessageSink<T> extends AbstractMessageSink
|
|||
if (frame.isFin())
|
||||
{
|
||||
CompletableFuture<Void> finComplete = new CompletableFuture<>();
|
||||
frameCallback = new Callback()
|
||||
{
|
||||
@Override
|
||||
public void failed(Throwable cause)
|
||||
{
|
||||
finComplete.completeExceptionally(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
finComplete.complete(null);
|
||||
}
|
||||
};
|
||||
frameCallback = Callback.from(() -> finComplete.complete(null), finComplete::completeExceptionally);
|
||||
CompletableFuture.allOf(dispatchComplete, finComplete).whenComplete(
|
||||
(aVoid, throwable) ->
|
||||
{
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.util.messages;
|
|||
|
||||
import java.lang.invoke.MethodHandle;
|
||||
import java.lang.invoke.MethodType;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
@ -30,6 +29,8 @@ import org.eclipse.jetty.websocket.util.InvalidSignatureException;
|
|||
|
||||
public class PartialByteArrayMessageSink extends AbstractMessageSink
|
||||
{
|
||||
private static byte[] EMPTY_BUFFER = new byte[0];
|
||||
|
||||
public PartialByteArrayMessageSink(CoreSession session, MethodHandle methodHandle)
|
||||
{
|
||||
super(session, methodHandle);
|
||||
|
@ -48,22 +49,12 @@ public class PartialByteArrayMessageSink extends AbstractMessageSink
|
|||
{
|
||||
try
|
||||
{
|
||||
byte[] buffer;
|
||||
int offset = 0;
|
||||
int length = 0;
|
||||
|
||||
if (frame.hasPayload())
|
||||
if (frame.hasPayload() || frame.isFin())
|
||||
{
|
||||
ByteBuffer payload = frame.getPayload();
|
||||
length = payload.remaining();
|
||||
buffer = BufferUtil.toArray(payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer = new byte[0];
|
||||
byte[] buffer = frame.hasPayload() ? BufferUtil.toArray(frame.getPayload()) : EMPTY_BUFFER;
|
||||
methodHandle.invoke(buffer, 0, buffer.length, frame.isFin());
|
||||
}
|
||||
|
||||
methodHandle.invoke(buffer, offset, length, frame.isFin());
|
||||
callback.succeeded();
|
||||
}
|
||||
catch (Throwable t)
|
||||
|
|
|
@ -47,23 +47,11 @@ public class PartialByteBufferMessageSink extends AbstractMessageSink
|
|||
{
|
||||
try
|
||||
{
|
||||
ByteBuffer buffer;
|
||||
|
||||
if (frame.hasPayload())
|
||||
if (frame.hasPayload() || frame.isFin())
|
||||
{
|
||||
ByteBuffer payload = frame.getPayload();
|
||||
// copy buffer here
|
||||
buffer = ByteBuffer.allocate(payload.remaining());
|
||||
BufferUtil.clearToFill(buffer);
|
||||
BufferUtil.put(payload, buffer);
|
||||
BufferUtil.flipToFlush(buffer, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer = BufferUtil.EMPTY_BUFFER;
|
||||
}
|
||||
|
||||
ByteBuffer buffer = frame.hasPayload() ? frame.getPayload() : BufferUtil.EMPTY_BUFFER;
|
||||
methodHandle.invoke(buffer, frame.isFin());
|
||||
}
|
||||
|
||||
callback.succeeded();
|
||||
}
|
||||
|
|
|
@ -19,28 +19,18 @@
|
|||
package org.eclipse.jetty.websocket.util.messages;
|
||||
|
||||
import java.lang.invoke.MethodHandle;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Utf8StringBuilder;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.core.CoreSession;
|
||||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
|
||||
public class PartialStringMessageSink extends AbstractMessageSink
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(PartialStringMessageSink.class);
|
||||
private Utf8StringBuilder utf;
|
||||
private int size;
|
||||
|
||||
public PartialStringMessageSink(CoreSession session, MethodHandle methodHandle)
|
||||
{
|
||||
super(session, methodHandle);
|
||||
Objects.requireNonNull(methodHandle, "MethodHandle");
|
||||
this.size = 0;
|
||||
}
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
|
@ -49,43 +39,9 @@ public class PartialStringMessageSink extends AbstractMessageSink
|
|||
{
|
||||
try
|
||||
{
|
||||
if (utf == null)
|
||||
utf = new Utf8StringBuilder(1024);
|
||||
|
||||
if (frame.hasPayload())
|
||||
if (frame.hasPayload() || frame.isFin())
|
||||
{
|
||||
ByteBuffer payload = frame.getPayload();
|
||||
|
||||
//TODO we should fragment on maxTextMessageBufferSize not limit
|
||||
//TODO also for PartialBinaryMessageSink
|
||||
/*
|
||||
if ((session.getMaxTextMessageBufferSize() > 0) && (size + payload.remaining() > session.getMaxTextMessageBufferSize()))
|
||||
{
|
||||
throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max text buffer size) %,d",
|
||||
size + payload.remaining(), session.getMaxTextMessageBufferSize()));
|
||||
}
|
||||
*/
|
||||
|
||||
size += payload.remaining();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Raw Payload {}", BufferUtil.toDetailString(payload));
|
||||
|
||||
// allow for fast fail of BAD utf
|
||||
utf.append(payload);
|
||||
}
|
||||
|
||||
if (frame.isFin())
|
||||
{
|
||||
// Using toString to trigger failure on incomplete UTF-8
|
||||
methodHandle.invoke(utf.toString(), true);
|
||||
// reset
|
||||
size = 0;
|
||||
utf = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
methodHandle.invoke(utf.takePartialString(), false);
|
||||
methodHandle.invoke(frame.getPayloadAsUTF8(), frame.isFin());
|
||||
}
|
||||
|
||||
callback.succeeded();
|
||||
|
|
|
@ -73,7 +73,6 @@ public class StringMessageSink extends AbstractMessageSink
|
|||
{
|
||||
// reset
|
||||
size = 0;
|
||||
out.reset();
|
||||
out = null;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue