diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 200b51ffb8f..a17382f0543 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -991,8 +991,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio protected void abort(Throwable failure) { - terminate(); notifyFailure(this, failure); + terminate(); } public boolean isDisconnected() diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index f163d27a2a0..0f251a95ecd 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -134,6 +134,12 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF getConnection().onSessionFailure(new IOException("HTTP/2 " + error + reason)); } + @Override + public void onFailure(Session session, Throwable failure) + { + getConnection().onSessionFailure(failure); + } + @Override public void onHeaders(Stream stream, HeadersFrame frame) { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 4cf3396dca3..02ba47cf825 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -296,8 +296,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel public void onFailure(Throwable failure) { - onEarlyEOF(); - getState().asyncError(failure); + if (onEarlyEOF()) + handle(); + else + getState().asyncError(failure); } protected void consumeInput() diff --git a/tests/test-http-client-transport/pom.xml b/tests/test-http-client-transport/pom.xml index 8a45e70a261..b835c863c81 100644 --- a/tests/test-http-client-transport/pom.xml +++ b/tests/test-http-client-transport/pom.xml @@ -61,7 +61,7 @@ org.eclipse.jetty - jetty-server + jetty-servlet ${project.version} test diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java index 493f801a8f5..1fb649238df 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java @@ -21,6 +21,8 @@ package org.eclipse.jetty.http.client; import java.util.ArrayList; import java.util.List; +import javax.servlet.http.HttpServlet; + import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClientTransport; @@ -40,6 +42,8 @@ import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.toolchain.test.TestTracker; import org.eclipse.jetty.util.SocketAddressResolver; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -67,6 +71,8 @@ public abstract class AbstractTest protected SslContextFactory sslContextFactory; protected Server server; protected ServerConnector connector; + protected ServletContextHandler context; + protected String servletPath = "/servlet"; protected HttpClient client; public AbstractTest(Transport transport) @@ -81,6 +87,22 @@ public abstract class AbstractTest startClient(); } + public void start(HttpServlet servlet) throws Exception + { + startServer(servlet); + startClient(); + } + + protected void startServer(HttpServlet servlet) throws Exception + { + context = new ServletContextHandler(); + context.setContextPath("/"); + ServletHolder holder = new ServletHolder(servlet); + holder.setAsyncSupported(true); + context.addServlet(holder, servletPath); + startServer(context); + } + protected void startServer(Handler handler) throws Exception { sslContextFactory = new SslContextFactory(); @@ -228,9 +250,19 @@ public abstract class AbstractTest @After public void stop() throws Exception + { + stopClient(); + stopServer(); + } + + protected void stopClient() throws Exception { if (client != null) client.stop(); + } + + protected void stopServer() throws Exception + { if (server != null) server.stop(); } diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java similarity index 54% rename from jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java rename to tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java index 91a6a7d5eca..409f075931f 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java @@ -16,13 +16,12 @@ // ======================================================================== // -package org.eclipse.jetty.servlet; +package org.eclipse.jetty.http.client; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.Socket; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -42,105 +41,106 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpChannel; -import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; -import org.eclipse.jetty.toolchain.test.AdvancedRunner; -import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser; -import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse; -import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.log.StacklessLogging; import org.hamcrest.Matchers; -import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; -import org.junit.runner.RunWith; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -@RunWith(AdvancedRunner.class) -public class AsyncIOServletTest +public class AsyncIOServletTest extends AbstractTest { - private Server server; - private ServerConnector connector; - private String path = "/path"; - private static final ThreadLocal scope = new ThreadLocal<>(); + private static final ThreadLocal scope = new ThreadLocal<>(); - public void startServer(HttpServlet servlet) throws Exception + public AsyncIOServletTest(Transport transport) { - startServer(servlet, 30000); + super(transport == Transport.FCGI ? null : transport); } - public void startServer(HttpServlet servlet, long idleTimeout) throws Exception + @Override + protected void startServer(Handler handler) throws Exception { - server = new Server(); - connector = new ServerConnector(server); - connector.setIdleTimeout(idleTimeout); - connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setDelayDispatchUntilContent(false); - server.addConnector(connector); - - ServletContextHandler context = new ServletContextHandler(server, "/", false, false); - ServletHolder holder = new ServletHolder(servlet); - holder.setAsyncSupported(true); - context.addServlet(holder, path); - - context.addEventListener(new ContextHandler.ContextScopeListener() + if (handler == context) { - @Override - public void enterScope(Context context, Request request, Object reason) + // Add this listener before the context is started, so it's durable. + context.addEventListener(new ContextHandler.ContextScopeListener() { - if (scope.get() != null) + @Override + public void enterScope(Context context, Request request, Object reason) { - System.err.println(Thread.currentThread() + " Already entered scope!!!"); - scope.get().printStackTrace(); - throw new IllegalStateException(); + checkScope(); + scope.set(new RuntimeException()); } - scope.set(new Throwable()); - } - @Override - public void exitScope(Context context, Request request) - { - if (scope.get() == null) - throw new IllegalStateException(); - scope.set(null); - } - }); - - server.start(); - } - - private static void assertScope() - { - if (scope.get() == null) - Assert.fail("Not in scope"); - } - - @After - public void stopServer() throws Exception - { - server.stop(); - if (scope.get() != null) - { - System.err.println("Still in scope after stop!"); - scope.get().printStackTrace(); - throw new IllegalStateException("Didn't leave scope"); + @Override + public void exitScope(Context context, Request request) + { + assertScope(); + scope.set(null); + } + }); } + super.startServer(handler); + } + + private void assertScope() + { + Assert.assertNotNull("Not in scope", scope.get()); + } + + private void checkScope() + { + RuntimeException callScope = scope.get(); + if (callScope != null) + throw callScope; + } + + protected void stopServer() throws Exception + { + super.stopServer(); + checkScope(); scope.set(null); } + private void sleep(long ms) throws IOException + { + try + { + Thread.sleep(ms); + } + catch (InterruptedException e) + { + throw new InterruptedIOException(); + } + } + @Test public void testAsyncReadThrowsException() throws Exception { @@ -156,7 +156,7 @@ public class AsyncIOServletTest private void testAsyncReadThrows(final Throwable throwable) throws Exception { final CountDownLatch latch = new CountDownLatch(1); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -190,52 +190,36 @@ public class AsyncIOServletTest Assert.assertThat("onError message", t.getMessage(), is(throwable.getMessage())); latch.countDown(); response.setStatus(500); - asyncContext.complete(); } }); } }); - String data = "0123456789"; - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Length: " + data.length() + "\r\n" + - "\r\n" + - data; + ContentResponse response = client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(new StringContentProvider("0123456789")) + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - client.setSoTimeout(5000); - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("500 Server Error")); - while (line.length() > 0) - { - line = in.readLine(); - } - - assertTrue(latch.await(5, TimeUnit.SECONDS)); - } + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus()); } @Test public void testAsyncReadIdleTimeout() throws Exception { - final int status = 567; - startServer(new HttpServlet() + int status = 567; + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { assertScope(); - final AsyncContext asyncContext = request.startAsync(request, response); + AsyncContext asyncContext = request.startAsync(request, response); asyncContext.setTimeout(0); - final ServletInputStream inputStream = request.getInputStream(); + ServletInputStream inputStream = request.getInputStream(); inputStream.setReadListener(new ReadListener() { @Override @@ -263,39 +247,51 @@ public class AsyncIOServletTest } }); } - }, 1000); - - String data1 = "0123456789"; - String data2 = "ABCDEF"; - // Only send the first chunk of data and then let it idle timeout. - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Length: " + (data1.length() + data2.length()) + "\r\n" + - "\r\n" + - data1; - - try (Socket client = new Socket("localhost", connector.getLocalPort())) + }); + connector.setIdleTimeout(1000); + CountDownLatch closeLatch = new CountDownLatch(1); + connector.addBean(new Connection.Listener() { - client.setSoTimeout(5000); - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); + @Override + public void onOpened(Connection connection) + { + } - SimpleHttpParser parser = new SimpleHttpParser(); - SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8"))); + @Override + public void onClosed(Connection connection) + { + closeLatch.countDown(); + } + }); - assertEquals(String.valueOf(status), response.getCode()); + String data = "0123456789"; + DeferredContentProvider content = new DeferredContentProvider(); + content.offer(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))); + CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(content) + .onResponseSuccess(r -> responseLatch.countDown()) + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + assertEquals(status, result.getResponse().getStatus()); + clientLatch.countDown(); + }); - // Make sure the connection was closed by the server. - assertEquals(-1, client.getInputStream().read()); - } + assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + content.close(); + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); } @Test public void testOnErrorThrows() throws Exception { final AtomicInteger errors = new AtomicInteger(); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException @@ -337,24 +333,15 @@ public class AsyncIOServletTest } }); - String data = "0123456789"; - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Length: " + data.length() + "\r\n" + - "\r\n" + - data; - - try (Socket client = new Socket("localhost", connector.getLocalPort()); - StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); + ContentResponse response = client.newRequest(newURI()) + .path(servletPath) + .content(new StringContentProvider("0123456789")) + .timeout(5, TimeUnit.SECONDS) + .send(); - SimpleHttpParser parser = new SimpleHttpParser(); - SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8"))); - - Assert.assertEquals("500", response.getCode()); + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus()); Assert.assertEquals(1, errors.get()); } } @@ -373,8 +360,8 @@ public class AsyncIOServletTest private void testAsyncWriteThrows(final Throwable throwable) throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - startServer(new HttpServlet() + CountDownLatch latch = new CountDownLatch(1); + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -407,35 +394,25 @@ public class AsyncIOServletTest } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "\r\n"; + ContentResponse response = client.newRequest(newURI()) + .path(servletPath) + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - - SimpleHttpParser parser = new SimpleHttpParser(); - SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8"))); - - assertTrue(latch.await(5, TimeUnit.SECONDS)); - Assert.assertEquals("500", response.getCode()); - } + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus()); } - @Test public void testAsyncWriteClosed() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); String text = "Now is the winter of our discontent. How Now Brown Cow. The quick brown fox jumped over the lazy dog.\n"; for (int i = 0; i < 10; i++) text = text + text; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + byte[] data = text.getBytes(StandardCharsets.UTF_8); - startServer(new HttpServlet() + CountDownLatch errorLatch = new CountDownLatch(1); + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -451,22 +428,12 @@ public class AsyncIOServletTest public void onWritePossible() throws IOException { assertScope(); - while (out.isReady()) - { - try - { - Thread.sleep(100); - out.write(data); - } - catch (IOException e) - { - throw e; - } - catch (Exception e) - { - e.printStackTrace(); - } - } + + // Wait for the failure to arrive to + // the server while we are about to write. + sleep(1000); + + out.write(data); } @Override @@ -474,45 +441,37 @@ public class AsyncIOServletTest { assertScope(); async.complete(); - latch.countDown(); + errorLatch.countDown(); } }); } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "\r\n"; + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .path(servletPath) + .onResponseHeaders(response -> + { + if (response.getStatus() == HttpStatus.OK_200) + response.abort(new IOException("explicitly_closed_by_test")); + }) + .send(result -> + { + if (result.isFailed()) + clientLatch.countDown(); + }); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - line = in.readLine(); - line = in.readLine(); - assertThat(line, not(containsString(" "))); - line = in.readLine(); - assertThat(line, containsString("discontent. How Now Brown Cow. The ")); - } - - if (!latch.await(5, TimeUnit.SECONDS)) - Assert.fail(); + assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); } - @Test public void testIsReadyAtEOF() throws Exception { String text = "TEST\n"; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -520,16 +479,41 @@ public class AsyncIOServletTest assertScope(); response.flushBuffer(); - final AsyncContext async = request.startAsync(); - final ServletInputStream in = request.getInputStream(); - final ServletOutputStream out = response.getOutputStream(); + AsyncContext async = request.startAsync(); + ServletInputStream input = request.getInputStream(); + ServletOutputStream output = response.getOutputStream(); - in.setReadListener(new ReadListener() + input.setReadListener(new ReadListener() { transient int _i = 0; transient boolean _minusOne = false; transient boolean _finished = false; + @Override + public void onDataAvailable() throws IOException + { + assertScope(); + while (input.isReady() && !input.isFinished()) + { + int b = input.read(); + if (b == -1) + _minusOne = true; + else if (data[_i++] != b) + throw new IllegalStateException(); + } + + if (input.isFinished()) + _finished = true; + } + + @Override + public void onAllDataRead() throws IOException + { + assertScope(); + output.write(String.format("i=%d eof=%b finished=%b", _i, _minusOne, _finished).getBytes(StandardCharsets.ISO_8859_1)); + async.complete(); + } + @Override public void onError(Throwable t) { @@ -537,68 +521,29 @@ public class AsyncIOServletTest t.printStackTrace(); async.complete(); } - - @Override - public void onDataAvailable() throws IOException - { - assertScope(); - while (in.isReady() && !in.isFinished()) - { - int b = in.read(); - if (b == -1) - _minusOne = true; - else if (data[_i++] != b) - throw new IllegalStateException(); - } - - if (in.isFinished()) - _finished = true; - } - - @Override - public void onAllDataRead() throws IOException - { - assertScope(); - out.write(String.format("i=%d eof=%b finished=%b", _i, _minusOne, _finished).getBytes(StandardCharsets.ISO_8859_1)); - async.complete(); - } }); } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + data.length + "\r\n" + - "Connection: close\r\n" + - "\r\n"; + ContentResponse response = client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .header(HttpHeader.CONNECTION, "close") + .content(new StringContentProvider(text)) + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - output.write(data); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - line = in.readLine(); - line = in.readLine(); - assertThat(line, containsString("i=" + data.length + " eof=true finished=true")); - } + String responseContent = response.getContentAsString(); + assertThat(responseContent, containsString("i=" + data.length + " eof=true finished=true")); } - @Test public void testOnAllDataRead() throws Exception { String text = "X"; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); - - startServer(new HttpServlet() + byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + String success = "SUCCESS"; + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -613,21 +558,13 @@ public class AsyncIOServletTest in.setReadListener(new ReadListener() { - @Override - public void onError(Throwable t) - { - assertScope(); - t.printStackTrace(); - async.complete(); - } - @Override public void onDataAvailable() throws IOException { assertScope(); try { - Thread.sleep(1000); + sleep(1000); if (!in.isReady()) throw new IllegalStateException(); if (in.read() != 'X') @@ -637,9 +574,9 @@ public class AsyncIOServletTest if (in.read() != -1) throw new IllegalStateException(); } - catch (Exception e) + catch (IOException x) { - e.printStackTrace(); + throw new UncheckedIOException(x); } } @@ -647,64 +584,10 @@ public class AsyncIOServletTest public void onAllDataRead() throws IOException { assertScope(); - out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1)); + out.write(success.getBytes(StandardCharsets.ISO_8859_1)); async.complete(); } - }); - } - }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + data.length + "\r\n" + - "Connection: close\r\n" + - "\r\n"; - - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - client.setSoTimeout(5000); - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - Thread.sleep(100); - output.write(data); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - line = in.readLine(); - line = in.readLine(); - assertThat(line, containsString("OK")); - } - } - - @Test - public void testOtherThreadOnAllDataRead() throws Exception - { - String text = "X"; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); - - startServer(new HttpServlet() - { - @Override - protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException - { - assertScope(); - response.flushBuffer(); - - final AsyncContext async = request.startAsync(); - async.setTimeout(500000); - final ServletInputStream in = request.getInputStream(); - final ServletOutputStream out = response.getOutputStream(); - - if (request.getDispatcherType() == DispatcherType.ERROR) - throw new IllegalStateException(); - - in.setReadListener(new ReadListener() - { @Override public void onError(Throwable t) { @@ -712,7 +595,70 @@ public class AsyncIOServletTest t.printStackTrace(); async.complete(); } + }); + } + }); + CountDownLatch clientLatch = new CountDownLatch(1); + DeferredContentProvider content = new DeferredContentProvider() + { + @Override + public long getLength() + { + return data.length; + } + }; + client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(content) + .timeout(5, TimeUnit.SECONDS) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + { + Response response = result.getResponse(); + String content = getContentAsString(); + if (response.getStatus() == HttpStatus.OK_200 && success.equals(content)) + clientLatch.countDown(); + } + } + }); + + sleep(100); + content.offer(ByteBuffer.wrap(data)); + content.close(); + + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testOtherThreadOnAllDataRead() throws Exception + { + String text = "X"; + byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + String success = "SUCCESS"; + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException + { + assertScope(); + response.flushBuffer(); + + AsyncContext async = request.startAsync(); + async.setTimeout(0); + ServletInputStream input = request.getInputStream(); + ServletOutputStream output = response.getOutputStream(); + + if (request.getDispatcherType() == DispatcherType.ERROR) + throw new IllegalStateException(); + + input.setReadListener(new ReadListener() + { @Override public void onDataAvailable() throws IOException { @@ -722,19 +668,19 @@ public class AsyncIOServletTest assertScope(); try { - Thread.sleep(1000); - if (!in.isReady()) + sleep(1000); + if (!input.isReady()) throw new IllegalStateException(); - if (in.read() != 'X') + if (input.read() != 'X') throw new IllegalStateException(); - if (!in.isReady()) + if (!input.isReady()) throw new IllegalStateException(); - if (in.read() != -1) + if (input.read() != -1) throw new IllegalStateException(); } - catch (Exception e) + catch (IOException x) { - e.printStackTrace(); + throw new UncheckedIOException(x); } }); } @@ -742,49 +688,59 @@ public class AsyncIOServletTest @Override public void onAllDataRead() throws IOException { - out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1)); + output.write(success.getBytes(StandardCharsets.ISO_8859_1)); + async.complete(); + } + + @Override + public void onError(Throwable t) + { + assertScope(); + t.printStackTrace(); async.complete(); } }); } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + data.length + "\r\n" + - "Connection: close\r\n" + - "\r\n"; + CountDownLatch clientLatch = new CountDownLatch(1); + DeferredContentProvider content = new DeferredContentProvider(); + client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(content) + .timeout(5, TimeUnit.SECONDS) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + { + Response response = result.getResponse(); + String content = getContentAsString(); + if (response.getStatus() == HttpStatus.OK_200 && success.equals(content)) + clientLatch.countDown(); + } + } + }); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - client.setSoTimeout(500000); - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - Thread.sleep(100); - output.write(data); - output.flush(); + sleep(100); + content.offer(ByteBuffer.wrap(data)); + content.close(); - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - line = in.readLine(); - line = in.readLine(); - assertThat(line, containsString("OK")); - } + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); } - @Test public void testCompleteBeforeOnAllDataRead() throws Exception { String text = "XYZ"; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); - final AtomicBoolean allDataRead = new AtomicBoolean(false); + byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + String success = "SUCCESS"; + AtomicBoolean allDataRead = new AtomicBoolean(false); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -792,29 +748,22 @@ public class AsyncIOServletTest assertScope(); response.flushBuffer(); - final AsyncContext async = request.startAsync(); - final ServletInputStream in = request.getInputStream(); - final ServletOutputStream out = response.getOutputStream(); + AsyncContext async = request.startAsync(); + ServletInputStream input = request.getInputStream(); + ServletOutputStream output = response.getOutputStream(); - in.setReadListener(new ReadListener() + input.setReadListener(new ReadListener() { - @Override - public void onError(Throwable t) - { - assertScope(); - t.printStackTrace(); - } - @Override public void onDataAvailable() throws IOException { assertScope(); - while (in.isReady()) + while (input.isReady()) { - int b = in.read(); + int b = input.read(); if (b < 0) { - out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1)); + output.write(success.getBytes(StandardCharsets.ISO_8859_1)); async.complete(); return; } @@ -825,57 +774,46 @@ public class AsyncIOServletTest public void onAllDataRead() throws IOException { assertScope(); - out.write("BAD!!!\n".getBytes(StandardCharsets.ISO_8859_1)); + output.write("FAILURE".getBytes(StandardCharsets.ISO_8859_1)); allDataRead.set(true); throw new IllegalStateException(); } + + @Override + public void onError(Throwable t) + { + assertScope(); + t.printStackTrace(); + } }); } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + data.length + "\r\n" + - "Connection: close\r\n" + - "\r\n"; + ContentResponse response = client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .header(HttpHeader.CONNECTION, "close") + .content(new StringContentProvider(text)) + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - Thread.sleep(100); - output.write(data); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - { - line = in.readLine(); - } - line = in.readLine(); - assertThat(line, containsString("OK")); - Assert.assertFalse(allDataRead.get()); - } + assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); + assertThat(response.getContentAsString(), Matchers.equalTo(success)); } - @Test public void testEmptyAsyncRead() throws Exception { - final AtomicBoolean oda = new AtomicBoolean(); - final CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean oda = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { assertScope(); - final AsyncContext asyncContext = request.startAsync(request, response); + AsyncContext asyncContext = request.startAsync(request, response); response.setStatus(200); response.getOutputStream().close(); request.getInputStream().setReadListener(new ReadListener() @@ -906,24 +844,15 @@ public class AsyncIOServletTest } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Connection: close\r\n" + - "\r\n"; + ContentResponse response = client.newRequest(newURI()) + .path(servletPath) + .header(HttpHeader.CONNECTION, "close") + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - - String response = IO.toString(client.getInputStream()); - assertThat(response, containsString(" 200 OK")); - // wait for onAllDataRead BEFORE closing client - latch.await(); - } - - // ODA not called at all! + assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + // onDataAvailable must not be called. Assert.assertFalse(oda.get()); } @@ -932,7 +861,7 @@ public class AsyncIOServletTest { Queue errors = new ConcurrentLinkedQueue<>(); CountDownLatch writeLatch = new CountDownLatch(1); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException @@ -990,34 +919,127 @@ public class AsyncIOServletTest }); String content = "0123456789ABCDEF"; + DeferredContentProvider contentProvider = new DeferredContentProvider(); + contentProvider.offer(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(contentProvider) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + { + Response response = result.getResponse(); + assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); + assertThat(getContentAsString(), Matchers.equalTo(content)); + assertThat(errors, Matchers.hasSize(0)); + clientLatch.countDown(); + } + } + }); - try (Socket client = new Socket("localhost", connector.getLocalPort())) + assertTrue(writeLatch.await(5, TimeUnit.SECONDS)); + + contentProvider.close(); + + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testAsyncReadEarlyEOF() throws Exception + { + // SSLEngine receives the close alert from the client, and when + // the server passes the response to encrypt and write, SSLEngine + // only generates the close alert back, without encrypting the + // response, so we need to skip the transports over TLS. + Assume.assumeThat(transport, Matchers.not(Matchers.isOneOf(Transport.HTTPS, Transport.H2))); + + String content = "jetty"; + int responseCode = HttpStatus.NO_CONTENT_204; + CountDownLatch readLatch = new CountDownLatch(content.length()); + CountDownLatch errorLatch = new CountDownLatch(1); + start(new HttpServlet() { - OutputStream output = client.getOutputStream(); + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + AsyncContext asyncContext = request.startAsync(); + ServletInputStream input = request.getInputStream(); + input.setReadListener(new ReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + while (input.isReady() && !input.isFinished()) + { + int read = input.read(); + System.err.printf("%x%n", read); + readLatch.countDown(); + } + } - String request = "POST " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Transfer-Encoding: chunked\r\n" + - "\r\n" + - "10\r\n" + - content + "\r\n"; - output.write(request.getBytes("UTF-8")); - output.flush(); + @Override + public void onAllDataRead() throws IOException + { + } - assertTrue(writeLatch.await(5, TimeUnit.SECONDS)); + @Override + public void onError(Throwable x) + { + response.setStatus(responseCode); + asyncContext.complete(); + errorLatch.countDown(); + } + }); + } + }); - request = "" + - "0\r\n" + - "\r\n"; - output.write(request.getBytes("UTF-8")); - output.flush(); + CountDownLatch responseLatch = new CountDownLatch(1); + DeferredContentProvider contentProvider = new DeferredContentProvider(); + contentProvider.offer(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + org.eclipse.jetty.client.api.Request request = client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(contentProvider) + .onResponseSuccess(response -> responseLatch.countDown()); - HttpTester.Input input = HttpTester.from(client.getInputStream()); - HttpTester.Response response = HttpTester.parseResponse(input); + Destination destination = client.getDestination(getScheme(), "localhost", connector.getLocalPort()); + FuturePromise promise = new FuturePromise<>(); + destination.newConnection(promise); + org.eclipse.jetty.client.api.Connection connection = promise.get(5, TimeUnit.SECONDS); + CountDownLatch clientLatch = new CountDownLatch(1); + connection.send(request, result -> + { + assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode)); + clientLatch.countDown(); + }); - assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); - assertThat(response.getContent(), Matchers.equalTo(content)); - assertThat(errors, Matchers.hasSize(0)); + assertTrue(readLatch.await(5, TimeUnit.SECONDS)); + + switch (transport) + { + case HTTP: + case HTTPS: + ((HttpConnectionOverHTTP)connection).getEndPoint().shutdownOutput(); + break; + case H2C: + case H2: + Session session = ((HttpConnectionOverHTTP2)connection).getSession(); + ((HTTP2Session)session).getEndPoint().shutdownOutput(); + break; + default: + Assert.fail(); } + + // Wait for the response to arrive before finishing the request. + assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + contentProvider.close(); + + assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); } }