From 40d84ff1e37d1f562a030dae002a10b418882e1b Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 10 Sep 2014 17:40:50 +1000 Subject: [PATCH] 443662 Consume buffer in write(ByteBuffer) --- .../org/eclipse/jetty/io/ChannelEndPoint.java | 6 ++++-- .../jetty/io/SelectChannelEndPointTest.java | 1 + .../java/org/eclipse/jetty/server/HttpOutput.java | 8 +++++++- .../jetty/servlets/DataRateLimitedServlet.java | 15 ++++++++------- .../servlets/DataRateLimitedServletTest.java | 9 ++++++++- .../java/org/eclipse/jetty/util/BufferUtil.java | 3 +-- 6 files changed, 29 insertions(+), 13 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index 7c80d06bcad..b7344687908 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -41,6 +41,7 @@ public class ChannelEndPoint extends AbstractEndPoint private final ByteChannel _channel; private final Socket _socket; + private final GatheringByteChannel _gathering; private volatile boolean _ishut; private volatile boolean _oshut; @@ -51,6 +52,7 @@ public class ChannelEndPoint extends AbstractEndPoint (InetSocketAddress)channel.socket().getRemoteSocketAddress()); _channel = channel; _socket=channel.socket(); + _gathering=_channel instanceof GatheringByteChannel?((GatheringByteChannel)_channel):null; } @Override @@ -168,8 +170,8 @@ public class ChannelEndPoint extends AbstractEndPoint { if (buffers.length==1) flushed=_channel.write(buffers[0]); - else if (buffers.length>1 && _channel instanceof GatheringByteChannel) - flushed= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length); + else if (_gathering!=null && buffers.length>1) + flushed= (int)_gathering.write(buffers,0,buffers.length); else { for (ByteBuffer b : buffers) diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index bdeb332bfbc..5ba510b4300 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -145,6 +145,7 @@ public class SelectChannelEndPointTest progress = false; // Fill the input buffer with everything available + BufferUtil.compact(_in); if (BufferUtil.isFull(_in)) throw new IllegalStateException("FULL " + BufferUtil.toDetailString(_in)); int filled = _endp.fill(_in); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 4454f01e43a..4b435822d7f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -903,7 +903,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable _buffer=buffer; _len=buffer.remaining(); // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers - _slice=_buffer.isDirect()||_len cache=new ConcurrentHashMap<>(); @@ -76,7 +77,7 @@ public class DataRateLimitedServlet extends HttpServlet buffersize=Integer.parseInt(tmp); tmp = getInitParameter("pause"); if (tmp!=null) - pause=Integer.parseInt(tmp); + pauseNS=TimeUnit.MILLISECONDS.toNanos(Integer.parseInt(tmp)); tmp = getInitParameter("pool"); int pool=tmp==null?Runtime.getRuntime().availableProcessors():Integer.parseInt(tmp); @@ -205,7 +206,7 @@ public class DataRateLimitedServlet extends HttpServlet // Schedule a timer callback to pause writing. Because isReady() is not called, // a onWritePossible callback is no scheduled. - scheduler.schedule(this,pause,TimeUnit.MILLISECONDS); + scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS); } } @@ -261,7 +262,7 @@ public class DataRateLimitedServlet extends HttpServlet { // If we are able to write if(out.isReady()) - { + { // Position our buffers limit to allow only buffersize bytes to be written int l=content.position()+buffersize; // respect the ultimate limit @@ -276,7 +277,7 @@ public class DataRateLimitedServlet extends HttpServlet async.complete(); return; } - + // write our limited buffer. This will be an asynchronous write // and will always return immediately without blocking. If a subsequent // call to out.isReady() returns false, then this onWritePossible method @@ -284,8 +285,8 @@ public class DataRateLimitedServlet extends HttpServlet out.write(content); // Schedule a timer callback to pause writing. Because isReady() is not called, - // a onWritePossible callback is no scheduled. - scheduler.schedule(this,pause,TimeUnit.MILLISECONDS); + // a onWritePossible callback is not scheduled. + scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS); } } diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java index 08e49e053df..6a7e1de1ea7 100644 --- a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java +++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import org.eclipse.jetty.server.HttpConfiguration; @@ -85,15 +86,19 @@ public class DataRateLimitedServletTest public void testStream() throws Exception { File content = testdir.getFile("content.txt"); + String[] results=new String[10]; try(OutputStream out = new FileOutputStream(content);) { byte[] b= new byte[1024]; for (int i=1024;i-->0;) { - Arrays.fill(b,(byte)('0'+(i%10))); + int index=i%10; + Arrays.fill(b,(byte)('0'+(index))); out.write(b); out.write('\n'); + if (results[index]==null) + results[index]=new String(b,StandardCharsets.US_ASCII); } } @@ -105,5 +110,7 @@ public class DataRateLimitedServletTest assertThat(response,containsString("200 OK")); assertThat(duration,greaterThan(PAUSE*1024L*1024/BUFFER)); + for (int i=0;i<10;i++) + assertThat(response,containsString(results[i])); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index 7f0b785a8e3..d192cb513fa 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -312,8 +312,7 @@ public class BufferUtil { to.put(from); put = remaining; - from.position(0); - from.limit(0); + from.position(from.limit()); } else if (from.hasArray()) {