Fix #5605 Unblock non container Threads

test and fixes for the write side.
This commit is contained in:
gregw 2021-02-02 17:49:17 +01:00
parent b3268eb3b5
commit 0d85c7d220
4 changed files with 155 additions and 6 deletions

View File

@ -377,7 +377,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public void onCompleted() public void onCompleted()
{ {
boolean complete = _input.consumeAll(); boolean complete = _input.consumeAll();
getEndPoint().cancelFillInterest(_input::getError); Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError);
if (LOG.isDebugEnabled())
LOG.debug("cancelled {}", this, cancelled);
// Handle connection upgrades // Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)

View File

@ -29,6 +29,7 @@ import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult; import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction; import java.nio.charset.CodingErrorAction;
import java.util.ResourceBundle; import java.util.ResourceBundle;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.servlet.RequestDispatcher; import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
@ -449,6 +450,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break; break;
case BLOCKED: case BLOCKED:
CancellationException cancelled = new CancellationException();
if (_writeBlocker.fail(cancelled))
_channel.abort(cancelled);
// An operation is in progress, so we soft close now
_softClose = true;
// then trigger a close from onWriteComplete
_state = State.CLOSE;
break;
case UNREADY: case UNREADY:
case PENDING: case PENDING:
// An operation is in progress, so we soft close now // An operation is in progress, so we soft close now
@ -1399,7 +1409,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
_state = State.OPEN; _state = State.OPEN;
_apiState = ApiState.BLOCKING; _apiState = ApiState.BLOCKING;
_softClose = false; _softClose = true; // Stay closed until next request
_interceptor = _channel; _interceptor = _channel;
HttpConfiguration config = _channel.getHttpConfiguration(); HttpConfiguration config = _channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize(); _bufferSize = config.getOutputBufferSize();

View File

@ -18,10 +18,15 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -44,6 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -395,4 +401,104 @@ public class BlockingTest
assertThat(readException.get(), instanceOf(IOException.class)); assertThat(readException.get(), instanceOf(IOException.class));
} }
} }
@Test
public void testBlockingWriteThenNormalComplete() throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentType("text/plain");
new Thread(() ->
{
try
{
byte[] data = new byte[16 * 1024];
Arrays.fill(data, (byte)'X');
data[data.length - 2] = '\r';
data[data.length - 1] = '\n';
OutputStream out = response.getOutputStream();
started.countDown();
while (true)
out.write(data);
}
catch (Throwable t)
{
readException.set(t);
stopped.countDown();
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on write
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
}
};
context.setHandler(handler);
server.start();
StringBuilder request = new StringBuilder();
request.append("GET /ctx/path/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("\r\n");
int port = connector.getLocalPort();
try (Socket socket = new Socket("localhost", port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1));
// Read the header
List<String> header = new ArrayList<>();
while (true)
{
String line = in.readLine();
if (line.length() == 0)
break;
header.add(line);
}
assertThat(header.get(0), containsString("200 OK"));
// read one line of content
String content = in.readLine();
assertThat(content, is("4000"));
content = in.readLine();
assertThat(content, startsWith("XXXXXXXX"));
// check that writing thread is stopped by end of request handling
assertTrue(stopped.await(10, TimeUnit.SECONDS));
// read until last line
String last = null;
while (true)
{
String line = in.readLine();
if (line == null)
break;
last = line;
}
// last line is not empty chunk, ie abnormal completion
assertThat(last, startsWith("XXXXX"));
}
}
} }

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.util;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.Objects;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -50,10 +51,10 @@ public class SharedBlockingCallback
{ {
private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
private static Throwable IDLE = new ConstantThrowable("IDLE"); private static final Throwable IDLE = new ConstantThrowable("IDLE");
private static Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED"); private static final Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED");
private static Throwable FAILED = new ConstantThrowable("FAILED"); private static final Throwable FAILED = new ConstantThrowable("FAILED");
private final ReentrantLock _lock = new ReentrantLock(); private final ReentrantLock _lock = new ReentrantLock();
private final Condition _idle = _lock.newCondition(); private final Condition _idle = _lock.newCondition();
@ -96,6 +97,26 @@ public class SharedBlockingCallback
} }
} }
public boolean fail(Throwable cause)
{
Objects.requireNonNull(cause);
_lock.lock();
try
{
if (_blocker._state == null)
{
_blocker._state = new BlockerFailedException(cause);
_complete.signalAll();
return true;
}
}
finally
{
_lock.unlock();
}
return false;
}
protected void notComplete(Blocker blocker) protected void notComplete(Blocker blocker)
{ {
LOG.warn("Blocker not complete {}", blocker); LOG.warn("Blocker not complete {}", blocker);
@ -165,10 +186,12 @@ public class SharedBlockingCallback
_state = cause; _state = cause;
_complete.signalAll(); _complete.signalAll();
} }
else if (_state instanceof BlockerTimeoutException) else if (_state instanceof BlockerTimeoutException || _state instanceof BlockerFailedException)
{ {
// Failure arrived late, block() already // Failure arrived late, block() already
// modified the state, nothing more to do. // modified the state, nothing more to do.
if (LOG.isDebugEnabled())
LOG.debug("Failed after {}", _state);
} }
else else
{ {
@ -297,4 +320,12 @@ public class SharedBlockingCallback
private static class BlockerTimeoutException extends TimeoutException private static class BlockerTimeoutException extends TimeoutException
{ {
} }
private static class BlockerFailedException extends Exception
{
public BlockerFailedException(Throwable cause)
{
super(cause);
}
}
} }