Issue #4571 - optimise aggregating text and binary MessageSinks

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-02-14 12:26:59 +11:00
parent 5fe202f29f
commit 36cccd2c88
4 changed files with 223 additions and 76 deletions

View File

@ -0,0 +1,111 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Objects;
import org.eclipse.jetty.util.BufferUtil;
public class ByteBufferPoolOutputStream extends OutputStream
{
private final ByteBufferPool bufferPool;
private final ArrayList<ByteBuffer> buffers;
private final boolean direct;
private final int acquireSize;
private ByteBuffer aggregateBuffer;
private int size = 0;
public ByteBufferPoolOutputStream(ByteBufferPool bufferPool, int acquireSize, boolean direct)
{
this.buffers = new ArrayList<>();
this.direct = direct;
this.bufferPool = Objects.requireNonNull(bufferPool);
this.acquireSize = acquireSize;
if (acquireSize <= 0)
throw new IllegalArgumentException();
this.buffers.add(bufferPool.acquire(acquireSize, direct));
}
@Override
public void write(int b) throws IOException
{
write(new byte[]{(byte)b}, 0, 1);
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
write(ByteBuffer.wrap(b, off, len));
}
public void write(ByteBuffer data)
{
while (data.hasRemaining())
{
ByteBuffer buffer = buffers.get(buffers.size() - 1);
size += BufferUtil.append(buffer, data);
if (!buffer.hasRemaining())
buffers.add(bufferPool.acquire(acquireSize, direct));
}
}
public int size()
{
return size;
}
public ByteBuffer toByteBuffer()
{
releaseAggregate();
aggregateBuffer = bufferPool.acquire(size, direct);
for (ByteBuffer data : buffers)
{
BufferUtil.append(aggregateBuffer, data);
}
return aggregateBuffer;
}
public byte[] toByteArray()
{
return BufferUtil.toArray(toByteBuffer());
}
private void releaseAggregate()
{
if (aggregateBuffer != null)
{
bufferPool.release(aggregateBuffer);
aggregateBuffer = null;
}
}
@Override
public void close()
{
releaseAggregate();
for (ByteBuffer buffer : buffers)
bufferPool.release(buffer);
}
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.util.messages; package org.eclipse.jetty.websocket.util.messages;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType; import java.lang.invoke.MethodType;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -55,34 +56,28 @@ public class ByteArrayMessageSink extends AbstractMessageSink
{ {
try try
{ {
if (frame.hasPayload()) // If we are fin and no OutputStream has been created we don't need to aggregate.
if (frame.isFin() && (out == null))
{ {
ByteBuffer payload = frame.getPayload(); if (frame.hasPayload())
size += payload.remaining();
long maxBinaryMessageSize = session.getMaxBinaryMessageSize();
if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize)
{ {
throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max binary buffer size) %,d", byte[] buf = BufferUtil.toArray(frame.getPayload());
size, maxBinaryMessageSize));
}
if (out == null)
out = new ByteArrayOutputStream(BUFFER_SIZE);
BufferUtil.writeTo(payload, out);
}
if (frame.isFin())
{
if (out != null)
{
byte[] buf = out.toByteArray();
methodHandle.invoke(buf, 0, buf.length); methodHandle.invoke(buf, 0, buf.length);
} }
else else
methodHandle.invoke(EMPTY_BUFFER, 0, 0); methodHandle.invoke(EMPTY_BUFFER, 0, 0);
callback.succeeded();
return;
} }
aggregatePayload(frame);
if (frame.isFin())
{
byte[] buf = out.toByteArray();
methodHandle.invoke(buf, 0, buf.length);
}
callback.succeeded(); callback.succeeded();
} }
catch (Throwable t) catch (Throwable t)
@ -99,4 +94,24 @@ public class ByteArrayMessageSink extends AbstractMessageSink
} }
} }
} }
private void aggregatePayload(Frame frame) throws IOException
{
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
size += payload.remaining();
long maxBinaryMessageSize = session.getMaxBinaryMessageSize();
if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize)
{
throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d",
size, maxBinaryMessageSize));
}
if (out == null)
out = new ByteArrayOutputStream(BUFFER_SIZE);
BufferUtil.writeTo(payload, out);
}
}
} }

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.util.messages; package org.eclipse.jetty.websocket.util.messages;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType; import java.lang.invoke.MethodType;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -56,32 +57,22 @@ public class ByteBufferMessageSink extends AbstractMessageSink
{ {
try try
{ {
if (frame.hasPayload()) // If we are fin and no OutputStream has been created we don't need to aggregate.
if (frame.isFin() && (out == null))
{ {
ByteBuffer payload = frame.getPayload(); if (frame.hasPayload())
size += payload.remaining(); methodHandle.invoke(frame.getPayload());
long maxBinaryMessageSize = session.getMaxBinaryMessageSize();
if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize)
{
throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d",
size, maxBinaryMessageSize));
}
if (out == null)
out = new ByteArrayOutputStream(BUFFER_SIZE);
BufferUtil.writeTo(payload, out);
payload.position(payload.limit()); // consume buffer
}
if (frame.isFin())
{
if (out != null)
methodHandle.invoke(ByteBuffer.wrap(out.toByteArray()));
else else
methodHandle.invoke(BufferUtil.EMPTY_BUFFER); methodHandle.invoke(BufferUtil.EMPTY_BUFFER);
callback.succeeded();
return;
} }
aggregatePayload(frame);
if (frame.isFin())
methodHandle.invoke(ByteBuffer.wrap(out.toByteArray()));
callback.succeeded(); callback.succeeded();
} }
catch (Throwable t) catch (Throwable t)
@ -98,4 +89,25 @@ public class ByteBufferMessageSink extends AbstractMessageSink
} }
} }
} }
private void aggregatePayload(Frame frame) throws IOException
{
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
size += payload.remaining();
long maxBinaryMessageSize = session.getMaxBinaryMessageSize();
if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize)
{
throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d",
size, maxBinaryMessageSize));
}
if (out == null)
out = new ByteArrayOutputStream(BUFFER_SIZE);
BufferUtil.writeTo(payload, out);
payload.position(payload.limit()); // consume buffer
}
}
} }

