426750 isReady() returns true at EOF

This commit is contained in:
Greg Wilkins 2014-02-05 17:34:22 +11:00
parent ba318ccbd3
commit 7725056234
3 changed files with 215 additions and 43 deletions

View File

@ -313,7 +313,9 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
boolean finished;
synchronized (lock())
{
if (_listener == null)
if (_contentState.isEOF())
return true;
if (_listener == null )
return true;
if (available() > 0)
return true;

View File

@ -63,13 +63,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private volatile Throwable _onError;
/*
ACTION OPEN ASYNC READY PENDING UNREADY
-------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise
write() OPEN ise PENDING wpe wpe
flush() OPEN ise PENDING wpe wpe
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false
write completed - - - ASYNC READY->owp
ACTION OPEN ASYNC READY PENDING UNREADY CLOSED
-----------------------------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise ise
write() OPEN ise PENDING wpe wpe eof
flush() OPEN ise PENDING wpe wpe eof
close() CLOSED CLOSED CLOSED CLOSED wpe CLOSED
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
write completed - - - ASYNC READY->owp -
*/
enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
@ -131,48 +133,66 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override
public void close()
{
OutputState state=_state.get();
while(state!=OutputState.CLOSED)
loop: while(true)
{
if (_state.compareAndSet(state,OutputState.CLOSED))
OutputState state=_state.get();
switch (state)
{
try
{
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
}
catch(IOException e)
{
LOG.debug(e);
_channel.failed();
}
releaseBuffer();
return;
case CLOSED:
break loop;
case UNREADY:
throw new WritePendingException(); // TODO ?
default:
if (_state.compareAndSet(state,OutputState.CLOSED))
{
try
{
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
}
catch(IOException e)
{
LOG.debug(e);
_channel.failed();
}
releaseBuffer();
return;
}
}
state=_state.get();
}
}
/* Called to indicated that the output is already closed and the state needs to be updated to match */
void closed()
{
OutputState state=_state.get();
while(state!=OutputState.CLOSED)
loop: while(true)
{
if (_state.compareAndSet(state,OutputState.CLOSED))
OutputState state=_state.get();
switch (state)
{
try
{
_channel.getResponse().closeOutput();
}
catch(IOException e)
{
LOG.debug(e);
_channel.failed();
}
releaseBuffer();
return;
case CLOSED:
break loop;
case UNREADY:
throw new WritePendingException(); // TODO ?
default:
if (_state.compareAndSet(state,OutputState.CLOSED))
{
try
{
_channel.getResponse().closeOutput();
}
catch(IOException e)
{
LOG.debug(e);
_channel.failed();
}
releaseBuffer();
return;
}
}
state=_state.get();
}
}
@ -667,8 +687,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return false;
case UNREADY:
return false;
case CLOSED:
return false;
return true;
}
}
}
@ -683,6 +704,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_writeListener.onError(new IOException(th));
close();
}
switch(_state.get())
{
case READY:
@ -700,6 +722,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case CLOSED:
try
{
new Throwable().printStackTrace();
// even though a write is not possible, because a close has
// occurred, we need to call onWritePossible to tell async
// producer that the last write completed.
@ -716,6 +739,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
}
@Override
public String toString()
{
return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
}
private abstract class AsyncICB extends IteratingCallback
{
@ -741,7 +769,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break;
case CLOSED:
_channel.getState().onWritePossible();
break;
default:

View File

@ -32,10 +32,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -55,7 +58,9 @@ import org.junit.Test;
// TODO need these on SPDY as well!
public class AsyncServletIOTest
{
protected AsyncIOServlet _servlet=new AsyncIOServlet();
protected AsyncIOServlet _servlet0=new AsyncIOServlet();
protected AsyncIOServlet2 _servlet2=new AsyncIOServlet2();
protected int _port;
protected Server _server = new Server();
@ -74,9 +79,16 @@ public class AsyncServletIOTest
context.setContextPath("/ctx");
_server.setHandler(context);
_servletHandler=context.getServletHandler();
ServletHolder holder=new ServletHolder(_servlet);
ServletHolder holder=new ServletHolder(_servlet0);
holder.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder,"/path/*");
ServletHolder holder2=new ServletHolder(_servlet2);
holder.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder2,"/path2/*");
_server.start();
_port=_connector.getLocalPort();
@ -147,6 +159,56 @@ public class AsyncServletIOTest
}
@Test
public void testAsync2() throws Exception
{
StringBuilder request = new StringBuilder(512);
request.append("GET /ctx/path2/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Connection: close\r\n")
.append("\r\n");
int port=_port;
List<String> list = new ArrayList<>();
try (Socket socket = new Socket("localhost",port);)
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes("ISO-8859-1"));
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400);
// response line
String line = in.readLine();
// System.err.println("resp: "+line);
Assert.assertThat(line,Matchers.startsWith("HTTP/1.1 200 OK"));
// Skip headers
while (line!=null)
{
line = in.readLine();
// System.err.println("line: "+line);
if (line.length()==0)
break;
}
// Get body slowly
while (true)
{
line = in.readLine();
// System.err.println("body: "+line);
if (line==null)
break;
list.add(line);
}
}
Assert.assertEquals(list.get(0),"data");
Assert.assertEquals(_servlet2.completed.get(),1);
}
protected void assertContains(String content,String response)
{
Assert.assertThat(response,Matchers.containsString(content));
@ -298,7 +360,7 @@ public class AsyncServletIOTest
throw new IllegalStateException();
// System.err.println("ODA");
while (in.isReady())
while (in.isReady() && !in.isFinished())
{
_oda.incrementAndGet();
int len=in.read(_buf);
@ -374,4 +436,85 @@ public class AsyncServletIOTest
});
}
}
public class AsyncIOServlet2 extends HttpServlet
{
public AtomicInteger completed = new AtomicInteger(0);
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException
{
new SampleAsycListener(request,response);
}
class SampleAsycListener implements WriteListener, AsyncListener
{
final ServletResponse response;
final ServletOutputStream servletOutputStream;
final AsyncContext asyncContext;
SampleAsycListener(HttpServletRequest request,HttpServletResponse response) throws IOException
{
asyncContext = request.startAsync();
asyncContext.setTimeout(10000L);
asyncContext.addListener(this);
servletOutputStream = response.getOutputStream();
servletOutputStream.setWriteListener(this);
this.response=response;
}
volatile boolean written=false;
@Override
public void onWritePossible() throws IOException
{
if (!written)
{
written=true;
response.setContentLength(5);
servletOutputStream.write("data\n".getBytes());
}
if (servletOutputStream.isReady())
{
asyncContext.complete();
}
}
@Override
public void onError(final Throwable t)
{
t.printStackTrace();
asyncContext.complete();
}
@Override
public void onComplete(final AsyncEvent event) throws IOException
{
completed.incrementAndGet();
}
@Override
public void onTimeout(final AsyncEvent event) throws IOException
{
asyncContext.complete();
}
@Override
public void onError(final AsyncEvent event) throws IOException
{
asyncContext.complete();
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
}
}
}
}