Issue #300 - shared Inflater/Deflater pools for WebSocket
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
e8115268d4
commit
c30c335df3
|
@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public abstract class CompressionPool<T>
|
public abstract class CompressionPool<T>
|
||||||
{
|
{
|
||||||
|
public static final int INFINITE_CAPACITY = -1;
|
||||||
|
|
||||||
private final Queue<T> _pool;
|
private final Queue<T> _pool;
|
||||||
private final AtomicInteger _numObjects = new AtomicInteger(0);
|
private final AtomicInteger _numObjects = new AtomicInteger(0);
|
||||||
private final int _capacity;
|
private final int _capacity;
|
||||||
|
@ -77,7 +79,7 @@ public abstract class CompressionPool<T>
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param object returns this Object to the pool or calls {@link #end(T)} if the pool is full.
|
* @param object returns this Object to the pool or calls {@link #end(Object)} if the pool is full.
|
||||||
*/
|
*/
|
||||||
public void release(T object)
|
public void release(T object)
|
||||||
{
|
{
|
||||||
|
|
|
@ -18,16 +18,24 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.websocket.common.extensions;
|
package org.eclipse.jetty.websocket.common.extensions;
|
||||||
|
|
||||||
|
import java.util.zip.Deflater;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.StringUtil;
|
import org.eclipse.jetty.util.StringUtil;
|
||||||
|
import org.eclipse.jetty.util.compression.CompressionPool;
|
||||||
|
import org.eclipse.jetty.util.compression.DeflaterPool;
|
||||||
|
import org.eclipse.jetty.util.compression.InflaterPool;
|
||||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||||
import org.eclipse.jetty.websocket.api.extensions.Extension;
|
import org.eclipse.jetty.websocket.api.extensions.Extension;
|
||||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
|
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
|
||||||
|
import org.eclipse.jetty.websocket.common.extensions.compress.CompressExtension;
|
||||||
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
|
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
|
||||||
|
|
||||||
public class WebSocketExtensionFactory extends ExtensionFactory
|
public class WebSocketExtensionFactory extends ExtensionFactory
|
||||||
{
|
{
|
||||||
private WebSocketContainerScope container;
|
private WebSocketContainerScope container;
|
||||||
|
private final InflaterPool inflaterPool = new InflaterPool(CompressionPool.INFINITE_CAPACITY, true);
|
||||||
|
private final DeflaterPool deflaterPool = new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true);
|
||||||
|
|
||||||
public WebSocketExtensionFactory(WebSocketContainerScope container)
|
public WebSocketExtensionFactory(WebSocketContainerScope container)
|
||||||
{
|
{
|
||||||
|
@ -64,6 +72,13 @@ public class WebSocketExtensionFactory extends ExtensionFactory
|
||||||
aext.init(container);
|
aext.init(container);
|
||||||
aext.setConfig(config);
|
aext.setConfig(config);
|
||||||
}
|
}
|
||||||
|
if (ext instanceof CompressExtension)
|
||||||
|
{
|
||||||
|
CompressExtension cext = (CompressExtension)ext;
|
||||||
|
cext.setInflaterPool(inflaterPool);
|
||||||
|
cext.setDeflaterPool(deflaterPool);
|
||||||
|
}
|
||||||
|
|
||||||
return ext;
|
return ext;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
|
|
|
@ -71,16 +71,10 @@ public abstract class CompressExtension extends AbstractExtension
|
||||||
|
|
||||||
/** Inflater : Output Buffer Size */
|
/** Inflater : Output Buffer Size */
|
||||||
private static final int DECOMPRESS_BUF_SIZE = 8 * 1024;
|
private static final int DECOMPRESS_BUF_SIZE = 8 * 1024;
|
||||||
|
|
||||||
private final static boolean NOWRAP = true;
|
|
||||||
private final static int POOL_CAPACITY = -1;
|
|
||||||
|
|
||||||
private static final DeflaterPool deflaterPool = new DeflaterPool(POOL_CAPACITY,
|
|
||||||
Deflater.DEFAULT_COMPRESSION, NOWRAP);
|
|
||||||
private static final InflaterPool inflaterPool = new InflaterPool(POOL_CAPACITY, NOWRAP);
|
|
||||||
|
|
||||||
private final Queue<FrameEntry> entries = new ArrayDeque<>();
|
private final Queue<FrameEntry> entries = new ArrayDeque<>();
|
||||||
private final IteratingCallback flusher = new Flusher();
|
private final IteratingCallback flusher = new Flusher();
|
||||||
|
private DeflaterPool deflaterPool;
|
||||||
|
private InflaterPool inflaterPool;
|
||||||
private Deflater deflaterImpl;
|
private Deflater deflaterImpl;
|
||||||
private Inflater inflaterImpl;
|
private Inflater inflaterImpl;
|
||||||
protected AtomicInteger decompressCount = new AtomicInteger(0);
|
protected AtomicInteger decompressCount = new AtomicInteger(0);
|
||||||
|
@ -93,6 +87,16 @@ public abstract class CompressExtension extends AbstractExtension
|
||||||
rsvUse = getRsvUseMode();
|
rsvUse = getRsvUseMode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setInflaterPool(InflaterPool inflaterPool)
|
||||||
|
{
|
||||||
|
this.inflaterPool = inflaterPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDeflaterPool(DeflaterPool deflaterPool)
|
||||||
|
{
|
||||||
|
this.deflaterPool = deflaterPool;
|
||||||
|
}
|
||||||
|
|
||||||
public Deflater getDeflater()
|
public Deflater getDeflater()
|
||||||
{
|
{
|
||||||
if (deflaterImpl == null)
|
if (deflaterImpl == null)
|
||||||
|
|
|
@ -34,6 +34,9 @@ import org.eclipse.jetty.io.RuntimeIOException;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.StringUtil;
|
import org.eclipse.jetty.util.StringUtil;
|
||||||
import org.eclipse.jetty.util.TypeUtil;
|
import org.eclipse.jetty.util.TypeUtil;
|
||||||
|
import org.eclipse.jetty.util.compression.CompressionPool;
|
||||||
|
import org.eclipse.jetty.util.compression.DeflaterPool;
|
||||||
|
import org.eclipse.jetty.util.compression.InflaterPool;
|
||||||
import org.eclipse.jetty.util.log.Log;
|
import org.eclipse.jetty.util.log.Log;
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||||
|
@ -68,6 +71,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
||||||
private static final Logger LOG = Log.getLogger(DeflateFrameExtensionTest.class);
|
private static final Logger LOG = Log.getLogger(DeflateFrameExtensionTest.class);
|
||||||
|
|
||||||
public ByteBufferPool bufferPool = new MappedByteBufferPool();
|
public ByteBufferPool bufferPool = new MappedByteBufferPool();
|
||||||
|
public DeflaterPool deflaterPool = new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true);
|
||||||
|
public InflaterPool inflaterPool = new InflaterPool(CompressionPool.INFINITE_CAPACITY, true);
|
||||||
|
|
||||||
private void assertIncoming(byte[] raw, String... expectedTextDatas)
|
private void assertIncoming(byte[] raw, String... expectedTextDatas)
|
||||||
{
|
{
|
||||||
|
@ -75,6 +80,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
||||||
|
|
||||||
DeflateFrameExtension ext = new DeflateFrameExtension();
|
DeflateFrameExtension ext = new DeflateFrameExtension();
|
||||||
ext.setBufferPool(bufferPool);
|
ext.setBufferPool(bufferPool);
|
||||||
|
ext.setDeflaterPool(deflaterPool);
|
||||||
|
ext.setInflaterPool(inflaterPool);
|
||||||
ext.setPolicy(policy);
|
ext.setPolicy(policy);
|
||||||
|
|
||||||
ExtensionConfig config = ExtensionConfig.parse("deflate-frame");
|
ExtensionConfig config = ExtensionConfig.parse("deflate-frame");
|
||||||
|
@ -119,6 +126,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
||||||
|
|
||||||
DeflateFrameExtension ext = new DeflateFrameExtension();
|
DeflateFrameExtension ext = new DeflateFrameExtension();
|
||||||
ext.setBufferPool(bufferPool);
|
ext.setBufferPool(bufferPool);
|
||||||
|
ext.setDeflaterPool(deflaterPool);
|
||||||
|
ext.setInflaterPool(inflaterPool);
|
||||||
ext.setPolicy(policy);
|
ext.setPolicy(policy);
|
||||||
|
|
||||||
ExtensionConfig config = ExtensionConfig.parse("deflate-frame");
|
ExtensionConfig config = ExtensionConfig.parse("deflate-frame");
|
||||||
|
@ -251,6 +260,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
||||||
{
|
{
|
||||||
ext.setConfig(new ExtensionConfig(ext.getName()));
|
ext.setConfig(new ExtensionConfig(ext.getName()));
|
||||||
ext.setBufferPool(bufferPool);
|
ext.setBufferPool(bufferPool);
|
||||||
|
ext.setDeflaterPool(deflaterPool);
|
||||||
|
ext.setInflaterPool(inflaterPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -303,6 +314,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
||||||
|
|
||||||
DeflateFrameExtension ext = new DeflateFrameExtension();
|
DeflateFrameExtension ext = new DeflateFrameExtension();
|
||||||
ext.setBufferPool(bufferPool);
|
ext.setBufferPool(bufferPool);
|
||||||
|
ext.setDeflaterPool(deflaterPool);
|
||||||
|
ext.setInflaterPool(inflaterPool);
|
||||||
ext.setPolicy(policy);
|
ext.setPolicy(policy);
|
||||||
ext.setConfig(new ExtensionConfig(ext.getName()));
|
ext.setConfig(new ExtensionConfig(ext.getName()));
|
||||||
|
|
||||||
|
@ -395,6 +408,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
||||||
|
|
||||||
DeflateFrameExtension clientExtension = new DeflateFrameExtension();
|
DeflateFrameExtension clientExtension = new DeflateFrameExtension();
|
||||||
clientExtension.setBufferPool(bufferPool);
|
clientExtension.setBufferPool(bufferPool);
|
||||||
|
clientExtension.setDeflaterPool(deflaterPool);
|
||||||
|
clientExtension.setInflaterPool(inflaterPool);
|
||||||
clientExtension.setPolicy(WebSocketPolicy.newClientPolicy());
|
clientExtension.setPolicy(WebSocketPolicy.newClientPolicy());
|
||||||
clientExtension.getPolicy().setMaxBinaryMessageSize(maxMessageSize);
|
clientExtension.getPolicy().setMaxBinaryMessageSize(maxMessageSize);
|
||||||
clientExtension.getPolicy().setMaxBinaryMessageBufferSize(maxMessageSize);
|
clientExtension.getPolicy().setMaxBinaryMessageBufferSize(maxMessageSize);
|
||||||
|
@ -402,6 +417,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
||||||
|
|
||||||
final DeflateFrameExtension serverExtension = new DeflateFrameExtension();
|
final DeflateFrameExtension serverExtension = new DeflateFrameExtension();
|
||||||
serverExtension.setBufferPool(bufferPool);
|
serverExtension.setBufferPool(bufferPool);
|
||||||
|
serverExtension.setDeflaterPool(deflaterPool);
|
||||||
|
serverExtension.setInflaterPool(inflaterPool);
|
||||||
serverExtension.setPolicy(WebSocketPolicy.newServerPolicy());
|
serverExtension.setPolicy(WebSocketPolicy.newServerPolicy());
|
||||||
serverExtension.getPolicy().setMaxBinaryMessageSize(maxMessageSize);
|
serverExtension.getPolicy().setMaxBinaryMessageSize(maxMessageSize);
|
||||||
serverExtension.getPolicy().setMaxBinaryMessageBufferSize(maxMessageSize);
|
serverExtension.getPolicy().setMaxBinaryMessageBufferSize(maxMessageSize);
|
||||||
|
|
|
@ -24,12 +24,16 @@ import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.zip.Deflater;
|
||||||
|
|
||||||
import org.eclipse.jetty.io.ByteBufferPool;
|
import org.eclipse.jetty.io.ByteBufferPool;
|
||||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||||
import org.eclipse.jetty.toolchain.test.ByteBufferAssert;
|
import org.eclipse.jetty.toolchain.test.ByteBufferAssert;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.TypeUtil;
|
import org.eclipse.jetty.util.TypeUtil;
|
||||||
|
import org.eclipse.jetty.util.compression.CompressionPool;
|
||||||
|
import org.eclipse.jetty.util.compression.DeflaterPool;
|
||||||
|
import org.eclipse.jetty.util.compression.InflaterPool;
|
||||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||||
import org.eclipse.jetty.websocket.api.ProtocolException;
|
import org.eclipse.jetty.websocket.api.ProtocolException;
|
||||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||||
|
@ -58,6 +62,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
|
public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
|
||||||
{
|
{
|
||||||
public ByteBufferPool bufferPool = new MappedByteBufferPool();
|
public ByteBufferPool bufferPool = new MappedByteBufferPool();
|
||||||
|
public DeflaterPool deflaterPool = new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true);
|
||||||
|
public InflaterPool inflaterPool = new InflaterPool(CompressionPool.INFINITE_CAPACITY, true);
|
||||||
|
|
||||||
private void assertEndsWithTail(String hexStr, boolean expectedResult)
|
private void assertEndsWithTail(String hexStr, boolean expectedResult)
|
||||||
{
|
{
|
||||||
|
@ -284,6 +290,8 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
|
||||||
{
|
{
|
||||||
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
||||||
ext.setBufferPool(bufferPool);
|
ext.setBufferPool(bufferPool);
|
||||||
|
ext.setDeflaterPool(deflaterPool);
|
||||||
|
ext.setInflaterPool(inflaterPool);
|
||||||
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
||||||
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
||||||
ext.setConfig(config);
|
ext.setConfig(config);
|
||||||
|
@ -321,6 +329,8 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
|
||||||
{
|
{
|
||||||
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
||||||
ext.setBufferPool(bufferPool);
|
ext.setBufferPool(bufferPool);
|
||||||
|
ext.setDeflaterPool(deflaterPool);
|
||||||
|
ext.setInflaterPool(inflaterPool);
|
||||||
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
||||||
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
||||||
ext.setConfig(config);
|
ext.setConfig(config);
|
||||||
|
@ -359,6 +369,8 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
|
||||||
{
|
{
|
||||||
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
||||||
ext.setBufferPool(bufferPool);
|
ext.setBufferPool(bufferPool);
|
||||||
|
ext.setDeflaterPool(deflaterPool);
|
||||||
|
ext.setInflaterPool(inflaterPool);
|
||||||
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
||||||
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
||||||
ext.setConfig(config);
|
ext.setConfig(config);
|
||||||
|
@ -415,6 +427,8 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
|
||||||
{
|
{
|
||||||
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
||||||
ext.setBufferPool(bufferPool);
|
ext.setBufferPool(bufferPool);
|
||||||
|
ext.setDeflaterPool(deflaterPool);
|
||||||
|
ext.setInflaterPool(inflaterPool);
|
||||||
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
||||||
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
||||||
ext.setConfig(config);
|
ext.setConfig(config);
|
||||||
|
@ -455,6 +469,8 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
|
||||||
{
|
{
|
||||||
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
|
||||||
ext.setBufferPool(bufferPool);
|
ext.setBufferPool(bufferPool);
|
||||||
|
ext.setDeflaterPool(deflaterPool);
|
||||||
|
ext.setInflaterPool(inflaterPool);
|
||||||
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
ext.setPolicy(WebSocketPolicy.newServerPolicy());
|
||||||
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
|
||||||
ext.setConfig(config);
|
ext.setConfig(config);
|
||||||
|
|
Loading…
Reference in New Issue