From 772505623471ef140b69d0911fafc38dcefb0baa Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 5 Feb 2014 17:34:22 +1100 Subject: [PATCH] 426750 isReady() returns true at EOF --- .../org/eclipse/jetty/server/HttpInput.java | 4 +- .../org/eclipse/jetty/server/HttpOutput.java | 105 +++++++----- .../jetty/servlet/AsyncServletIOTest.java | 149 +++++++++++++++++- 3 files changed, 215 insertions(+), 43 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index a66ed167b5c..771ff2c47ec 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -313,7 +313,9 @@ public abstract class HttpInput extends ServletInputStream implements Runnabl boolean finished; synchronized (lock()) { - if (_listener == null) + if (_contentState.isEOF()) + return true; + if (_listener == null ) return true; if (available() > 0) return true; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 1b676da6534..99a94235d12 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -63,13 +63,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable private volatile Throwable _onError; /* - ACTION OPEN ASYNC READY PENDING UNREADY - ------------------------------------------------------------------------------- - setWriteListener() READY->owp ise ise ise ise - write() OPEN ise PENDING wpe wpe - flush() OPEN ise PENDING wpe wpe - isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false - write completed - - - ASYNC READY->owp + ACTION OPEN ASYNC READY PENDING UNREADY CLOSED + ----------------------------------------------------------------------------------------------------- + setWriteListener() READY->owp ise ise ise ise ise + write() OPEN ise PENDING wpe wpe eof + flush() OPEN ise PENDING wpe wpe eof + close() CLOSED CLOSED CLOSED CLOSED wpe CLOSED + isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true + write completed - - - ASYNC READY->owp - + */ enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED } private final AtomicReference _state=new AtomicReference<>(OutputState.OPEN); @@ -131,48 +133,66 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void close() { - OutputState state=_state.get(); - while(state!=OutputState.CLOSED) + loop: while(true) { - if (_state.compareAndSet(state,OutputState.CLOSED)) + OutputState state=_state.get(); + switch (state) { - try - { - write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding()); - } - catch(IOException e) - { - LOG.debug(e); - _channel.failed(); - } - releaseBuffer(); - return; + case CLOSED: + break loop; + + case UNREADY: + throw new WritePendingException(); // TODO ? + + default: + if (_state.compareAndSet(state,OutputState.CLOSED)) + { + try + { + write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding()); + } + catch(IOException e) + { + LOG.debug(e); + _channel.failed(); + } + releaseBuffer(); + return; + } } - state=_state.get(); } } /* Called to indicated that the output is already closed and the state needs to be updated to match */ void closed() { - OutputState state=_state.get(); - while(state!=OutputState.CLOSED) + loop: while(true) { - if (_state.compareAndSet(state,OutputState.CLOSED)) + OutputState state=_state.get(); + switch (state) { - try - { - _channel.getResponse().closeOutput(); - } - catch(IOException e) - { - LOG.debug(e); - _channel.failed(); - } - releaseBuffer(); - return; + case CLOSED: + break loop; + + case UNREADY: + throw new WritePendingException(); // TODO ? + + default: + if (_state.compareAndSet(state,OutputState.CLOSED)) + { + try + { + _channel.getResponse().closeOutput(); + } + catch(IOException e) + { + LOG.debug(e); + _channel.failed(); + } + releaseBuffer(); + return; + } } - state=_state.get(); } } @@ -667,8 +687,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable return false; case UNREADY: return false; + case CLOSED: - return false; + return true; } } } @@ -683,6 +704,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _writeListener.onError(new IOException(th)); close(); } + switch(_state.get()) { case READY: @@ -700,6 +722,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable case CLOSED: try { + new Throwable().printStackTrace(); // even though a write is not possible, because a close has // occurred, we need to call onWritePossible to tell async // producer that the last write completed. @@ -716,6 +739,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable } } + @Override + public String toString() + { + return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get()); + } private abstract class AsyncICB extends IteratingCallback { @@ -741,7 +769,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable break; case CLOSED: - _channel.getState().onWritePossible(); break; default: diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java index a5a0554a55f..e38d39f68c3 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java +++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java @@ -32,10 +32,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; import javax.servlet.ReadListener; import javax.servlet.ServletException; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; +import javax.servlet.ServletResponse; import javax.servlet.WriteListener; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; @@ -55,7 +58,9 @@ import org.junit.Test; // TODO need these on SPDY as well! public class AsyncServletIOTest { - protected AsyncIOServlet _servlet=new AsyncIOServlet(); + protected AsyncIOServlet _servlet0=new AsyncIOServlet(); + protected AsyncIOServlet2 _servlet2=new AsyncIOServlet2(); + protected int _port; protected Server _server = new Server(); @@ -74,9 +79,16 @@ public class AsyncServletIOTest context.setContextPath("/ctx"); _server.setHandler(context); _servletHandler=context.getServletHandler(); - ServletHolder holder=new ServletHolder(_servlet); + + + ServletHolder holder=new ServletHolder(_servlet0); holder.setAsyncSupported(true); _servletHandler.addServletWithMapping(holder,"/path/*"); + + ServletHolder holder2=new ServletHolder(_servlet2); + holder.setAsyncSupported(true); + _servletHandler.addServletWithMapping(holder2,"/path2/*"); + _server.start(); _port=_connector.getLocalPort(); @@ -146,6 +158,56 @@ public class AsyncServletIOTest process("Hello!!!\r\n",10); } + + @Test + public void testAsync2() throws Exception + { + StringBuilder request = new StringBuilder(512); + request.append("GET /ctx/path2/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Connection: close\r\n") + .append("\r\n"); + + int port=_port; + List list = new ArrayList<>(); + try (Socket socket = new Socket("localhost",port);) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes("ISO-8859-1")); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400); + + // response line + String line = in.readLine(); + // System.err.println("resp: "+line); + Assert.assertThat(line,Matchers.startsWith("HTTP/1.1 200 OK")); + + // Skip headers + while (line!=null) + { + line = in.readLine(); + // System.err.println("line: "+line); + if (line.length()==0) + break; + } + + // Get body slowly + while (true) + { + line = in.readLine(); + // System.err.println("body: "+line); + if (line==null) + break; + list.add(line); + } + } + + Assert.assertEquals(list.get(0),"data"); + Assert.assertEquals(_servlet2.completed.get(),1); + } + + protected void assertContains(String content,String response) { @@ -298,7 +360,7 @@ public class AsyncServletIOTest throw new IllegalStateException(); // System.err.println("ODA"); - while (in.isReady()) + while (in.isReady() && !in.isFinished()) { _oda.incrementAndGet(); int len=in.read(_buf); @@ -374,4 +436,85 @@ public class AsyncServletIOTest }); } } + + + + public class AsyncIOServlet2 extends HttpServlet + { + public AtomicInteger completed = new AtomicInteger(0); + + @Override + public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException + { + new SampleAsycListener(request,response); + } + + class SampleAsycListener implements WriteListener, AsyncListener + { + final ServletResponse response; + final ServletOutputStream servletOutputStream; + final AsyncContext asyncContext; + + SampleAsycListener(HttpServletRequest request,HttpServletResponse response) throws IOException + { + asyncContext = request.startAsync(); + asyncContext.setTimeout(10000L); + asyncContext.addListener(this); + servletOutputStream = response.getOutputStream(); + servletOutputStream.setWriteListener(this); + this.response=response; + } + + volatile boolean written=false; + @Override + public void onWritePossible() throws IOException + { + if (!written) + { + written=true; + response.setContentLength(5); + servletOutputStream.write("data\n".getBytes()); + } + + if (servletOutputStream.isReady()) + { + asyncContext.complete(); + } + } + + @Override + public void onError(final Throwable t) + { + t.printStackTrace(); + asyncContext.complete(); + } + + @Override + public void onComplete(final AsyncEvent event) throws IOException + { + completed.incrementAndGet(); + } + + @Override + public void onTimeout(final AsyncEvent event) throws IOException + { + asyncContext.complete(); + } + + @Override + public void onError(final AsyncEvent event) throws IOException + { + asyncContext.complete(); + } + + @Override + public void onStartAsync(AsyncEvent event) throws IOException + { + + } + + } + } + + }