Move work on ByteAccumulator to jetty-util

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-11-05 13:42:47 +11:00
parent 1f5b446462
commit 05dafb89ab
8 changed files with 309 additions and 176 deletions

View File

@ -0,0 +1,94 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
public class ByteBufferAccumulator implements AutoCloseable
{
private static final int MIN_SPACE = 3;
private static final int DEFAULT_BUFFER_SIZE = 1024;
private final List<ByteBuffer> _buffers = new ArrayList<>();
private final ByteBufferPool _bufferPool;
public ByteBufferAccumulator(ByteBufferPool bufferPool)
{
this._bufferPool = bufferPool;
}
public int getLength()
{
int length = 0;
for (ByteBuffer buffer : _buffers)
length += buffer.remaining();
return length;
}
public ByteBuffer getBuffer()
{
return getBuffer(DEFAULT_BUFFER_SIZE);
}
public ByteBuffer getBuffer(int minAllocationSize)
{
ByteBuffer buffer = _buffers.isEmpty() ? BufferUtil.EMPTY_BUFFER : _buffers.get(_buffers.size() - 1);
if (BufferUtil.space(buffer) <= MIN_SPACE)
{
buffer = _bufferPool.acquire(minAllocationSize, false);
_buffers.add(buffer);
}
return buffer;
}
public void writeTo(ByteBuffer buffer)
{
int pos = BufferUtil.flipToFill(buffer);
for (ByteBuffer bb : _buffers)
{
buffer.put(bb);
}
BufferUtil.flipToFlush(buffer, pos);
}
public void writeTo(OutputStream out) throws IOException
{
for (ByteBuffer bb : _buffers)
{
BufferUtil.writeTo(bb, out);
}
}
@Override
public void close()
{
for (ByteBuffer buffer : _buffers)
{
_bufferPool.release(buffer);
}
_buffers.clear();
}
}

View File

@ -0,0 +1,144 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
/**
* This class implements an output stream in which the data is written into a list of ByteBuffer,
* the buffer list automatically grows as data is written to it, the buffers are taken from the
* supplied {@link ByteBufferPool} or freshly allocated if one is not supplied.
*
* Designed to mimic {@link java.io.ByteArrayOutputStream} but with better memory usage, and less copying.
*/
public class ByteBufferOutputStream2 extends OutputStream
{
private final ByteBufferAccumulator _accumulator;
private final ByteBufferPool _bufferPool;
private ByteBuffer _combinedByteBuffer;
private int _size = 0;
public ByteBufferOutputStream2()
{
this(null);
}
public ByteBufferOutputStream2(ByteBufferPool bufferPool)
{
_bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool;
_accumulator = new ByteBufferAccumulator(bufferPool);
}
/**
* Get an aggregated content written to the OutputStream in a ByteBuffer.
* @return the content in a ByteBuffer.
*/
public ByteBuffer toByteBuffer()
{
int length = _accumulator.getLength();
if (length == 0)
return BufferUtil.EMPTY_BUFFER;
if (_combinedByteBuffer != null && length == _combinedByteBuffer.remaining())
return _combinedByteBuffer;
ByteBuffer buffer = _bufferPool.acquire(_size, false);
_accumulator.writeTo(buffer);
if (_combinedByteBuffer != null)
{
_bufferPool.release(_combinedByteBuffer);
_combinedByteBuffer = buffer;
}
return buffer;
}
/**
* Get an aggregated content written to the OutputStream in a byte array.
* @return the content in a byte array.
*/
public byte[] toByteArray()
{
int length = _accumulator.getLength();
if (length == 0)
return new byte[0];
byte[] bytes = new byte[_size];
ByteBuffer buffer = BufferUtil.toBuffer(bytes);
_accumulator.writeTo(buffer);
return bytes;
}
public int size()
{
return _accumulator.getLength();
}
@Override
public void write(int b)
{
write(new byte[]{(byte)b}, 0, 1);
}
@Override
public void write(byte[] b, int off, int len)
{
write(BufferUtil.toBuffer(b, off, len));
}
public void write(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
ByteBuffer lastBuffer = _accumulator.getBuffer(buffer.remaining());
int pos = BufferUtil.flipToFill(lastBuffer);
_size += BufferUtil.put(buffer, lastBuffer);
BufferUtil.flipToFlush(lastBuffer, pos);
}
}
public void writeTo(OutputStream out) throws IOException
{
_accumulator.writeTo(out);
}
@Override
public void close()
{
if (_combinedByteBuffer != null)
{
_bufferPool.release(_combinedByteBuffer);
_combinedByteBuffer = null;
}
_accumulator.close();
_size = 0;
}
@Override
public synchronized String toString()
{
return String.format("%s@%x{size=%d, bufferPool=%s, byteAccumulator=%s}", getClass().getSimpleName(),
hashCode(), _size, _bufferPool, _accumulator);
}
}

