From 9cc7be4842e96b995a29c2ab0c897969887f9568 Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 2 Feb 2021 14:03:38 +0100 Subject: [PATCH] Fix #5605 Unblock non container Threads Ensure that HttpInput is always closed to EOF, EarlyEOF or Error, so that non container threads doing blocking reads will not block forever, even if late. Delay recycling of HttpInput until next request is received. --- .../eclipse/jetty/io/AbstractEndPoint.java | 7 + .../java/org/eclipse/jetty/io/EndPoint.java | 3 + .../org/eclipse/jetty/io/FillInterest.java | 191 +++++++-- .../eclipse/jetty/server/HttpConnection.java | 5 +- .../org/eclipse/jetty/server/HttpInput.java | 14 + .../jetty/server/ProxyConnectionFactory.java | 7 + .../org/eclipse/jetty/server/Request.java | 4 +- .../eclipse/jetty/server/BlockingTest.java | 398 ++++++++++++++++++ 8 files changed, 591 insertions(+), 38 deletions(-) create mode 100644 jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 99237f853a7..c6635f732b0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -362,6 +363,12 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint _fillInterest.register(callback); } + @Override + public Throwable cancelFillInterest(Supplier cancellation) + { + return _fillInterest.cancel(cancellation); + } + @Override public boolean tryFillInterested(Callback callback) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index 16776dacbd3..99d4f3def3a 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; +import java.util.function.Supplier; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; @@ -223,6 +224,8 @@ public interface EndPoint extends Closeable */ boolean isFillInterested(); + Throwable cancelFillInterest(Supplier cancellation); + /** *

Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either * all the data has been flushed or an error occurs.

diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index fa7fc5fc88c..967282768f8 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -21,7 +21,9 @@ package org.eclipse.jetty.io; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadPendingException; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; @@ -42,6 +44,38 @@ public abstract class FillInterest { } + /** + * Cancel a fill interest registration. + * + * If there was a registration, then any {@link #fillable()}, {@link #onClose()} or {@link #onFail(Throwable)} + * calls are remembered and passed to the next registration. + * Since any actions resulting from a call to {@link #needsFillInterest()} cannot be unwound, a subsequent call to + * register will not call {@link #needsFillInterest()} again if it has already been called an no callback received. + * @param cancellation A supplier of the cancellation Throwable to use if there is an existing registration. If the + * suppler or the supplied Throwable is null, then a new {@link CancellationException} is used. + * @return The Throwable used to cancel an existing registration or null if there was no registration to cancel. + */ + public Throwable cancel(Supplier cancellation) + { + Cancelled cancelled = new Cancelled(); + while (true) + { + Callback callback = _interested.get(); + if (callback == null || callback instanceof Cancelled) + return null; + if (_interested.compareAndSet(callback, cancelled)) + { + Throwable cause = cancellation == null ? null : cancellation.get(); + if (cause == null) + cause = new CancellationException(); + if (LOG.isDebugEnabled()) + LOG.debug("cancelled {} {}",this, callback, cause); + callback.failed(cause); + return cause; + } + } + } + /** * Call to register interest in a callback when a read is possible. * The callback will be called either immediately if {@link #needsFillInterest()} @@ -68,26 +102,63 @@ public abstract class FillInterest * @return true if the register succeeded */ public boolean tryRegister(Callback callback) + { + return register(callback, null); + } + + /** + * Call to register interest in a callback when a read is possible. + * The callback will be called either immediately if {@link #needsFillInterest()} + * returns true or eventually once {@link #fillable()} is called. + * + * @param callback the callback to register + * @param cancellation A supplier of a {@link Throwable}, which if not null will be used to fail any existing registration + * @return true if the register succeeded + */ + public boolean register(Callback callback, Supplier cancellation) { if (callback == null) throw new IllegalArgumentException(); - if (!_interested.compareAndSet(null, callback)) - return false; - - if (LOG.isDebugEnabled()) - LOG.debug("interested {}", this); - - try + while (true) { - needsFillInterest(); - } - catch (Throwable e) - { - onFail(e); - } + Callback existing = _interested.get(); - return true; + if (existing != null && !(existing instanceof Cancelled) && cancellation == null) + return false; + + if (existing == callback) + return true; + + if (_interested.compareAndSet(existing, callback)) + { + if (LOG.isDebugEnabled()) + LOG.debug("interested {}->{}", existing, this); + if (existing == null) + { + try + { + needsFillInterest(); + } + catch (Throwable e) + { + onFail(e); + } + } + else if (existing instanceof Cancelled) + { + ((Cancelled)existing).apply(callback); + } + else + { + Throwable cause = cancellation.get(); + if (cause == null) + cause = new CancellationException(); + existing.failed(cause); + } + return true; + } + } } /** @@ -97,17 +168,19 @@ public abstract class FillInterest */ public boolean fillable() { - if (LOG.isDebugEnabled()) - LOG.debug("fillable {}", this); - Callback callback = _interested.get(); - if (callback != null && _interested.compareAndSet(callback, null)) + while (true) { - callback.succeeded(); - return true; + Callback callback = _interested.get(); + if (callback == null) + return false; + if (_interested.compareAndSet(callback, null)) + { + if (LOG.isDebugEnabled()) + LOG.debug("fillable {} {}",this, callback); + callback.succeeded(); + return true; + } } - if (LOG.isDebugEnabled()) - LOG.debug("{} lost race {}", this, callback); - return false; } /** @@ -115,7 +188,8 @@ public abstract class FillInterest */ public boolean isInterested() { - return _interested.get() != null; + Callback callback = _interested.get(); + return callback != null && !(callback instanceof Cancelled); } public InvocationType getCallbackInvocationType() @@ -132,24 +206,37 @@ public abstract class FillInterest */ public boolean onFail(Throwable cause) { - if (LOG.isDebugEnabled()) - LOG.debug("onFail " + this, cause); - Callback callback = _interested.get(); - if (callback != null && _interested.compareAndSet(callback, null)) + while (true) { - callback.failed(cause); - return true; + Callback callback = _interested.get(); + if (callback == null) + return false; + if (_interested.compareAndSet(callback, null)) + { + if (LOG.isDebugEnabled()) + LOG.debug("onFail {} {}",this, callback, cause); + callback.failed(cause); + return true; + } } - return false; } public void onClose() { - if (LOG.isDebugEnabled()) - LOG.debug("onClose {}", this); - Callback callback = _interested.get(); - if (callback != null && _interested.compareAndSet(callback, null)) - callback.failed(new ClosedChannelException()); + while (true) + { + Callback callback = _interested.get(); + if (callback == null) + return; + if (_interested.compareAndSet(callback, null)) + { + ClosedChannelException cause = new ClosedChannelException(); + if (LOG.isDebugEnabled()) + LOG.debug("onFail {} {}",this, callback, cause); + callback.failed(cause); + return; + } + } } @Override @@ -171,4 +258,36 @@ public abstract class FillInterest * @throws IOException if unable to fulfill interest in fill */ protected abstract void needsFillInterest() throws IOException; + + private static class Cancelled implements Callback + { + private final AtomicReference _result = new AtomicReference<>(); + + @Override + public void succeeded() + { + _result.compareAndSet(null, Boolean.TRUE); + } + + @Override + public void failed(Throwable x) + { + _result.compareAndSet(null, x == null ? new Exception() : x); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + + void apply(Callback callback) + { + Object result = _result.get(); + if (result == Boolean.TRUE) + callback.succeeded(); + else if (result instanceof Throwable) + callback.failed((Throwable)result); + } + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 3ae8bbe4bc6..7b503913b5b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -376,6 +376,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public void onCompleted() { + boolean complete = _input.consumeAll(); + getEndPoint().cancelFillInterest(_input::getError); + // Handle connection upgrades if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) { @@ -409,7 +412,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _parser.close(); } // else abort if we can't consume all - else if (_generator.isPersistent() && !_input.consumeAll()) + else if (_generator.isPersistent() && !complete) { if (LOG.isDebugEnabled()) LOG.debug("unconsumed input {} {}", this, _parser); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 600260de34a..724132c3e30 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -720,12 +720,17 @@ public class HttpInput extends ServletInputStream implements Runnable { produceContent(); if (_content == null && _intercepted == null && _inputQ.isEmpty()) + { + _state = EARLY_EOF; + _inputQ.notify(); return false; + } } catch (Throwable e) { LOG.debug(e); _state = new ErrorState(e); + _inputQ.notify(); return false; } } @@ -740,6 +745,15 @@ public class HttpInput extends ServletInputStream implements Runnable } } + public Throwable getError() + { + synchronized (_inputQ) + { + Throwable error = _state instanceof ErrorState ? ((ErrorState)_state)._error : null; + return error == null ? new IOException() : error; + } + } + public boolean isAsync() { synchronized (_inputQ) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java index 71cd03c274d..3938c0f272a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; import java.nio.charset.StandardCharsets; +import java.util.function.Supplier; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.Connection; @@ -805,6 +806,12 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory _endp.fillInterested(callback); } + @Override + public Throwable cancelFillInterest(Supplier cancellation) + { + return _endp.cancelFillInterest(cancellation); + } + @Override public boolean flush(ByteBuffer... buffer) throws IOException { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index d1c27ef2a72..3cc4946c2d2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -1811,6 +1811,8 @@ public class Request implements HttpServletRequest */ public void setMetaData(org.eclipse.jetty.http.MetaData.Request request) { + if (_metaData == null) + _input.recycle(); _metaData = request; setMethod(request.getMethod()); @@ -1879,7 +1881,7 @@ public class Request implements HttpServletRequest getHttpChannelState().recycle(); _requestAttributeListeners.clear(); - _input.recycle(); + // Defer _input.recycle() until setMetaData on next request, so that late readers will fail _metaData = null; _originalURI = null; _contextPath = null; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java new file mode 100644 index 00000000000..62605a29dee --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java @@ -0,0 +1,398 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.AsyncContext; +import javax.servlet.DispatcherType; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockingTest +{ + private Server server; + ServerConnector connector; + private ContextHandler context; + + @BeforeEach + void setUp() + { + server = new Server(); + connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + context = new ContextHandler("/ctx"); + + HandlerList handlers = new HandlerList(); + handlers.setHandlers(new Handler[]{context, new DefaultHandler()}); + server.setHandler(handlers); + } + + @AfterEach + void tearDown() throws Exception + { + server.stop(); + } + + @Test + public void testBlockingReadThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testNormalCompleteThenBlockingRead() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testStartAsyncThenBlockingReadThenTimeout() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + AsyncContext async = request.startAsync(); + async.setTimeout(100); + + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(500)); + assertThat(response.getContent(), containsString("AsyncContext timeout")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testBlockingReadThenSendError() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.sendError(499); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(499)); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } +}