Implemented synthetic data frames to avoid copying data bytes in case of split frames.

This commit is contained in:
Simone Bordet 2012-02-25 15:07:03 +01:00
parent d43ac786a8
commit 2ce29b740d
2 changed files with 77 additions and 40 deletions

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.frames.DataFrame;
public abstract class DataFrameParser
@ -27,9 +28,13 @@ public abstract class DataFrameParser
private int streamId;
private byte flags;
private int length;
private int remaining;
private ByteBuffer data;
/**
* <p>Parses the given {@link ByteBuffer} for a data frame.</p>
*
* @param buffer the {@link ByteBuffer} to parse
* @return true if the data frame has been fully parsed, false otherwise
*/
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
@ -73,7 +78,6 @@ public abstract class DataFrameParser
length += (currByte & 0xFF) << 8 * cursor;
if (cursor > 0)
break;
remaining = length;
state = State.DATA;
// Fall down if length == 0: we can't loop because the buffer
// may be empty but we need to invoke the application anyway
@ -88,37 +92,23 @@ public abstract class DataFrameParser
// However, TCP may further split the flow control window, so we may
// only have part of the data at this point.
// TODO: introduce synthetic frames instead of accumulating data
int length = Math.min(remaining, buffer.remaining());
int size = Math.min(length, buffer.remaining());
int limit = buffer.limit();
buffer.limit(buffer.position() + length);
buffer.limit(buffer.position() + size);
ByteBuffer bytes = buffer.slice();
buffer.limit(limit);
buffer.position(buffer.position() + length);
remaining -= length;
if (remaining == 0)
{
if (data == null)
buffer.position(buffer.position() + size);
length -= size;
if (length == 0)
{
onData(bytes);
return true;
}
else
{
accumulate(bytes);
onData(data);
return true;
}
}
else
{
// We got only part of the frame data bytes, so we need to copy
// the current data and wait for the remaining to arrive.
if (data == null)
data = bytes;
else
accumulate(bytes);
// We got only part of the frame data bytes,
// so we generate a synthetic data frame
onSyntheticData(bytes);
}
break;
}
@ -131,14 +121,6 @@ public abstract class DataFrameParser
return false;
}
private void accumulate(ByteBuffer bytes)
{
ByteBuffer local = ByteBuffer.allocate(data.remaining() + bytes.remaining());
local.put(data).put(bytes);
local.flip();
data = local;
}
private void onData(ByteBuffer bytes)
{
DataFrame frame = new DataFrame(streamId, flags, bytes.remaining());
@ -146,6 +128,13 @@ public abstract class DataFrameParser
reset();
}
private void onSyntheticData(ByteBuffer bytes)
{
DataFrame frame = new DataFrame(streamId, (byte)(flags & ~DataInfo.FLAG_CLOSE), bytes.remaining());
onDataFrame(frame, bytes);
// Do not reset, we're expecting more data
}
protected abstract void onDataFrame(DataFrame frame, ByteBuffer data);
private void reset()
@ -155,8 +144,6 @@ public abstract class DataFrameParser
streamId = 0;
flags = 0;
length = 0;
remaining = 0;
data = null;
}
private enum State

View File

@ -79,13 +79,63 @@ public class DataGenerateParseTest
Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
parser.addListener(listener);
while (buffer.hasRemaining())
{
parser.parse(ByteBuffer.wrap(new byte[]{buffer.get()}));
if (buffer.remaining() < length)
{
DataFrame frame2 = listener.getDataFrame();
Assert.assertNotNull(frame2);
Assert.assertEquals(streamId, frame2.getStreamId());
Assert.assertEquals(buffer.hasRemaining() ? 0 : DataInfo.FLAG_CLOSE, frame2.getFlags());
Assert.assertEquals(1, frame2.getLength());
Assert.assertEquals(1, listener.getData().remaining());
}
}
}
@Test
public void testGenerateParseWithSyntheticFrames() throws Exception
{
String content = "0123456789ABCDEF";
int length = content.length();
DataInfo data = new StringDataInfo(content, true);
int streamId = 13;
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.data(streamId, 2 * length, data);
Assert.assertNotNull(buffer);
TestSPDYParserListener listener = new TestSPDYParserListener();
Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
parser.addListener(listener);
// Split the buffer to simulate a split boundary in receiving the bytes
int split = 3;
ByteBuffer buffer1 = ByteBuffer.allocate(buffer.remaining() - split);
buffer.limit(buffer.limit() - split);
buffer1.put(buffer);
buffer1.flip();
ByteBuffer buffer2 = ByteBuffer.allocate(split);
buffer.limit(buffer.limit() + split);
buffer2.put(buffer);
buffer2.flip();
parser.parse(buffer1);
DataFrame frame2 = listener.getDataFrame();
Assert.assertNotNull(frame2);
Assert.assertEquals(streamId, frame2.getStreamId());
Assert.assertEquals(DataInfo.FLAG_CLOSE, frame2.getFlags());
Assert.assertEquals(length, frame2.getLength());
Assert.assertEquals(length, listener.getData().remaining());
Assert.assertEquals(0, frame2.getFlags());
Assert.assertEquals(length - split, frame2.getLength());
Assert.assertEquals(length - split, listener.getData().remaining());
parser.parse(buffer2);
DataFrame frame3 = listener.getDataFrame();
Assert.assertNotNull(frame3);
Assert.assertEquals(streamId, frame3.getStreamId());
Assert.assertEquals(DataInfo.FLAG_CLOSE, frame3.getFlags());
Assert.assertEquals(split, frame3.getLength());
Assert.assertEquals(split, listener.getData().remaining());
}
}