WebSocket / releasing (header) buffer acquired during generation

This commit is contained in:
Joakim Erdfelt 2013-08-20 09:57:57 -07:00
parent 9bb6d91742
commit 34f03cb0e8
4 changed files with 39 additions and 12 deletions

View File

@ -1,7 +1,7 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG

View File

@ -45,6 +45,8 @@ public class WriteBytesProvider implements Callback
protected final AtomicBoolean failed = new AtomicBoolean(false);
protected final Frame frame;
protected final Callback callback;
/** holds reference to header ByteBuffer, as it needs to be released on success/failure */
private ByteBuffer headerBuffer;
public FrameEntry(Frame frame, Callback callback)
{
@ -54,16 +56,20 @@ public class WriteBytesProvider implements Callback
public ByteBuffer getHeaderBytes()
{
return generator.generateHeaderBytes(frame);
ByteBuffer buf = generator.generateHeaderBytes(frame);
headerBuffer = buf;
return buf;
}
public ByteBuffer getPayloadWindow()
{
// There is no need to release this ByteBuffer, as it is just a slice of the user provided payload
return generator.getPayloadWindow(bufferSize,frame);
}
public void notifyFailure(Throwable t)
{
freeBuffers();
if (failed.getAndSet(true) == false)
{
notifySafeFailure(callback,t);
@ -72,6 +78,7 @@ public class WriteBytesProvider implements Callback
public void notifySucceeded()
{
freeBuffers();
if (callback == null)
{
return;
@ -86,6 +93,15 @@ public class WriteBytesProvider implements Callback
}
}
public void freeBuffers()
{
if (headerBuffer != null)
{
generator.getBufferPool().release(headerBuffer);
headerBuffer = null;
}
}
/**
* Indicate that the frame entry is done generating
*/
@ -113,8 +129,6 @@ public class WriteBytesProvider implements Callback
private FrameEntry active;
/** Tracking for failure */
private Throwable failure;
/** The last requested buffer */
private ByteBuffer buffer;
/** Is WriteBytesProvider closed to more WriteBytes being enqueued? */
private AtomicBoolean closed;
@ -208,7 +222,7 @@ public class WriteBytesProvider implements Callback
failure = t;
// fail past
while(!past.isEmpty())
while (!past.isEmpty())
{
FrameEntry entry = past.pop();
entry.notifyFailure(t);
@ -216,7 +230,7 @@ public class WriteBytesProvider implements Callback
}
// fail others
while(!queue.isEmpty())
while (!queue.isEmpty())
{
FrameEntry entry = queue.pop();
entry.notifyFailure(t);
@ -348,9 +362,6 @@ public class WriteBytesProvider implements Callback
@Override
public void succeeded()
{
// Release the active byte buffer first
generator.getBufferPool().release(buffer);
if ((active != null) && (active.frame.remaining() <= 0))
{
// All done with active FrameEntry

View File

@ -24,16 +24,21 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Hex;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.junit.Assert;
import org.junit.Test;
public class WriteBytesProviderTest
{
private static final Logger LOG = Log.getLogger(WriteBytesProviderTest.class);
private WriteBytesProvider bytesProvider;
private void assertCallbackSuccessCount(TrackingCallback callback, int expectedSuccsesCount)
@ -109,7 +114,8 @@ public class WriteBytesProviderTest
UnitGenerator generator = new UnitGenerator();
TrackingCallback flushCallback = new TrackingCallback();
bytesProvider = new WriteBytesProvider(generator,flushCallback);
bytesProvider.setBufferSize(30);
int bufferSize = 30;
bytesProvider.setBufferSize(bufferSize);
// Create frames for provider
TrackingCallback binCallback = new TrackingCallback();
@ -117,7 +123,7 @@ public class WriteBytesProviderTest
int binPayloadSize = 50;
byte bin[] = new byte[binPayloadSize];
Arrays.fill(bin,(byte)0x00);
WebSocketFrame binFrame = WebSocketFrame.binary(bin);
BinaryFrame binFrame = new BinaryFrame(bin);
byte maskingKey[] = Hex.asByteArray("11223344");
binFrame.setMask(maskingKey);
bytesProvider.enqueue(binFrame,binCallback);
@ -126,6 +132,7 @@ public class WriteBytesProviderTest
// get bytes out
List<ByteBuffer> bytes = bytesProvider.getByteBuffers();
Assert.assertThat("Number of buffers",bytes.size(),is(5));
assertBufferLengths(bytes,6,bufferSize,binPayloadSize-bufferSize,2,0);
// Test byte values
StringBuilder expected = new StringBuilder();
@ -150,6 +157,14 @@ public class WriteBytesProviderTest
assertCallbackSuccessCount(closeCallback,1);
}
private void assertBufferLengths(List<ByteBuffer> bytes, int... expectedLengths)
{
for (int i = 0; i < expectedLengths.length; i++)
{
Assert.assertThat("Buffer[" + i + "].remaining",bytes.get(i).remaining(),is(expectedLengths[i]));
}
}
private void assertExpectedBytes(List<ByteBuffer> bytes, String expected)
{
String actual = gatheredHex(bytes);
@ -161,6 +176,7 @@ public class WriteBytesProviderTest
int len = 0;
for (ByteBuffer buf : bytes)
{
LOG.debug("buffer[] {}", BufferUtil.toDetailString(buf));
len += buf.remaining();
}
len = len * 2;

View File

@ -2,7 +2,7 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG