443662 Consume buffer in write(ByteBuffer)

This commit is contained in:
Greg Wilkins 2014-09-10 17:40:50 +10:00
parent 09e22c042c
commit 40d84ff1e3
6 changed files with 29 additions and 13 deletions

View File

@ -41,6 +41,7 @@ public class ChannelEndPoint extends AbstractEndPoint
private final ByteChannel _channel;
private final Socket _socket;
private final GatheringByteChannel _gathering;
private volatile boolean _ishut;
private volatile boolean _oshut;
@ -51,6 +52,7 @@ public class ChannelEndPoint extends AbstractEndPoint
(InetSocketAddress)channel.socket().getRemoteSocketAddress());
_channel = channel;
_socket=channel.socket();
_gathering=_channel instanceof GatheringByteChannel?((GatheringByteChannel)_channel):null;
}
@Override
@ -168,8 +170,8 @@ public class ChannelEndPoint extends AbstractEndPoint
{
if (buffers.length==1)
flushed=_channel.write(buffers[0]);
else if (buffers.length>1 && _channel instanceof GatheringByteChannel)
flushed= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length);
else if (_gathering!=null && buffers.length>1)
flushed= (int)_gathering.write(buffers,0,buffers.length);
else
{
for (ByteBuffer b : buffers)

View File

@ -145,6 +145,7 @@ public class SelectChannelEndPointTest
progress = false;
// Fill the input buffer with everything available
BufferUtil.compact(_in);
if (BufferUtil.isFull(_in))
throw new IllegalStateException("FULL " + BufferUtil.toDetailString(_in));
int filled = _endp.fill(_in);

View File

@ -903,7 +903,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer=buffer;
_len=buffer.remaining();
// Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
_slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
if (_buffer.isDirect()||_len<getBufferSize())
_slice=null;
else
{
_slice=_buffer.duplicate();
_buffer.position(_buffer.limit());
}
_complete=complete;
}

View File

@ -37,6 +37,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.util.BufferUtil;
/**
* A servlet that uses the Servlet 3.1 asynchronous IO API to server
@ -63,7 +64,7 @@ public class DataRateLimitedServlet extends HttpServlet
{
private static final long serialVersionUID = -4771757707068097025L;
private int buffersize=8192;
private int pause=100;
private long pauseNS=TimeUnit.MILLISECONDS.toNanos(100);
ScheduledThreadPoolExecutor scheduler;
private final ConcurrentHashMap<String, ByteBuffer> cache=new ConcurrentHashMap<>();
@ -76,7 +77,7 @@ public class DataRateLimitedServlet extends HttpServlet
buffersize=Integer.parseInt(tmp);
tmp = getInitParameter("pause");
if (tmp!=null)
pause=Integer.parseInt(tmp);
pauseNS=TimeUnit.MILLISECONDS.toNanos(Integer.parseInt(tmp));
tmp = getInitParameter("pool");
int pool=tmp==null?Runtime.getRuntime().availableProcessors():Integer.parseInt(tmp);
@ -205,7 +206,7 @@ public class DataRateLimitedServlet extends HttpServlet
// Schedule a timer callback to pause writing. Because isReady() is not called,
// a onWritePossible callback is no scheduled.
scheduler.schedule(this,pause,TimeUnit.MILLISECONDS);
scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS);
}
}
@ -261,7 +262,7 @@ public class DataRateLimitedServlet extends HttpServlet
{
// If we are able to write
if(out.isReady())
{
{
// Position our buffers limit to allow only buffersize bytes to be written
int l=content.position()+buffersize;
// respect the ultimate limit
@ -276,7 +277,7 @@ public class DataRateLimitedServlet extends HttpServlet
async.complete();
return;
}
// write our limited buffer. This will be an asynchronous write
// and will always return immediately without blocking. If a subsequent
// call to out.isReady() returns false, then this onWritePossible method
@ -284,8 +285,8 @@ public class DataRateLimitedServlet extends HttpServlet
out.write(content);
// Schedule a timer callback to pause writing. Because isReady() is not called,
// a onWritePossible callback is no scheduled.
scheduler.schedule(this,pause,TimeUnit.MILLISECONDS);
// a onWritePossible callback is not scheduled.
scheduler.schedule(this,pauseNS,TimeUnit.NANOSECONDS);
}
}

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.eclipse.jetty.server.HttpConfiguration;
@ -85,15 +86,19 @@ public class DataRateLimitedServletTest
public void testStream() throws Exception
{
File content = testdir.getFile("content.txt");
String[] results=new String[10];
try(OutputStream out = new FileOutputStream(content);)
{
byte[] b= new byte[1024];
for (int i=1024;i-->0;)
{
Arrays.fill(b,(byte)('0'+(i%10)));
int index=i%10;
Arrays.fill(b,(byte)('0'+(index)));
out.write(b);
out.write('\n');
if (results[index]==null)
results[index]=new String(b,StandardCharsets.US_ASCII);
}
}
@ -105,5 +110,7 @@ public class DataRateLimitedServletTest
assertThat(response,containsString("200 OK"));
assertThat(duration,greaterThan(PAUSE*1024L*1024/BUFFER));
for (int i=0;i<10;i++)
assertThat(response,containsString(results[i]));
}
}

View File

@ -312,8 +312,7 @@ public class BufferUtil
{
to.put(from);
put = remaining;
from.position(0);
from.limit(0);
from.position(from.limit());
}
else if (from.hasArray())
{