View File

@ -0,0 +1,41 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
public class NullByteBufferPool implements ByteBufferPool
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
if (direct)
return BufferUtil.allocateDirect(size);
else
return BufferUtil.allocate(size);
}
@Override
public void release(ByteBuffer buffer)
{
BufferUtil.clear(buffer);
}
}

View File

@ -19,11 +19,6 @@
package org.eclipse.jetty.websocket.common.extensions; package org.eclipse.jetty.websocket.common.extensions;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
@ -42,10 +37,8 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
public class WebSocketExtensionFactory extends ExtensionFactory implements LifeCycle, Dumpable public class WebSocketExtensionFactory extends ExtensionFactory implements LifeCycle, Dumpable
{ {
private ContainerLifeCycle containerLifeCycle; private final ContainerLifeCycle containerLifeCycle;
private WebSocketContainerScope container; private final WebSocketContainerScope container;
private ServiceLoader<Extension> extensionLoader = ServiceLoader.load(Extension.class);
private Map<String, Class<? extends Extension>> availableExtensions;
private final InflaterPool inflaterPool = new InflaterPool(CompressionPool.INFINITE_CAPACITY, true); private final InflaterPool inflaterPool = new InflaterPool(CompressionPool.INFINITE_CAPACITY, true);
private final DeflaterPool deflaterPool = new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true); private final DeflaterPool deflaterPool = new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true);
@ -59,42 +52,12 @@ public class WebSocketExtensionFactory extends ExtensionFactory implements LifeC
return String.format("%s@%x{%s}", WebSocketExtensionFactory.class.getSimpleName(), hashCode(), containerLifeCycle.getState()); return String.format("%s@%x{%s}", WebSocketExtensionFactory.class.getSimpleName(), hashCode(), containerLifeCycle.getState());
} }
}; };
availableExtensions = new HashMap<>();
for (Extension ext : extensionLoader)
{
if (ext != null)
availableExtensions.put(ext.getName(), ext.getClass());
}
this.container = container; this.container = container;
containerLifeCycle.addBean(inflaterPool); containerLifeCycle.addBean(inflaterPool);
containerLifeCycle.addBean(deflaterPool); containerLifeCycle.addBean(deflaterPool);
} }
@Override
public Map<String, Class<? extends Extension>> getAvailableExtensions()
{
return availableExtensions;
}
@Override
public Class<? extends Extension> getExtension(String name)
{
return availableExtensions.get(name);
}
@Override
public Set<String> getExtensionNames()
{
return availableExtensions.keySet();
}
@Override
public boolean isAvailable(String name)
{
return availableExtensions.containsKey(name);
}
@Override @Override
public Extension newInstance(ExtensionConfig config) public Extension newInstance(ExtensionConfig config)
{ {
@ -139,24 +102,6 @@ public class WebSocketExtensionFactory extends ExtensionFactory implements LifeC
} }
} }
@Override
public void register(String name, Class<? extends Extension> extension)
{
availableExtensions.put(name, extension);
}
@Override
public void unregister(String name)
{
availableExtensions.remove(name);
}
@Override
public Iterator<Class<? extends Extension>> iterator()
{
return availableExtensions.values().iterator();
}
/* --- All of the below ugliness due to not being able to break API compatibility with ExtensionFactory --- */ /* --- All of the below ugliness due to not being able to break API compatibility with ExtensionFactory --- */
@Override @Override

View File

