483427 - AsyncContext complete while pending async Reads/Writes

This commit is contained in:
Greg Wilkins 2015-12-02 16:07:59 +11:00
parent 5e40bf29aa
commit 4bbd060ca8
5 changed files with 214 additions and 18 deletions

View File

@ -138,18 +138,24 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
LOG.debug("doClose {}", this);
try
{
try
{
_channel.close();
}
catch (IOException e)
{
LOG.debug(e);
}
finally
{
super.doClose();
}
_channel.close();
}
catch (IOException e)
{
LOG.debug(e);
}
finally
{
super.doClose();
}
}
@Override
public void onClose()
{
try
{
super.onClose();
}
finally
{
@ -157,6 +163,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
_selector.onClose(this);
}
}
@Override
public int fill(ByteBuffer buffer) throws IOException

View File

@ -131,6 +131,8 @@ public abstract class FillInterest
public void onClose()
{
Callback callback = _interested.get();
if (LOG.isDebugEnabled())
LOG.debug("{} onClose {}",this,callback);
if (callback != null && _interested.compareAndSet(callback, null))
callback.failed(new ClosedChannelException());
}

View File

@ -297,7 +297,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/
protected void endPointClosed(EndPoint endpoint)
{
endpoint.onClose();
}
/**

View File

@ -82,7 +82,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
setWriteListener() READY->owp ise ise ise ise ise
write() OPEN ise PENDING wpe wpe eof
flush() OPEN ise PENDING wpe wpe eof
close() CLOSED CLOSED CLOSED CLOSED wpe CLOSED
close() CLOSED CLOSED CLOSED CLOSED CLOSED CLOSED
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
write completed - - - ASYNC READY->owp -
*/
@ -196,10 +196,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return;
}
case UNREADY:
case PENDING:
{
if (_state.compareAndSet(state,OutputState.ERROR))
_writeListener.onError(_onError==null?new EofException("Async close"):_onError);
break;
if (!_state.compareAndSet(state,OutputState.CLOSED))
break;
IOException ex = new IOException("Closed while Pending/Unready");
LOG.warn(ex.toString());
LOG.debug(ex);
_channel.abort(ex);
}
default:
{
@ -321,6 +325,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return;
case PENDING:
return;
case UNREADY:
throw new WritePendingException();

View File

@ -18,9 +18,14 @@
package org.eclipse.jetty.servlet;
import static java.nio.charset.StandardCharsets.ISO_8859_1;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.IOException;
@ -57,6 +62,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -69,6 +75,7 @@ public class AsyncServletIOTest
protected AsyncIOServlet _servlet0=new AsyncIOServlet();
protected AsyncIOServlet2 _servlet2=new AsyncIOServlet2();
protected AsyncIOServlet3 _servlet3=new AsyncIOServlet3();
protected AsyncIOServlet4 _servlet4=new AsyncIOServlet4();
protected int _port;
protected Server _server = new Server();
protected ServletHandler _servletHandler;
@ -101,6 +108,10 @@ public class AsyncServletIOTest
holder3.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder3,"/path3/*");
ServletHolder holder4=new ServletHolder(_servlet4);
holder4.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder4,"/path4/*");
_server.start();
_port=_connector.getLocalPort();
@ -232,7 +243,7 @@ public class AsyncServletIOTest
int port=_port;
try (Socket socket = new Socket("localhost",port))
{
socket.setSoTimeout(1000000);
socket.setSoTimeout(10000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
@ -263,6 +274,8 @@ public class AsyncServletIOTest
}
}
public synchronized List<String> process(String content,int... writes) throws Exception
{
return process(content.getBytes(StandardCharsets.ISO_8859_1),writes);
@ -596,4 +609,173 @@ public class AsyncServletIOTest
async.complete();
}
}
@Test
public void testCompleteWhilePending() throws Exception
{
StringBuilder request = new StringBuilder(512);
request.append("POST /ctx/path4/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: text/plain\r\n")
.append("Content-Length: 20\r\n")
.append("\r\n")
.append("12345678\r\n");
int port=_port;
List<String> list = new ArrayList<>();
try (Socket socket = new Socket("localhost",port))
{
socket.setSoTimeout(10000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(ISO_8859_1));
out.flush();
Thread.sleep(100);
out.write("ABC".getBytes(ISO_8859_1));
out.flush();
Thread.sleep(100);
out.write("DEF".getBytes(ISO_8859_1));
out.flush();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// response line
String line = in.readLine();
LOG.debug("response-line: "+line);
Assert.assertThat(line,startsWith("HTTP/1.1 200 OK"));
boolean chunked=false;
// Skip headers
while (line!=null)
{
line = in.readLine();
LOG.debug("header-line: "+line);
chunked|="Transfer-Encoding: chunked".equals(line);
if (line.length()==0)
break;
}
assertTrue(chunked);
// Get body slowly
String last;
while (true)
{
last=line;
//Thread.sleep(1000);
line = in.readLine();
LOG.debug("body: "+line);
if (line==null)
break;
list.add(line);
}
LOG.debug("last: "+last);
// last non empty line should contain some X's
assertThat(last,containsString("X"));
// last non empty line should not contain end chunk
assertThat(last,not(containsString("0")));
}
assertTrue(_servlet4.completed.await(5, TimeUnit.SECONDS));
Thread.sleep(100);
assertEquals(2,_servlet4.onDA.get());
assertEquals(2,_servlet4.onWP.get());
}
@SuppressWarnings("serial")
public class AsyncIOServlet4 extends HttpServlet
{
public CountDownLatch completed = new CountDownLatch(1);
public AtomicInteger onDA = new AtomicInteger();
public AtomicInteger onWP = new AtomicInteger();
@Override
public void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException
{
final AsyncContext async = request.startAsync();
final ServletInputStream in = request.getInputStream();
final ServletOutputStream out = response.getOutputStream();
in.setReadListener(new ReadListener()
{
@Override
public void onError(Throwable t)
{
t.printStackTrace();
}
@Override
public void onDataAvailable() throws IOException
{
onDA.incrementAndGet();
if (onDA.get()>2)
return;
// Read all available content
while(in.isReady())
if (in.read()<0)
throw new IllegalStateException();
if (onDA.get()==1)
return;
final byte[] buffer = new byte[64*1024];
Arrays.fill(buffer,(byte)'X');
for (int i=199;i<buffer.length;i+=200)
buffer[i]=(byte)'\n';
// Once we read block, let's make ourselves write blocked
out.setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
onWP.incrementAndGet();
if (onWP.get()>2)
return;
while (out.isReady())
out.write(buffer);
if (onWP.get()==1)
return;
try
{
// As soon as we are write blocked, complete
async.complete();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
completed.countDown();
}
}
@Override
public void onError(Throwable t)
{
t.printStackTrace();
}
});
}
@Override
public void onAllDataRead() throws IOException
{
throw new IllegalStateException();
}
});
}
}
}