428232 - Rework batch mode / buffering in websocket.

This commit is contained in:
Greg Wilkins 2014-02-21 00:54:32 +11:00
parent 75cc42a647
commit 15952aeee4
7 changed files with 78 additions and 53 deletions

View File

@ -311,7 +311,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
shutdownInput();
if (_ishut)
return -1;
int filled=BufferUtil.flipPutFlip(_in,buffer);
int filled=BufferUtil.append(buffer,_in);
if (filled>0)
notIdle();
return filled;
@ -342,12 +342,12 @@ public class ByteArrayEndPoint extends AbstractEndPoint
if (b.remaining()>BufferUtil.space(_out))
{
ByteBuffer n = BufferUtil.allocate(_out.capacity()+b.remaining()*2);
BufferUtil.flipPutFlip(_out,n);
BufferUtil.append(n,_out);
_out=n;
}
}
if (BufferUtil.flipPutFlip(b,_out)>0)
if (BufferUtil.append(_out,b)>0)
idle=false;
if (BufferUtil.hasContent(b))

View File

@ -469,7 +469,7 @@ public class SslConnection extends AbstractConnection
{
// Do we already have some decrypted data?
if (BufferUtil.hasContent(_decryptedInput))
return BufferUtil.flipPutFlip(_decryptedInput, buffer);
return BufferUtil.append(buffer,_decryptedInput);
// We will need a network buffer
if (_encryptedInput == null)
@ -574,7 +574,7 @@ public class SslConnection extends AbstractConnection
{
if (app_in == buffer)
return unwrapResult.bytesProduced();
return BufferUtil.flipPutFlip(_decryptedInput, buffer);
return BufferUtil.append(buffer,_decryptedInput);
}
switch (handshakeStatus)

View File

@ -162,7 +162,7 @@ public class SelectChannelEndPointTest
}
// Copy to the out buffer
if (BufferUtil.hasContent(_in) && BufferUtil.flipPutFlip(_in, _out) > 0)
if (BufferUtil.hasContent(_in) && BufferUtil.append(_out, _in) > 0)
progress = true;
// Blocking writes

View File

@ -339,23 +339,20 @@ public class BufferUtil
* @param from Buffer to take bytes from in flush mode
* @param to Buffer to put bytes to in flush mode. The buffer is flipToFill before the put and flipToFlush after.
* @return number of bytes moved
* @deprecated use {@link #append(ByteBuffer, ByteBuffer)}
*/
public static int flipPutFlip(ByteBuffer from, ByteBuffer to)
{
int pos = flipToFill(to);
try
{
return put(from, to);
}
finally
{
flipToFlush(to, pos);
}
return append(to,from);
}
/* ------------------------------------------------------------ */
/** Append bytes to a buffer.
*
* @param to Buffer is flush mode
* @param b bytes to append
* @param off offset into byte
* @param len length to append
* @throws BufferOverflowException
*/
public static void append(ByteBuffer to, byte[] b, int off, int len) throws BufferOverflowException
{
@ -372,6 +369,8 @@ public class BufferUtil
/* ------------------------------------------------------------ */
/** Appends a byte to a buffer
* @param to Buffer is flush mode
* @param b byte to append
*/
public static void append(ByteBuffer to, byte b)
{
@ -386,9 +385,31 @@ public class BufferUtil
}
}
/* ------------------------------------------------------------ */
/** Appends a byte to a buffer
* @param to Buffer is flush mode
* @param b bytes to append
*/
public static int append(ByteBuffer to, ByteBuffer b)
{
int pos = flipToFill(to);
try
{
return put(b, to);
}
finally
{
flipToFlush(to, pos);
}
}
/* ------------------------------------------------------------ */
/**
* Like append, but does not throw {@link BufferOverflowException}
* @param to Buffer is flush mode
* @param b bytes to fill
* @param off offset into byte
* @param len length to fill
*/
public static int fill(ByteBuffer to, byte[] b, int off, int len)
{

View File

@ -141,14 +141,14 @@ public class BufferUtilTest
ByteBuffer from=BufferUtil.toBuffer("12345");
BufferUtil.clear(to);
assertEquals(5,BufferUtil.flipPutFlip(from,to));
assertEquals(5,BufferUtil.append(to,from));
assertTrue(BufferUtil.isEmpty(from));
assertEquals("12345",BufferUtil.toString(to));
from=BufferUtil.toBuffer("XX67890ZZ");
from.position(2);
assertEquals(5,BufferUtil.flipPutFlip(from,to));
assertEquals(5,BufferUtil.append(to,from));
assertEquals(2,from.remaining());
assertEquals("1234567890",BufferUtil.toString(to));
}
@ -183,14 +183,14 @@ public class BufferUtilTest
ByteBuffer from=BufferUtil.toBuffer("12345");
BufferUtil.clear(to);
assertEquals(5,BufferUtil.flipPutFlip(from,to));
assertEquals(5,BufferUtil.append(to,from));
assertTrue(BufferUtil.isEmpty(from));
assertEquals("12345",BufferUtil.toString(to));
from=BufferUtil.toBuffer("XX67890ZZ");
from.position(2);
assertEquals(5,BufferUtil.flipPutFlip(from,to));
assertEquals(5,BufferUtil.append(to,from));
assertEquals(2,from.remaining());
assertEquals("1234567890",BufferUtil.toString(to));
}