@ -22,48 +22,18 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.MessageTooLargeException; import org.eclipse.jetty.websocket.api.MessageTooLargeException;
public class ByteAccumulator public class ByteAccumulator
{ {
private final List<ByteBuffer> chunks = new ArrayList<>(); private final List<byte[]> chunks = new ArrayList<>();
private final int maxSize; private final int maxSize;
private int length = 0; private int length = 0;
private final ByteBufferPool bufferPool;
public ByteAccumulator(int maxOverallBufferSize) public ByteAccumulator(int maxOverallBufferSize)
{
this(maxOverallBufferSize, null);
}
public ByteAccumulator(int maxOverallBufferSize, ByteBufferPool bufferPool)
{ {
this.maxSize = maxOverallBufferSize; this.maxSize = maxOverallBufferSize;
this.bufferPool = bufferPool;
}
public void copyChunk(ByteBuffer buf)
{
int length = buf.remaining();
if (this.length + length > maxSize)
{
release(buf);
String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize);
throw new MessageTooLargeException(err);
}
if (buf.hasRemaining())
{
chunks.add(buf);
this.length += length;
}
else
{
// release 0 length buffer directly
release(buf);
}
} }
public void copyChunk(byte[] buf, int offset, int length) public void copyChunk(byte[] buf, int offset, int length)
@ -73,7 +43,11 @@ public class ByteAccumulator
String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize); String err = String.format("Resulting message size [%,d] is too large for configured max of [%,d]", this.length + length, maxSize);
throw new MessageTooLargeException(err); throw new MessageTooLargeException(err);
} }
chunks.add(ByteBuffer.wrap(buf, offset, length));
byte[] copy = new byte[length - offset];
System.arraycopy(buf, offset, copy, 0, length);
chunks.add(copy);
this.length += length; this.length += length;
} }
@ -82,20 +56,6 @@ public class ByteAccumulator
return length; return length;
} }
int getMaxSize()
{
return maxSize;
}
ByteBuffer newByteBuffer(int size)
{
if (bufferPool == null)
{
return ByteBuffer.allocate(size);
}
return (ByteBuffer)bufferPool.acquire(size, false).clear();
}
public void transferTo(ByteBuffer buffer) public void transferTo(ByteBuffer buffer)
{ {
if (buffer.remaining() < length) if (buffer.remaining() < length)
@ -105,30 +65,10 @@ public class ByteAccumulator
} }
int position = buffer.position(); int position = buffer.position();
for (ByteBuffer chunk : chunks) for (byte[] chunk : chunks)
{ {
buffer.put(chunk); buffer.put(chunk, 0, chunk.length);
} }
BufferUtil.flipToFlush(buffer, position); BufferUtil.flipToFlush(buffer, position);
} }
void recycle()
{
length = 0;
for (ByteBuffer chunk : chunks)
{
release(chunk);
}
chunks.clear();
}
void release(ByteBuffer buffer)
{
if (bufferPool != null)
{
bufferPool.release(buffer);
}
}
} }

View File

