From 0d85c7d2200fd5c495205234907cb9aad70c14de Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 2 Feb 2021 17:49:17 +0100 Subject: [PATCH] Fix #5605 Unblock non container Threads test and fixes for the write side. --- .../eclipse/jetty/server/HttpConnection.java | 4 +- .../org/eclipse/jetty/server/HttpOutput.java | 12 +- .../eclipse/jetty/server/BlockingTest.java | 106 ++++++++++++++++++ .../jetty/util/SharedBlockingCallback.java | 39 ++++++- 4 files changed, 155 insertions(+), 6 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 7b503913b5b..78f96cf9ccb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -377,7 +377,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http public void onCompleted() { boolean complete = _input.consumeAll(); - getEndPoint().cancelFillInterest(_input::getError); + Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError); + if (LOG.isDebugEnabled()) + LOG.debug("cancelled {}", this, cancelled); // Handle connection upgrades if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 799fa5fe339..143931091b7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -29,6 +29,7 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; import java.util.ResourceBundle; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; @@ -449,6 +450,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable break; case BLOCKED: + CancellationException cancelled = new CancellationException(); + if (_writeBlocker.fail(cancelled)) + _channel.abort(cancelled); + // An operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + break; + case UNREADY: case PENDING: // An operation is in progress, so we soft close now @@ -1399,7 +1409,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable { _state = State.OPEN; _apiState = ApiState.BLOCKING; - _softClose = false; + _softClose = true; // Stay closed until next request _interceptor = _channel; HttpConfiguration config = _channel.getHttpConfiguration(); _bufferSize = config.getOutputBufferSize(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java index 62605a29dee..9a2f20b3828 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingTest.java @@ -18,10 +18,15 @@ package org.eclipse.jetty.server; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.OutputStream; import java.net.Socket; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -44,6 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -395,4 +401,104 @@ public class BlockingTest assertThat(readException.get(), instanceOf(IOException.class)); } } + + @Test + public void testBlockingWriteThenNormalComplete() throws Exception + { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch stopped = new CountDownLatch(1); + AtomicReference readException = new AtomicReference<>(); + AbstractHandler handler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentType("text/plain"); + new Thread(() -> + { + try + { + byte[] data = new byte[16 * 1024]; + Arrays.fill(data, (byte)'X'); + data[data.length - 2] = '\r'; + data[data.length - 1] = '\n'; + OutputStream out = response.getOutputStream(); + started.countDown(); + while (true) + out.write(data); + } + catch (Throwable t) + { + readException.set(t); + stopped.countDown(); + } + }).start(); + + try + { + // wait for thread to start and read first byte + started.await(10, TimeUnit.SECONDS); + // give it time to block on write + Thread.sleep(1000); + } + catch (Throwable e) + { + throw new ServletException(e); + } + } + }; + context.setHandler(handler); + server.start(); + + StringBuilder request = new StringBuilder(); + request.append("GET /ctx/path/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("\r\n"); + + int port = connector.getLocalPort(); + try (Socket socket = new Socket("localhost", port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1)); + + // Read the header + List header = new ArrayList<>(); + while (true) + { + String line = in.readLine(); + if (line.length() == 0) + break; + header.add(line); + } + assertThat(header.get(0), containsString("200 OK")); + + // read one line of content + String content = in.readLine(); + assertThat(content, is("4000")); + content = in.readLine(); + assertThat(content, startsWith("XXXXXXXX")); + + // check that writing thread is stopped by end of request handling + assertTrue(stopped.await(10, TimeUnit.SECONDS)); + + // read until last line + String last = null; + while (true) + { + String line = in.readLine(); + if (line == null) + break; + + last = line; + } + + // last line is not empty chunk, ie abnormal completion + assertThat(last, startsWith("XXXXX")); + } + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index eea89c8ea24..daad08cc502 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.util; import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,10 +51,10 @@ public class SharedBlockingCallback { private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); - private static Throwable IDLE = new ConstantThrowable("IDLE"); - private static Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); + private static final Throwable IDLE = new ConstantThrowable("IDLE"); + private static final Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); - private static Throwable FAILED = new ConstantThrowable("FAILED"); + private static final Throwable FAILED = new ConstantThrowable("FAILED"); private final ReentrantLock _lock = new ReentrantLock(); private final Condition _idle = _lock.newCondition(); @@ -96,6 +97,26 @@ public class SharedBlockingCallback } } + public boolean fail(Throwable cause) + { + Objects.requireNonNull(cause); + _lock.lock(); + try + { + if (_blocker._state == null) + { + _blocker._state = new BlockerFailedException(cause); + _complete.signalAll(); + return true; + } + } + finally + { + _lock.unlock(); + } + return false; + } + protected void notComplete(Blocker blocker) { LOG.warn("Blocker not complete {}", blocker); @@ -165,10 +186,12 @@ public class SharedBlockingCallback _state = cause; _complete.signalAll(); } - else if (_state instanceof BlockerTimeoutException) + else if (_state instanceof BlockerTimeoutException || _state instanceof BlockerFailedException) { // Failure arrived late, block() already // modified the state, nothing more to do. + if (LOG.isDebugEnabled()) + LOG.debug("Failed after {}", _state); } else { @@ -297,4 +320,12 @@ public class SharedBlockingCallback private static class BlockerTimeoutException extends TimeoutException { } + + private static class BlockerFailedException extends Exception + { + public BlockerFailedException(Throwable cause) + { + super(cause); + } + } }