diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ContinueProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ContinueProtocolHandler.java index 354852d5f99..4081d37f011 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ContinueProtocolHandler.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ContinueProtocolHandler.java @@ -64,6 +64,10 @@ public class ContinueProtocolHandler implements ProtocolHandler return new ContinueListener(); } + protected void onContinue(Request request) + { + } + protected class ContinueListener extends BufferingResponseListener { @Override @@ -72,7 +76,8 @@ public class ContinueProtocolHandler implements ProtocolHandler // Handling of success must be done here and not from onComplete(), // since the onComplete() is not invoked because the request is not completed yet. - HttpConversation conversation = ((HttpRequest)response.getRequest()).getConversation(); + Request request = response.getRequest(); + HttpConversation conversation = ((HttpRequest)request).getConversation(); // Mark the 100 Continue response as handled conversation.setAttribute(ATTRIBUTE, Boolean.TRUE); @@ -88,6 +93,7 @@ public class ContinueProtocolHandler implements ProtocolHandler // All good, continue exchange.resetResponse(); exchange.proceed(null); + onContinue(request); break; } default: @@ -98,7 +104,7 @@ public class ContinueProtocolHandler implements ProtocolHandler List listeners = exchange.getResponseListeners(); HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding()); notifier.forwardSuccess(listeners, contentResponse); - exchange.proceed(new HttpRequestException("Expectation failed", exchange.getRequest())); + exchange.proceed(new HttpRequestException("Expectation failed", request)); break; } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index 58f6fe33288..766bc308cc2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -35,7 +35,6 @@ import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; -import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -68,21 +67,23 @@ public abstract class HttpConnection implements Connection @Override public void send(Request request, Response.CompleteListener listener) { - ArrayList listeners = new ArrayList<>(2); - if (request.getTimeout() > 0) + HttpRequest httpRequest = (HttpRequest)request; + + ArrayList listeners = new ArrayList<>(httpRequest.getResponseListeners()); + if (httpRequest.getTimeout() > 0) { - TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(request); + TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(httpRequest); timeoutListener.schedule(getHttpClient().getScheduler()); listeners.add(timeoutListener); } if (listener != null) listeners.add(listener); - HttpExchange exchange = new HttpExchange(getHttpDestination(), (HttpRequest)request, listeners); + HttpExchange exchange = new HttpExchange(getHttpDestination(), httpRequest, listeners); SendFailure result = send(exchange); if (result != null) - request.abort(result.failure); + httpRequest.abort(result.failure); } protected abstract SendFailure send(HttpExchange exchange); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index aaff1bad3ba..3a31b87d3fb 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -693,6 +693,11 @@ public class HttpRequest implements Request client.send(request, responseListeners); } + protected List getResponseListeners() + { + return responseListeners; + } + @Override public boolean abort(Throwable cause) { diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientExplicitConnectionTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientExplicitConnectionTest.java index 81455cbfda1..7e3f6edd200 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientExplicitConnectionTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientExplicitConnectionTest.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.client; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.client.api.Connection; @@ -27,7 +28,7 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.client.http.HttpDestinationOverHTTP; import org.eclipse.jetty.client.util.FutureResponseListener; -import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; @@ -65,7 +66,6 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe } } - @Slow @Test public void testExplicitConnectionIsClosedOnRemoteClose() throws Exception { @@ -98,4 +98,26 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe Assert.assertTrue(connectionPool.getActiveConnections().isEmpty()); Assert.assertTrue(connectionPool.getIdleConnections().isEmpty()); } + + @Test + public void testExplicitConnectionResponseListeners() throws Exception + { + start(new EmptyServerHandler()); + + Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort()); + FuturePromise futureConnection = new FuturePromise<>(); + destination.newConnection(futureConnection); + Connection connection = futureConnection.get(5, TimeUnit.SECONDS); + CountDownLatch responseLatch = new CountDownLatch(1); + Request request = client.newRequest(destination.getHost(), destination.getPort()) + .scheme(scheme) + .onResponseSuccess(response -> responseLatch.countDown()); + + FutureResponseListener listener = new FutureResponseListener(request); + connection.send(request, listener); + ContentResponse response = listener.get(5, TimeUnit.SECONDS); + + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 920a1759de6..b7c1ba43fbd 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -997,8 +997,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio protected void abort(Throwable failure) { - terminate(); notifyFailure(this, failure); + terminate(); } public boolean isDisconnected() diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index 8420752c1ca..f95c09d429f 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -162,8 +162,14 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen { dataInfo = queue.poll(); } + if (dataInfo == null) + { + DataInfo prevDataInfo = this.dataInfo; + if (prevDataInfo != null && prevDataInfo.last) + return Action.SUCCEEDED; return Action.IDLE; + } this.dataInfo = dataInfo; responseContent(dataInfo.exchange, dataInfo.buffer, this); @@ -176,11 +182,15 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool(); byteBufferPool.release(dataInfo.buffer); dataInfo.callback.succeeded(); - if (dataInfo.last) - responseSuccess(dataInfo.exchange); super.succeeded(); } + @Override + protected void onCompleteSuccess() + { + responseSuccess(dataInfo.exchange); + } + @Override protected void onCompleteFailure(Throwable failure) { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index 0af8f165e82..8295acc5c20 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -133,6 +133,12 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF getConnection().onSessionFailure(new IOException("HTTP/2 " + error + reason)); } + @Override + public void onFailure(Session session, Throwable failure) + { + getConnection().onSessionFailure(failure); + } + @Override public void onHeaders(Stream stream, HeadersFrame frame) { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index d7eb6261e22..7c3717fc3f0 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -44,7 +44,6 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Invocable.InvocationType; public class HttpChannelOverHTTP2 extends HttpChannel { @@ -297,8 +296,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel public void onFailure(Throwable failure) { - onEarlyEOF(); - getState().asyncError(failure); + if (onEarlyEOF()) + handle(); + else + getState().asyncError(failure); } protected void consumeInput() diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AbstractProxyServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AbstractProxyServlet.java index 8350febd4c3..2fc62333bdc 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AbstractProxyServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AbstractProxyServlet.java @@ -40,7 +40,9 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.ContinueProtocolHandler; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.ProtocolHandlers; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpField; @@ -80,6 +82,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool; */ public abstract class AbstractProxyServlet extends HttpServlet { + protected static final String CLIENT_REQUEST_ATTRIBUTE = "org.eclipse.jetty.proxy.clientRequest"; protected static final Set HOP_HEADERS; static { @@ -324,8 +327,10 @@ public abstract class AbstractProxyServlet extends HttpServlet // Content must not be decoded, otherwise the client gets confused. client.getContentDecoderFactories().clear(); - // No protocol handlers, pass everything to the client. - client.getProtocolHandlers().clear(); + // Pass traffic to the client, only intercept what's necessary. + ProtocolHandlers protocolHandlers = client.getProtocolHandlers(); + protocolHandlers.clear(); + protocolHandlers.put(new ProxyContinueProtocolHandler()); return client; } @@ -427,6 +432,11 @@ public abstract class AbstractProxyServlet extends HttpServlet clientRequest.getHeader(HttpHeader.TRANSFER_ENCODING.asString()) != null; } + protected boolean expects100Continue(HttpServletRequest request) + { + return HttpHeaderValue.CONTINUE.asString().equals(request.getHeader(HttpHeader.EXPECT.asString())); + } + protected void copyRequestHeaders(HttpServletRequest clientRequest, Request proxyRequest) { // First clear possibly existing headers, as we are going to copy those from the client request. @@ -638,6 +648,9 @@ public abstract class AbstractProxyServlet extends HttpServlet int status = failure instanceof TimeoutException ? HttpStatus.GATEWAY_TIMEOUT_504 : HttpStatus.BAD_GATEWAY_502; + int serverStatus = serverResponse == null ? status : serverResponse.getStatus(); + if (expects100Continue(clientRequest) && serverStatus >= HttpStatus.OK_200) + status = serverStatus; sendProxyResponseError(clientRequest, proxyResponse, status); } } @@ -655,6 +668,12 @@ public abstract class AbstractProxyServlet extends HttpServlet clientRequest.getAsyncContext().complete(); } + protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest) + { + if (_log.isDebugEnabled()) + _log.debug("{} handling 100 Continue", getRequestId(clientRequest)); + } + /** *

Utility class that implement transparent proxy functionalities.

*

Configuration parameters:

@@ -733,4 +752,14 @@ public abstract class AbstractProxyServlet extends HttpServlet return rewrittenURI.toString(); } } + + class ProxyContinueProtocolHandler extends ContinueProtocolHandler + { + @Override + protected void onContinue(Request request) + { + HttpServletRequest clientRequest = (HttpServletRequest)request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE); + AbstractProxyServlet.this.onContinue(clientRequest, request); + } + } } diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java index 491258acdc5..03ca4943d89 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java @@ -41,7 +41,6 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.ContentDecoder; import org.eclipse.jetty.client.GZIPContentDecoder; -import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; @@ -66,9 +65,10 @@ import org.eclipse.jetty.util.component.Destroyable; */ public class AsyncMiddleManServlet extends AbstractProxyServlet { - private static final String PROXY_REQUEST_COMMITTED = AsyncMiddleManServlet.class.getName() + ".proxyRequestCommitted"; - private static final String CLIENT_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".clientTransformer"; - private static final String SERVER_TRANSFORMER = AsyncMiddleManServlet.class.getName() + ".serverTransformer"; + private static final String PROXY_REQUEST_CONTENT_COMMITTED_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".proxyRequestContentCommitted"; + private static final String CLIENT_TRANSFORMER_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".clientTransformer"; + private static final String SERVER_TRANSFORMER_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".serverTransformer"; + private static final String CONTINUE_ACTION_ATTRIBUTE = AsyncMiddleManServlet.class.getName() + ".continueAction"; @Override protected void service(HttpServletRequest clientRequest, HttpServletResponse proxyResponse) throws ServletException, IOException @@ -91,8 +91,6 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet .method(clientRequest.getMethod()) .version(HttpVersion.fromString(clientRequest.getProtocol())); - boolean hasContent = hasContent(clientRequest); - copyRequestHeaders(clientRequest, proxyRequest); addProxyHeaders(clientRequest, proxyRequest); @@ -105,16 +103,43 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet // If there is content, the send of the proxy request // is delayed and performed when the content arrives, // to allow optimization of the Content-Length header. - if (hasContent) - proxyRequest.content(newProxyContentProvider(clientRequest, proxyResponse, proxyRequest)); + if (hasContent(clientRequest)) + { + DeferredContentProvider provider = newProxyContentProvider(clientRequest, proxyResponse, proxyRequest); + proxyRequest.content(provider); + + if (expects100Continue(clientRequest)) + { + proxyRequest.attribute(CLIENT_REQUEST_ATTRIBUTE, clientRequest); + proxyRequest.attribute(CONTINUE_ACTION_ATTRIBUTE, (Runnable)() -> + { + try + { + ServletInputStream input = clientRequest.getInputStream(); + input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider)); + } + catch (Throwable failure) + { + onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, failure); + } + }); + sendProxyRequest(clientRequest, proxyResponse, proxyRequest); + } + else + { + ServletInputStream input = clientRequest.getInputStream(); + input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider)); + } + } else + { sendProxyRequest(clientRequest, proxyResponse, proxyRequest); + } } - protected ContentProvider newProxyContentProvider(final HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest) throws IOException + protected DeferredContentProvider newProxyContentProvider(final HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest) throws IOException { - ServletInputStream input = clientRequest.getInputStream(); - DeferredContentProvider provider = new DeferredContentProvider() + return new DeferredContentProvider() { @Override public boolean offer(ByteBuffer buffer, Callback callback) @@ -124,8 +149,6 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet return super.offer(buffer, callback); } }; - input.setReadListener(newProxyReadListener(clientRequest, proxyResponse, proxyRequest, provider)); - return provider; } protected ReadListener newProxyReadListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider) @@ -154,6 +177,14 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet return ContentTransformer.IDENTITY; } + @Override + protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest) + { + super.onContinue(clientRequest, proxyRequest); + Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE); + action.run(); + } + private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List output) throws IOException { try @@ -197,10 +228,10 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet private void cleanup(HttpServletRequest clientRequest) { - ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER); + ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER_ATTRIBUTE); if (clientTransformer instanceof Destroyable) ((Destroyable)clientTransformer).destroy(); - ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER); + ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER_ATTRIBUTE); if (serverTransformer instanceof Destroyable) ((Destroyable)serverTransformer).destroy(); } @@ -237,6 +268,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet private final Request proxyRequest; private final DeferredContentProvider provider; private final int contentLength; + private final boolean expects100Continue; private int length; protected ProxyReader(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest, DeferredContentProvider provider) @@ -246,6 +278,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet this.proxyRequest = proxyRequest; this.provider = provider; this.contentLength = clientRequest.getContentLength(); + this.expects100Continue = expects100Continue(clientRequest); } @Override @@ -321,15 +354,13 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet private void process(ByteBuffer content, Callback callback, boolean finished) throws IOException { - ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER); + ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER_ATTRIBUTE); if (transformer == null) { transformer = newClientRequestContentTransformer(clientRequest, proxyRequest); - clientRequest.setAttribute(CLIENT_TRANSFORMER, transformer); + clientRequest.setAttribute(CLIENT_TRANSFORMER_ATTRIBUTE, transformer); } - boolean committed = clientRequest.getAttribute(PROXY_REQUEST_COMMITTED) != null; - int contentBytes = content.remaining(); // Skip transformation for empty non-last buffers. @@ -361,11 +392,15 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet if (_log.isDebugEnabled()) _log.debug("{} upstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes); - if (!committed && (size > 0 || finished)) + boolean contentCommitted = clientRequest.getAttribute(PROXY_REQUEST_CONTENT_COMMITTED_ATTRIBUTE) != null; + if (!contentCommitted && (size > 0 || finished)) { - proxyRequest.header(HttpHeader.CONTENT_LENGTH, null); - clientRequest.setAttribute(PROXY_REQUEST_COMMITTED, true); - sendProxyRequest(clientRequest, proxyResponse, proxyRequest); + clientRequest.setAttribute(PROXY_REQUEST_CONTENT_COMMITTED_ATTRIBUTE, true); + if (!expects100Continue) + { + proxyRequest.header(HttpHeader.CONTENT_LENGTH, null); + sendProxyRequest(clientRequest, proxyResponse, proxyRequest); + } } if (size == 0) @@ -401,6 +436,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet @Override public void onBegin(Response serverResponse) { + response = serverResponse; proxyResponse.setStatus(serverResponse.getStatus()); } @@ -430,11 +466,11 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet clientRequest.setAttribute(WRITE_LISTENER_ATTRIBUTE, proxyWriter); } - ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER); + ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER_ATTRIBUTE); if (transformer == null) { transformer = newServerResponseContentTransformer(clientRequest, proxyResponse, serverResponse); - clientRequest.setAttribute(SERVER_TRANSFORMER, transformer); + clientRequest.setAttribute(SERVER_TRANSFORMER_ATTRIBUTE, transformer); } length += contentBytes; @@ -502,7 +538,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet if (contentLength < 0) { ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE); - ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER); + ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER_ATTRIBUTE); transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers); @@ -544,7 +580,6 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet @Override public void onComplete(Result result) { - response = result.getResponse(); if (result.isSucceeded()) complete.succeeded(); else diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java index d636ad89ec2..a0af592f4c6 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ProxyServlet.java @@ -18,9 +18,12 @@ package org.eclipse.jetty.proxy; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.servlet.AsyncContext; @@ -29,13 +32,16 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.AsyncContentProvider; import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.InputStreamContentProvider; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; /** *

Servlet 3.0 asynchronous proxy servlet.

@@ -47,6 +53,8 @@ import org.eclipse.jetty.util.Callback; */ public class ProxyServlet extends AbstractProxyServlet { + private static final String CONTINUE_ACTION_ATTRIBUTE = ProxyServlet.class.getName() + ".continueAction"; + @Override protected void service(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { @@ -83,7 +91,30 @@ public class ProxyServlet extends AbstractProxyServlet proxyRequest.timeout(getTimeout(), TimeUnit.MILLISECONDS); if (hasContent(request)) - proxyRequest.content(proxyRequestContent(request, response, proxyRequest)); + { + if (expects100Continue(request)) + { + DeferredContentProvider deferred = new DeferredContentProvider(); + proxyRequest.content(deferred); + proxyRequest.attribute(CLIENT_REQUEST_ATTRIBUTE, request); + proxyRequest.attribute(CONTINUE_ACTION_ATTRIBUTE, (Runnable)() -> + { + try + { + ContentProvider provider = proxyRequestContent(request, response, proxyRequest); + new DelegatingContentProvider(request, proxyRequest, response, provider, deferred).iterate(); + } + catch (Throwable failure) + { + onClientRequestFailure(request, proxyRequest, response, failure); + } + }); + } + else + { + proxyRequest.content(proxyRequestContent(request, response, proxyRequest)); + } + } sendProxyRequest(request, response, proxyRequest); } @@ -114,6 +145,15 @@ public class ProxyServlet extends AbstractProxyServlet } } + @Override + protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest) + { + super.onContinue(clientRequest, proxyRequest); + Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE); + Executor executor = getHttpClient().getExecutor(); + executor.execute(action); + } + /** *

Convenience extension of {@link ProxyServlet} that offers transparent proxy functionalities.

* @@ -240,4 +280,81 @@ public class ProxyServlet extends AbstractProxyServlet onClientRequestFailure(request, proxyRequest, response, failure); } } + + private class DelegatingContentProvider extends IteratingCallback implements AsyncContentProvider.Listener + { + private final HttpServletRequest clientRequest; + private final Request proxyRequest; + private final HttpServletResponse proxyResponse; + private final Iterator iterator; + private final DeferredContentProvider deferred; + + private DelegatingContentProvider(HttpServletRequest clientRequest, Request proxyRequest, HttpServletResponse proxyResponse, ContentProvider provider, DeferredContentProvider deferred) + { + this.clientRequest = clientRequest; + this.proxyRequest = proxyRequest; + this.proxyResponse = proxyResponse; + this.iterator = provider.iterator(); + this.deferred = deferred; + if (provider instanceof AsyncContentProvider) + ((AsyncContentProvider)provider).setListener(this); + } + + @Override + protected Action process() throws Exception + { + if (!iterator.hasNext()) + return Action.SUCCEEDED; + + ByteBuffer buffer = iterator.next(); + if (buffer == null) + return Action.IDLE; + + deferred.offer(buffer, this); + return Action.SCHEDULED; + } + + @Override + public void succeeded() + { + if (iterator instanceof Callback) + ((Callback)iterator).succeeded(); + super.succeeded(); + } + + @Override + protected void onCompleteSuccess() + { + try + { + if (iterator instanceof Closeable) + ((Closeable)iterator).close(); + deferred.close(); + } + catch (Throwable x) + { + _log.ignore(x); + } + } + + @Override + protected void onCompleteFailure(Throwable failure) + { + if (iterator instanceof Callback) + ((Callback)iterator).failed(failure); + onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, failure); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + + @Override + public void onContent() + { + iterate(); + } + } } diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java index f3add4ba420..3415c883d71 100644 --- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java +++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java @@ -53,6 +53,7 @@ import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; @@ -72,9 +73,12 @@ import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.http.HttpDestinationOverHTTP; import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Server; @@ -187,9 +191,12 @@ public class ProxyServletTest @After public void dispose() throws Exception { - client.stop(); - proxy.stop(); - server.stop(); + if (client != null) + client.stop(); + if (proxy != null) + proxy.stop(); + if (server != null) + server.stop(); } @Test @@ -1233,5 +1240,227 @@ public class ProxyServletTest Assert.assertEquals(200, response.getStatus()); } - // TODO: test proxy authentication + @Test + public void testExpect100ContinueRespond100Continue() throws Exception + { + CountDownLatch serverLatch1 = new CountDownLatch(1); + CountDownLatch serverLatch2 = new CountDownLatch(1); + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + serverLatch1.countDown(); + + try + { + serverLatch2.await(5, TimeUnit.SECONDS); + } + catch (Throwable x) + { + throw new InterruptedIOException(); + } + + // Send the 100 Continue. + ServletInputStream input = request.getInputStream(); + + // Echo the content. + IO.copy(input, response.getOutputStream()); + } + }); + startProxy(); + startClient(); + + byte[] content = new byte[1024]; + CountDownLatch contentLatch = new CountDownLatch(1); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest("localhost", serverConnector.getLocalPort()) + .header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()) + .content(new BytesContentProvider(content)) + .onRequestContent((request, buffer) -> contentLatch.countDown()) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + { + if (result.getResponse().getStatus() == HttpStatus.OK_200) + { + if (Arrays.equals(content, getContent())) + clientLatch.countDown(); + } + } + } + }); + + // Wait until we arrive on the server. + Assert.assertTrue(serverLatch1.await(5, TimeUnit.SECONDS)); + // The client should not send the content yet. + Assert.assertFalse(contentLatch.await(1, TimeUnit.SECONDS)); + + // Make the server send the 100 Continue. + serverLatch2.countDown(); + + // The client has sent the content. + Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testExpect100ContinueRespond100ContinueDelayedRequestContent() throws Exception + { + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + // Send the 100 Continue. + ServletInputStream input = request.getInputStream(); + // Echo the content. + IO.copy(input, response.getOutputStream()); + } + }); + startProxy(); + startClient(); + + byte[] content = new byte[1024]; + new Random().nextBytes(content); + int chunk1 = content.length / 2; + DeferredContentProvider contentProvider = new DeferredContentProvider(); + contentProvider.offer(ByteBuffer.wrap(content, 0, chunk1)); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest("localhost", serverConnector.getLocalPort()) + .header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()) + .content(contentProvider) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + { + if (result.getResponse().getStatus() == HttpStatus.OK_200) + { + if (Arrays.equals(content, getContent())) + clientLatch.countDown(); + } + } + } + }); + + // Wait a while and then offer more content. + Thread.sleep(1000); + contentProvider.offer(ByteBuffer.wrap(content, chunk1, content.length - chunk1)); + contentProvider.close(); + + Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testExpect100ContinueRespond100ContinueSomeRequestContentThenFailure() throws Exception + { + CountDownLatch serverLatch = new CountDownLatch(1); + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + // Send the 100 Continue. + ServletInputStream input = request.getInputStream(); + try + { + // Echo the content. + IO.copy(input, response.getOutputStream()); + } + catch (IOException x) + { + serverLatch.countDown(); + } + } + }); + startProxy(); + startClient(); + + long idleTimeout = 1000; + client.setIdleTimeout(idleTimeout); + + byte[] content = new byte[1024]; + new Random().nextBytes(content); + int chunk1 = content.length / 2; + DeferredContentProvider contentProvider = new DeferredContentProvider(); + contentProvider.offer(ByteBuffer.wrap(content, 0, chunk1)); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest("localhost", serverConnector.getLocalPort()) + .header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()) + .content(contentProvider) + .send(result -> + { + if (result.isFailed()) + clientLatch.countDown(); + }); + + // Wait more than the idle timeout to break the connection. + Thread.sleep(2 * idleTimeout); + + Assert.assertTrue(serverLatch.await(555, TimeUnit.SECONDS)); + Assert.assertTrue(clientLatch.await(555, TimeUnit.SECONDS)); + } + + @Test + public void testExpect100ContinueRespond417ExpectationFailed() throws Exception + { + CountDownLatch serverLatch1 = new CountDownLatch(1); + CountDownLatch serverLatch2 = new CountDownLatch(1); + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + serverLatch1.countDown(); + + try + { + serverLatch2.await(5, TimeUnit.SECONDS); + } + catch (Throwable x) + { + throw new InterruptedIOException(); + } + + // Send the 417 Expectation Failed. + response.setStatus(HttpStatus.EXPECTATION_FAILED_417); + } + }); + startProxy(); + startClient(); + + byte[] content = new byte[1024]; + CountDownLatch contentLatch = new CountDownLatch(1); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest("localhost", serverConnector.getLocalPort()) + .header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()) + .content(new BytesContentProvider(content)) + .onRequestContent((request, buffer) -> contentLatch.countDown()) + .send(result -> + { + if (result.isFailed()) + { + if (result.getResponse().getStatus() == HttpStatus.EXPECTATION_FAILED_417) + clientLatch.countDown(); + } + }); + + // Wait until we arrive on the server. + Assert.assertTrue(serverLatch1.await(5, TimeUnit.SECONDS)); + // The client should not send the content yet. + Assert.assertFalse(contentLatch.await(1, TimeUnit.SECONDS)); + + // Make the server send the 417 Expectation Failed. + serverLatch2.countDown(); + + // The client should not send the content. + Assert.assertFalse(contentLatch.await(1, TimeUnit.SECONDS)); + Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java index c5b348e60cf..6668678874c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java @@ -222,8 +222,8 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque // If we have no request yet, just close if (_metadata.getMethod() == null) _httpConnection.close(); - else - onEarlyEOF(); + else if (onEarlyEOF()) + handle(); } @Override diff --git a/tests/test-http-client-transport/pom.xml b/tests/test-http-client-transport/pom.xml index ca0ea2a2225..5f79cb73e66 100644 --- a/tests/test-http-client-transport/pom.xml +++ b/tests/test-http-client-transport/pom.xml @@ -75,7 +75,7 @@ org.eclipse.jetty - jetty-server + jetty-servlet ${project.version} test diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java index 493f801a8f5..1fb649238df 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java @@ -21,6 +21,8 @@ package org.eclipse.jetty.http.client; import java.util.ArrayList; import java.util.List; +import javax.servlet.http.HttpServlet; + import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClientTransport; @@ -40,6 +42,8 @@ import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.toolchain.test.TestTracker; import org.eclipse.jetty.util.SocketAddressResolver; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -67,6 +71,8 @@ public abstract class AbstractTest protected SslContextFactory sslContextFactory; protected Server server; protected ServerConnector connector; + protected ServletContextHandler context; + protected String servletPath = "/servlet"; protected HttpClient client; public AbstractTest(Transport transport) @@ -81,6 +87,22 @@ public abstract class AbstractTest startClient(); } + public void start(HttpServlet servlet) throws Exception + { + startServer(servlet); + startClient(); + } + + protected void startServer(HttpServlet servlet) throws Exception + { + context = new ServletContextHandler(); + context.setContextPath("/"); + ServletHolder holder = new ServletHolder(servlet); + holder.setAsyncSupported(true); + context.addServlet(holder, servletPath); + startServer(context); + } + protected void startServer(Handler handler) throws Exception { sslContextFactory = new SslContextFactory(); @@ -228,9 +250,19 @@ public abstract class AbstractTest @After public void stop() throws Exception + { + stopClient(); + stopServer(); + } + + protected void stopClient() throws Exception { if (client != null) client.stop(); + } + + protected void stopServer() throws Exception + { if (server != null) server.stop(); } diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java similarity index 53% rename from jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java rename to tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java index e8b5940b240..409f075931f 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java @@ -16,13 +16,11 @@ // ======================================================================== // -package org.eclipse.jetty.servlet; +package org.eclipse.jetty.http.client; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.Socket; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Queue; @@ -43,107 +41,106 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpChannel; -import org.eclipse.jetty.server.HttpConnectionFactory; -import org.eclipse.jetty.server.LocalConnector; -import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; -import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser; -import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse; -import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.log.StacklessLogging; import org.hamcrest.Matchers; -import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -public class AsyncIOServletTest +public class AsyncIOServletTest extends AbstractTest { - private Server server; - private ServerConnector connector; - private LocalConnector local; - private String path = "/path"; - private static final ThreadLocal scope = new ThreadLocal<>(); + private static final ThreadLocal scope = new ThreadLocal<>(); - public void startServer(HttpServlet servlet) throws Exception + public AsyncIOServletTest(Transport transport) { - startServer(servlet, 30000); + super(transport == Transport.FCGI ? null : transport); } - public void startServer(HttpServlet servlet, long idleTimeout) throws Exception + @Override + protected void startServer(Handler handler) throws Exception { - server = new Server(); - connector = new ServerConnector(server); - connector.setIdleTimeout(idleTimeout); - connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setDelayDispatchUntilContent(false); - server.addConnector(connector); - local = new LocalConnector(server); - server.addConnector(local); - - ServletContextHandler context = new ServletContextHandler(server, "/", false, false); - ServletHolder holder = new ServletHolder(servlet); - holder.setAsyncSupported(true); - context.addServlet(holder, path); - - context.addEventListener(new ContextHandler.ContextScopeListener() + if (handler == context) { - @Override - public void enterScope(Context context, Request request, Object reason) + // Add this listener before the context is started, so it's durable. + context.addEventListener(new ContextHandler.ContextScopeListener() { - if (scope.get() != null) + @Override + public void enterScope(Context context, Request request, Object reason) { - System.err.println(Thread.currentThread() + " Already entered scope!!!"); - scope.get().printStackTrace(); - throw new IllegalStateException(); + checkScope(); + scope.set(new RuntimeException()); } - scope.set(new Throwable()); - } - @Override - public void exitScope(Context context, Request request) - { - if (scope.get() == null) - throw new IllegalStateException(); - scope.set(null); - } - }); - - server.start(); - } - - private static void assertScope() - { - if (scope.get() == null) - Assert.fail("Not in scope"); - } - - @After - public void stopServer() throws Exception - { - server.stop(); - if (scope.get() != null) - { - System.err.println("Still in scope after stop!"); - scope.get().printStackTrace(); - throw new IllegalStateException("Didn't leave scope"); + @Override + public void exitScope(Context context, Request request) + { + assertScope(); + scope.set(null); + } + }); } + super.startServer(handler); + } + + private void assertScope() + { + Assert.assertNotNull("Not in scope", scope.get()); + } + + private void checkScope() + { + RuntimeException callScope = scope.get(); + if (callScope != null) + throw callScope; + } + + protected void stopServer() throws Exception + { + super.stopServer(); + checkScope(); scope.set(null); } + private void sleep(long ms) throws IOException + { + try + { + Thread.sleep(ms); + } + catch (InterruptedException e) + { + throw new InterruptedIOException(); + } + } + @Test public void testAsyncReadThrowsException() throws Exception { @@ -159,7 +156,7 @@ public class AsyncIOServletTest private void testAsyncReadThrows(final Throwable throwable) throws Exception { final CountDownLatch latch = new CountDownLatch(1); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -193,53 +190,36 @@ public class AsyncIOServletTest Assert.assertThat("onError message", t.getMessage(), is(throwable.getMessage())); latch.countDown(); response.setStatus(500); - asyncContext.complete(); } }); } }); - String data = "0123456789"; - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Length: " + data.length() + "\r\n" + - "\r\n" + - data; + ContentResponse response = client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(new StringContentProvider("0123456789")) + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - client.setSoTimeout(5000); - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("500 Server Error")); - while (line.length() > 0) - { - line = in.readLine(); - } - line = in.readLine(); - - assertTrue(latch.await(5, TimeUnit.SECONDS)); - } + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus()); } @Test public void testAsyncReadIdleTimeout() throws Exception { - final int status = 567; - startServer(new HttpServlet() + int status = 567; + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { assertScope(); - final AsyncContext asyncContext = request.startAsync(request, response); + AsyncContext asyncContext = request.startAsync(request, response); asyncContext.setTimeout(0); - final ServletInputStream inputStream = request.getInputStream(); + ServletInputStream inputStream = request.getInputStream(); inputStream.setReadListener(new ReadListener() { @Override @@ -267,39 +247,51 @@ public class AsyncIOServletTest } }); } - }, 1000); - - String data1 = "0123456789"; - String data2 = "ABCDEF"; - // Only send the first chunk of data and then let it idle timeout. - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Length: " + (data1.length() + data2.length()) + "\r\n" + - "\r\n" + - data1; - - try (Socket client = new Socket("localhost", connector.getLocalPort())) + }); + connector.setIdleTimeout(1000); + CountDownLatch closeLatch = new CountDownLatch(1); + connector.addBean(new Connection.Listener() { - client.setSoTimeout(5000); - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); + @Override + public void onOpened(Connection connection) + { + } - SimpleHttpParser parser = new SimpleHttpParser(); - SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8"))); + @Override + public void onClosed(Connection connection) + { + closeLatch.countDown(); + } + }); - assertEquals(String.valueOf(status), response.getCode()); + String data = "0123456789"; + DeferredContentProvider content = new DeferredContentProvider(); + content.offer(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))); + CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(content) + .onResponseSuccess(r -> responseLatch.countDown()) + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + assertEquals(status, result.getResponse().getStatus()); + clientLatch.countDown(); + }); - // Make sure the connection was closed by the server. - assertEquals(-1, client.getInputStream().read()); - } + assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + content.close(); + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); } @Test public void testOnErrorThrows() throws Exception { final AtomicInteger errors = new AtomicInteger(); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException @@ -341,24 +333,15 @@ public class AsyncIOServletTest } }); - String data = "0123456789"; - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Length: " + data.length() + "\r\n" + - "\r\n" + - data; - - try (Socket client = new Socket("localhost", connector.getLocalPort()); - StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); + ContentResponse response = client.newRequest(newURI()) + .path(servletPath) + .content(new StringContentProvider("0123456789")) + .timeout(5, TimeUnit.SECONDS) + .send(); - SimpleHttpParser parser = new SimpleHttpParser(); - SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8"))); - - Assert.assertEquals("500", response.getCode()); + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus()); Assert.assertEquals(1, errors.get()); } } @@ -377,8 +360,8 @@ public class AsyncIOServletTest private void testAsyncWriteThrows(final Throwable throwable) throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - startServer(new HttpServlet() + CountDownLatch latch = new CountDownLatch(1); + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -411,35 +394,25 @@ public class AsyncIOServletTest } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "\r\n"; + ContentResponse response = client.newRequest(newURI()) + .path(servletPath) + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - - SimpleHttpParser parser = new SimpleHttpParser(); - SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8"))); - - assertTrue(latch.await(5, TimeUnit.SECONDS)); - Assert.assertEquals("500", response.getCode()); - } + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus()); } - @Test public void testAsyncWriteClosed() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); String text = "Now is the winter of our discontent. How Now Brown Cow. The quick brown fox jumped over the lazy dog.\n"; for (int i = 0; i < 10; i++) text = text + text; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + byte[] data = text.getBytes(StandardCharsets.UTF_8); - startServer(new HttpServlet() + CountDownLatch errorLatch = new CountDownLatch(1); + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -455,22 +428,12 @@ public class AsyncIOServletTest public void onWritePossible() throws IOException { assertScope(); - while (out.isReady()) - { - try - { - Thread.sleep(100); - out.write(data); - } - catch (IOException e) - { - throw e; - } - catch (Exception e) - { - e.printStackTrace(); - } - } + + // Wait for the failure to arrive to + // the server while we are about to write. + sleep(1000); + + out.write(data); } @Override @@ -478,45 +441,37 @@ public class AsyncIOServletTest { assertScope(); async.complete(); - latch.countDown(); + errorLatch.countDown(); } }); } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "\r\n"; + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .path(servletPath) + .onResponseHeaders(response -> + { + if (response.getStatus() == HttpStatus.OK_200) + response.abort(new IOException("explicitly_closed_by_test")); + }) + .send(result -> + { + if (result.isFailed()) + clientLatch.countDown(); + }); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - line = in.readLine(); - line = in.readLine(); - assertThat(line, not(containsString(" "))); - line = in.readLine(); - assertThat(line, containsString("discontent. How Now Brown Cow. The ")); - } - - if (!latch.await(5, TimeUnit.SECONDS)) - Assert.fail(); + assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); } - @Test public void testIsReadyAtEOF() throws Exception { String text = "TEST\n"; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -524,16 +479,41 @@ public class AsyncIOServletTest assertScope(); response.flushBuffer(); - final AsyncContext async = request.startAsync(); - final ServletInputStream in = request.getInputStream(); - final ServletOutputStream out = response.getOutputStream(); + AsyncContext async = request.startAsync(); + ServletInputStream input = request.getInputStream(); + ServletOutputStream output = response.getOutputStream(); - in.setReadListener(new ReadListener() + input.setReadListener(new ReadListener() { transient int _i = 0; transient boolean _minusOne = false; transient boolean _finished = false; + @Override + public void onDataAvailable() throws IOException + { + assertScope(); + while (input.isReady() && !input.isFinished()) + { + int b = input.read(); + if (b == -1) + _minusOne = true; + else if (data[_i++] != b) + throw new IllegalStateException(); + } + + if (input.isFinished()) + _finished = true; + } + + @Override + public void onAllDataRead() throws IOException + { + assertScope(); + output.write(String.format("i=%d eof=%b finished=%b", _i, _minusOne, _finished).getBytes(StandardCharsets.ISO_8859_1)); + async.complete(); + } + @Override public void onError(Throwable t) { @@ -541,68 +521,29 @@ public class AsyncIOServletTest t.printStackTrace(); async.complete(); } - - @Override - public void onDataAvailable() throws IOException - { - assertScope(); - while (in.isReady() && !in.isFinished()) - { - int b = in.read(); - if (b == -1) - _minusOne = true; - else if (data[_i++] != b) - throw new IllegalStateException(); - } - - if (in.isFinished()) - _finished = true; - } - - @Override - public void onAllDataRead() throws IOException - { - assertScope(); - out.write(String.format("i=%d eof=%b finished=%b", _i, _minusOne, _finished).getBytes(StandardCharsets.ISO_8859_1)); - async.complete(); - } }); } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + data.length + "\r\n" + - "Connection: close\r\n" + - "\r\n"; + ContentResponse response = client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .header(HttpHeader.CONNECTION, "close") + .content(new StringContentProvider(text)) + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - output.write(data); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - line = in.readLine(); - line = in.readLine(); - assertThat(line, containsString("i=" + data.length + " eof=true finished=true")); - } + String responseContent = response.getContentAsString(); + assertThat(responseContent, containsString("i=" + data.length + " eof=true finished=true")); } - @Test public void testOnAllDataRead() throws Exception { String text = "X"; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); - - startServer(new HttpServlet() + byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + String success = "SUCCESS"; + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -617,21 +558,13 @@ public class AsyncIOServletTest in.setReadListener(new ReadListener() { - @Override - public void onError(Throwable t) - { - assertScope(); - t.printStackTrace(); - async.complete(); - } - @Override public void onDataAvailable() throws IOException { assertScope(); try { - Thread.sleep(1000); + sleep(1000); if (!in.isReady()) throw new IllegalStateException(); if (in.read() != 'X') @@ -641,9 +574,9 @@ public class AsyncIOServletTest if (in.read() != -1) throw new IllegalStateException(); } - catch (Exception e) + catch (IOException x) { - e.printStackTrace(); + throw new UncheckedIOException(x); } } @@ -651,64 +584,10 @@ public class AsyncIOServletTest public void onAllDataRead() throws IOException { assertScope(); - out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1)); + out.write(success.getBytes(StandardCharsets.ISO_8859_1)); async.complete(); } - }); - } - }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + data.length + "\r\n" + - "Connection: close\r\n" + - "\r\n"; - - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - client.setSoTimeout(5000); - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - Thread.sleep(100); - output.write(data); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - line = in.readLine(); - line = in.readLine(); - assertThat(line, containsString("OK")); - } - } - - @Test - public void testOtherThreadOnAllDataRead() throws Exception - { - String text = "X"; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); - - startServer(new HttpServlet() - { - @Override - protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException - { - assertScope(); - response.flushBuffer(); - - final AsyncContext async = request.startAsync(); - async.setTimeout(500000); - final ServletInputStream in = request.getInputStream(); - final ServletOutputStream out = response.getOutputStream(); - - if (request.getDispatcherType() == DispatcherType.ERROR) - throw new IllegalStateException(); - - in.setReadListener(new ReadListener() - { @Override public void onError(Throwable t) { @@ -716,7 +595,70 @@ public class AsyncIOServletTest t.printStackTrace(); async.complete(); } + }); + } + }); + CountDownLatch clientLatch = new CountDownLatch(1); + DeferredContentProvider content = new DeferredContentProvider() + { + @Override + public long getLength() + { + return data.length; + } + }; + client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(content) + .timeout(5, TimeUnit.SECONDS) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + { + Response response = result.getResponse(); + String content = getContentAsString(); + if (response.getStatus() == HttpStatus.OK_200 && success.equals(content)) + clientLatch.countDown(); + } + } + }); + + sleep(100); + content.offer(ByteBuffer.wrap(data)); + content.close(); + + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testOtherThreadOnAllDataRead() throws Exception + { + String text = "X"; + byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + String success = "SUCCESS"; + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException + { + assertScope(); + response.flushBuffer(); + + AsyncContext async = request.startAsync(); + async.setTimeout(0); + ServletInputStream input = request.getInputStream(); + ServletOutputStream output = response.getOutputStream(); + + if (request.getDispatcherType() == DispatcherType.ERROR) + throw new IllegalStateException(); + + input.setReadListener(new ReadListener() + { @Override public void onDataAvailable() throws IOException { @@ -726,19 +668,19 @@ public class AsyncIOServletTest assertScope(); try { - Thread.sleep(1000); - if (!in.isReady()) + sleep(1000); + if (!input.isReady()) throw new IllegalStateException(); - if (in.read() != 'X') + if (input.read() != 'X') throw new IllegalStateException(); - if (!in.isReady()) + if (!input.isReady()) throw new IllegalStateException(); - if (in.read() != -1) + if (input.read() != -1) throw new IllegalStateException(); } - catch (Exception e) + catch (IOException x) { - e.printStackTrace(); + throw new UncheckedIOException(x); } }); } @@ -746,49 +688,59 @@ public class AsyncIOServletTest @Override public void onAllDataRead() throws IOException { - out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1)); + output.write(success.getBytes(StandardCharsets.ISO_8859_1)); + async.complete(); + } + + @Override + public void onError(Throwable t) + { + assertScope(); + t.printStackTrace(); async.complete(); } }); } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + data.length + "\r\n" + - "Connection: close\r\n" + - "\r\n"; + CountDownLatch clientLatch = new CountDownLatch(1); + DeferredContentProvider content = new DeferredContentProvider(); + client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(content) + .timeout(5, TimeUnit.SECONDS) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + { + Response response = result.getResponse(); + String content = getContentAsString(); + if (response.getStatus() == HttpStatus.OK_200 && success.equals(content)) + clientLatch.countDown(); + } + } + }); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - client.setSoTimeout(500000); - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - Thread.sleep(100); - output.write(data); - output.flush(); + sleep(100); + content.offer(ByteBuffer.wrap(data)); + content.close(); - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - line = in.readLine(); - line = in.readLine(); - assertThat(line, containsString("OK")); - } + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); } - @Test public void testCompleteBeforeOnAllDataRead() throws Exception { String text = "XYZ"; - final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); - final AtomicBoolean allDataRead = new AtomicBoolean(false); + byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + String success = "SUCCESS"; + AtomicBoolean allDataRead = new AtomicBoolean(false); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException @@ -796,29 +748,22 @@ public class AsyncIOServletTest assertScope(); response.flushBuffer(); - final AsyncContext async = request.startAsync(); - final ServletInputStream in = request.getInputStream(); - final ServletOutputStream out = response.getOutputStream(); + AsyncContext async = request.startAsync(); + ServletInputStream input = request.getInputStream(); + ServletOutputStream output = response.getOutputStream(); - in.setReadListener(new ReadListener() + input.setReadListener(new ReadListener() { - @Override - public void onError(Throwable t) - { - assertScope(); - t.printStackTrace(); - } - @Override public void onDataAvailable() throws IOException { assertScope(); - while (in.isReady()) + while (input.isReady()) { - int b = in.read(); + int b = input.read(); if (b < 0) { - out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1)); + output.write(success.getBytes(StandardCharsets.ISO_8859_1)); async.complete(); return; } @@ -829,57 +774,46 @@ public class AsyncIOServletTest public void onAllDataRead() throws IOException { assertScope(); - out.write("BAD!!!\n".getBytes(StandardCharsets.ISO_8859_1)); + output.write("FAILURE".getBytes(StandardCharsets.ISO_8859_1)); allDataRead.set(true); throw new IllegalStateException(); } + + @Override + public void onError(Throwable t) + { + assertScope(); + t.printStackTrace(); + } }); } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + data.length + "\r\n" + - "Connection: close\r\n" + - "\r\n"; + ContentResponse response = client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .header(HttpHeader.CONNECTION, "close") + .content(new StringContentProvider(text)) + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - Thread.sleep(100); - output.write(data); - output.flush(); - - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); - String line = in.readLine(); - assertThat(line, containsString("200 OK")); - while (line.length() > 0) - { - line = in.readLine(); - } - line = in.readLine(); - assertThat(line, containsString("OK")); - Assert.assertFalse(allDataRead.get()); - } + assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); + assertThat(response.getContentAsString(), Matchers.equalTo(success)); } - @Test public void testEmptyAsyncRead() throws Exception { - final AtomicBoolean oda = new AtomicBoolean(); - final CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean oda = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { assertScope(); - final AsyncContext asyncContext = request.startAsync(request, response); + AsyncContext asyncContext = request.startAsync(request, response); response.setStatus(200); response.getOutputStream().close(); request.getInputStream().setReadListener(new ReadListener() @@ -910,24 +844,15 @@ public class AsyncIOServletTest } }); - String request = "GET " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Connection: close\r\n" + - "\r\n"; + ContentResponse response = client.newRequest(newURI()) + .path(servletPath) + .header(HttpHeader.CONNECTION, "close") + .timeout(5, TimeUnit.SECONDS) + .send(); - try (Socket client = new Socket("localhost", connector.getLocalPort())) - { - OutputStream output = client.getOutputStream(); - output.write(request.getBytes("UTF-8")); - output.flush(); - - String response = IO.toString(client.getInputStream()); - assertThat(response, containsString(" 200 OK")); - // wait for onAllDataRead BEFORE closing client - latch.await(); - } - - // ODA not called at all! + assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + // onDataAvailable must not be called. Assert.assertFalse(oda.get()); } @@ -936,7 +861,7 @@ public class AsyncIOServletTest { Queue errors = new ConcurrentLinkedQueue<>(); CountDownLatch writeLatch = new CountDownLatch(1); - startServer(new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException @@ -955,16 +880,11 @@ public class AsyncIOServletTest int read = input.read(buffer); if (read < 0) { - //if (output.isReady()) - { - asyncContext.complete(); - } + asyncContext.complete(); break; } if (output.isReady()) - { output.write(buffer, 0, read); - } else Assert.fail(); } @@ -986,10 +906,7 @@ public class AsyncIOServletTest @Override public void onWritePossible() throws IOException { - if (writeLatch.getCount() == 0) - asyncContext.complete(); - else - writeLatch.countDown(); + writeLatch.countDown(); } @Override @@ -1002,29 +919,127 @@ public class AsyncIOServletTest }); String content = "0123456789ABCDEF"; + DeferredContentProvider contentProvider = new DeferredContentProvider(); + contentProvider.offer(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(contentProvider) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + { + Response response = result.getResponse(); + assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); + assertThat(getContentAsString(), Matchers.equalTo(content)); + assertThat(errors, Matchers.hasSize(0)); + clientLatch.countDown(); + } + } + }); - try (LocalEndPoint endp = local.connect()) + assertTrue(writeLatch.await(5, TimeUnit.SECONDS)); + + contentProvider.close(); + + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testAsyncReadEarlyEOF() throws Exception + { + // SSLEngine receives the close alert from the client, and when + // the server passes the response to encrypt and write, SSLEngine + // only generates the close alert back, without encrypting the + // response, so we need to skip the transports over TLS. + Assume.assumeThat(transport, Matchers.not(Matchers.isOneOf(Transport.HTTPS, Transport.H2))); + + String content = "jetty"; + int responseCode = HttpStatus.NO_CONTENT_204; + CountDownLatch readLatch = new CountDownLatch(content.length()); + CountDownLatch errorLatch = new CountDownLatch(1); + start(new HttpServlet() { - String request = "POST " + path + " HTTP/1.1\r\n" + - "Host: localhost:" + connector.getLocalPort() + "\r\n" + - "Transfer-Encoding: chunked\r\n" + - "\r\n" + - Integer.toHexString(content.length()) + "\r\n" + - content + "\r\n"; - endp.addInput(ByteBuffer.wrap(request.getBytes("UTF-8"))); + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + AsyncContext asyncContext = request.startAsync(); + ServletInputStream input = request.getInputStream(); + input.setReadListener(new ReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + while (input.isReady() && !input.isFinished()) + { + int read = input.read(); + System.err.printf("%x%n", read); + readLatch.countDown(); + } + } - assertTrue(writeLatch.await(5, TimeUnit.SECONDS)); + @Override + public void onAllDataRead() throws IOException + { + } - request = "" + - "0\r\n" + - "\r\n"; - endp.addInput(ByteBuffer.wrap(request.getBytes("UTF-8"))); + @Override + public void onError(Throwable x) + { + response.setStatus(responseCode); + asyncContext.complete(); + errorLatch.countDown(); + } + }); + } + }); - HttpTester.Response response = HttpTester.parseResponse(endp.getResponse()); + CountDownLatch responseLatch = new CountDownLatch(1); + DeferredContentProvider contentProvider = new DeferredContentProvider(); + contentProvider.offer(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + org.eclipse.jetty.client.api.Request request = client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(contentProvider) + .onResponseSuccess(response -> responseLatch.countDown()); - assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); - assertThat(response.getContent(), Matchers.equalTo(content)); - assertThat(errors, Matchers.hasSize(0)); + Destination destination = client.getDestination(getScheme(), "localhost", connector.getLocalPort()); + FuturePromise promise = new FuturePromise<>(); + destination.newConnection(promise); + org.eclipse.jetty.client.api.Connection connection = promise.get(5, TimeUnit.SECONDS); + CountDownLatch clientLatch = new CountDownLatch(1); + connection.send(request, result -> + { + assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode)); + clientLatch.countDown(); + }); + + assertTrue(readLatch.await(5, TimeUnit.SECONDS)); + + switch (transport) + { + case HTTP: + case HTTPS: + ((HttpConnectionOverHTTP)connection).getEndPoint().shutdownOutput(); + break; + case H2C: + case H2: + Session session = ((HttpConnectionOverHTTP2)connection).getSession(); + ((HTTP2Session)session).getEndPoint().shutdownOutput(); + break; + default: + Assert.fail(); } + + // Wait for the response to arrive before finishing the request. + assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + contentProvider.close(); + + assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java index 5567ed81f7d..67b38b3c344 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java @@ -488,6 +488,28 @@ public class HttpClientTest extends AbstractTest Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); } + @Test + public void testResponseWithContentCompleteListenerInvokedOnce() throws Exception + { + start(new EmptyServerHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + super.handle(target, baseRequest, request, response); + response.getWriter().write("Jetty"); + } + }); + + AtomicInteger completes = new AtomicInteger(); + client.newRequest(newURI()) + .send(result -> completes.incrementAndGet()); + + sleep(1000); + + Assert.assertEquals(1, completes.get()); + } + private void sleep(long time) throws IOException { try