@ -46,7 +46,7 @@ public abstract class CompressExtension extends AbstractExtension
protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF}; protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF};
protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES); protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES);
private static final Logger LOG = Log.getLogger(CompressExtension.class); private static final Logger LOG = Log.getLogger(CompressExtension.class);
/** /**
* Never drop tail bytes 0000FFFF, from any frame type * Never drop tail bytes 0000FFFF, from any frame type
*/ */
@ -92,7 +92,6 @@ public abstract class CompressExtension extends AbstractExtension
private InflaterPool inflaterPool; private InflaterPool inflaterPool;
private Deflater deflaterImpl; private Deflater deflaterImpl;
private Inflater inflaterImpl; private Inflater inflaterImpl;
protected ByteAccumulator accumulator;
protected AtomicInteger decompressCount = new AtomicInteger(0); protected AtomicInteger decompressCount = new AtomicInteger(0);
private int tailDrop = TAIL_DROP_NEVER; private int tailDrop = TAIL_DROP_NEVER;
private int rsvUse = RSV_USE_ALWAYS; private int rsvUse = RSV_USE_ALWAYS;
@ -177,37 +176,7 @@ public abstract class CompressExtension extends AbstractExtension
protected ByteAccumulator newByteAccumulator() protected ByteAccumulator newByteAccumulator()
{ {
int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageSize()); int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageSize());
if (accumulator == null || accumulator.getMaxSize() != maxSize) return new ByteAccumulator(maxSize);
{
accumulator = new ByteAccumulator(maxSize, getBufferPool());
}
return accumulator;
}
int copyChunk(Inflater inflater, ByteAccumulator accumulator) throws DataFormatException
{
ByteBuffer buf = accumulator.newByteBuffer(DECOMPRESS_BUF_SIZE);
while (buf.hasRemaining())
{
try
{
int read = inflater.inflate(buf.array(), buf.position(), buf.remaining());
if (read <= 0)
{
accumulator.copyChunk((ByteBuffer)buf.flip());
return read;
}
buf.position(buf.position() + read);
}
catch (DataFormatException e)
{
accumulator.release(buf);
throw e;
}
}
int position = buf.position();
accumulator.copyChunk((ByteBuffer)buf.flip());
return position;
} }
protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException
@ -216,10 +185,10 @@ public abstract class CompressExtension extends AbstractExtension
{ {
return; return;
} }
byte[] output = new byte[DECOMPRESS_BUF_SIZE];
Inflater inflater = getInflater(); Inflater inflater = getInflater();
while (buf.hasRemaining() && inflater.needsInput()) while (buf.hasRemaining() && inflater.needsInput())
{ {
if (!supplyInput(inflater, buf)) if (!supplyInput(inflater, buf))
@ -229,12 +198,22 @@ public abstract class CompressExtension extends AbstractExtension
return; return;
} }
while (true) int read;
while ((read = inflater.inflate(output)) >= 0)
{ {
if (copyChunk(inflater, accumulator) <= 0) if (read == 0)
{ {
if (LOG.isDebugEnabled())
LOG.debug("Decompress: read 0 {}", toDetail(inflater));
break; break;
} }
else
{
// do something with output
if (LOG.isDebugEnabled())
LOG.debug("Decompressed {} bytes: {}", read, toDetail(inflater));
accumulator.copyChunk(output, 0, read);
}
} }
} }

View File

@ -47,7 +47,7 @@ public class DeflateFrameExtension extends CompressExtension
{ {
return TAIL_DROP_ALWAYS; return TAIL_DROP_ALWAYS;
} }
@Override @Override
public void incomingFrame(Frame frame) public void incomingFrame(Frame frame)
{ {
@ -63,7 +63,7 @@ public class DeflateFrameExtension extends CompressExtension
try try
{ {
accumulator = newByteAccumulator(); ByteAccumulator accumulator = newByteAccumulator();
decompress(accumulator, frame.getPayload()); decompress(accumulator, frame.getPayload());
decompress(accumulator, TAIL_BYTES_BUF.slice()); decompress(accumulator, TAIL_BYTES_BUF.slice());
forwardIncoming(frame, accumulator); forwardIncoming(frame, accumulator);
@ -72,10 +72,5 @@ public class DeflateFrameExtension extends CompressExtension
{ {
throw new BadPayloadException(e); throw new BadPayloadException(e);
} }
finally
{
if (accumulator != null)
accumulator.recycle();
}
} }
} }

View File

@ -78,9 +78,10 @@ public class PerMessageDeflateExtension extends CompressExtension
throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame"); throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame");
} }
ByteAccumulator accumulator = newByteAccumulator();
try try
{ {
accumulator = newByteAccumulator();
ByteBuffer payload = frame.getPayload(); ByteBuffer payload = frame.getPayload();
decompress(accumulator, payload); decompress(accumulator, payload);
if (frame.isFin()) if (frame.isFin())
@ -89,17 +90,11 @@ public class PerMessageDeflateExtension extends CompressExtension
} }
forwardIncoming(frame, accumulator); forwardIncoming(frame, accumulator);
} }
catch (DataFormatException e) catch (DataFormatException e)
{ {
throw new BadPayloadException(e); throw new BadPayloadException(e);
} }
finally
{
if (accumulator != null)
accumulator.recycle();
}
if (frame.isFin()) if (frame.isFin())
incomingCompressed = false; incomingCompressed = false;