View File

@ -18,22 +18,20 @@
package org.eclipse.jetty.websocket.util.messages; package org.eclipse.jetty.websocket.util.messages;
import java.io.IOException;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession; import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException; import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
public class StringMessageSink extends AbstractMessageSink public class StringMessageSink extends AbstractMessageSink
{ {
private static final Logger LOG = Log.getLogger(StringMessageSink.class); private static final int BUFFER_SIZE = 1024;
private Utf8StringBuilder utf; private Utf8StringBuilder out;
private int size; private int size;
public StringMessageSink(CoreSession session, MethodHandle methodHandle) public StringMessageSink(CoreSession session, MethodHandle methodHandle)
@ -42,52 +40,63 @@ public class StringMessageSink extends AbstractMessageSink
this.size = 0; this.size = 0;
} }
@SuppressWarnings("Duplicates")
@Override @Override
public void accept(Frame frame, Callback callback) public void accept(Frame frame, Callback callback)
{ {
try try
{ {
if (frame.hasPayload()) // If we are fin and out has not been created we don't need to aggregate.
if (frame.isFin() && (out == null))
{ {
ByteBuffer payload = frame.getPayload(); if (frame.hasPayload())
methodHandle.invoke(frame.getPayloadAsUTF8());
size += payload.remaining();
long maxTextMessageSize = session.getMaxTextMessageSize();
if (maxTextMessageSize > 0 && size > maxTextMessageSize)
{
throw new MessageTooLargeException(String.format("Text message too large: (actual) %,d > (configured max text message size) %,d",
size, maxTextMessageSize));
}
if (utf == null)
utf = new Utf8StringBuilder(1024);
if (LOG.isDebugEnabled())
LOG.debug("Raw Payload {}", BufferUtil.toDetailString(payload));
// allow for fast fail of BAD utf (incomplete utf will trigger on messageComplete)
utf.append(payload);
}
if (frame.isFin())
{
// notify event
if (utf != null)
methodHandle.invoke(utf.toString());
else else
methodHandle.invoke(""); methodHandle.invoke("");
// reset callback.succeeded();
size = 0; return;
utf = null;
} }
aggregatePayload(frame);
if (frame.isFin())
methodHandle.invoke(out.toString());
callback.succeeded(); callback.succeeded();
} }
catch (Throwable t) catch (Throwable t)
{ {
callback.failed(t); callback.failed(t);
} }
finally
{
if (frame.isFin())
{
// reset
size = 0;
out.reset();
out = null;
}
}
}
private void aggregatePayload(Frame frame) throws IOException
{
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
size += frame.getPayloadLength();
long maxTextMessageSize = session.getMaxTextMessageSize();
if (maxTextMessageSize > 0 && size > maxTextMessageSize)
{
throw new MessageTooLargeException(String.format("Text message too large: (actual) %,d > (configured max text message size) %,d",
size, maxTextMessageSize));
}
if (out == null)
out = new Utf8StringBuilder(BUFFER_SIZE);
// allow for fast fail of BAD utf (incomplete utf will trigger on messageComplete)
out.append(payload);
}
} }
} }