Making Generator and WebSocketFrame collaborate to allow for windowed buffer creation

This commit is contained in:
Joakim Erdfelt 2012-07-18 11:36:30 -07:00
parent 85d7f9712a
commit 58e181f463
5 changed files with 196 additions and 121 deletions

View File

@ -159,10 +159,13 @@ public class Generator
public ByteBuffer generate(int bufferSize, WebSocketFrame frame) public ByteBuffer generate(int bufferSize, WebSocketFrame frame)
{ {
LOG.debug(String.format("Generate.Frame[opcode=%s,fin=%b,cont=%b,rsv1=%b,rsv2=%b,rsv3=%b,mask=%b,plength=%d]",frame.getOpCode().toString(), if (LOG.isDebugEnabled())
frame.isFin(),frame.isContinuation(),frame.isRsv1(),frame.isRsv2(),frame.isRsv3(),frame.isMasked(),frame.getPayloadLength())); {
LOG.debug(String.format(
assertFrameValid(frame); "Generate.Frame[opcode=%s,fin=%b,cont=%b,rsv1=%b,rsv2=%b,rsv3=%b,mask=%b,plength=%d,payloadStart=%s,remaining=%d,position=%s]",frame
.getOpCode().toString(),frame.isFin(),frame.isContinuation(),frame.isRsv1(),frame.isRsv2(),frame.isRsv3(),frame.isMasked(),frame
.getPayloadLength(),frame.getPayloadStart(),frame.remaining(),frame.position()));
}
/* /*
* prepare the byte buffer to put frame into * prepare the byte buffer to put frame into
@ -170,117 +173,138 @@ public class Generator
ByteBuffer buffer = bufferPool.acquire(bufferSize,true); ByteBuffer buffer = bufferPool.acquire(bufferSize,true);
BufferUtil.clearToFill(buffer); BufferUtil.clearToFill(buffer);
/* if (frame.remaining() == frame.getPayloadLength())
* start the generation process
*/
byte b;
// Setup fin thru opcode
b = 0x00;
if (frame.isFin())
{ {
b |= 0x80; // 1000_0000 // we need a framing header
} assertFrameValid(frame);
if (frame.isRsv1())
{
b |= 0x40; // 0100_0000
}
if (frame.isRsv2())
{
b |= 0x20; // 0010_0000
}
if (frame.isRsv3())
{
b |= 0x10;
}
byte opcode = frame.getOpCode().getCode(); /*
* start the generation process
*/
byte b;
if (frame.isContinuation()) // Setup fin thru opcode
{ b = 0x00;
// Continuations are not the same OPCODE if (frame.isFin())
opcode = OpCode.CONTINUATION.getCode(); {
} b |= 0x80; // 1000_0000
}
if (frame.isRsv1())
{
b |= 0x40; // 0100_0000
}
if (frame.isRsv2())
{
b |= 0x20; // 0010_0000
}
if (frame.isRsv3())
{
b |= 0x10;
}
b |= opcode & 0x0F; byte opcode = frame.getOpCode().getCode();
buffer.put(b); if (frame.isContinuation())
{
// Continuations are not the same OPCODE
opcode = OpCode.CONTINUATION.getCode();
}
// is masked b |= opcode & 0x0F;
b = 0x00;
b |= (frame.isMasked()?0x80:0x00);
// payload lengths
int payloadLength = frame.getPayloadLength();
/*
* if length is over 65535 then its a 7 + 64 bit length
*/
if (payloadLength > 0xFF_FF)
{
// we have a 64 bit length
b |= 0x7F;
buffer.put(b); // indicate 8 byte length
buffer.put((byte)0); //
buffer.put((byte)0); // anything over an
buffer.put((byte)0); // int is just
buffer.put((byte)0); // intsane!
buffer.put((byte)((payloadLength >> 24) & 0xFF));
buffer.put((byte)((payloadLength >> 16) & 0xFF));
buffer.put((byte)((payloadLength >> 8) & 0xFF));
buffer.put((byte)(payloadLength & 0xFF));
}
/*
* if payload is ge 126 we have a 7 + 16 bit length
*/
else if (payloadLength >= 0x7E)
{
b |= 0x7E;
buffer.put(b); // indicate 2 byte length
buffer.put((byte)(payloadLength >> 8));
buffer.put((byte)(payloadLength & 0xFF));
}
/*
* we have a 7 bit length
*/
else
{
b |= (payloadLength & 0x7F);
buffer.put(b); buffer.put(b);
}
// masking key // is masked
if (frame.isMasked()) b = 0x00;
{ b |= (frame.isMasked()?0x80:0x00);
buffer.put(frame.getMask());
}
// remember the position // payload lengths
int positionPrePayload = buffer.position(); int payloadLength = frame.getPayloadLength();
/*
* if length is over 65535 then its a 7 + 64 bit length
*/
if (payloadLength > 0xFF_FF)
{
// we have a 64 bit length
b |= 0x7F;
buffer.put(b); // indicate 8 byte length
buffer.put((byte)0); //
buffer.put((byte)0); // anything over an
buffer.put((byte)0); // int is just
buffer.put((byte)0); // intsane!
buffer.put((byte)((payloadLength >> 24) & 0xFF));
buffer.put((byte)((payloadLength >> 16) & 0xFF));
buffer.put((byte)((payloadLength >> 8) & 0xFF));
buffer.put((byte)(payloadLength & 0xFF));
}
/*
* if payload is ge 126 we have a 7 + 16 bit length
*/
else if (payloadLength >= 0x7E)
{
b |= 0x7E;
buffer.put(b); // indicate 2 byte length
buffer.put((byte)(payloadLength >> 8));
buffer.put((byte)(payloadLength & 0xFF));
}
/*
* we have a 7 bit length
*/
else
{
b |= (payloadLength & 0x7F);
buffer.put(b);
}
// masking key
if (frame.isMasked())
{
buffer.put(frame.getMask());
}
}
// copy payload // copy payload
if (frame.hasPayload()) if (frame.hasPayload())
{ {
buffer.put(frame.getPayload()); // remember the position
} int maskingStartPosition = buffer.position();
int positionPostPayload = buffer.position(); // remember the offset within the frame payload (for working with
// windowed frames that don't split on 4 byte barriers)
int payloadOffset = frame.getPayload().position();
int payloadStart = frame.getPayloadStart();
// mask it if needed // put as much as possible into the buffer
if (frame.isMasked()) BufferUtil.put(frame.getPayload(),buffer);
{
// move back to remembered position. // mask it if needed
int size = positionPostPayload - positionPrePayload; if (frame.isMasked())
byte[] mask = frame.getMask();
int pos;
for (int i = 0; i < size; i++)
{ {
pos = positionPrePayload + i; // move back to remembered position.
// Mask each byte by its absolute position in the bytebuffer int size = buffer.position() - maskingStartPosition;
buffer.put(pos,(byte)(buffer.get(pos) ^ mask[i % 4])); byte[] mask = frame.getMask();
byte b;
int posBuf;
int posFrame;
for (int i = 0; i < size; i++)
{
posBuf = i + maskingStartPosition;
posFrame = i + (payloadOffset - payloadStart);
// get raw byte from buffer.
b = buffer.get(posBuf);
// mask, using offset information from frame windowing.
b ^= mask[posFrame % 4];
// Mask each byte by its absolute position in the bytebuffer
buffer.put(posBuf,b);
}
} }
} }
BufferUtil.flipToFlush(buffer,0);
return buffer; return buffer;
} }

