417133 - WebSocket / deflate-frame should accumulate decompress byte buffers properly
+ Adding ByteAccumulator to make job easier and more sane (with a minimum of byte buffer copying.
This commit is contained in:
parent
eecc17ee67
commit
5445c42ffe
|
@ -0,0 +1,83 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common.extensions.compress;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.websocket.api.MessageTooLargeException;
|
||||
|
||||
public class ByteAccumulator
|
||||
{
|
||||
private static class Buf
|
||||
{
|
||||
public Buf(byte[] buffer, int offset, int length)
|
||||
{
|
||||
this.buffer = buffer;
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
}
|
||||
|
||||
byte[] buffer;
|
||||
int offset;
|
||||
int length;
|
||||
}
|
||||
|
||||
private final int maxSize;
|
||||
private int length = 0;
|
||||
private List<Buf> buffers;
|
||||
|
||||
public ByteAccumulator(int maxOverallBufferSize)
|
||||
{
|
||||
this.maxSize = maxOverallBufferSize;
|
||||
this.buffers = new ArrayList<>();
|
||||
}
|
||||
|
||||
public void addBuffer(byte buf[], int offset, int length)
|
||||
{
|
||||
if (buf.length + length > maxSize)
|
||||
{
|
||||
throw new MessageTooLargeException("Frame is too large");
|
||||
}
|
||||
buffers.add(new Buf(buf,offset,length));
|
||||
this.length += length;
|
||||
}
|
||||
|
||||
public int getLength()
|
||||
{
|
||||
return length;
|
||||
}
|
||||
|
||||
public ByteBuffer getByteBuffer(ByteBufferPool pool)
|
||||
{
|
||||
ByteBuffer ret = pool.acquire(length,false);
|
||||
BufferUtil.clearToFill(ret);
|
||||
|
||||
for (Buf buf : buffers)
|
||||
{
|
||||
ret.put(buf.buffer, buf.offset, buf.length);
|
||||
}
|
||||
|
||||
BufferUtil.flipToFlush(ret,0);
|
||||
return ret;
|
||||
}
|
||||
}
|
|
@ -82,11 +82,16 @@ public class DeflateFrameExtension extends AbstractExtension
|
|||
System.arraycopy(TAIL,0,compressed,inlen,TAIL.length);
|
||||
decompressor.setInput(compressed,0,compressed.length);
|
||||
|
||||
// Since we don't track text vs binary vs continuation state, just grab whatever is the greater value.
|
||||
int maxSize = Math.max(getPolicy().getMaxTextMessageSize(),getPolicy().getMaxBinaryMessageBufferSize());
|
||||
ByteAccumulator accumulator = new ByteAccumulator(maxSize);
|
||||
|
||||
DataFrame out = new DataFrame(frame);
|
||||
out.setRsv1(false); // Unset RSV1
|
||||
|
||||
// Perform decompression
|
||||
while (decompressor.getRemaining() > 0 && !decompressor.finished())
|
||||
{
|
||||
DataFrame out = new DataFrame(frame);
|
||||
out.setRsv1(false); // Unset RSV1
|
||||
byte outbuf[] = new byte[Math.min(inlen * 2,bufferSize)];
|
||||
try
|
||||
{
|
||||
|
@ -104,9 +109,8 @@ public class DeflateFrameExtension extends AbstractExtension
|
|||
}
|
||||
if (len > 0)
|
||||
{
|
||||
out.setPayload(ByteBuffer.wrap(outbuf,0,len));
|
||||
accumulator.addBuffer(outbuf,0,len);
|
||||
}
|
||||
nextIncomingFrame(out);
|
||||
}
|
||||
catch (DataFormatException e)
|
||||
{
|
||||
|
@ -114,6 +118,10 @@ public class DeflateFrameExtension extends AbstractExtension
|
|||
throw new BadPayloadException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// Forward on the frame
|
||||
out.setPayload(accumulator.getByteBuffer(getBufferPool()));
|
||||
nextIncomingFrame(out);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -83,11 +83,16 @@ public class PerMessageDeflateExtension extends AbstractExtension
|
|||
System.arraycopy(TAIL,0,compressed,inlen,TAIL.length);
|
||||
decompressor.setInput(compressed,0,compressed.length);
|
||||
|
||||
// Since we don't track text vs binary vs continuation state, just grab whatever is the greater value.
|
||||
int maxSize = Math.max(getPolicy().getMaxTextMessageSize(),getPolicy().getMaxBinaryMessageBufferSize());
|
||||
ByteAccumulator accumulator = new ByteAccumulator(maxSize);
|
||||
|
||||
DataFrame out = new DataFrame(frame);
|
||||
out.setRsv1(false); // Unset RSV1
|
||||
|
||||
// Perform decompression
|
||||
while (decompressor.getRemaining() > 0 && !decompressor.finished())
|
||||
{
|
||||
DataFrame out = new DataFrame(frame);
|
||||
out.setRsv1(false); // Unset RSV1
|
||||
byte outbuf[] = new byte[Math.min(inlen * 2,bufferSize)];
|
||||
try
|
||||
{
|
||||
|
@ -105,9 +110,8 @@ public class PerMessageDeflateExtension extends AbstractExtension
|
|||
}
|
||||
if (len > 0)
|
||||
{
|
||||
out.setPayload(ByteBuffer.wrap(outbuf,0,len));
|
||||
accumulator.addBuffer(outbuf,0,len);
|
||||
}
|
||||
nextIncomingFrame(out);
|
||||
}
|
||||
catch (DataFormatException e)
|
||||
{
|
||||
|
@ -115,6 +119,10 @@ public class PerMessageDeflateExtension extends AbstractExtension
|
|||
throw new BadPayloadException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// Forward on the frame
|
||||
out.setPayload(accumulator.getByteBuffer(getBufferPool()));
|
||||
nextIncomingFrame(out);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -222,7 +230,8 @@ public class PerMessageDeflateExtension extends AbstractExtension
|
|||
{
|
||||
key = key.trim();
|
||||
String value = config.getParameter(key,null);
|
||||
switch(key) {
|
||||
switch (key)
|
||||
{
|
||||
case "c2s_max_window_bits":
|
||||
negotiated.setParameter("s2c_max_window_bits",value);
|
||||
break;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
<input id="manythreads" class="button" type="submit" name="many" value="manythreads" disabled="disabled"/>
|
||||
<input id="hello" class="button" type="submit" name="hello" value="hello" disabled="disabled"/>
|
||||
<input id="there" class="button" type="submit" name="there" value="there" disabled="disabled"/>
|
||||
<input id="json" class="button" type="submit" name="json" value="json" disabled="disabled"/>
|
||||
</div>
|
||||
<script type="text/javascript">
|
||||
$("connect").onclick = function(event) { wstool.connect(); return false; }
|
||||
|
@ -26,6 +27,11 @@
|
|||
$("manythreads").onclick = function(event) {wstool.write("manythreads:20,25,60"); return false; }
|
||||
$("hello").onclick = function(event) {wstool.write("Hello"); return false; }
|
||||
$("there").onclick = function(event) {wstool.write("There"); return false; }
|
||||
$("json").onclick = function(event) {wstool.write("[{\"channel\":\"/meta/subscribe\",\"subscription\":\"/chat/demo\",\"id\":\"2\",\"clientId\":\"81dwnxwbgs0h0bq8968b0a0gyl\",\"timestamp\":\"Thu,"
|
||||
+ " 12 Sep 2013 19:42:30 GMT\"},{\"channel\":\"/meta/subscribe\",\"subscription\":\"/members/demo\",\"id\":\"3\",\"clientId\":\"81dwnxwbgs0h0bq8968b0a0gyl\",\"timestamp\":\"Thu,"
|
||||
+ " 12 Sep 2013 19:42:30 GMT\"},{\"channel\":\"/chat/demo\",\"data\":{\"user\":\"ch\",\"membership\":\"join\",\"chat\":\"ch"
|
||||
+ " has joined\"},\"id\":\"4\",\"clientId\":\"81dwnxwbgs0h0bq8968b0a0gyl\",\"timestamp\":\"Thu,"
|
||||
+ " 12 Sep 2013 19:42:30 GMT\"}]"); return false; }
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
|
@ -76,6 +76,7 @@ var wstool = {
|
|||
$('manythreads').disabled = !enabled;
|
||||
$('hello').disabled = !enabled;
|
||||
$('there').disabled = !enabled;
|
||||
$('json').disabled = !enabled;
|
||||
},
|
||||
|
||||
_onopen : function() {
|
||||
|
|
Loading…
Reference in New Issue