454157 - HttpInput.consumeAll spins if input is in async mode.

Added additional check to test whether consumeAll() could actually
consume the content, and if not, abort the channel.
This commit is contained in:
Simone Bordet 2014-12-05 14:12:01 +01:00
parent cce5c14e7c
commit d3763e0d1c
3 changed files with 85 additions and 7 deletions

View File

@ -366,7 +366,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {}", this); LOG.debug("unconsumed input {}", this);
// Complete reading the request // Complete reading the request
_channel.getRequest().getHttpInput().consumeAll(); if (!_channel.getRequest().getHttpInput().consumeAll())
_channel.abort();
} }
} }

View File

@ -277,10 +277,14 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
_channelState.onReadPossible(); _channelState.onReadPossible();
} }
public void consumeAll() public boolean consumeAll()
{ {
synchronized (lock()) synchronized (lock())
{ {
// Don't bother reading if we already know there was an error.
if (_onError != null)
return false;
try try
{ {
while (!isFinished()) while (!isFinished())
@ -291,10 +295,12 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
else else
consume(item, remaining(item)); consume(item, remaining(item));
} }
return true;
} }
catch (IOException e) catch (IOException e)
{ {
LOG.debug(e); LOG.debug(e);
return false;
} }
} }
} }

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.servlet; package org.eclipse.jetty.servlet;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThat;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
@ -30,10 +27,10 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener; import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
@ -48,6 +45,13 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class AsyncIOServletTest public class AsyncIOServletTest
{ {
private Server server; private Server server;
@ -147,6 +151,73 @@ public class AsyncIOServletTest
} }
} }
@Test
public void testAsyncReadIdleTimeout() throws Exception
{
final int status = 567;
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
final AsyncContext asyncContext = request.startAsync(request, response);
asyncContext.setTimeout(0);
final ServletInputStream inputStream = request.getInputStream();
inputStream.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (inputStream.isReady() && !inputStream.isFinished())
inputStream.read();
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable t)
{
response.setStatus(status);
// Do not put Connection: close header here, the test
// verifies that the server closes no matter what.
asyncContext.complete();
}
});
}
});
server.stop();
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
server.start();
String data1 = "0123456789";
String data2 = "ABCDEF";
// Only send the first chunk of data and then let it idle timeout.
String request = "GET " + path + " HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"Content-Length: " + (data1.length() + data2.length()) + "\r\n" +
"\r\n" +
data1;
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
SimpleHttpParser parser = new SimpleHttpParser();
SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8")));
assertEquals(String.valueOf(status), response.getCode());
// Make sure the connection was closed by the server.
assertEquals(-1, client.getInputStream().read());
}
}
@Test @Test
public void testOnErrorThrows() throws Exception public void testOnErrorThrows() throws Exception
{ {