diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index b149476b64f..3fb493b1fad 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -385,6 +385,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http private boolean upgrade() { + // If we are fill interested, then a read is pending and we must abort + if (isFillInterested()) + { + LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint()); + abort(new IllegalStateException()); + } + Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE); if (connection == null) return false; @@ -416,6 +423,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http if (upgrade()) return; + // Drive to EOF, EarlyEOF or Error + boolean complete = _input.consumeAll(); + // Finish consuming the request // If we are still expecting if (_channel.isExpecting100Continue()) @@ -424,7 +434,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _parser.close(); } // else abort if we can't consume all - else if (_generator.isPersistent() && !_input.consumeAll()) + else if (_generator.isPersistent() && !complete) { if (LOG.isDebugEnabled()) LOG.debug("unconsumed input {} {}", this, _parser); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index a41ad47fe1f..ff2edc7e06e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -135,6 +135,7 @@ public class HttpInput extends ServletInputStream implements Runnable if (isFinished()) return !isError(); + //TODO move to early EOF and notify blocking reader return false; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index c3b7815d91d..a72ebc04e94 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -23,6 +23,8 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; +import java.util.ResourceBundle; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; @@ -435,10 +437,22 @@ public class HttpOutput extends ServletOutputStream implements Runnable case BLOCKED: case UNREADY: case PENDING: + LOG.warn("Pending write in complete {} {}", this, _channel); // An operation is in progress, so we soft close now _softClose = true; // then trigger a close from onWriteComplete _state = State.CLOSE; + + // But if we are blocked or there is more content to come, we must abort + // Note that this allows a pending async write to complete only if it is the last write + if (_apiState == ApiState.BLOCKED || !_channel.getResponse().isContentComplete(_written)) + { + CancellationException cancelled = new CancellationException(); + _writeBlocker.fail(cancelled); + _channel.abort(cancelled); + _state = State.CLOSED; + } + break; } break; @@ -1351,7 +1365,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable { _state = State.OPEN; _apiState = ApiState.BLOCKING; - _softClose = false; + _softClose = true; // Stay closed until next request _interceptor = _channel; HttpConfiguration config = _channel.getHttpConfiguration(); _bufferSize = config.getOutputBufferSize(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index 49b2b71f135..28e41714352 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -1680,6 +1680,11 @@ public class Request implements HttpServletRequest */ public void setMetaData(MetaData.Request request) { + if (_metaData == null && _input != null && _channel != null) + { + _input.recycle(); + _channel.getResponse().getHttpOutput().reopen(); + } _metaData = request; _method = request.getMethod(); _httpFields = request.getFields(); @@ -1771,7 +1776,7 @@ public class Request implements HttpServletRequest getHttpChannelState().recycle(); _requestAttributeListeners.clear(); - _input.recycle(); + // Defer _input.recycle() until setMetaData on next request, TODO replace with recycle and reopen in 10 _metaData = null; _httpFields = null; _trailers = null; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java new file mode 100644 index 00000000000..9a2f20b3828 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java @@ -0,0 +1,504 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.AsyncContext; +import javax.servlet.DispatcherType; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockingTest +{ + private Server server; + ServerConnector connector; + private ContextHandler context; + + @BeforeEach + void setUp() + { + server = new Server(); + connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + context = new ContextHandler("/ctx"); + + HandlerList handlers = new HandlerList(); + handlers.setHandlers(new Handler[]{context, new DefaultHandler()}); + server.setHandler(handlers); + } + + @AfterEach + void tearDown() throws Exception + { + server.stop(); + } + + @Test + public void testBlockingReadThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testNormalCompleteThenBlockingRead() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), containsString("OK")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testStartAsyncThenBlockingReadThenTimeout() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + AsyncContext async = request.startAsync(); + async.setTimeout(100); + + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + completed.await(10, TimeUnit.SECONDS); + Thread.sleep(500); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(500)); + assertThat(response.getContent(), containsString("AsyncContext timeout")); + + completed.countDown(); + Thread.sleep(1000); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testBlockingReadThenSendError() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + if (baseRequest.getDispatcherType() != DispatcherType.ERROR) + { + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.sendError(499); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("POST /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: test/data\r\n") + .append("Content-Length: 2\r\n") + .append("\r\n") + .append("1"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertThat(response, notNullValue()); + assertThat(response.getStatus(), is(499)); + + // Async thread should have stopped + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testBlockingWriteThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentType("text/plain"); + new Thread(() -> + { + try + { + byte[] data = new byte[16 * 1024]; + Arrays.fill(data, (byte)'X'); + data[data.length - 2] = '\r'; + data[data.length - 1] = '\n'; + OutputStream out = response.getOutputStream(); + started.countDown(); + while (true) + out.write(data); + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on write + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("GET /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("\r\n"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1)); + + // Read the header + List header = new ArrayList<>(); + while (true) + { + String line = in.readLine(); + if (line.length() == 0) + break; + header.add(line); + } + assertThat(header.get(0), containsString("200 OK")); + + // read one line of content + String content = in.readLine(); + assertThat(content, is("4000")); + content = in.readLine(); + assertThat(content, startsWith("XXXXXXXX")); + + // check that writing thread is stopped by end of request handling + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + + // read until last line + String last = null; + while (true) + { + String line = in.readLine(); + if (line == null) + break; + + last = line; + } + + // last line is not empty chunk, ie abnormal completion + assertThat(last, startsWith("XXXXX")); + } + } +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index 22b6c237187..a6b962c07fd 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -643,6 +643,7 @@ public class ResponseTest assertEquals("foo2/bar2;charset=utf-8", response.getContentType()); response.recycle(); + response.reopen(); response.setCharacterEncoding("utf16"); response.setContentType("text/html; charset=utf-8"); @@ -655,6 +656,7 @@ public class ResponseTest assertEquals("text/xml;charset=utf-8", response.getContentType()); response.recycle(); + response.reopen(); response.setCharacterEncoding("utf-16"); response.setContentType("foo/bar"); assertEquals("foo/bar;charset=utf-16", response.getContentType()); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index a40aabcb001..6ea4351dc80 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.util; import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; @@ -44,10 +45,10 @@ public class SharedBlockingCallback { private static final Logger LOG = LoggerFactory.getLogger(SharedBlockingCallback.class); - private static Throwable IDLE = new ConstantThrowable("IDLE"); - private static Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); + private static final Throwable IDLE = new ConstantThrowable("IDLE"); + private static final Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); - private static Throwable FAILED = new ConstantThrowable("FAILED"); + private static final Throwable FAILED = new ConstantThrowable("FAILED"); private final ReentrantLock _lock = new ReentrantLock(); private final Condition _idle = _lock.newCondition(); @@ -76,6 +77,26 @@ public class SharedBlockingCallback } } + public boolean fail(Throwable cause) + { + Objects.requireNonNull(cause); + _lock.lock(); + try + { + if (_blocker._state == null) + { + _blocker._state = new BlockerFailedException(cause); + _complete.signalAll(); + return true; + } + } + finally + { + _lock.unlock(); + } + return false; + } + protected void notComplete(Blocker blocker) { LOG.warn("Blocker not complete {}", blocker); @@ -145,10 +166,12 @@ public class SharedBlockingCallback _state = cause; _complete.signalAll(); } - else if (_state instanceof BlockerTimeoutException) + else if (_state instanceof BlockerTimeoutException || _state instanceof BlockerFailedException) { // Failure arrived late, block() already // modified the state, nothing more to do. + if (LOG.isDebugEnabled()) + LOG.debug("Failed after {}", _state); } else { @@ -261,4 +284,12 @@ public class SharedBlockingCallback private static class BlockerTimeoutException extends TimeoutException { } + + private static class BlockerFailedException extends Exception + { + public BlockerFailedException(Throwable cause) + { + super(cause); + } + } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java new file mode 100644 index 00000000000..4e4ac600bee --- /dev/null +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/BlockedIOTest.java @@ -0,0 +1,144 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http.client; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.BufferUtil; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockedIOTest extends AbstractTest +{ + @Override + public void init(Transport transport) throws IOException + { + setScenario(new TransportScenario(transport)); + } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testBlockingReadThenNormalComplete(Transport transport) throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AtomicReference rereadException = new AtomicReference<>(); + + init(transport); + scenario.start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + new Thread(() -> + { + try + { + int b = baseRequest.getHttpInput().read(); + if (b == '1') + { + started.countDown(); + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + } + catch (Throwable ex1) + { + readException.set(ex1); + try + { + if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) + throw new IllegalStateException(); + } + catch (Throwable ex2) + { + rereadException.set(ex2); + } + finally + { + stopped.countDown(); + } + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on second byte + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.setContentType("text/plain"); + response.getOutputStream().print("OK\r\n"); + } + }); + + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch ok = new CountDownLatch(2); + scenario.client.POST(scenario.newURI()) + .content(contentProvider) + .onResponseContent((response, content) -> + { + assertThat(BufferUtil.toString(content), containsString("OK")); + ok.countDown(); + }) + .onResponseSuccess(response -> + { + try + { + assertThat(response.getStatus(), is(200)); + stopped.await(10, TimeUnit.SECONDS); + ok.countDown(); + } + catch (Throwable t) + { + t.printStackTrace(); + } + }) + .send(null); + contentProvider.offer(BufferUtil.toBuffer("1")); + + assertTrue(ok.await(10, TimeUnit.SECONDS)); + assertThat(readException.get(), instanceOf(IOException.class)); + assertThat(rereadException.get(), instanceOf(IOException.class)); + } +}