414913 - WebSocket / Performance - reduce ByteBuffer allocation/copying during generation/writing

+ Splitting Header bytes from payload bytes.
+ returning List<ByteBuffer> from WriteBytesProvider now
+ returning possibly more than 1 frame per
  WriteBytesProvider.getByteBuffers() for submission to
  Endpoint.write()'s gathered write of multiple bytebuffers
This commit is contained in:
Joakim Erdfelt 2013-08-12 13:28:06 -07:00
parent 6a831dc2ca
commit 46b3dc10c8
27 changed files with 960 additions and 464 deletions

View File

@ -81,10 +81,13 @@ public interface Frame
public ByteBuffer getPayload();
/**
* The original payload length ({@link ByteBuffer#remaining()})
*
* @return the original payload length ({@link ByteBuffer#remaining()})
*/
public int getPayloadLength();
public int getPayloadStart();
public Type getType();
public boolean hasPayload();
@ -108,5 +111,10 @@ public interface Frame
public boolean isRsv3();
/**
* The current number of bytes left to read from the payload ByteBuffer.
*
* @return the current number of bytes left to read from the payload ByteBuffer
*/
public int remaining();
}

View File

@ -230,15 +230,16 @@ public class BlockheadServer
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
{
ByteBuffer buf = generator.generate(frame);
ByteBuffer headerBuf = generator.generateHeaderBytes(frame);
if (LOG.isDebugEnabled())
{
LOG.debug("writing out: {}",BufferUtil.toDetailString(buf));
LOG.debug("writing out: {}",BufferUtil.toDetailString(headerBuf));
}
try
{
BufferUtil.writeTo(buf,out);
BufferUtil.writeTo(headerBuf,out);
BufferUtil.writeTo(generator.getPayloadWindow(frame.getPayloadLength(),frame),out);
out.flush();
if (callback != null)
{

View File

@ -3,8 +3,12 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=DEBUG
org.eclipse.jetty.websocket.client.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.WriteBytesProvider.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG
# Hide the stacktraces during testing
### Hide the stacktraces during testing
org.eclipse.jetty.websocket.client.internal.io.UpgradeConnection.STACKS=false
# org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection$DataFrameBytes.LEVEL=WARN

View File

@ -23,8 +23,6 @@ import java.util.List;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -57,7 +55,6 @@ import org.eclipse.jetty.websocket.api.extensions.Frame;
*/
public class Generator
{
private static final Logger LOG = Log.getLogger(Generator.class);
/**
* The overhead (maximum) for a framing header. Assuming a maximum sized payload with masking key.
*/
@ -65,7 +62,7 @@ public class Generator
private final WebSocketBehavior behavior;
private final ByteBufferPool bufferPool;
private boolean validating;
private final boolean validating;
/** Is there an extension using RSV1 */
private boolean rsv1InUse = false;
@ -175,7 +172,7 @@ public class Generator
this.rsv3InUse = false;
// configure from list of extensions in use
for(Extension ext: exts)
for (Extension ext : exts)
{
if (ext.isRsv1User())
{
@ -192,193 +189,178 @@ public class Generator
}
}
/**
* generate a byte buffer based on the frame being passed in
*
* bufferSize is determined by the length of the payload + 28 for frame overhead
*
* @param frame
* @return
*/
public synchronized ByteBuffer generate(Frame frame)
public ByteBuffer generateHeaderBytes(Frame frame)
{
int bufferSize = frame.getPayloadLength() + OVERHEAD;
return generate(bufferSize,frame);
}
// we need a framing header
assertFrameValid(frame);
/**
* Generate, into a ByteBuffer, no more than bufferSize of contents from the frame. If the frame exceeds the bufferSize, then multiple calls to
* {@link #generate(int, WebSocketFrame)} are required to obtain each window of ByteBuffer to complete the frame.
*/
public synchronized ByteBuffer generate(int windowSize, Frame frame)
{
if (windowSize < OVERHEAD)
{
throw new IllegalArgumentException("Cannot have windowSize less than " + OVERHEAD);
}
LOG.debug("{} Generate: {} (windowSize {})",behavior,frame,windowSize);
ByteBuffer buffer = bufferPool.acquire(OVERHEAD,true);
BufferUtil.flipToFill(buffer);
/*
* prepare the byte buffer to put frame into
* start the generation process
*/
ByteBuffer buffer = bufferPool.acquire(windowSize,false);
BufferUtil.clearToFill(buffer);
if (LOG.isDebugEnabled())
byte b;
// Setup fin thru opcode
b = 0x00;
if (frame.isFin())
{
LOG.debug("Acquired Buffer (windowSize={}): {}",windowSize,BufferUtil.toDetailString(buffer));
b |= 0x80; // 1000_0000
}
// since the buffer from the pool can exceed the window size, artificially
// limit the buffer to the window size.
int newlimit = Math.min(buffer.position() + windowSize,buffer.limit());
buffer.limit(newlimit);
LOG.debug("Buffer limited: {}",buffer);
if (frame.remaining() == frame.getPayloadLength())
if (frame.isRsv1())
{
// we need a framing header
assertFrameValid(frame);
b |= 0x40; // 0100_0000
}
if (frame.isRsv2())
{
b |= 0x20; // 0010_0000
}
if (frame.isRsv3())
{
b |= 0x10;
}
/*
* start the generation process
*/
byte b;
// NOTE: using .getOpCode() here, not .getType().getOpCode() for testing reasons
byte opcode = frame.getOpCode();
// Setup fin thru opcode
b = 0x00;
if (frame.isFin())
{
b |= 0x80; // 1000_0000
}
if (frame.isRsv1())
{
b |= 0x40; // 0100_0000
}
if (frame.isRsv2())
{
b |= 0x20; // 0010_0000
}
if (frame.isRsv3())
{
b |= 0x10;
}
if (frame.isContinuation())
{
// Continuations are not the same OPCODE
opcode = OpCode.CONTINUATION;
}
// NOTE: using .getOpCode() here, not .getType().getOpCode() for testing reasons
byte opcode = frame.getOpCode();
b |= opcode & 0x0F;
if (frame.isContinuation())
{
// Continuations are not the same OPCODE
opcode = OpCode.CONTINUATION;
}
buffer.put(b);
b |= opcode & 0x0F;
// is masked
b = 0x00;
b |= (frame.isMasked()?0x80:0x00);
// payload lengths
int payloadLength = frame.getPayloadLength();
/*
* if length is over 65535 then its a 7 + 64 bit length
*/
if (payloadLength > 0xFF_FF)
{
// we have a 64 bit length
b |= 0x7F;
buffer.put(b); // indicate 8 byte length
buffer.put((byte)0); //
buffer.put((byte)0); // anything over an
buffer.put((byte)0); // int is just
buffer.put((byte)0); // intsane!
buffer.put((byte)((payloadLength >> 24) & 0xFF));
buffer.put((byte)((payloadLength >> 16) & 0xFF));
buffer.put((byte)((payloadLength >> 8) & 0xFF));
buffer.put((byte)(payloadLength & 0xFF));
}
/*
* if payload is greater 126 we have a 7 + 16 bit length
*/
else if (payloadLength >= 0x7E)
{
b |= 0x7E;
buffer.put(b); // indicate 2 byte length
buffer.put((byte)(payloadLength >> 8));
buffer.put((byte)(payloadLength & 0xFF));
}
/*
* we have a 7 bit length
*/
else
{
b |= (payloadLength & 0x7F);
buffer.put(b);
// is masked
b = 0x00;
b |= (frame.isMasked()?0x80:0x00);
// payload lengths
int payloadLength = frame.getPayloadLength();
/*
* if length is over 65535 then its a 7 + 64 bit length
*/
if (payloadLength > 0xFF_FF)
{
// we have a 64 bit length
b |= 0x7F;
buffer.put(b); // indicate 8 byte length
buffer.put((byte)0); //
buffer.put((byte)0); // anything over an
buffer.put((byte)0); // int is just
buffer.put((byte)0); // intsane!
buffer.put((byte)((payloadLength >> 24) & 0xFF));
buffer.put((byte)((payloadLength >> 16) & 0xFF));
buffer.put((byte)((payloadLength >> 8) & 0xFF));
buffer.put((byte)(payloadLength & 0xFF));
}
/*
* if payload is greater 126 we have a 7 + 16 bit length
*/
else if (payloadLength >= 0x7E)
{
b |= 0x7E;
buffer.put(b); // indicate 2 byte length
buffer.put((byte)(payloadLength >> 8));
buffer.put((byte)(payloadLength & 0xFF));
}
/*
* we have a 7 bit length
*/
else
{
b |= (payloadLength & 0x7F);
buffer.put(b);
}
// masking key
if (frame.isMasked())
{
buffer.put(frame.getMask());
}
}
// copy payload
if (frame.hasPayload())
// masking key
if (frame.isMasked())
{
// remember the position
int maskingStartPosition = buffer.position();
byte[] mask = frame.getMask();
buffer.put(mask);
// remember the offset within the frame payload (for working with
// masked windowed frames that don't split on 4 byte barriers)
int payloadOffset = frame.getPayload().position();
int payloadStart = frame.getPayloadStart();
// put as much as possible into the buffer
BufferUtil.put(frame.getPayload(),buffer);
// mask it if needed
if (frame.isMasked())
// perform data masking here
byte mb;
ByteBuffer payload = frame.getPayload();
if ((payload != null) && (payload.remaining() > 0))
{
// move back to remembered position.
int size = buffer.position() - maskingStartPosition;
byte[] mask = frame.getMask();
byte b;
int posBuf;
int posFrame;
for (int i = 0; i < size; i++)
int pos = payload.position();
int limit = payload.limit();
for (int i = pos; i < limit; i++)
{
posBuf = i + maskingStartPosition;
posFrame = i + (payloadOffset - payloadStart);
// get raw byte from buffer.
b = buffer.get(posBuf);
mb = payload.get(i);
// mask, using offset information from frame windowing.
b ^= mask[posFrame % 4];
mb ^= mask[(i - pos) % 4];
// Mask each byte by its absolute position in the payload bytebuffer
buffer.put(posBuf,b);
payload.put(i,mb);
}
}
}
BufferUtil.flipToFlush(buffer,0);
if (LOG.isDebugEnabled())
{
LOG.debug("Generated Buffer: {}",BufferUtil.toDetailString(buffer));
}
return buffer;
}
/**
* Generate the whole frame (header + payload copy) into a single ByteBuffer.
* <p>
* Note: THIS IS SLOW. Only use this if you must.
*
* @param frame
* the frame to generate
*/
public void generateWholeFrame(Frame frame, ByteBuffer buf)
{
buf.put(generateHeaderBytes(frame));
if (frame.hasPayload())
{
buf.put(getPayloadWindow(frame.getPayloadLength(),frame));
}
}
public ByteBufferPool getBufferPool()
{
return bufferPool;
}
public ByteBuffer getPayloadWindow(int windowSize, Frame frame)
{
if (!frame.hasPayload())
{
return BufferUtil.EMPTY_BUFFER;
}
ByteBuffer buffer;
// We will create a slice representing the windowSize of this payload
if (frame.getPayload().remaining() <= windowSize)
{
// remaining will fit within window
buffer = frame.getPayload().slice();
// adjust the frame payload position (mark as read)
frame.getPayload().position(frame.getPayload().limit());
}
else
{
// remaining is over the window size limit, slice it
buffer = frame.getPayload().slice();
buffer.limit(windowSize);
int offset = frame.getPayload().position(); // offset within frame payload
// adjust the frame payload position
int newpos = Math.min(offset + windowSize,frame.getPayload().limit());
frame.getPayload().position(newpos);
}
return buffer;
}
public boolean isRsv1InUse()
{
return rsv1InUse;

View File

@ -85,6 +85,7 @@ public class WebSocketFrame implements Frame
return new WebSocketFrame(OpCode.TEXT).setPayload(msg);
}
// FIXME: make each boolean/bit part of 1 byte (instead of multiple booleans) to save memory
private boolean fin = true;
private boolean rsv1 = false;
private boolean rsv2 = false;
@ -351,14 +352,7 @@ public class WebSocketFrame implements Frame
@Override
public ByteBuffer getPayload()
{
if (data != null)
{
return data;
}
else
{
return null;
}
return data;
}
public String getPayloadAsUTF8()
@ -380,16 +374,6 @@ public class WebSocketFrame implements Frame
return payloadLength;
}
@Override
public int getPayloadStart()
{
if (data == null)
{
return -1;
}
return payloadStart;
}
@Override
public Type getType()
{
@ -593,7 +577,7 @@ public class WebSocketFrame implements Frame
}
}
data = BufferUtil.toBuffer(buf);
data = ByteBuffer.wrap(buf);
payloadStart = data.position();
payloadLength = data.remaining();
return this;
@ -697,7 +681,6 @@ public class WebSocketFrame implements Frame
b.append(rsv3?'1':'.');
b.append(",masked=").append(masked);
b.append(",continuation=").append(continuation);
b.append(",payloadStart=").append(getPayloadStart());
b.append(",remaining=").append(remaining());
b.append(",position=").append(position());
b.append(']');

View File

@ -141,7 +141,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return String.format("%s@%x",FlushInvoker.class.getSimpleName(),hashCode());
}
}
public class OnDisconnectCallback implements WriteCallback
{
@Override
@ -216,13 +216,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
this.setInputBufferSize(policy.getInputBufferSize());
}
@Override
public Executor getExecutor()
{
return super.getExecutor();
}
@Override
public void close()
{
@ -254,7 +254,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void disconnect()
{
LOG.debug("{} disconnect()", policy.getBehavior());
LOG.debug("{} disconnect()",policy.getBehavior());
synchronized (writeBytes)
{
if (!writeBytes.isClosed())
@ -315,7 +315,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void flush()
{
ByteBuffer buffer = null;
List<ByteBuffer> buffers = null;
synchronized (writeBytes)
{
@ -336,22 +336,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return;
}
buffer = writeBytes.getByteBuffer();
if (buffer == null)
{
return;
}
buffers = writeBytes.getByteBuffers();
flushing = true;
if (LOG.isDebugEnabled())
{
LOG.debug("Flushing {} - {}",BufferUtil.toDetailString(buffer),writeBytes);
}
}
write(buffer);
write(buffers);
}
@Override
@ -653,7 +643,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
}
private <C> void write(ByteBuffer buffer)
private <C> void write(List<ByteBuffer> buffer)
{
EndPoint endpoint = getEndPoint();
@ -665,7 +655,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
try
{
endpoint.write(writeBytes,buffer);
int bufsize = buffer.size();
if (bufsize == 1)
{
// simple case
endpoint.write(writeBytes,buffer.get(0));
}
else
{
// gathered writes case
ByteBuffer bbarr[] = buffer.toArray(new ByteBuffer[bufsize]);
endpoint.write(writeBytes,bbarr);
}
}
catch (Throwable t)
{

View File

@ -21,13 +21,14 @@ package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -51,14 +52,14 @@ public class WriteBytesProvider implements Callback
this.callback = callback;
}
public ByteBuffer getByteBuffer()
public ByteBuffer getHeaderBytes()
{
ByteBuffer buffer = generator.generate(bufferSize,frame);
if (LOG.isDebugEnabled())
{
LOG.debug("getByteBuffer() - {}",BufferUtil.toDetailString(buffer));
}
return buffer;
return generator.generateHeaderBytes(frame);
}
public ByteBuffer getPayloadWindow()
{
return generator.getPayloadWindow(bufferSize,frame);
}
public void notifyFailure(Throwable t)
@ -68,6 +69,16 @@ public class WriteBytesProvider implements Callback
notifySafeFailure(callback,t);
}
}
/**
* Indicate that the frame entry is done generating
*
* @return
*/
public boolean isDone()
{
return frame.remaining() <= 0;
}
}
private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
@ -80,6 +91,10 @@ public class WriteBytesProvider implements Callback
private LinkedList<FrameEntry> queue;
/** the buffer input size */
private int bufferSize = 2048;
/** the gathered write bytebuffer array limit */
private int gatheredBufferLimit = 10;
/** Past Frames, not yet notified (from gathered generation/write) */
private LinkedList<FrameEntry> past;
/** Currently active frame */
private FrameEntry active;
/** Tracking for failure */
@ -104,6 +119,7 @@ public class WriteBytesProvider implements Callback
this.generator = Objects.requireNonNull(generator);
this.flushCallback = Objects.requireNonNull(flushCallback);
this.queue = new LinkedList<>();
this.past = new LinkedList<>();
this.closed = new AtomicBoolean(false);
}
@ -176,6 +192,15 @@ public class WriteBytesProvider implements Callback
failure = t;
// fail past
for (FrameEntry fe : past)
{
fe.notifyFailure(t);
notified = true;
}
past.clear();
// fail others
for (FrameEntry fe : queue)
{
@ -213,34 +238,46 @@ public class WriteBytesProvider implements Callback
}
/**
* Get the next ByteBuffer to write.
* Get the next set of ByteBuffers to write.
*
* @return the next ByteBuffer (or null if nothing to write)
* @return the next set of ByteBuffers to write
*/
public ByteBuffer getByteBuffer()
public List<ByteBuffer> getByteBuffers()
{
List<ByteBuffer> bufs = new ArrayList<>();
int count = 0;
synchronized (this)
{
if (active == null)
for (; count < gatheredBufferLimit; count++)
{
if (queue.isEmpty())
if (active == null)
{
// nothing in queue
return null;
if (queue.isEmpty())
{
// nothing in queue
return bufs;
}
// get current topmost entry
active = queue.pop();
// generate header
bufs.add(active.getHeaderBytes());
count++;
}
// get current topmost entry
active = queue.pop();
}
if (active == null)
{
// no active frame available, even in queue.
return null;
// collect payload window
bufs.add(active.getPayloadWindow());
if (active.isDone())
{
past.add(active);
active = null;
}
}
buffer = active.getByteBuffer();
}
return buffer;
LOG.debug("Collected {} ByteBuffers",bufs.size());
return bufs;
}
/**
@ -291,32 +328,36 @@ public class WriteBytesProvider implements Callback
@Override
public void succeeded()
{
Callback successCallback = null;
List<Callback> successNotifiers = new ArrayList<>();
synchronized (this)
{
// Release the active byte buffer first
generator.getBufferPool().release(buffer);
if (active == null)
if (active != null)
{
return;
if (active.frame.remaining() <= 0)
{
// All done with active FrameEntry
successNotifiers.add(active.callback);
// Forget active
active = null;
}
}
if (active.frame.remaining() <= 0)
for (FrameEntry entry : past)
{
// All done with active FrameEntry
successCallback = active.callback;
// Forget active
active = null;
successNotifiers.add(entry.callback);
}
past.clear();
// notify flush callback
flushCallback.succeeded();
}
// Notify success (outside of synchronize lock)
if (successCallback != null)
for (Callback successCallback : successNotifiers)
{
try
{
@ -345,6 +386,7 @@ public class WriteBytesProvider implements Callback
{
b.append(",active=").append(active);
b.append(",queue.size=").append(queue.size());
b.append(",past.size=").append(past.size());
}
b.append(']');
return b.toString();

View File

@ -50,9 +50,13 @@ public class GeneratorParserRoundtripTest
// Generate Buffer
BufferUtil.flipToFill(out);
WebSocketFrame frame = WebSocketFrame.text(message);
out = gen.generate(frame);
ByteBuffer header = gen.generateHeaderBytes(frame);
ByteBuffer payload = gen.getPayloadWindow(frame.getPayloadLength(),frame);
out.put(header);
out.put(payload);
// Parse Buffer
BufferUtil.flipToFlush(out,0);
parser.parse(out);
}
finally
@ -81,6 +85,7 @@ public class GeneratorParserRoundtripTest
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
ByteBuffer out = bufferPool.acquire(8192,false);
BufferUtil.flipToFill(out);
try
{
// Setup Frame
@ -92,9 +97,13 @@ public class GeneratorParserRoundtripTest
frame.setMask(mask);
// Generate Buffer
out = gen.generate(8192,frame);
ByteBuffer header = gen.generateHeaderBytes(frame);
ByteBuffer payload = gen.getPayloadWindow(8192,frame);
out.put(header);
out.put(payload);
// Parse Buffer
BufferUtil.flipToFlush(out,0);
parser.parse(out);
}
finally

View File

@ -21,57 +21,221 @@ package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.junit.Assert;
import org.junit.Test;
public class GeneratorTest
{
private static final Logger LOG = Log.getLogger(GeneratorTest.WindowHelper.class);
public static class WindowHelper
{
final int windowSize;
int totalParts;
int totalBytes;
public WindowHelper(int windowSize)
{
this.windowSize = windowSize;
this.totalParts = 0;
this.totalBytes = 0;
}
public ByteBuffer generateWindowed(Frame... frames)
{
// Create Buffer to hold all generated frames in a single buffer
int completeBufSize = 0;
for (Frame f : frames)
{
completeBufSize += Generator.OVERHEAD + f.getPayloadLength();
}
ByteBuffer completeBuf = ByteBuffer.allocate(completeBufSize);
BufferUtil.clearToFill(completeBuf);
// Generate from all frames
Generator generator = new UnitGenerator();
for (Frame f : frames)
{
ByteBuffer header = generator.generateHeaderBytes(f);
totalBytes += BufferUtil.put(header,completeBuf);
// Generate using windowing
boolean done = false;
while (!done)
{
ByteBuffer window = generator.getPayloadWindow(windowSize,f);
Assert.assertThat("Generated should not exceed window size",window.remaining(),lessThanOrEqualTo(windowSize));
totalBytes += window.remaining();
completeBuf.put(window);
totalParts++;
done = (f.remaining() <= 0);
}
}
// Return results
BufferUtil.flipToFlush(completeBuf,0);
return completeBuf;
}
public void assertTotalParts(int expectedParts)
{
Assert.assertThat("Generated Parts",totalParts,is(expectedParts));
}
public void assertTotalBytes(int expectedBytes)
{
Assert.assertThat("Generated Bytes",totalBytes,is(expectedBytes));
}
}
private void assertGeneratedBytes(CharSequence expectedBytes, Frame... frames)
{
// collect up all frames as single ByteBuffer
ByteBuffer allframes = UnitGenerator.generate(frames);
// Get hex String form of all frames bytebuffer.
String actual = Hex.asHex(allframes);
// Validate
Assert.assertThat("Buffer",actual,is(expectedBytes.toString()));
}
private String asMaskedHex(String str, byte[] maskingKey)
{
byte utf[] = StringUtil.getUtf8Bytes(str);
mask(utf, maskingKey);
return Hex.asHex(utf);
}
private void mask(byte[] buf, byte[] maskingKey)
{
int size = buf.length;
for (int i = 0; i < size; i++)
{
buf[i] ^= maskingKey[i % 4];
}
}
@Test
public void testClose_Empty()
{
// 0 byte payload (no status code)
assertGeneratedBytes("8800",new WebSocketFrame(OpCode.CLOSE));
}
@Test
public void testClose_CodeNoReason()
{
CloseInfo close = new CloseInfo(StatusCode.NORMAL);
// 2 byte payload (2 bytes for status code)
assertGeneratedBytes("880203E8",close.asFrame());
}
@Test
public void testClose_CodeOkReason()
{
CloseInfo close = new CloseInfo(StatusCode.NORMAL,"OK");
// 4 byte payload (2 bytes for status code, 2 more for "OK")
assertGeneratedBytes("880403E84F4B",close.asFrame());
}
@Test
public void testText_Hello()
{
WebSocketFrame frame = WebSocketFrame.text("Hello");
byte utf[] = StringUtil.getUtf8Bytes("Hello");
assertGeneratedBytes("8105" + Hex.asHex(utf),frame);
}
@Test
public void testText_Masked()
{
WebSocketFrame frame = WebSocketFrame.text("Hello");
byte maskingKey[] = Hex.asByteArray("11223344");
frame.setMask(maskingKey);
// what is expected
StringBuilder expected = new StringBuilder();
expected.append("8185").append("11223344");
expected.append(asMaskedHex("Hello",maskingKey));
// validate
assertGeneratedBytes(expected,frame);
}
@Test
public void testText_Masked_OffsetSourceByteBuffer()
{
ByteBuffer payload = ByteBuffer.allocate(100);
payload.position(5);
payload.put(StringUtil.getUtf8Bytes("Hello"));
payload.flip();
payload.position(5);
// at this point, we have a ByteBuffer of 100 bytes.
// but only a few bytes in the middle are made available for the payload.
// we are testing that masking works as intended, even if the provided
// payload does not start at position 0.
LOG.debug("Payload = {}", BufferUtil.toDetailString(payload));
WebSocketFrame frame = WebSocketFrame.text().setPayload(payload);
byte maskingKey[] = Hex.asByteArray("11223344");
frame.setMask(maskingKey);
// what is expected
StringBuilder expected = new StringBuilder();
expected.append("8185").append("11223344");
expected.append(asMaskedHex("Hello",maskingKey));
// validate
assertGeneratedBytes(expected,frame);
}
/**
* Prevent regression of masking of many packets.
*/
@Test
public void testManyMasked()
{
int pingCount = 10;
int pingCount = 2;
// Prepare frames
List<WebSocketFrame> send = new ArrayList<>();
WebSocketFrame[] frames = new WebSocketFrame[pingCount + 1];
for (int i = 0; i < pingCount; i++)
{
String payload = String.format("ping-%d[%X]",i,i);
send.add(WebSocketFrame.ping().setPayload(payload));
String payload = String.format("ping-%d",i);
frames[i] = WebSocketFrame.ping().setPayload(payload);
}
send.add(new CloseInfo(StatusCode.NORMAL).asFrame());
frames[pingCount] = new CloseInfo(StatusCode.NORMAL).asFrame();
ByteBuffer completeBuf = UnitGenerator.generate(send);
// Parse complete buffer (5 bytes at a time)
UnitParser parser = new UnitParser();
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
int segmentSize = 5;
parser.parseSlowly(completeBuf,segmentSize);
// Assert validity of frame
int frameCount = send.size();
capture.assertFrameCount(frameCount);
for (int i = 0; i < frameCount; i++)
// Mask All Frames
byte maskingKey[] = Hex.asByteArray("11223344");
for (WebSocketFrame f : frames)
{
WebSocketFrame actual = capture.getFrames().get(i);
WebSocketFrame expected = send.get(i);
String prefix = "Frame[" + i + "]";
Assert.assertThat(prefix + ".opcode",actual.getOpCode(),is(expected.getOpCode()));
Assert.assertThat(prefix + ".payloadLength",actual.getPayloadLength(),is(expected.getPayloadLength()));
ByteBufferAssert.assertEquals(prefix + ".payload",expected.getPayload(),actual.getPayload());
f.setMask(maskingKey);
}
// Validate result of generation
StringBuilder expected = new StringBuilder();
expected.append("8986").append("11223344");
expected.append(asMaskedHex("ping-0",maskingKey)); // ping 0
expected.append("8986").append("11223344");
expected.append(asMaskedHex("ping-1",maskingKey)); // ping 1
expected.append("8882").append("11223344");
byte closure[] = Hex.asByteArray("03E8");
mask(closure, maskingKey);
expected.append(Hex.asHex(closure)); // normal closure
assertGeneratedBytes(expected,frames);
}
/**
@ -86,35 +250,20 @@ public class GeneratorTest
WebSocketFrame frame = WebSocketFrame.binary(payload);
// tracking values
int totalParts = 0;
int totalBytes = 0;
// Generate
int windowSize = 1024;
WindowHelper helper = new WindowHelper(windowSize);
ByteBuffer completeBuffer = helper.generateWindowed(frame);
// Validate
int expectedHeaderSize = 4;
int expectedParts = (int)Math.ceil((double)(payload.length + expectedHeaderSize) / windowSize);
int expectedSize = payload.length + expectedHeaderSize;
int expectedParts = (int)Math.ceil((double)(payload.length) / windowSize);
// the generator
Generator generator = new UnitGenerator();
helper.assertTotalParts(expectedParts);
helper.assertTotalBytes(payload.length + expectedHeaderSize);
// lets see how many parts the generator makes
boolean done = false;
while (!done)
{
// sanity check in loop, our test should fail if this is true.
Assert.assertThat("Too many parts",totalParts,lessThanOrEqualTo(expectedParts));
ByteBuffer buf = generator.generate(windowSize,frame);
Assert.assertThat("Generated should not exceed window size",buf.remaining(),lessThanOrEqualTo(windowSize));
totalBytes += buf.remaining();
totalParts++;
done = (frame.remaining() <= 0);
}
// validate
Assert.assertThat("Created Parts",totalParts,is(expectedParts));
Assert.assertThat("Created Bytes",totalBytes,is(payload.length + expectedHeaderSize));
Assert.assertThat("Generated Buffer",completeBuffer.remaining(),is(expectedSize));
}
@Test
@ -125,45 +274,25 @@ public class GeneratorTest
Arrays.fill(payload,(byte)0x55);
byte mask[] = new byte[]
{ 0x2A, (byte)0xF0, 0x0F, 0x00 };
{ 0x2A, (byte)0xF0, 0x0F, 0x00 };
WebSocketFrame frame = WebSocketFrame.binary(payload);
frame.setMask(mask); // masking!
// Generate
int windowSize = 2929;
WindowHelper helper = new WindowHelper(windowSize);
ByteBuffer completeBuffer = helper.generateWindowed(frame);
// tracking values
int totalParts = 0;
int totalBytes = 0;
int windowSize = 2929; // important for test, use an odd # window size to test masking across window barriers
// Validate
int expectedHeaderSize = 8;
int expectedParts = (int)Math.ceil((double)(payload.length + expectedHeaderSize) / windowSize);
int expectedSize = payload.length + expectedHeaderSize;
int expectedParts = (int)Math.ceil((double)(payload.length) / windowSize);
// Buffer to capture generated bytes (we do this to validate that the masking
// is working correctly
ByteBuffer completeBuf = ByteBuffer.allocate(payload.length + expectedHeaderSize);
BufferUtil.clearToFill(completeBuf);
helper.assertTotalParts(expectedParts);
helper.assertTotalBytes(payload.length + expectedHeaderSize);
// Generate and capture generator output
Generator generator = new UnitGenerator();
boolean done = false;
while (!done)
{
// sanity check in loop, our test should fail if this is true.
Assert.assertThat("Too many parts",totalParts,lessThanOrEqualTo(expectedParts));
ByteBuffer buf = generator.generate(windowSize,frame);
Assert.assertThat("Generated should not exceed window size",buf.remaining(),lessThanOrEqualTo(windowSize));
totalBytes += buf.remaining();
totalParts++;
BufferUtil.put(buf,completeBuf);
done = (frame.remaining() <= 0);
}
Assert.assertThat("Created Parts",totalParts,is(expectedParts));
Assert.assertThat("Created Bytes",totalBytes,is(payload.length + expectedHeaderSize));
Assert.assertThat("Generated Buffer",completeBuffer.remaining(),is(expectedSize));
// Parse complete buffer.
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
@ -171,8 +300,7 @@ public class GeneratorTest
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
BufferUtil.flipToFlush(completeBuf,0);
parser.parse(completeBuf);
parser.parse(completeBuffer);
// Assert validity of frame
WebSocketFrame actual = capture.getFrames().get(0);

View File

@ -0,0 +1,75 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
public final class Hex
{
private static final char[] hexcodes = "0123456789ABCDEF".toCharArray();
public static byte[] asByteArray(String hstr)
{
if ((hstr.length() < 0) || ((hstr.length() % 2) != 0))
{
throw new IllegalArgumentException(String.format("Invalid string length of <%d>",hstr.length()));
}
int size = hstr.length() / 2;
byte buf[] = new byte[size];
byte hex;
int len = hstr.length();
int idx = (int)Math.floor(((size * 2) - (double)len) / 2);
for (int i = 0; i < len; i++)
{
hex = 0;
if (i >= 0)
{
hex = (byte)(Character.digit(hstr.charAt(i),16) << 4);
}
i++;
hex += (byte)(Character.digit(hstr.charAt(i),16));
buf[idx] = hex;
idx++;
}
return buf;
}
public static String asHex(byte buf[])
{
int len = buf.length;
char out[] = new char[len * 2];
for (int i = 0; i < len; i++)
{
out[i * 2] = hexcodes[(buf[i] & 0xF0) >> 4];
out[(i * 2) + 1] = hexcodes[(buf[i] & 0x0F)];
}
return String.valueOf(out);
}
public static String asHex(ByteBuffer buffer)
{
return asHex(BufferUtil.toArray(buffer));
}
}

View File

@ -62,8 +62,10 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
{
ByteBuffer buf = generator.generate(frame);
captured.add(buf.slice());
ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + frame.getPayloadLength());
generator.generateWholeFrame(frame,buf);
BufferUtil.flipToFlush(buf,0);
captured.add(buf);
if (callback != null)
{
callback.writeSuccess();

View File

@ -21,9 +21,6 @@ package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Test;
public class RFC6455ExamplesGeneratorTest
@ -36,10 +33,8 @@ public class RFC6455ExamplesGeneratorTest
WebSocketFrame text1 = WebSocketFrame.text("Hel").setFin(false);
WebSocketFrame text2 = new WebSocketFrame(OpCode.CONTINUATION).setPayload("lo");
Generator generator = new UnitGenerator();
ByteBuffer actual1 = generator.generate(text1);
ByteBuffer actual2 = generator.generate(text2);
ByteBuffer actual1 = UnitGenerator.generate(text1);
ByteBuffer actual2 = UnitGenerator.generate(text2);
ByteBuffer expected1 = ByteBuffer.allocate(5);
@ -66,9 +61,7 @@ public class RFC6455ExamplesGeneratorTest
pong.setMask(new byte[]
{ 0x37, (byte)0xfa, 0x21, 0x3d });
Generator gen = new UnitGenerator();
ByteBuffer actual = gen.generate(pong);
ByteBuffer actual = UnitGenerator.generate(pong);
ByteBuffer expected = ByteBuffer.allocate(11);
// Raw bytes as found in RFC 6455, Section 5.7 - Examples
@ -87,8 +80,7 @@ public class RFC6455ExamplesGeneratorTest
text.setMask(new byte[]
{ 0x37, (byte)0xfa, 0x21, 0x3d });
Generator gen = new UnitGenerator();
ByteBuffer actual = gen.generate(text);
ByteBuffer actual = UnitGenerator.generate(text);
ByteBuffer expected = ByteBuffer.allocate(11);
// Raw bytes as found in RFC 6455, Section 5.7 - Examples
@ -110,9 +102,7 @@ public class RFC6455ExamplesGeneratorTest
Arrays.fill(payload,(byte)0x44);
binary.setPayload(payload);
Generator gen = new UnitGenerator();
ByteBuffer actual = gen.generate(binary);
ByteBuffer actual = UnitGenerator.generate(binary);
ByteBuffer expected = ByteBuffer.allocate(dataSize + FUDGE);
// Raw bytes as found in RFC 6455, Section 5.7 - Examples
@ -141,9 +131,7 @@ public class RFC6455ExamplesGeneratorTest
Arrays.fill(payload,(byte)0x44);
binary.setPayload(payload);
Generator gen = new UnitGenerator();
ByteBuffer actual = gen.generate(binary);
ByteBuffer actual = UnitGenerator.generate(binary);
ByteBuffer expected = ByteBuffer.allocate(dataSize + 10);
// Raw bytes as found in RFC 6455, Section 5.7 - Examples
@ -168,8 +156,7 @@ public class RFC6455ExamplesGeneratorTest
{
WebSocketFrame ping = new WebSocketFrame(OpCode.PING).setPayload("Hello");
Generator gen = new UnitGenerator();
ByteBuffer actual = gen.generate(ping);
ByteBuffer actual = UnitGenerator.generate(ping);
ByteBuffer expected = ByteBuffer.allocate(10);
expected.put(new byte[]
@ -184,9 +171,7 @@ public class RFC6455ExamplesGeneratorTest
{
WebSocketFrame text = WebSocketFrame.text("Hello");
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(text);
ByteBuffer actual = UnitGenerator.generate(text);
ByteBuffer expected = ByteBuffer.allocate(10);

View File

@ -23,6 +23,8 @@ import java.util.List;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -31,11 +33,58 @@ import org.eclipse.jetty.websocket.api.extensions.Frame;
*/
public class UnitGenerator extends Generator
{
private static final Logger LOG = Log.getLogger(UnitGenerator.class);
public static ByteBuffer generate(Frame frame)
{
return generate(new Frame[]
{ frame });
}
/**
* Generate All Frames into a single ByteBuffer.
* <p>
* This is highly inefficient and is not used in production! (This exists to make testing of the Generator easier)
*
* @param frames
* the frames to generate from
* @return the ByteBuffer representing all of the generated frames provided.
*/
public static ByteBuffer generate(Frame[] frames)
{
Generator generator = new UnitGenerator();
// Generate into single bytebuffer
int buflen = 0;
for (Frame f : frames)
{
buflen += f.getPayloadLength() + Generator.OVERHEAD;
}
ByteBuffer completeBuf = ByteBuffer.allocate(buflen);
BufferUtil.clearToFill(completeBuf);
// Generate frames
for (Frame f : frames)
{
generator.generateWholeFrame(f,completeBuf);
}
BufferUtil.flipToFlush(completeBuf,0);
if (LOG.isDebugEnabled())
{
LOG.debug("generate({} frames) - {}",frames.length,BufferUtil.toDetailString(completeBuf));
}
return completeBuf;
}
/**
* Generate a single giant buffer of all provided frames Not appropriate for production code, but useful for testing.
*/
public static ByteBuffer generate(List<WebSocketFrame> frames)
{
// Create non-symmetrical mask (helps show mask bytes order issues)
byte[] MASK =
{ 0x11, 0x22, 0x33, 0x44 };
{ 0x11, 0x22, 0x33, 0x44 };
// the generator
Generator generator = new UnitGenerator();
@ -52,13 +101,20 @@ public class UnitGenerator extends Generator
// Generate frames
for (WebSocketFrame f : frames)
{
f.setMask(MASK); // make sure we have mask set
ByteBuffer slice = f.getPayload().slice();
BufferUtil.put(generator.generate(f),completeBuf);
f.setPayload(slice);
f.setMask(MASK); // make sure we have the test mask set
BufferUtil.put(generator.generateHeaderBytes(f),completeBuf);
ByteBuffer window = generator.getPayloadWindow(f.getPayloadLength(),f);
if (BufferUtil.hasContent(window))
{
BufferUtil.put(window,completeBuf);
}
}
BufferUtil.flipToFlush(completeBuf,0);
if (LOG.isDebugEnabled())
{
LOG.debug("generate({} frames) - {}",frames.size(),BufferUtil.toDetailString(completeBuf));
}
return completeBuf;
}

View File

@ -25,10 +25,7 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.junit.BeforeClass;
import org.junit.Test;
@ -37,6 +34,14 @@ public class WebSocketFrameTest
private static Generator strictGenerator;
private static Generator laxGenerator;
private ByteBuffer generateWholeFrame(Generator generator, Frame frame)
{
ByteBuffer buf = ByteBuffer.allocate(frame.getPayloadLength() + Generator.OVERHEAD);
generator.generateWholeFrame(frame,buf);
BufferUtil.flipToFlush(buf,0);
return buf;
}
@BeforeClass
public static void initGenerator()
{
@ -57,7 +62,7 @@ public class WebSocketFrameTest
public void testLaxInvalidClose()
{
WebSocketFrame frame = new WebSocketFrame(OpCode.CLOSE).setFin(false);
ByteBuffer actual = laxGenerator.generate(frame);
ByteBuffer actual = generateWholeFrame(laxGenerator,frame);
ByteBuffer expected = ByteBuffer.allocate(2);
expected.put((byte)0x08);
expected.put((byte)0x00);
@ -69,7 +74,7 @@ public class WebSocketFrameTest
public void testLaxInvalidPing()
{
WebSocketFrame frame = new WebSocketFrame(OpCode.PING).setFin(false);
ByteBuffer actual = laxGenerator.generate(frame);
ByteBuffer actual = generateWholeFrame(laxGenerator,frame);
ByteBuffer expected = ByteBuffer.allocate(2);
expected.put((byte)0x09);
expected.put((byte)0x00);
@ -81,7 +86,7 @@ public class WebSocketFrameTest
public void testStrictValidClose()
{
CloseInfo close = new CloseInfo(StatusCode.NORMAL);
ByteBuffer actual = strictGenerator.generate(close.asFrame());
ByteBuffer actual = generateWholeFrame(strictGenerator,close.asFrame());
ByteBuffer expected = ByteBuffer.allocate(4);
expected.put((byte)0x88);
expected.put((byte)0x02);
@ -95,7 +100,7 @@ public class WebSocketFrameTest
public void testStrictValidPing()
{
WebSocketFrame frame = new WebSocketFrame(OpCode.PING);
ByteBuffer actual = strictGenerator.generate(frame);
ByteBuffer actual = generateWholeFrame(strictGenerator,frame);
ByteBuffer expected = ByteBuffer.allocate(2);
expected.put((byte)0x89);
expected.put((byte)0x00);

View File

@ -21,7 +21,9 @@ package org.eclipse.jetty.websocket.common.ab;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -47,23 +49,18 @@ public class TestABCase1_1
public void testGenerate125ByteTextCase1_1_2()
{
int length = 125;
byte buf[] = new byte[length];
Arrays.fill(buf,(byte)'*');
String text = new String(buf,StringUtil.__UTF8_CHARSET);
StringBuilder builder = new StringBuilder();
Frame textFrame = WebSocketFrame.text(text);
for (int i = 0; i < length; ++i)
{
builder.append("*");
}
WebSocketFrame textFrame = WebSocketFrame.text(builder.toString());
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(textFrame);
ByteBuffer actual = UnitGenerator.generate(textFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= length & 0x7F;
@ -93,13 +90,12 @@ public class TestABCase1_1
WebSocketFrame textFrame = WebSocketFrame.text(builder.toString());
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(textFrame);
ByteBuffer actual = UnitGenerator.generate(textFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= length & 0x7E;
@ -133,13 +129,12 @@ public class TestABCase1_1
WebSocketFrame textFrame = WebSocketFrame.text(builder.toString());
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(textFrame);
ByteBuffer actual = UnitGenerator.generate(textFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= length & 0x7E;
@ -173,13 +168,12 @@ public class TestABCase1_1
WebSocketFrame textFrame = WebSocketFrame.text(builder.toString());
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(textFrame);
ByteBuffer actual = UnitGenerator.generate(textFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= 0x7E;
@ -213,20 +207,18 @@ public class TestABCase1_1
WebSocketFrame textFrame = WebSocketFrame.text(builder.toString());
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(textFrame);
ByteBuffer actual = UnitGenerator.generate(textFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= 0x7E;
expected.put(b);
expected.put(new byte[]
{ (byte)0xff, (byte)0xff });
{ (byte)0xff, (byte)0xff });
for (int i = 0; i < length; ++i)
{
@ -252,19 +244,18 @@ public class TestABCase1_1
WebSocketFrame textFrame = WebSocketFrame.text(builder.toString());
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(textFrame);
ByteBuffer actual = UnitGenerator.generate(textFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 11);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= 0x7F;
expected.put(b);
expected.put(new byte[]
{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00 });
{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00 });
for (int i = 0; i < length; ++i)
{
@ -281,13 +272,12 @@ public class TestABCase1_1
{
WebSocketFrame textFrame = WebSocketFrame.text("");
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(textFrame);
ByteBuffer actual = UnitGenerator.generate(textFrame);
ByteBuffer expected = ByteBuffer.allocate(5);
expected.put(new byte[]
{ (byte)0x81, (byte)0x00 });
{ (byte)0x81, (byte)0x00 });
expected.flip();
@ -302,7 +292,7 @@ public class TestABCase1_1
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= length & 0x7F;
expected.put(b);
@ -335,7 +325,7 @@ public class TestABCase1_1
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= length & 0x7E;
expected.put(b);
@ -369,7 +359,7 @@ public class TestABCase1_1
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= length & 0x7E;
expected.put(b);
@ -403,7 +393,7 @@ public class TestABCase1_1
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= 0x7E;
expected.put(b);
@ -437,12 +427,12 @@ public class TestABCase1_1
ByteBuffer expected = ByteBuffer.allocate(length + 5);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= 0x7E;
expected.put(b);
expected.put(new byte[]
{ (byte)0xff, (byte)0xff });
{ (byte)0xff, (byte)0xff });
for (int i = 0; i < length; ++i)
{
@ -473,12 +463,12 @@ public class TestABCase1_1
ByteBuffer expected = ByteBuffer.allocate(length + 11);
expected.put(new byte[]
{ (byte)0x81 });
{ (byte)0x81 });
byte b = 0x00; // no masking
b |= 0x7F;
expected.put(b);
expected.put(new byte[]
{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00 });
{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00 });
for (int i = 0; i < length; ++i)
{
@ -509,7 +499,7 @@ public class TestABCase1_1
ByteBuffer expected = ByteBuffer.allocate(5);
expected.put(new byte[]
{ (byte)0x81, (byte)0x00 });
{ (byte)0x81, (byte)0x00 });
expected.flip();

View File

@ -60,8 +60,7 @@ public class TestABCase1_2
WebSocketFrame binaryFrame = WebSocketFrame.binary().setPayload(bb);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(binaryFrame);
ByteBuffer actual = UnitGenerator.generate(binaryFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
@ -99,8 +98,7 @@ public class TestABCase1_2
WebSocketFrame binaryFrame = WebSocketFrame.binary().setPayload(bb);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(binaryFrame);
ByteBuffer actual = UnitGenerator.generate(binaryFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
@ -142,8 +140,7 @@ public class TestABCase1_2
WebSocketFrame binaryFrame = WebSocketFrame.binary().setPayload(bb);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(binaryFrame);
ByteBuffer actual = UnitGenerator.generate(binaryFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
@ -184,8 +181,7 @@ public class TestABCase1_2
bb.flip();
WebSocketFrame binaryFrame = WebSocketFrame.binary().setPayload(bb);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(binaryFrame);
ByteBuffer actual = UnitGenerator.generate(binaryFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
@ -228,8 +224,7 @@ public class TestABCase1_2
WebSocketFrame binaryFrame = WebSocketFrame.binary().setPayload(bb);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(binaryFrame);
ByteBuffer actual = UnitGenerator.generate(binaryFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 5);
@ -268,8 +263,7 @@ public class TestABCase1_2
WebSocketFrame binaryFrame = WebSocketFrame.binary().setPayload(bb);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(binaryFrame);
ByteBuffer actual = UnitGenerator.generate(binaryFrame);
ByteBuffer expected = ByteBuffer.allocate(length + 11);
@ -297,8 +291,7 @@ public class TestABCase1_2
{
WebSocketFrame binaryFrame = WebSocketFrame.binary(new byte[] {});
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(binaryFrame);
ByteBuffer actual = UnitGenerator.generate(binaryFrame);
ByteBuffer expected = ByteBuffer.allocate(5);

View File

@ -55,8 +55,7 @@ public class TestABCase2
WebSocketFrame pingFrame = WebSocketFrame.ping().setPayload(bytes);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(pingFrame);
ByteBuffer actual = UnitGenerator.generate(pingFrame);
ByteBuffer expected = ByteBuffer.allocate(bytes.length + 32);
@ -80,8 +79,7 @@ public class TestABCase2
WebSocketFrame pingFrame = WebSocketFrame.ping().setPayload(bytes);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(pingFrame);
ByteBuffer actual = UnitGenerator.generate(pingFrame);
ByteBuffer expected = ByteBuffer.allocate(32);
@ -104,9 +102,7 @@ public class TestABCase2
{
WebSocketFrame pingFrame = WebSocketFrame.ping();
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(pingFrame);
ByteBuffer actual = UnitGenerator.generate(pingFrame);
ByteBuffer expected = ByteBuffer.allocate(5);
@ -126,8 +122,7 @@ public class TestABCase2
WebSocketFrame pingFrame = WebSocketFrame.ping().setPayload(messageBytes);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(pingFrame);
ByteBuffer actual = UnitGenerator.generate(pingFrame);
ByteBuffer expected = ByteBuffer.allocate(32);
@ -169,8 +164,7 @@ public class TestABCase2
WebSocketFrame pingFrame = WebSocketFrame.ping().setPayload(bytes);
Generator generator = new UnitGenerator();
generator.generate(pingFrame);
UnitGenerator.generate(pingFrame);
}
@Test

View File

@ -25,7 +25,6 @@ import java.util.List;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Rule;
@ -86,9 +85,7 @@ public class TestABCase3
@Test(expected = ProtocolException.class)
public void testGenerateInvalidControlFrame()
{
Generator generator = new UnitGenerator();
generator.generate(invalidFrame);
UnitGenerator.generate(invalidFrame);
}

View File

@ -31,7 +31,6 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.ByteBufferAssert;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
@ -50,8 +49,7 @@ public class TestABCase7_3
{
CloseInfo close = new CloseInfo();
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(close.asFrame());
ByteBuffer actual = UnitGenerator.generate(close.asFrame());
ByteBuffer expected = ByteBuffer.allocate(5);
@ -93,8 +91,7 @@ public class TestABCase7_3
WebSocketFrame closeFrame = new WebSocketFrame(OpCode.CLOSE).setPayload(new byte[]
{ 0x00 });
Generator generator = new UnitGenerator();
generator.generate(closeFrame);
UnitGenerator.generate(closeFrame);
}
@Test
@ -124,8 +121,7 @@ public class TestABCase7_3
{
CloseInfo close = new CloseInfo(1000);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(close.asFrame());
ByteBuffer actual = UnitGenerator.generate(close.asFrame());
ByteBuffer expected = ByteBuffer.allocate(5);
@ -169,8 +165,7 @@ public class TestABCase7_3
CloseInfo close = new CloseInfo(1000,message);
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(close.asFrame());
ByteBuffer actual = UnitGenerator.generate(close.asFrame());
ByteBuffer expected = ByteBuffer.allocate(32);
@ -230,8 +225,7 @@ public class TestABCase7_3
CloseInfo close = new CloseInfo(1000,message.toString());
Generator generator = new UnitGenerator();
ByteBuffer actual = generator.generate(close.asFrame());
ByteBuffer actual = UnitGenerator.generate(close.asFrame());
ByteBuffer expected = ByteBuffer.allocate(132);
byte messageBytes[] = message.toString().getBytes(StringUtil.__UTF8_CHARSET);
@ -310,8 +304,7 @@ public class TestABCase7_3
closeFrame.setPayload(BufferUtil.toArray(bb));
Generator generator = new UnitGenerator();
generator.generate(closeFrame);
UnitGenerator.generate(closeFrame);
}
@Test

View File

@ -0,0 +1,68 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.Callback;
/**
* Tracking Callback for testing how the callbacks are used.
*/
public class TrackingCallback implements Callback
{
private AtomicInteger called = new AtomicInteger();
private boolean success = false;
private Throwable failure = null;
@Override
public void failed(Throwable x)
{
this.called.incrementAndGet();
this.success = false;
this.failure = x;
}
@Override
public void succeeded()
{
this.called.incrementAndGet();
this.success = false;
}
public Throwable getFailure()
{
return failure;
}
public boolean isSuccess()
{
return success;
}
public boolean isCalled()
{
return called.get() >= 1;
}
public int getCallCount()
{
return called.get();
}
}

View File

@ -0,0 +1,174 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Hex;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import org.junit.Test;
public class WriteBytesProviderTest
{
private WriteBytesProvider bytesProvider;
private void assertCallbackSuccessCount(TrackingCallback callback, int expectedSuccsesCount)
{
Assert.assertThat("Callback was called",callback.isCalled(),is(true));
Assert.assertThat("No Failed Callbacks",callback.getFailure(),nullValue());
Assert.assertThat("# of Success Callbacks",callback.getCallCount(),is(expectedSuccsesCount));
}
@Test
public void testSingleFrame()
{
UnitGenerator generator = new UnitGenerator();
TrackingCallback flushCallback = new TrackingCallback();
bytesProvider = new WriteBytesProvider(generator,flushCallback);
TrackingCallback frameCallback = new TrackingCallback();
Frame frame = WebSocketFrame.text("Test");
// place in to bytes provider
bytesProvider.enqueue(frame,frameCallback);
// get bytes out
List<ByteBuffer> bytes = bytesProvider.getByteBuffers();
Assert.assertThat("Number of buffers",bytes.size(),is(2));
// Test byte values
assertExpectedBytes(bytes,"810454657374");
// Trigger success
bytesProvider.succeeded();
// Validate success
assertCallbackSuccessCount(flushCallback,1);
assertCallbackSuccessCount(frameCallback,1);
}
@Test
public void testTextClose()
{
UnitGenerator generator = new UnitGenerator();
TrackingCallback flushCallback = new TrackingCallback();
bytesProvider = new WriteBytesProvider(generator,flushCallback);
// Create frames for provider
TrackingCallback textCallback = new TrackingCallback();
TrackingCallback closeCallback = new TrackingCallback();
bytesProvider.enqueue(WebSocketFrame.text("Bye"),textCallback);
bytesProvider.enqueue(new CloseInfo().asFrame(),closeCallback);
// get bytes out
List<ByteBuffer> bytes = bytesProvider.getByteBuffers();
Assert.assertThat("Number of buffers",bytes.size(),is(4));
// Test byte values
StringBuilder expected = new StringBuilder();
expected.append("8103427965"); // text frame
expected.append("8800"); // (empty) close frame
assertExpectedBytes(bytes,expected.toString());
// Trigger success
bytesProvider.succeeded();
// Validate success
assertCallbackSuccessCount(flushCallback,1);
assertCallbackSuccessCount(textCallback,1);
assertCallbackSuccessCount(closeCallback,1);
}
@Test
public void testTinyBufferSizeFrame()
{
UnitGenerator generator = new UnitGenerator();
TrackingCallback flushCallback = new TrackingCallback();
bytesProvider = new WriteBytesProvider(generator,flushCallback);
bytesProvider.setBufferSize(30);
// Create frames for provider
TrackingCallback binCallback = new TrackingCallback();
TrackingCallback closeCallback = new TrackingCallback();
int binPayloadSize = 50;
byte bin[] = new byte[binPayloadSize];
Arrays.fill(bin,(byte)0x00);
WebSocketFrame binFrame = WebSocketFrame.binary(bin);
byte maskingKey[] = Hex.asByteArray("11223344");
binFrame.setMask(maskingKey);
bytesProvider.enqueue(binFrame,binCallback);
bytesProvider.enqueue(new CloseInfo().asFrame(),closeCallback);
// get bytes out
List<ByteBuffer> bytes = bytesProvider.getByteBuffers();
Assert.assertThat("Number of buffers",bytes.size(),is(5));
// Test byte values
StringBuilder expected = new StringBuilder();
expected.append("82B2").append("11223344"); // bin frame
// build up masked bytes
byte masked[] = new byte[binPayloadSize];
Arrays.fill(masked,(byte)0x00);
for (int i = 0; i < binPayloadSize; i++)
{
masked[i] ^= maskingKey[i % 4];
}
expected.append(Hex.asHex(masked));
expected.append("8800"); // (empty) close frame
assertExpectedBytes(bytes,expected.toString());
// Trigger success
bytesProvider.succeeded();
// Validate success
assertCallbackSuccessCount(flushCallback,1);
assertCallbackSuccessCount(binCallback,1);
assertCallbackSuccessCount(closeCallback,1);
}
private void assertExpectedBytes(List<ByteBuffer> bytes, String expected)
{
String actual = gatheredHex(bytes);
Assert.assertThat("Expected Bytes",actual,is(expected));
}
private String gatheredHex(List<ByteBuffer> bytes)
{
int len = 0;
for (ByteBuffer buf : bytes)
{
len += buf.remaining();
}
len = len * 2;
StringBuilder ret = new StringBuilder(len);
for (ByteBuffer buf : bytes)
{
ret.append(Hex.asHex(buf));
}
return ret.toString();
}
}

View File

@ -1,5 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
# org.eclipse.jetty.websocket.LEVEL=DEBUG
org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.payload.LEVEL=DEBUG

View File

@ -287,8 +287,9 @@ public class WebSocketServletRFCTest
{ (byte)0xC2, (byte)0xC3 };
WebSocketFrame txt = WebSocketFrame.text().setPayload(buf);
ByteBuffer bb = generator.generate(txt);
client.writeRaw(bb);
ByteBuffer bbHeader = generator.generateHeaderBytes(txt);
client.writeRaw(bbHeader);
client.writeRaw(txt.getPayload());
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = capture.getFrames().poll();

View File

@ -62,7 +62,7 @@ public class Fuzzer
PER_FRAME,
SLOW
}
public static enum DisconnectMode
{
/** Disconnect occurred after a proper close handshake */
@ -115,7 +115,7 @@ public class Fuzzer
for (WebSocketFrame f : send)
{
setClientMask(f);
BufferUtil.put(generator.generate(f),buf);
generator.generateWholeFrame(f,buf);
}
BufferUtil.flipToFlush(buf,0);
return buf;
@ -267,19 +267,16 @@ public class Fuzzer
buflen += f.getPayloadLength() + Generator.OVERHEAD;
}
ByteBuffer buf = ByteBuffer.allocate(buflen);
BufferUtil.clearToFill(buf);
// Generate frames
for (WebSocketFrame f : send)
{
setClientMask(f);
ByteBuffer rawbytes = generator.generate(f);
if (LOG.isDebugEnabled())
buf.put(generator.generateHeaderBytes(f));
if (f.hasPayload())
{
LOG.debug("frame: {}",f);
LOG.debug("bytes: {}",BufferUtil.toDetailString(rawbytes));
buf.put(f.getPayload());
}
BufferUtil.put(rawbytes,buf);
}
BufferUtil.flipToFlush(buf,0);
@ -302,7 +299,15 @@ public class Fuzzer
{
f.setMask(MASK); // make sure we have mask set
// Using lax generator, generate and send
client.writeRaw(generator.generate(f));
ByteBuffer fullframe = ByteBuffer.allocate(f.getPayloadLength() + Generator.OVERHEAD);
BufferUtil.flipToFill(fullframe);
fullframe.put(generator.generateHeaderBytes(f));
if (f.hasPayload())
{
fullframe.put(f.getPayload());
}
BufferUtil.flipToFlush(fullframe,0);
client.writeRaw(fullframe);
client.flush();
}
}

View File

@ -468,14 +468,15 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
{
ByteBuffer buf = generator.generate(frame);
ByteBuffer headerBuf = generator.generateHeaderBytes(frame);
if (LOG.isDebugEnabled())
{
LOG.debug("writing out: {}",BufferUtil.toDetailString(buf));
LOG.debug("writing out: {}",BufferUtil.toDetailString(headerBuf));
}
try
{
BufferUtil.writeTo(buf,out);
BufferUtil.writeTo(headerBuf,out);
BufferUtil.writeTo(frame.getPayload(),out);
out.flush();
if (callback != null)
{
@ -491,7 +492,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
}
finally
{
bufferPool.release(buf);
bufferPool.release(headerBuf);
}
if (frame.getType().getOpCode() == OpCode.CLOSE)
@ -512,7 +513,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
return BufferUtil.put(remainingBuffer,buf);
}
int len = 0;
int len = -1;
int b;
while ((in.available() > 0) && (buf.remaining() > 0))
{
@ -520,7 +521,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
if (b == (-1))
{
eof = true;
break;
return -1;
}
buf.put((byte)b);
len++;

View File

@ -19,8 +19,6 @@
package org.eclipse.jetty.websocket.server.blockhead;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;

View File

@ -1,7 +1,8 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=DEBUG
org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG