From 7db7ef30204a4ff897e49ec82960dff90078ed1f Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 5 Feb 2015 14:26:34 +1100 Subject: [PATCH] Added test cases and improved isReady and isFinished handling --- .../jetty/server/HttpChannelState.java | 32 +-- .../jetty/server/HttpConfiguration.java | 2 +- .../org/eclipse/jetty/server/HttpInput.java | 182 ++++++++++++------ .../eclipse/jetty/server/HttpInputTest.java | 22 ++- .../jetty/servlet/AsyncIOServletTest.java | 102 +++++++++- 5 files changed, 247 insertions(+), 93 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index a6650b7dc95..35d0f1b78ac 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -296,19 +296,6 @@ public class HttpChannelState throw new IllegalStateException(this.getStatusString()); } - if (_asyncRead) - { - _state=State.ASYNC_IO; - _asyncRead=false; - return Action.READ_CALLBACK; - } - - if (_asyncWrite) - { - _asyncWrite=false; - _state=State.ASYNC_IO; - return Action.WRITE_CALLBACK; - } if (_async!=null) { @@ -327,8 +314,25 @@ public class HttpChannelState _state=State.DISPATCHED; _async=null; return Action.ASYNC_EXPIRED; - case EXPIRING: case STARTED: + if (_asyncRead) + { + _state=State.ASYNC_IO; + _asyncRead=false; + return Action.READ_CALLBACK; + } + + if (_asyncWrite) + { + _asyncWrite=false; + _state=State.ASYNC_IO; + return Action.WRITE_CALLBACK; + } + scheduleTimeout(); + _state=State.ASYNC_WAIT; + return Action.WAIT; + + case EXPIRING: scheduleTimeout(); _state=State.ASYNC_WAIT; return Action.WAIT; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java index d9c833fe94c..fc19accd3be 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java @@ -54,7 +54,7 @@ public class HttpConfiguration private boolean _sendServerVersion = true; private boolean _sendXPoweredBy = false; private boolean _sendDateHeader = true; - private boolean _delayDispatchUntilContent = false; + private boolean _delayDispatchUntilContent = false; // TODO change to true /* ------------------------------------------------------------ */ /** diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 5d0e6118af7..c314315454e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -27,6 +27,7 @@ import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.ArrayQueue; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -112,7 +113,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable { int read = read(_oneByteBuffer, 0, 1); if (read==0) - throw new IllegalStateException("unready"); + throw new IllegalStateException("unready read=0"); return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; } @@ -148,12 +149,9 @@ public abstract class HttpInput extends ServletInputStream implements Runnable } /** - * Access the next content to be consumed from. Returning the next item does not consume it - * and it may be returned multiple times until it is consumed. - *

- * Calls to {@link #get(Content, byte[], int, int)} - * or {@link #skip(Content, int)} are required to consume data from the content. - * + * Get the next content from the inputQ, calling {@link #produceContent()} + * if need be. EOF is processed and state changed. + * * @return the content or null if none available. * @throws IOException if retrieving the content fails */ @@ -161,16 +159,21 @@ public abstract class HttpInput extends ServletInputStream implements Runnable { if (!Thread.holdsLock(_inputQ)) throw new IllegalStateException(); - Content content = pollInputQ(); + Content content = pollContent(); if (content==null && !isFinished()) { produceContent(); - content = pollInputQ(); + content = pollContent(); } return content; } - protected Content pollInputQ() + /** Poll the inputQ for Content. + * Consumed buffers and {@link PoisonPillContent}s are removed and + * EOF state updated if need be. + * @return Content or null + */ + protected Content pollContent() { if (!Thread.holdsLock(_inputQ)) throw new IllegalStateException(); @@ -203,7 +206,55 @@ public abstract class HttpInput extends ServletInputStream implements Runnable return content; } - + + /** + * Get the next readable from the inputQ, calling {@link #produceContent()} + * if need be. EOF is NOT processed and state is not changed. + * + * @return the content or EOF or null if none available. + * @throws IOException if retrieving the content fails + */ + protected Content nextReadable() throws IOException + { + if (!Thread.holdsLock(_inputQ)) + throw new IllegalStateException(); + Content content = pollReadable(); + if (content==null && !isFinished()) + { + produceContent(); + content = pollReadable(); + } + return content; + } + + /** Poll the inputQ for Content or EOF. + * Consumed buffers and non EOF {@link PoisonPillContent}s are removed. + * EOF state is not updated. + * @return Content, EOF or null + */ + protected Content pollReadable() + { + if (!Thread.holdsLock(_inputQ)) + throw new IllegalStateException(); + + // Items are removed only when they are fully consumed. + Content content = _inputQ.peekUnsafe(); + + // Skip consumed items at the head of the queue except EOF + while (content != null) + { + if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0) + return content; + + _inputQ.pollUnsafe(); + content.succeeded(); + if (LOG.isDebugEnabled()) + LOG.debug("{} consumed {}", this, content); + content = _inputQ.peekUnsafe(); + } + + return content; + } /** * @param item the content @@ -214,7 +265,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable return item.remaining(); } - /** * Copies the given content into the given byte buffer. * @@ -230,7 +280,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable content.getContent().get(buffer, offset, l); _contentConsumed+=l; if (l>0 && !content.hasContent()) - pollInputQ(); // hungry succeed + pollContent(); // hungry succeed return l; } @@ -248,7 +298,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable buffer.position(buffer.position()+l); _contentConsumed+=l; if (l>0 && !content.hasContent()) - pollInputQ(); // hungry succeed + pollContent(); // hungry succeed } @@ -391,34 +441,29 @@ public abstract class HttpInput extends ServletInputStream implements Runnable @Override public boolean isReady() { - synchronized (_inputQ) + try { - if (_listener == null ) - return true; - if (_unready) - return false; - if (_state instanceof EOFState) - return true; - - if (_inputQ.isEmpty()) + synchronized (_inputQ) { - try - { - produceContent(); - } - catch(IOException e) - { - failed(e); - } - } - - if (!_inputQ.isEmpty()) - return true; + if (_listener == null ) + return true; + if (_unready) + return false; + if (_state instanceof EOFState) + return true; + if (nextReadable()!=null) + return true; - _unready = true; + _unready = true; + } + unready(); + return false; + } + catch(IOException e) + { + LOG.ignore(e); + return true; } - unready(); - return false; } protected void unready() @@ -429,15 +474,28 @@ public abstract class HttpInput extends ServletInputStream implements Runnable public void setReadListener(ReadListener readListener) { readListener = Objects.requireNonNull(readListener); - synchronized (_inputQ) + boolean content; + try { - if (_state != STREAM) - throw new IllegalStateException("state=" + _state); - _state = ASYNC; - _listener = readListener; - _unready = true; + synchronized (_inputQ) + { + if (_state != STREAM) + throw new IllegalStateException("state=" + _state); + _state = ASYNC; + _listener = readListener; + _unready = true; + content=nextContent()!=null; + } } - onReadPossible(); + catch(IOException e) + { + throw new RuntimeIOException(e); + } + + if (content) + onReadPossible(); + else + unready(); } public void failed(Throwable x) @@ -488,30 +546,28 @@ public abstract class HttpInput extends ServletInputStream implements Runnable try { - if (error != null) - listener.onError(error); - else if (aeof) - listener.onAllDataRead(); - else - { + if (aeof) + listener.onAllDataRead(); + else if (error == null) listener.onDataAvailable(); - synchronized (_inputQ) - { - if (_state==AEOF) - { - _state=EOF; - aeof=true; - } - } - if (aeof) - listener.onAllDataRead(); - } + else + listener.onError(error); } catch (Throwable e) { LOG.warn(e.toString()); LOG.debug(e); - listener.onError(e); + try + { + if (aeof || error==null) + listener.onError(e); + } + catch (Throwable e2) + { + LOG.warn(e2.toString()); + LOG.debug(e2); + throw new RuntimeIOException(e2); + } } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java index bb9f0cecbed..f0da531888a 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java @@ -27,22 +27,21 @@ import static org.junit.Assert.fail; import java.io.EOFException; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Queue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.ReadListener; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.hamcrest.Matchers; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class HttpInputTest { Queue _history = new ConcurrentArrayQueue() @@ -328,7 +327,8 @@ public class HttpInputTest public void testAsyncEmpty() throws Exception { _in.setReadListener(_listener); - assertThat(_history.poll(),equalTo("onReadPossible")); + assertThat(_history.poll(),equalTo("produceContent 0")); + assertThat(_history.poll(),equalTo("unready")); assertThat(_history.poll(),nullValue()); _in.run(); @@ -349,7 +349,8 @@ public class HttpInputTest public void testAsyncRead() throws Exception { _in.setReadListener(_listener); - assertThat(_history.poll(),equalTo("onReadPossible")); + assertThat(_history.poll(),equalTo("produceContent 0")); + assertThat(_history.poll(),equalTo("unready")); assertThat(_history.poll(),nullValue()); _in.run(); @@ -401,7 +402,8 @@ public class HttpInputTest public void testAsyncEOF() throws Exception { _in.setReadListener(_listener); - assertThat(_history.poll(),equalTo("onReadPossible")); + assertThat(_history.poll(),equalTo("produceContent 0")); + assertThat(_history.poll(),equalTo("unready")); assertThat(_history.poll(),nullValue()); _in.run(); @@ -423,7 +425,8 @@ public class HttpInputTest public void testAsyncReadEOF() throws Exception { _in.setReadListener(_listener); - assertThat(_history.poll(),equalTo("onReadPossible")); + assertThat(_history.poll(),equalTo("produceContent 0")); + assertThat(_history.poll(),equalTo("unready")); assertThat(_history.poll(),nullValue()); _in.run(); @@ -475,7 +478,8 @@ public class HttpInputTest public void testAsyncError() throws Exception { _in.setReadListener(_listener); - assertThat(_history.poll(),equalTo("onReadPossible")); + assertThat(_history.poll(),equalTo("produceContent 0")); + assertThat(_history.poll(),equalTo("unready")); assertThat(_history.poll(),nullValue()); _in.run(); assertThat(_history.poll(),equalTo("onDataAvailable")); diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java index 2818d1b6189..84afe8c1c2c 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java +++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java @@ -39,6 +39,7 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.toolchain.test.AdvancedRunner; @@ -69,6 +70,7 @@ public class AsyncIOServletTest server = new Server(); connector = new ServerConnector(server); connector.setIdleTimeout(30000); + connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setDelayDispatchUntilContent(false); server.addConnector(connector); context = new ServletContextHandler(server, "/", false, false); @@ -149,11 +151,16 @@ public class AsyncIOServletTest output.write(request.getBytes("UTF-8")); output.flush(); - SimpleHttpParser parser = new SimpleHttpParser(); - SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8"))); - + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); + String line=in.readLine(); + assertThat(line, containsString("500 Server Error")); + while (line.length()>0) + { + line=in.readLine(); + } + line=in.readLine(); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); - Assert.assertEquals("500", response.getCode()); } } @@ -248,10 +255,10 @@ public class AsyncIOServletTest } @Override - public void onError(Throwable t) + public void onError(final Throwable t) { errors.incrementAndGet(); - throw new NullPointerException("explicitly_thrown_by_test_2"); + throw new NullPointerException("explicitly_thrown_by_test_2"){{this.initCause(t);}}; } }); } @@ -486,6 +493,7 @@ public class AsyncIOServletTest { OutputStream output = client.getOutputStream(); output.write(request.getBytes("UTF-8")); + output.flush(); output.write(data); output.flush(); @@ -588,4 +596,86 @@ public class AsyncIOServletTest assertThat(line, containsString("OK")); } } + + + @Test + public void testCompleteBeforeOnAllDataRead() throws Exception + { + String text = "XYZ"; + final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + final AtomicBoolean allDataRead = new AtomicBoolean(false); + + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException + { + response.flushBuffer(); + + final AsyncContext async = request.startAsync(); + final ServletInputStream in = request.getInputStream(); + final ServletOutputStream out = response.getOutputStream(); + + in.setReadListener(new ReadListener() + { + @Override + public void onError(Throwable t) + { + t.printStackTrace(); + } + + @Override + public void onDataAvailable() throws IOException + { + while (in.isReady()) + { + int b = in.read(); + if (b<0) + { + out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1)); + async.complete(); + return; + } + } + } + + @Override + public void onAllDataRead() throws IOException + { + out.write("BAD!!!\n".getBytes(StandardCharsets.ISO_8859_1)); + allDataRead.set(true); + throw new IllegalStateException(); + } + }); + } + }); + + String request = "GET " + path + " HTTP/1.1\r\n" + + "Host: localhost:" + connector.getLocalPort() + "\r\n" + + "Content-Type: text/plain\r\n"+ + "Content-Length: "+data.length+"\r\n" + + "Connection: close\r\n" + + "\r\n"; + + try (Socket client = new Socket("localhost", connector.getLocalPort())) + { + OutputStream output = client.getOutputStream(); + output.write(request.getBytes("UTF-8")); + output.flush(); + Thread.sleep(100); + output.write(data); + output.flush(); + + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); + String line=in.readLine(); + assertThat(line, containsString("200 OK")); + while (line.length()>0) + { + line=in.readLine(); + } + line=in.readLine(); + assertThat(line, containsString("OK")); + Assert.assertFalse(allDataRead.get()); + } + } }