From 7d6c1548b18df0fe6d5e817289e2cab4e4bb78ef Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 20 Feb 2014 22:17:16 +0100 Subject: [PATCH] 428232 - Rework batch mode / buffering in websocket. Fixed missing notification of the FLUSH_FRAME. --- .../websocket/common/io/FrameFlusher.java | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index e65f539628a..051b4ac8f1e 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -227,11 +227,7 @@ public class FrameFlusher { // Nothing more to do, release the aggregate buffer if we need to. // Releasing it here rather than in succeeded() allows for its reuse. - if (aggregate!=null && BufferUtil.isEmpty(aggregate)) - { - bufferPool.release(aggregate); - aggregate = null; - } + releaseAggregate(); return Action.IDLE; } @@ -244,6 +240,7 @@ public class FrameFlusher return currentBatchMode == BatchMode.OFF ? flush() : batch(); } + @SuppressWarnings("ForLoopReplaceableByForEach") private Action flush() { if (!BufferUtil.isEmpty(aggregate)) @@ -257,10 +254,10 @@ public class FrameFlusher for (int i = 0; i < entries.size(); ++i) { FrameEntry entry = entries.get(i); - // Skip "synthetic" frames used for flushing. + // Skip the "synthetic" frame used for flushing. if (entry.frame == FLUSH_FRAME) continue; - buffers.add(entry.getHeaderBytes()); + buffers.add(entry.generateHeaderBytes()); ByteBuffer payload = entry.frame.getPayload(); if (BufferUtil.hasContent(payload)) buffers.add(payload); @@ -271,11 +268,9 @@ public class FrameFlusher if (buffers.isEmpty()) { - if (aggregate!=null && BufferUtil.isEmpty(aggregate)) - { - bufferPool.release(aggregate); - aggregate = null; - } + releaseAggregate(); + // We may have the FLUSH_FRAME to notify. + succeedEntries(); return Action.IDLE; } @@ -284,6 +279,7 @@ public class FrameFlusher return Action.SCHEDULED; } + @SuppressWarnings("ForLoopReplaceableByForEach") private Action batch() { if (aggregate == null) @@ -297,12 +293,12 @@ public class FrameFlusher for (int i = 0; i < entries.size(); ++i) { FrameEntry entry = entries.get(i); - - entry.genHeaderBytes(aggregate); + + entry.generateHeaderBytes(aggregate); ByteBuffer payload = entry.frame.getPayload(); if (BufferUtil.hasContent(payload)) - BufferUtil.append(aggregate,payload); + BufferUtil.append(aggregate, payload); } if (LOG.isDebugEnabled()) LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries); @@ -310,8 +306,24 @@ public class FrameFlusher return Action.SCHEDULED; } + private void releaseAggregate() + { + if (aggregate != null && BufferUtil.isEmpty(aggregate)) + { + bufferPool.release(aggregate); + aggregate = null; + } + } + @Override public void succeeded() + { + succeedEntries(); + super.succeeded(); + } + + @SuppressWarnings("ForLoopReplaceableByForEach") + private void succeedEntries() { // Do not allocate the iterator here. for (int i = 0; i < entries.size(); ++i) @@ -321,8 +333,6 @@ public class FrameFlusher entry.release(); } entries.clear(); - - super.succeeded(); } @Override @@ -360,14 +370,14 @@ public class FrameFlusher this.batchMode = batchMode; } - private ByteBuffer getHeaderBytes() + private ByteBuffer generateHeaderBytes() { return headerBuffer = generator.generateHeaderBytes(frame); } - - private void genHeaderBytes(ByteBuffer buffer) + + private void generateHeaderBytes(ByteBuffer buffer) { - generator.generateHeaderBytes(frame,buffer); + generator.generateHeaderBytes(frame, buffer); } private void release()