398467 Servlet 3.1 Non Blocking IO

Cleaned up exceptions and HttpOutput.write
This commit is contained in:
Greg Wilkins 2013-05-16 18:56:42 +10:00
parent 67a1b2a18f
commit b22d280e2a
12 changed files with 257 additions and 220 deletions

View File

@ -636,16 +636,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
{
boolean committing=sendResponse(info,content,complete,_writeblock);
try
{
_writeblock.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
_writeblock.block();
return committing;
}
@ -664,14 +655,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
protected void write(ByteBuffer content, boolean complete) throws IOException
{
sendResponse(null,content,complete,_writeblock);
try
{
_writeblock.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
_writeblock.block();
}
/**

View File

@ -322,10 +322,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
_writeBlocker.block();
}
catch (InterruptedException x)
{
throw (IOException)new InterruptedIOException().initCause(x);
}
catch (ClosedChannelException e)
{
throw new EofException(e);
@ -334,10 +330,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
throw e;
}
catch (TimeoutException e)
{
throw new IOException(e);
}
}
@Override
@ -435,68 +427,57 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
default implementation of a blocking queue with an implementation
that uses the calling thread to block on a readable callback and
then to do the parsing before before attempting the read.
*/
try
*/
while (true)
{
while (true)
// Can the parser progress (even with an empty buffer)
boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
// If there is more content to parse, loop so we can queue all content from this buffer now without the
// need to call blockForContent again
while (event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
_parser.parseNext(_requestBuffer);
// If we have an event, return
if (event)
return;
// Do we have content ready to parse?
if (BufferUtil.isEmpty(_requestBuffer))
{
// Can the parser progress (even with an empty buffer)
boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
// If there is more content to parse, loop so we can queue all content from this buffer now without the
// need to call blockForContent again
while (event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
_parser.parseNext(_requestBuffer);
// If we have an event, return
if (event)
return;
// Do we have content ready to parse?
if (BufferUtil.isEmpty(_requestBuffer))
// If no more input
if (getEndPoint().isInputShutdown())
{
// If no more input
if (getEndPoint().isInputShutdown())
{
_parser.shutdownInput();
shutdown();
return;
}
_parser.shutdownInput();
shutdown();
return;
}
// Wait until we can read
block(_readBlocker);
LOG.debug("{} block readable on {}",this,_readBlocker);
_readBlocker.block();
// Wait until we can read
block(_readBlocker);
LOG.debug("{} block readable on {}",this,_readBlocker);
_readBlocker.block();
// We will need a buffer to read into
if (_requestBuffer==null)
{
long content_length=_channel.getRequest().getContentLength();
int size=getInputBufferSize();
if (size<content_length)
size=size*4; // TODO tune this
_requestBuffer=_bufferPool.acquire(size,REQUEST_BUFFER_DIRECT);
}
// We will need a buffer to read into
if (_requestBuffer==null)
{
long content_length=_channel.getRequest().getContentLength();
int size=getInputBufferSize();
if (size<content_length)
size=size*4; // TODO tune this
_requestBuffer=_bufferPool.acquire(size,REQUEST_BUFFER_DIRECT);
}
// read some data
int filled=getEndPoint().fill(_requestBuffer);
LOG.debug("{} block filled {}",this,filled);
if (filled<0)
{
_parser.shutdownInput();
return;
}
// read some data
int filled=getEndPoint().fill(_requestBuffer);
LOG.debug("{} block filled {}",this,filled);
if (filled<0)
{
_parser.shutdownInput();
return;
}
}
}
catch (TimeoutException e)
{
throw new EofException(e);
}
catch (final InterruptedException x)
{
throw new InterruptedIOException(getEndPoint().toString()){{initCause(x);}};
}
}
@Override

View File

@ -144,7 +144,13 @@ public class HttpOutput extends ServletOutputStream
public boolean closeIfAllContentWritten() throws IOException
{
return _channel.getResponse().closeIfAllContentWritten(_written);
Response response=_channel.getResponse();
if (response.isAllContentWritten(_written))
{
response.closeOutput();
return true;
}
return false;
}
@Override
@ -153,53 +159,55 @@ public class HttpOutput extends ServletOutputStream
if (isClosed())
throw new EOFException("Closed");
// Do we have an aggregate buffer already ?
// Do we have an aggregate buffer ?
if (_aggregate == null)
{
// What size should the aggregate be ?
// NO - should we have an aggregate buffer? yes if this write will easily fit in it
int size = getBufferSize();
// If this write would fill more than half the aggregate, just write it directly
if (len > size / 2)
if (len<=size/2)
{
_channel.write(ByteBuffer.wrap(b, off, len), false);
_aggregate = _channel.getByteBufferPool().acquire(size, false);
BufferUtil.append(_aggregate, b, off, len);
_written += len;
closeIfAllContentWritten();
return;
}
}
else
{
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
_written += filled;
// Allocate an aggregate buffer.
// Never direct as it is slow to do little writes to a direct buffer.
_aggregate = _channel.getByteBufferPool().acquire(size, false);
// if closed or there is no content left over and we are not full, then we are done
if (closeIfAllContentWritten() || filled==len && !BufferUtil.isFull(_aggregate))
return;
off+=filled;
len-=filled;
}
// Do we have space to aggregate ?
int space = BufferUtil.space(_aggregate);
if (len > space)
// flush the aggregate
if (BufferUtil.hasContent(_aggregate))
{
// No space so write the aggregate out if it is not empty
if (BufferUtil.hasContent(_aggregate))
_channel.write(_aggregate, false);
// should we fill aggregate again from the buffer?
if (len<_aggregate.capacity()/2)
{
_channel.write(_aggregate, false);
space = BufferUtil.space(_aggregate);
BufferUtil.append(_aggregate, b, off, len);
_written += len;
closeIfAllContentWritten();
return;
}
}
// Do we have space to aggregate now ?
if (len > space)
{
// No space so write the content directly
_channel.write(ByteBuffer.wrap(b, off, len), false);
_written += len;
return;
}
// Aggregate the content
BufferUtil.append(_aggregate, b, off, len);
// write any remaining content in the buffer directly
_written += len;
// Check if all written or full
if (!closeIfAllContentWritten() && BufferUtil.isFull(_aggregate))
_channel.write(_aggregate, false);
boolean complete=_channel.getResponse().isAllContentWritten(_written);
_channel.write(ByteBuffer.wrap(b, off, len), complete);
if (complete)
_channel.getResponse().closeOutput();
}
@ -219,7 +227,11 @@ public class HttpOutput extends ServletOutputStream
// Check if all written or full
if (!closeIfAllContentWritten() && BufferUtil.isFull(_aggregate))
_channel.write(_aggregate, false);
{
BlockingCallback callback = _channel.getWriteBlockingCallback();
_channel.write(_aggregate, false, callback);
callback.block();
}
}
@Override
@ -273,14 +285,7 @@ public class HttpOutput extends ServletOutputStream
else
callback.failed(new IllegalArgumentException("unknown content type "+content.getClass()));
try
{
callback.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
callback.block();
}
/* ------------------------------------------------------------ */
@ -290,16 +295,9 @@ public class HttpOutput extends ServletOutputStream
*/
public void sendContent(ByteBuffer content) throws IOException
{
try
{
final BlockingCallback callback =_channel.getWriteBlockingCallback();
_channel.write(content,true,callback);
callback.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
final BlockingCallback callback =_channel.getWriteBlockingCallback();
_channel.write(content,true,callback);
callback.block();
}
/* ------------------------------------------------------------ */
@ -309,16 +307,9 @@ public class HttpOutput extends ServletOutputStream
*/
public void sendContent(InputStream in) throws IOException
{
try
{
final BlockingCallback callback =_channel.getWriteBlockingCallback();
new InputStreamWritingCB(in,callback).iterate();
callback.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
final BlockingCallback callback =_channel.getWriteBlockingCallback();
new InputStreamWritingCB(in,callback).iterate();
callback.block();
}
/* ------------------------------------------------------------ */
@ -328,16 +319,9 @@ public class HttpOutput extends ServletOutputStream
*/
public void sendContent(ReadableByteChannel in) throws IOException
{
try
{
final BlockingCallback callback =_channel.getWriteBlockingCallback();
new ReadableByteChannelWritingCB(in,callback).iterate();
callback.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
final BlockingCallback callback =_channel.getWriteBlockingCallback();
new ReadableByteChannelWritingCB(in,callback).iterate();
callback.block();
}
@ -348,16 +332,9 @@ public class HttpOutput extends ServletOutputStream
*/
public void sendContent(HttpContent content) throws IOException
{
try
{
final BlockingCallback callback =_channel.getWriteBlockingCallback();
sendContent(content,callback);
callback.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
final BlockingCallback callback =_channel.getWriteBlockingCallback();
sendContent(content,callback);
callback.block();
}

View File

@ -38,6 +38,13 @@ public class Iso88591HttpWriter extends HttpWriter
if (length==0)
out.closeIfAllContentWritten();
if (length==1)
{
int c=s[offset];
out.write(c<256?c:'?');
return;
}
while (length > 0)
{
_bytes.reset();

View File

@ -757,34 +757,37 @@ public class Response implements HttpServletResponse
if (_contentLength > 0)
{
try
if (isAllContentWritten(written))
{
closeIfAllContentWritten(written);
}
catch(IOException e)
{
throw new RuntimeIOException(e);
try
{
closeOutput();
}
catch(IOException e)
{
throw new RuntimeIOException(e);
}
}
}
}
public boolean closeIfAllContentWritten(long written) throws IOException
public boolean isAllContentWritten(long written)
{
if (_contentLength >= 0 && written >= _contentLength)
return (_contentLength >= 0 && written >= _contentLength);
}
public void closeOutput() throws IOException
{
switch (_outputType)
{
switch (_outputType)
{
case WRITER:
_writer.close();
break;
case STREAM:
getOutputStream().close();
break;
default:
}
return true;
case WRITER:
_writer.close();
break;
case STREAM:
getOutputStream().close();
break;
default:
}
return false;
}
public long getLongContentLength()

View File

@ -305,7 +305,7 @@ public class HttpManyWaysToCommitTest extends AbstractHttpTest
}
@Test
public void testHandledBufferOverflow() throws Exception
public void testHandledOverflow() throws Exception
{
server.setHandler(new OverflowHandler(false));
server.start();
@ -318,6 +318,34 @@ public class HttpManyWaysToCommitTest extends AbstractHttpTest
assertHeader(response, "transfer-encoding", "chunked");
}
@Test
public void testHandledOverflow2() throws Exception
{
server.setHandler(new Overflow2Handler(false));
server.start();
SimpleHttpResponse response = executeRequest();
assertThat("response code is 200", response.getCode(), is("200"));
assertResponseBody(response, "foobar");
if (!"HTTP/1.0".equals(httpVersion))
assertHeader(response, "transfer-encoding", "chunked");
}
@Test
public void testHandledOverflow3() throws Exception
{
server.setHandler(new Overflow3Handler(false));
server.start();
SimpleHttpResponse response = executeRequest();
assertThat("response code is 200", response.getCode(), is("200"));
assertResponseBody(response, "foobar");
if (!"HTTP/1.0".equals(httpVersion))
assertHeader(response, "transfer-encoding", "chunked");
}
@Test
public void testHandledBufferOverflowAndThrow() throws Exception
{
@ -350,6 +378,43 @@ public class HttpManyWaysToCommitTest extends AbstractHttpTest
}
}
private class Overflow2Handler extends ThrowExceptionOnDemandHandler
{
private Overflow2Handler(boolean throwException)
{
super(throwException);
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setBufferSize(3);
response.getWriter().write("f");
response.getWriter().write("oobar");
super.handle(target, baseRequest, request, response);
}
}
private class Overflow3Handler extends ThrowExceptionOnDemandHandler
{
private Overflow3Handler(boolean throwException)
{
super(throwException);
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setBufferSize(5);
response.getWriter().write("fo");
response.getWriter().write("ob");
response.getWriter().write("ar");
super.handle(target, baseRequest, request, response);
}
}
@Test
public void testSetContentLengthAndWriteExactlyThatAmountOfBytes() throws Exception
{

View File

@ -89,14 +89,7 @@ public class ResponseTest
{
BlockingCallback cb = new BlockingCallback();
send(info,content,lastContent,cb);
try
{
cb.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
cb.block();
}
@Override

View File

@ -193,17 +193,10 @@ public class HttpTransportOverSPDY implements HttpTransport
}
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws EofException
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
send(info, content, lastContent, streamBlocker);
try
{
streamBlocker.block();
}
catch (InterruptedException | TimeoutException | IOException e)
{
LOG.debug(e);
}
streamBlocker.block();
}
@Override

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
@ -87,25 +88,25 @@ public class BlockingCallback implements Callback
* This is useful for code that wants to repeatable use a FutureCallback to convert
* an asynchronous API to a blocking API.
* @return
* @throws InterruptedException
* @throws IOException
* @throws TimeoutException
*/
public void block() throws InterruptedException, IOException, TimeoutException
public void block() throws IOException
{
_semaphone.acquire();
try
{
_semaphone.acquire();
if (_cause==COMPLETED)
return;
if (_cause instanceof IOException)
throw (IOException) _cause;
if (_cause instanceof CancellationException)
throw (CancellationException) _cause;
if (_cause instanceof TimeoutException)
throw (TimeoutException) _cause;
throw new IOException(_cause);
}
catch (final InterruptedException e)
{
throw new InterruptedIOException(){{initCause(e);}};
}
finally
{
_done.set(false);

View File

@ -24,6 +24,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.Buffer;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@ -326,7 +327,7 @@ public class BufferUtil
/* ------------------------------------------------------------ */
/**
*/
public static void append(ByteBuffer to, byte[] b,int off,int len)
public static void append(ByteBuffer to, byte[] b,int off,int len) throws BufferOverflowException
{
int pos= flipToFill(to);
try
@ -339,6 +340,25 @@ public class BufferUtil
}
}
/* ------------------------------------------------------------ */
/** Like append, but does not throw {@link BufferOverflowException}
*/
public static int fill(ByteBuffer to, byte[] b,int off,int len)
{
int pos= flipToFill(to);
try
{
int remaining=to.remaining();
int take=remaining<len?remaining:len;
to.put(b,off,take);
return take;
}
finally
{
flipToFlush(to,pos);
}
}
/* ------------------------------------------------------------ */
/**
*/

View File

@ -22,6 +22,7 @@ package org.eclipse.jetty.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -144,6 +145,29 @@ public class BufferUtilTest
assertEquals("1234567890",BufferUtil.toString(to));
}
@Test
public void testAppend() throws Exception
{
ByteBuffer to = BufferUtil.allocate(8);
ByteBuffer from=BufferUtil.toBuffer("12345");
BufferUtil.append(to,from.array(),0,3);
assertEquals("123",BufferUtil.toString(to));
BufferUtil.append(to,from.array(),3,2);
assertEquals("12345",BufferUtil.toString(to));
try
{
BufferUtil.append(to,from.array(),0,5);
Assert.fail();
}
catch(BufferOverflowException e)
{}
}
@Test
public void testPutDirect() throws Exception
{

View File

@ -57,18 +57,7 @@ public class HttpTransportOverMux implements HttpTransport
public void send(ResponseInfo info, ByteBuffer responseBodyContent, boolean lastContent) throws IOException
{
send(info,responseBodyContent,lastContent,streamBlocker);
try
{
streamBlocker.block();
}
catch (IOException e)
{
throw e;
}
catch (Exception e)
{
throw new EofException(e);
}
streamBlocker.block();
}
@Override