428232 - Rework batch mode / buffering in websocket.

Fixed missing notification of the FLUSH_FRAME.
This commit is contained in:
Simone Bordet 2014-02-20 22:17:16 +01:00
parent c3c4d059a6
commit 7d6c1548b1
1 changed files with 31 additions and 21 deletions

View File

@ -227,11 +227,7 @@ public class FrameFlusher
{ {
// Nothing more to do, release the aggregate buffer if we need to. // Nothing more to do, release the aggregate buffer if we need to.
// Releasing it here rather than in succeeded() allows for its reuse. // Releasing it here rather than in succeeded() allows for its reuse.
if (aggregate!=null && BufferUtil.isEmpty(aggregate)) releaseAggregate();
{
bufferPool.release(aggregate);
aggregate = null;
}
return Action.IDLE; return Action.IDLE;
} }
@ -244,6 +240,7 @@ public class FrameFlusher
return currentBatchMode == BatchMode.OFF ? flush() : batch(); return currentBatchMode == BatchMode.OFF ? flush() : batch();
} }
@SuppressWarnings("ForLoopReplaceableByForEach")
private Action flush() private Action flush()
{ {
if (!BufferUtil.isEmpty(aggregate)) if (!BufferUtil.isEmpty(aggregate))
@ -257,10 +254,10 @@ public class FrameFlusher
for (int i = 0; i < entries.size(); ++i) for (int i = 0; i < entries.size(); ++i)
{ {
FrameEntry entry = entries.get(i); FrameEntry entry = entries.get(i);
// Skip "synthetic" frames used for flushing. // Skip the "synthetic" frame used for flushing.
if (entry.frame == FLUSH_FRAME) if (entry.frame == FLUSH_FRAME)
continue; continue;
buffers.add(entry.getHeaderBytes()); buffers.add(entry.generateHeaderBytes());
ByteBuffer payload = entry.frame.getPayload(); ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload)) if (BufferUtil.hasContent(payload))
buffers.add(payload); buffers.add(payload);
@ -271,11 +268,9 @@ public class FrameFlusher
if (buffers.isEmpty()) if (buffers.isEmpty())
{ {
if (aggregate!=null && BufferUtil.isEmpty(aggregate)) releaseAggregate();
{ // We may have the FLUSH_FRAME to notify.
bufferPool.release(aggregate); succeedEntries();
aggregate = null;
}
return Action.IDLE; return Action.IDLE;
} }
@ -284,6 +279,7 @@ public class FrameFlusher
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@SuppressWarnings("ForLoopReplaceableByForEach")
private Action batch() private Action batch()
{ {
if (aggregate == null) if (aggregate == null)
@ -297,12 +293,12 @@ public class FrameFlusher
for (int i = 0; i < entries.size(); ++i) for (int i = 0; i < entries.size(); ++i)
{ {
FrameEntry entry = entries.get(i); FrameEntry entry = entries.get(i);
entry.genHeaderBytes(aggregate); entry.generateHeaderBytes(aggregate);
ByteBuffer payload = entry.frame.getPayload(); ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload)) if (BufferUtil.hasContent(payload))
BufferUtil.append(aggregate,payload); BufferUtil.append(aggregate, payload);
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries); LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries);
@ -310,8 +306,24 @@ public class FrameFlusher
return Action.SCHEDULED; return Action.SCHEDULED;
} }
private void releaseAggregate()
{
if (aggregate != null && BufferUtil.isEmpty(aggregate))
{
bufferPool.release(aggregate);
aggregate = null;
}
}
@Override @Override
public void succeeded() public void succeeded()
{
succeedEntries();
super.succeeded();
}
@SuppressWarnings("ForLoopReplaceableByForEach")
private void succeedEntries()
{ {
// Do not allocate the iterator here. // Do not allocate the iterator here.
for (int i = 0; i < entries.size(); ++i) for (int i = 0; i < entries.size(); ++i)
@ -321,8 +333,6 @@ public class FrameFlusher
entry.release(); entry.release();
} }
entries.clear(); entries.clear();
super.succeeded();
} }
@Override @Override
@ -360,14 +370,14 @@ public class FrameFlusher
this.batchMode = batchMode; this.batchMode = batchMode;
} }
private ByteBuffer getHeaderBytes() private ByteBuffer generateHeaderBytes()
{ {
return headerBuffer = generator.generateHeaderBytes(frame); return headerBuffer = generator.generateHeaderBytes(frame);
} }
private void genHeaderBytes(ByteBuffer buffer) private void generateHeaderBytes(ByteBuffer buffer)
{ {
generator.generateHeaderBytes(frame,buffer); generator.generateHeaderBytes(frame, buffer);
} }
private void release() private void release()