View File

@ -93,6 +93,10 @@ public class WebSocketFrame implements Frame
* It is assumed to always be in FLUSH mode (ready to read) in this object. * It is assumed to always be in FLUSH mode (ready to read) in this object.
*/ */
private ByteBuffer data; private ByteBuffer data;
private int payloadLength = 0;
/** position of start of data within a fresh payload */
private int payloadStart = -1;
private boolean continuation = false; private boolean continuation = false;
private int continuationIndex = 0; private int continuationIndex = 0;
@ -207,12 +211,20 @@ public class WebSocketFrame implements Frame
return opcode; return opcode;
} }
/**
* Get the payload ByteBuffer. possible null.
* <p>
*
* @return A {@link ByteBuffer#slice()} of the payload buffer (to prevent modification of the buffer state). Possibly null if no payload present.
* <p>
* Note: this method is exposed via the immutable {@link Frame#getPayload()} method.
*/
@Override @Override
public ByteBuffer getPayload() public ByteBuffer getPayload()
{ {
if (data != null) if (data != null)
{ {
return data.slice(); return data;
} }
else else
{ {
@ -236,12 +248,21 @@ public class WebSocketFrame implements Frame
{ {
return 0; return 0;
} }
return data.remaining(); return payloadLength;
}
public int getPayloadStart()
{
if (data == null)
{
return -1;
}
return payloadStart;
} }
public boolean hasPayload() public boolean hasPayload()
{ {
return ((data != null) && (data.remaining() > 0)); return ((data != null) && (payloadLength > 0));
} }
public boolean isContinuation() public boolean isContinuation()
@ -284,6 +305,29 @@ public class WebSocketFrame implements Frame
return rsv3; return rsv3;
} }
/**
* Get the position currently within the payload data.
* <p>
* Used by flow control, generator and window sizing.
*
* @return the number of bytes remaining in the payload data that has not yet been written out to Network ByteBuffers.
*/
public int position()
{
if (data == null)
{
return -1;
}
return data.position();
}
/**
* Get the number of bytes remaining to write out to the Network ByteBuffer.
* <p>
* Used by flow control, generator and window sizing.
*
* @return the number of bytes remaining in the payload data that has not yet been written out to Network ByteBuffers.
*/
public int remaining() public int remaining()
{ {
if (data == null) if (data == null)
@ -302,6 +346,7 @@ public class WebSocketFrame implements Frame
opcode = null; opcode = null;
masked = false; masked = false;
data = null; data = null;
payloadLength = 0;
mask = null; mask = null;
continuationIndex = 0; continuationIndex = 0;
continuation = false; continuation = false;
@ -366,11 +411,9 @@ public class WebSocketFrame implements Frame
} }
} }
int len = buf.length; data = BufferUtil.toBuffer(buf);
data = ByteBuffer.allocate(len); payloadStart = data.position();
BufferUtil.clearToFill(data); payloadLength = data.limit();
data.put(buf,0,len);
BufferUtil.flipToFlush(data,0);
return this; return this;
} }
@ -396,10 +439,9 @@ public class WebSocketFrame implements Frame
} }
} }
data = ByteBuffer.allocate(len); data = BufferUtil.toBuffer(buf,offset,len);
BufferUtil.clearToFill(data); payloadStart = data.position();
data.put(buf,0,len); payloadLength = data.limit();
BufferUtil.flipToFlush(data,0);
return this; return this;
} }
@ -430,6 +472,8 @@ public class WebSocketFrame implements Frame
} }
data = buf.slice(); data = buf.slice();
payloadStart = data.position();
payloadLength = data.limit();
return this; return this;
} }
@ -470,7 +514,7 @@ public class WebSocketFrame implements Frame
b.append("NO-OP"); b.append("NO-OP");
} }
b.append('['); b.append('[');
b.append("len=").append(getPayloadLength()); b.append("len=").append(payloadLength);
b.append(",fin=").append(fin); b.append(",fin=").append(fin);
b.append(",masked=").append(masked); b.append(",masked=").append(masked);
b.append(",continuation=").append(continuation); b.append(",continuation=").append(continuation);

View File

@ -2,22 +2,13 @@ package org.eclipse.jetty.websocket;
import org.eclipse.jetty.websocket.driver.EventMethodsCacheTest; import org.eclipse.jetty.websocket.driver.EventMethodsCacheTest;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriverTest; import org.eclipse.jetty.websocket.driver.WebSocketEventDriverTest;
import org.eclipse.jetty.websocket.protocol.AcceptHashTest;
import org.eclipse.jetty.websocket.protocol.ClosePayloadParserTest;
import org.eclipse.jetty.websocket.protocol.ParserTest;
import org.eclipse.jetty.websocket.protocol.PingPayloadParserTest;
import org.eclipse.jetty.websocket.protocol.RFC6455ExamplesGeneratorTest;
import org.eclipse.jetty.websocket.protocol.RFC6455ExamplesParserTest;
import org.eclipse.jetty.websocket.protocol.TextPayloadParserTest;
import org.eclipse.jetty.websocket.protocol.WebSocketFrameTest;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Suite; import org.junit.runners.Suite;
@RunWith(Suite.class) @RunWith(Suite.class)
@Suite.SuiteClasses( @Suite.SuiteClasses(
{ org.eclipse.jetty.websocket.ab.AllTests.class, EventMethodsCacheTest.class, WebSocketEventDriverTest.class, AcceptHashTest.class, { org.eclipse.jetty.websocket.ab.AllTests.class, EventMethodsCacheTest.class, WebSocketEventDriverTest.class,
ClosePayloadParserTest.class, ParserTest.class, PingPayloadParserTest.class, RFC6455ExamplesGeneratorTest.class, RFC6455ExamplesParserTest.class, org.eclipse.jetty.websocket.protocol.AllTests.class, GeneratorParserRoundtripTest.class })
TextPayloadParserTest.class, WebSocketFrameTest.class, GeneratorParserRoundtripTest.class })
public class AllTests public class AllTests
{ {
/* nothing to do here */ /* nothing to do here */

View File

@ -0,0 +1,13 @@
package org.eclipse.jetty.websocket.protocol;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses(
{ AcceptHashTest.class, ClosePayloadParserTest.class, GeneratorTest.class, ParserTest.class, PingPayloadParserTest.class, RFC6455ExamplesGeneratorTest.class,
RFC6455ExamplesParserTest.class, TextPayloadParserTest.class, WebSocketFrameTest.class })
public class AllTests
{
/* allow junit annotations to do the heavy lifting */
}

View File

@ -21,8 +21,8 @@ public class GeneratorTest
int totalParts = 0; int totalParts = 0;
int totalBytes = 0; int totalBytes = 0;
int windowSize = 1024; int windowSize = 1024;
int expectedHeaderSize = 4; // TODO: correct size int expectedHeaderSize = 4;
int expectedParts = (payload.length + expectedHeaderSize) / windowSize; int expectedParts = (int)Math.ceil((double)(payload.length + expectedHeaderSize) / windowSize);
Generator generator = new UnitGenerator(); Generator generator = new UnitGenerator();
@ -32,9 +32,12 @@ public class GeneratorTest
Assert.assertThat("Too many parts",totalParts,lessThan(20)); Assert.assertThat("Too many parts",totalParts,lessThan(20));
ByteBuffer buf = generator.generate(windowSize,frame); ByteBuffer buf = generator.generate(windowSize,frame);
// System.out.printf("Generated buf.limit() = %,d%n",buf.limit());
totalBytes += buf.remaining(); totalBytes += buf.remaining();
totalParts++; totalParts++;
done = (frame.remaining() <= 0);
} }
Assert.assertThat("Created Parts",totalParts,is(expectedParts)); Assert.assertThat("Created Parts",totalParts,is(expectedParts));