398467 Servlet 3.1 Non Blocking IO

Added ByteBuffer write and improved test harnesses
This commit is contained in:
Greg Wilkins 2013-07-18 12:53:07 +10:00
parent a1789c8f7c
commit 8645849272
10 changed files with 439 additions and 94 deletions

View File

@ -23,6 +23,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.resource.Resource;
/* ------------------------------------------------------------ */
@ -98,7 +99,16 @@ public interface HttpContent
@Override
public ByteBuffer getDirectBuffer()
{
return null;
if (_resource.length()<=0 || _maxBuffer<_resource.length())
return null;
try
{
return BufferUtil.toBuffer(_resource,true);
}
catch(IOException e)
{
throw new RuntimeException(e);
}
}
/* ------------------------------------------------------------ */
@ -114,24 +124,9 @@ public interface HttpContent
{
if (_resource.length()<=0 || _maxBuffer<_resource.length())
return null;
int length=(int)_resource.length();
byte[] array = new byte[length];
int offset=0;
try (InputStream in=_resource.getInputStream())
try
{
do
{
int filled=in.read(array,offset,length);
if (filled<0)
break;
length-=filled;
offset+=filled;
}
while(length>0);
ByteBuffer buffer = ByteBuffer.wrap(array);
return buffer;
return BufferUtil.toBuffer(_resource,false);
}
catch(IOException e)
{

View File

@ -456,30 +456,34 @@ public class HttpChannelState
}
}
if (aListeners!=null)
if (event!=null)
{
for (AsyncListener listener : aListeners)
if (aListeners!=null)
{
try
if (event.getThrowable()!=null)
{
if (event!=null && event.getThrowable()!=null)
{
event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
listener.onError(event);
}
else
listener.onComplete(event);
event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
}
catch(Exception e)
for (AsyncListener listener : aListeners)
{
LOG.warn(e);
try
{
if (event.getThrowable()!=null)
listener.onError(event);
else
listener.onComplete(event);
}
catch(Exception e)
{
LOG.warn(e);
}
}
}
}
if (event!=null)
event.completed();
}
}
protected void recycle()

View File

@ -314,7 +314,59 @@ write completed - - - ASYNC READY->owp
}
}
public void write(ByteBuffer buffer) throws IOException
{
_written+=buffer.remaining();
boolean complete=_channel.getResponse().isAllContentWritten(_written);
// Async or Blocking ?
while(true)
{
switch(_state.get())
{
case OPEN:
// process blocking below
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
continue;
// Do the asynchronous writing from the callback
new AsyncWrite(buffer,complete).process();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case CLOSED:
throw new EofException("Closed");
}
break;
}
// handle blocking write
int len=BufferUtil.length(buffer);
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
_channel.write(_aggregate, complete && len==0);
// write any remaining content in the buffer directly
if (len>0)
_channel.write(buffer, complete);
else if (complete)
_channel.write(BufferUtil.EMPTY_BUFFER,complete);
if (complete)
closed();
}
@Override
public void write(int b) throws IOException
@ -623,17 +675,22 @@ write completed - - - ASYNC READY->owp
private class AsyncWrite extends AsyncFlush
{
private final byte[] _b;
private final int _off;
private final int _len;
private final ByteBuffer _buffer;
private final boolean _complete;
private final int _len;
public AsyncWrite(byte[] b, int off, int len, boolean complete)
{
_b=b;
_off=off;
_len=len;
_buffer=ByteBuffer.wrap(b, off, len);
_complete=complete;
_len=len;
}
public AsyncWrite(ByteBuffer buffer, boolean complete)
{
_buffer=buffer;
_complete=complete;
_len=buffer.remaining();
}
@Override
@ -649,14 +706,13 @@ write completed - - - ASYNC READY->owp
// TODO write comments
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_aggregate.capacity()/4)
{
BufferUtil.append(_aggregate, _b, _off, _len);
BufferUtil.put(_buffer,_aggregate);
}
// TODO write comments
else if (_len>0 && !_flushed)
{
ByteBuffer buffer=ByteBuffer.wrap(_b, _off, _len);
_flushed=true;
_channel.write(buffer, _complete, this);
_channel.write(_buffer, _complete, this);
return false;
}
else if (_len==0 && !_flushed)

View File

@ -284,26 +284,9 @@ public class ResourceCache
{
try
{
int len=(int)resource.length();
if (len<0)
{
LOG.warn("invalid resource: "+String.valueOf(resource)+" "+len);
return null;
}
ByteBuffer buffer = BufferUtil.allocate(len);
int pos=BufferUtil.flipToFill(buffer);
if (resource.getFile()!=null)
BufferUtil.readFrom(resource.getFile(),buffer);
else
{
InputStream is = resource.getInputStream();
BufferUtil.readFrom(is,len,buffer);
is.close();
}
BufferUtil.flipToFlush(buffer,pos);
return buffer;
return BufferUtil.toBuffer(resource,true);
}
catch(IOException e)
catch(IOException|IllegalArgumentException e)
{
LOG.warn(e);
return null;
@ -316,30 +299,11 @@ public class ResourceCache
try
{
if (_useFileMappedBuffer && resource.getFile()!=null)
return BufferUtil.toBuffer(resource.getFile());
return BufferUtil.toMappedBuffer(resource.getFile());
int len=(int)resource.length();
if (len<0)
{
LOG.warn("invalid resource: "+String.valueOf(resource)+" "+len);
return null;
}
ByteBuffer buffer = BufferUtil.allocateDirect(len);
int pos=BufferUtil.flipToFill(buffer);
if (resource.getFile()!=null)
BufferUtil.readFrom(resource.getFile(),buffer);
else
{
InputStream is = resource.getInputStream();
BufferUtil.readFrom(is,len,buffer);
is.close();
}
BufferUtil.flipToFlush(buffer,pos);
return buffer;
return BufferUtil.toBuffer(resource,true);
}
catch(IOException e)
catch(IOException|IllegalArgumentException e)
{
LOG.warn(e);
return null;

View File

@ -531,7 +531,7 @@ public class ResourceHandler extends HandlerWrapper
resource.length()>_minMemoryMappedContentLength &&
resource instanceof FileResource)
{
ByteBuffer buffer = BufferUtil.toBuffer(resource.getFile());
ByteBuffer buffer = BufferUtil.toMappedBuffer(resource.getFile());
((HttpOutput)out).sendContent(buffer,callback);
}
else // Do a blocking write of a channel (if available) or input stream
@ -550,7 +550,7 @@ public class ResourceHandler extends HandlerWrapper
resource.length()>_minMemoryMappedContentLength &&
resource instanceof FileResource)
{
ByteBuffer buffer = BufferUtil.toBuffer(resource.getFile());
ByteBuffer buffer = BufferUtil.toMappedBuffer(resource.getFile());
((HttpOutput)out).sendContent(buffer);
}
else // Do a blocking write of a channel (if available) or input stream

View File

@ -27,11 +27,14 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.resource.Resource;
import org.hamcrest.Matchers;
import org.junit.After;
@ -119,9 +122,9 @@ public class HttpOutputTest
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("Transfer-Encoding: chunked"));
assertThat(response,containsString("400\tThis is a big file"));
assertThat(response,containsString("\r\n0\r\n"));
}
@Test
public void testSendChannelSimple() throws Exception
@ -141,8 +144,32 @@ public class HttpOutputTest
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testSendBigDirect() throws Exception
{
Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,true);
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("Content-Length"));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testSendBigInDirect() throws Exception
{
Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("Content-Length"));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testSendChannelBigChunked() throws Exception
{
@ -191,8 +218,175 @@ public class HttpOutputTest
assertThat(response,containsString("\r\n0\r\n"));
}
@Test
public void testWriteSmall() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._bytes=new byte[8];
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testWriteMed() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._bytes=new byte[4000];
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testWriteLarge() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._bytes=new byte[8192];
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testWriteBufferSmall() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._buffer=BufferUtil.allocate(8);
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testWriteBufferMed() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._buffer=BufferUtil.allocate(4000);
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testWriteBufferLarge() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._buffer=BufferUtil.allocate(8192);
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testAsyncWriteSmall() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._bytes=new byte[8];
_handler._async=true;
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testAsyncWriteMed() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._bytes=new byte[4000];
_handler._async=true;
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testAsyncWriteLarge() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._bytes=new byte[8192];
_handler._async=true;
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testAsyncWriteBufferSmall() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._buffer=BufferUtil.allocate(8);
_handler._async=true;
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testAsyncWriteBufferMed() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._buffer=BufferUtil.allocate(4000);
_handler._async=true;
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
@Test
public void testAsyncWriteBufferLarge() throws Exception
{
final Resource big = Resource.newClassPathResource("simple/big.txt");
_handler._content=BufferUtil.toBuffer(big,false);
_handler._buffer=BufferUtil.allocate(8192);
_handler._async=true;
String response=_connector.getResponses("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,Matchers.not(containsString("Content-Length")));
assertThat(response,containsString("400\tThis is a big file"));
}
static class ContentHandler extends AbstractHandler
{
boolean _async;
ByteBuffer _buffer;
byte[] _bytes;
ByteBuffer _content;
InputStream _contentInputStream;
ReadableByteChannel _contentChannel;
@ -202,7 +396,7 @@ public class HttpOutputTest
baseRequest.setHandled(true);
response.setContentType("text/plain");
HttpOutput out = (HttpOutput) response.getOutputStream();
final HttpOutput out = (HttpOutput) response.getOutputStream();
if (_contentInputStream!=null)
{
@ -218,6 +412,113 @@ public class HttpOutputTest
return;
}
if (_bytes!=null)
{
if (_async)
{
final AsyncContext async = request.startAsync();
out.setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
while (out.isReady())
{
int len=_content.remaining();
if (len>_bytes.length)
len=_bytes.length;
if (len==0)
{
async.complete();
break;
}
_content.get(_bytes,0,len);
out.write(_bytes,0,len);
}
}
@Override
public void onError(Throwable t)
{
t.printStackTrace();
async.complete();
}
});
return;
}
while(BufferUtil.hasContent(_content))
{
int len=_content.remaining();
if (len>_bytes.length)
len=_bytes.length;
_content.get(_bytes,0,len);
out.write(_bytes,0,len);
}
return;
}
if (_buffer!=null)
{
if (_async)
{
final AsyncContext async = request.startAsync();
out.setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
while (out.isReady())
{
if(BufferUtil.isEmpty(_content))
{
async.complete();
break;
}
BufferUtil.clearToFill(_buffer);
BufferUtil.put(_content,_buffer);
BufferUtil.flipToFlush(_buffer,0);
out.write(_buffer);
}
}
@Override
public void onError(Throwable t)
{
t.printStackTrace();
async.complete();
}
});
return;
}
while(BufferUtil.hasContent(_content))
{
BufferUtil.clearToFill(_buffer);
BufferUtil.put(_content,_buffer);
BufferUtil.flipToFlush(_buffer,0);
out.write(_buffer);
}
return;
}
if (_content!=null)
{
out.sendContent(_content);
_content=null;
return;
}
}
}

View File

@ -257,8 +257,6 @@ public class AsyncServletIOTest
return list;
}
static AtomicInteger _owp = new AtomicInteger();
static AtomicInteger _oda = new AtomicInteger();
static AtomicInteger _read = new AtomicInteger();
@ -376,7 +374,4 @@ public class AsyncServletIOTest
});
}
}
}

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
@ -41,6 +42,7 @@ import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.hamcrest.Matchers;
@ -48,10 +50,12 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.junit.Assert.assertEquals;
@RunWith(AdvancedRunner.class)
public class AsyncServletTest
{
protected AsyncServlet _servlet=new AsyncServlet();

View File

@ -1,3 +1,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.server.LEVEL=DEBUG
#org.eclipse.jetty.servlet.LEVEL=DEBUG

View File

@ -30,6 +30,8 @@ import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.charset.Charset;
import org.eclipse.jetty.util.resource.Resource;
/* ------------------------------------------------------------------------------- */
/**
@ -787,7 +789,7 @@ public class BufferUtil
return ByteBuffer.wrap(array, offset, length);
}
public static ByteBuffer toBuffer(File file) throws IOException
public static ByteBuffer toMappedBuffer(File file) throws IOException
{
try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
{
@ -795,6 +797,29 @@ public class BufferUtil
}
}
public static ByteBuffer toBuffer(Resource resource,boolean direct) throws IOException
{
int len=(int)resource.length();
if (len<0)
throw new IllegalArgumentException("invalid resource: "+String.valueOf(resource)+" len="+len);
ByteBuffer buffer = direct?BufferUtil.allocateDirect(len):BufferUtil.allocate(len);
int pos=BufferUtil.flipToFill(buffer);
if (resource.getFile()!=null)
BufferUtil.readFrom(resource.getFile(),buffer);
else
{
try (InputStream is = resource.getInputStream();)
{
BufferUtil.readFrom(is,len,buffer);
}
}
BufferUtil.flipToFlush(buffer,pos);
return buffer;
}
public static String toSummaryString(ByteBuffer buffer)
{
if (buffer == null)