Issue #730 - "Slow" client causes IllegalStateException.

Fixed by allowing state ASYNC to call onWritePossible().
This commit is contained in:
Simone Bordet 2016-07-15 18:11:28 +02:00
parent a38e819907
commit 0433a8ca73
2 changed files with 112 additions and 13 deletions

View File

@ -921,14 +921,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
switch(_state.get())
{
case CLOSED:
// Even though a write is not possible, because a close has
// occurred, we need to call onWritePossible to tell async
// producer that the last write completed.
// So fall through
case ASYNC:
case READY:
case PENDING:
case UNREADY:
case READY:
// Even though a write is not possible, because a close has
// occurred, we need to call onWritePossible to tell async
// producer that the last write completed, so fall through.
case CLOSED:
try
{
_writeListener.onWritePossible();

View File

@ -18,19 +18,14 @@
package org.eclipse.jetty.servlet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -47,6 +42,8 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
@ -59,11 +56,20 @@ import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@RunWith (AdvancedRunner.class)
public class AsyncIOServletTest
{
@ -922,4 +928,97 @@ public class AsyncIOServletTest
Assert.assertFalse(oda.get());
}
@Test
public void testWriteFromOnDataAvailable() throws Exception
{
Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
CountDownLatch writeLatch = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = request.startAsync();
request.getInputStream().setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
ServletInputStream input = request.getInputStream();
ServletOutputStream output = response.getOutputStream();
while (input.isReady())
{
byte[] buffer = new byte[512];
int read = input.read(buffer);
if (read < 0)
{
asyncContext.complete();
break;
}
if (output.isReady())
output.write(buffer, 0, read);
else
Assert.fail();
}
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable t)
{
errors.offer(t);
}
});
response.getOutputStream().setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
writeLatch.countDown();
}
@Override
public void onError(Throwable t)
{
errors.offer(t);
}
});
}
});
String content = "0123456789ABCDEF";
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
String request = "POST " + path + " HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"Transfer-Encoding: chunked\r\n" +
"\r\n" +
"10\r\n" +
content + "\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
request = "" +
"0\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
HttpTester.Input input = HttpTester.from(client.getInputStream());
HttpTester.Response response = HttpTester.parseResponse(input);
assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200));
assertThat(response.getContent(), Matchers.equalTo(content));
assertThat(errors, Matchers.hasSize(0));
}
}
}