View File

@ -193,12 +193,18 @@ public class Generator
public ByteBuffer generateHeaderBytes(Frame frame)
{
ByteBuffer buffer = bufferPool.acquire(MAX_HEADER_LENGTH,true);
generateHeaderBytes(frame,buffer);
return buffer;
}
public void generateHeaderBytes(Frame frame, ByteBuffer buffer)
{
int p=BufferUtil.flipToFill(buffer);
// we need a framing header
assertFrameValid(frame);
ByteBuffer buffer = bufferPool.acquire(MAX_HEADER_LENGTH,true);
BufferUtil.clearToFill(buffer);
/*
* start the generation process
*/
@ -313,8 +319,7 @@ public class Generator
}
}
BufferUtil.flipToFlush(buffer,0);
return buffer;
BufferUtil.flipToFlush(buffer,p);
}
/**

View File

@ -184,13 +184,12 @@ public class FrameFlusher
private final List<FrameEntry> entries = new ArrayList<>(maxGather);
private final List<ByteBuffer> buffers = new ArrayList<>(maxGather * 2 + 1);
private ByteBuffer aggregate;
private boolean releaseAggregate;
private BatchMode batchMode;
@Override
protected Action process() throws Exception
{
int space = aggregate == null ? bufferSize : aggregate.remaining();
int space = aggregate == null ? bufferSize : BufferUtil.space(aggregate);
BatchMode currentBatchMode = BatchMode.AUTO;
synchronized (lock)
{
@ -224,18 +223,17 @@ public class FrameFlusher
if (entries.isEmpty())
{
// Nothing more to do, release the aggregate buffer if we need to.
// Releasing it here rather than in succeeded() allows for its reuse.
if (releaseAggregate)
{
bufferPool.release(aggregate);
if (LOG.isDebugEnabled())
LOG.debug("{} released aggregate buffer {}", FrameFlusher.this, aggregate);
aggregate = null;
}
if (batchMode != BatchMode.AUTO)
{
// Nothing more to do, release the aggregate buffer if we need to.
// Releasing it here rather than in succeeded() allows for its reuse.
if (aggregate!=null && BufferUtil.isEmpty(aggregate))
{
bufferPool.release(aggregate);
aggregate = null;
}
return Action.IDLE;
}
LOG.debug("{} auto flushing", FrameFlusher.this);
return flush();
@ -246,14 +244,11 @@ public class FrameFlusher
return currentBatchMode == BatchMode.OFF ? flush() : batch();
}
@SuppressWarnings("ForLoopReplaceableByForEach")
private Action flush()
{
if (!BufferUtil.isEmpty(aggregate))
{
BufferUtil.flipToFlush(aggregate, 0);
buffers.add(aggregate);
releaseAggregate = true;
if (LOG.isDebugEnabled())
LOG.debug("{} flushing aggregate {}", FrameFlusher.this, aggregate);
}
@ -275,36 +270,39 @@ public class FrameFlusher
LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries);
if (buffers.isEmpty())
{
if (aggregate!=null && BufferUtil.isEmpty(aggregate))
{
bufferPool.release(aggregate);
aggregate = null;
}
return Action.IDLE;
}
endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return Action.SCHEDULED;
}
@SuppressWarnings("ForLoopReplaceableByForEach")
private Action batch()
{
if (aggregate == null)
{
aggregate = bufferPool.acquire(bufferSize, true);
BufferUtil.flipToFill(aggregate);
if (LOG.isDebugEnabled())
LOG.debug("{} acquired aggregate buffer {}", FrameFlusher.this, aggregate);
releaseAggregate = false;
}
// Do not allocate the iterator here.
for (int i = 0; i < entries.size(); ++i)
{
FrameEntry entry = entries.get(i);
// TODO: would be better to generate the header bytes directly into the aggregate buffer.
ByteBuffer header = entry.getHeaderBytes();
aggregate.put(header);
entry.genHeaderBytes(aggregate);
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
aggregate.put(payload);
BufferUtil.append(aggregate,payload);
}
if (LOG.isDebugEnabled())
LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries);
@ -312,7 +310,6 @@ public class FrameFlusher
return Action.SCHEDULED;
}
@SuppressWarnings("ForLoopReplaceableByForEach")
@Override
public void succeeded()
{
@ -325,10 +322,6 @@ public class FrameFlusher
}
entries.clear();
// Do not release the aggregate yet, in case there are more frames to process.
if (releaseAggregate)
BufferUtil.clearToFill(aggregate);
super.succeeded();
}
@ -371,6 +364,11 @@ public class FrameFlusher
{
return headerBuffer = generator.generateHeaderBytes(frame);
}
private void genHeaderBytes(ByteBuffer buffer)
{
generator.generateHeaderBytes(frame,buffer);
}
private void release()
{
@ -381,6 +379,7 @@ public class FrameFlusher
}
}
@Override
public String toString()
{
return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, failure);