Merge remote-tracking branch 'origin/jetty-9.2.x'

Conflicts:
	jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java
	jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
This commit is contained in:
Greg Wilkins 2014-12-05 08:11:54 +01:00
commit 0f4a4cdac6
4 changed files with 107 additions and 5 deletions

View File

@ -461,7 +461,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
}
else if (isCommitted())
{
_transport.abort(x);
abort(x);
if (!(x instanceof EofException))
LOG.warn("Could not send response error 500: "+x);
}

View File

@ -349,11 +349,27 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// Finish consuming the request
// If we are still expecting
if (_channel.isExpecting100Continue())
{
// close to seek EOF
_parser.close();
}
else if (_parser.inContentState() && _generator.isPersistent())
// Complete reading the request
_channel.getRequest().getHttpInput().consumeAll();
{
// If we are async, then we have problems to complete neatly
if (_channel.getRequest().getHttpInput().isAsync())
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed async input {}", this);
_channel.abort();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {}", this);
// Complete reading the request
_channel.getRequest().getHttpInput().consumeAll();
}
}
// Reset the channel, parsers and generator
_channel.recycle();

View File

@ -322,7 +322,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
public void consumeAll()
{
synchronized (lock())
{
{
try
{
while (!isFinished())

View File

@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
@ -65,6 +66,7 @@ public class AsyncServletIOTest
private static final Logger LOG = Log.getLogger(AsyncServletIOTest.class);
protected AsyncIOServlet _servlet0=new AsyncIOServlet();
protected AsyncIOServlet2 _servlet2=new AsyncIOServlet2();
protected AsyncIOServlet3 _servlet3=new AsyncIOServlet3();
protected int _port;
protected Server _server = new Server();
protected ServletHandler _servletHandler;
@ -89,9 +91,13 @@ public class AsyncServletIOTest
_servletHandler.addServletWithMapping(holder,"/path/*");
ServletHolder holder2=new ServletHolder(_servlet2);
holder.setAsyncSupported(true);
holder2.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder2,"/path2/*");
ServletHolder holder3=new ServletHolder(_servlet3);
holder3.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder3,"/path3/*");
_server.start();
_port=_connector.getLocalPort();
@ -209,6 +215,50 @@ public class AsyncServletIOTest
Assert.assertTrue(_servlet2.completed.await(5, TimeUnit.SECONDS));
}
@Test
public void testAsyncConsumeAll() throws Exception
{
StringBuilder request = new StringBuilder(512);
request.append("GET /ctx/path3/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: text/plain\r\n")
.append("Content-Length: 10\r\n")
.append("\r\n");
int port=_port;
try (Socket socket = new Socket("localhost",port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400);
// response line
String line = in.readLine();
LOG.debug("response-line: "+line);
Assert.assertThat(line,startsWith("HTTP/1.1 200 OK"));
// Skip headers
while (line!=null)
{
line = in.readLine();
LOG.debug("header-line: "+line);
if (line.length()==0)
break;
}
// Get body
line = in.readLine();
LOG.debug("body: "+line);
Assert.assertEquals("DONE",line);
// The connection should be aborted
line = in.readLine();
Assert.assertNull(line);
}
}
public synchronized List<String> process(String content,int... writes) throws Exception
{
return process(content.getBytes(StandardCharsets.ISO_8859_1),writes);
@ -507,4 +557,40 @@ public class AsyncServletIOTest
}
}
}
@SuppressWarnings("serial")
public class AsyncIOServlet3 extends HttpServlet
{
public CountDownLatch completed = new CountDownLatch(1);
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException
{
AsyncContext async = request.startAsync();
request.getInputStream().setReadListener(new ReadListener()
{
@Override
public void onError(Throwable t)
{
}
@Override
public void onDataAvailable() throws IOException
{
}
@Override
public void onAllDataRead() throws IOException
{
}
});
response.setStatus(200);
response.getOutputStream().print("DONE");
async.complete();
}
}
}